1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.giraph.comm.aggregators;
20
21 import java.util.AbstractMap;
22 import java.util.Map;
23 import java.util.Set;
24 import java.util.concurrent.ConcurrentMap;
25
26 import org.apache.giraph.reducers.ReduceOperation;
27 import org.apache.giraph.reducers.Reducer;
28 import org.apache.giraph.utils.TaskIdsPermitsBarrier;
29 import org.apache.hadoop.io.Writable;
30 import org.apache.hadoop.util.Progressable;
31 import org.apache.log4j.Logger;
32
33 import com.google.common.base.Function;
34 import com.google.common.collect.Iterables;
35 import com.google.common.collect.Maps;
36
37 /**
38 * Class for holding aggregators which current worker owns,
39 * and aggregating partial aggregator values from workers.
40 *
41 * Protocol:
42 * 1. Before the beginning of superstep, worker receives its aggregators
43 * from master, and these aggregators will be registered to this class.
44 * Multiple registrations can be called concurrently.
45 * 2. During the superstep, whenever a worker finishes computation,
46 * it will send partial aggregated values to worker owner. This class is used
47 * to help deserialize the arriving aggregator values, and aggregate the values
48 * at the destination owner worker; these can happen concurrently.
49 * (we know step 1. is finished before anything from step 2. happens because
50 * other workers can't start computation before they receive aggregators
51 * which this worker owns)
52 * 3. This class also tracks the number of partial aggregator requests which
53 * worker received. In the end of superstep, getMyAggregatorValuesWhenReady
54 * will be called to ensure everything was received and get the values which
55 * need to be sent to master.
56 * Because of this counting, in step 2. even if worker owns no aggregators,
57 * it will still send a message without aggregator data.
58 * 4. In the end we reset to prepare for the next superstep.
59 */
60 public class OwnerAggregatorServerData {
61 /** Class logger */
62 private static final Logger LOG =
63 Logger.getLogger(OwnerAggregatorServerData.class);
64 /** Map of aggregators which current worker owns */
65 private final ConcurrentMap<String, Reducer<Object, Writable>>
66 myReducerMap = Maps.newConcurrentMap();
67 /**
68 * Counts the requests with partial aggregated values from other workers.
69 * It uses GlobalCommType.SPECIAL_COUNT to know how many requests it
70 * has to receive.
71 */
72 private final TaskIdsPermitsBarrier workersBarrier;
73 /** Progressable used to report progress */
74 private final Progressable progressable;
75
76 /**
77 * Constructor
78 *
79 * @param progressable Progressable used to report progress
80 */
81 public OwnerAggregatorServerData(Progressable progressable) {
82 this.progressable = progressable;
83 workersBarrier = new TaskIdsPermitsBarrier(progressable);
84 }
85
86 /**
87 * Register a reducer which current worker owns. Thread-safe.
88 *
89 * @param name Name of aggregator
90 * @param reduceOp Reduce operation
91 */
92 public void registerReducer(String name,
93 ReduceOperation<Object, Writable> reduceOp) {
94 if (LOG.isDebugEnabled() && myReducerMap.isEmpty()) {
95 LOG.debug("registerAggregator: The first registration after a reset()");
96 }
97 myReducerMap.putIfAbsent(name, new Reducer<>(reduceOp));
98 progressable.progress();
99 }
100
101 /**
102 * Reduce partial value of one of current worker's reducers.
103 *
104 * Thread-safe. Call only after reducers have been registered.
105 *
106 * @param name Name of the reducer
107 * @param value Value to reduce to it
108 */
109 public void reduce(String name, Writable value) {
110 Reducer<Object, Writable> reducer = myReducerMap.get(name);
111 synchronized (reducer) {
112 reducer.reduceMerge(value);
113 }
114 progressable.progress();
115 }
116
117
118 /**
119 * Create initial value for a reducer. Used so requests
120 * would be able to deserialize data.
121 *
122 * Thread-safe. Call only after reducer has been registered.
123 *
124 * @param name Name of the reducer
125 * @return Empty value
126 */
127 public Writable createInitialValue(String name) {
128 Reducer<Object, Writable> reducer = myReducerMap.get(name);
129 synchronized (reducer) {
130 return reducer.createInitialValue();
131 }
132 }
133
134 /**
135 * Notify this object that a partial aggregated values request from some
136 * worker have been received. Thread-safe.
137 */
138 public void receivedRequestFromWorker() {
139 workersBarrier.releaseOnePermit();
140 }
141
142 /**
143 * Notify this object about the total number of requests which should
144 * arrive from one of the workers. Thread-safe.
145 *
146 * @param requestCount Number of requests which should arrive
147 * @param taskId Task id of that worker
148 */
149 public void receivedRequestCountFromWorker(long requestCount, int taskId) {
150 workersBarrier.requirePermits(requestCount, taskId);
151 }
152
153 /**
154 * This function will wait until all partial aggregated values from all
155 * workers are ready and aggregated, and return final aggregated values
156 * afterwards.
157 *
158 * @param workerIds All workers in the job apart from the current one
159 * @return Iterable through final aggregated values which this worker owns
160 */
161 public Iterable<Map.Entry<String, Writable>>
162 getMyReducedValuesWhenReady(Set<Integer> workerIds) {
163 workersBarrier.waitForRequiredPermits(workerIds);
164 if (LOG.isDebugEnabled()) {
165 LOG.debug("getMyAggregatorValuesWhenReady: Values ready");
166 }
167 return Iterables.transform(myReducerMap.entrySet(),
168 new Function<Map.Entry<String, Reducer<Object, Writable>>,
169 Map.Entry<String, Writable>>() {
170 @Override
171 public Map.Entry<String, Writable> apply(
172 Map.Entry<String, Reducer<Object, Writable>> aggregator) {
173 return new AbstractMap.SimpleEntry<String, Writable>(
174 aggregator.getKey(),
175 aggregator.getValue().getCurrentValue());
176 }
177 });
178 }
179
180 /**
181 * Prepare for next superstep
182 */
183 public void reset() {
184 myReducerMap.clear();
185 if (LOG.isDebugEnabled()) {
186 LOG.debug("reset: Ready for next superstep");
187 }
188 }
189
190 }