This project has retired. For details please refer to its
Attic page.
MasterAggregatorHandler xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.master;
19
20 import java.io.DataInput;
21 import java.io.DataOutput;
22 import java.io.IOException;
23 import java.util.Map;
24 import java.util.Map.Entry;
25
26 import org.apache.giraph.aggregators.AggregatorWriter;
27 import org.apache.giraph.bsp.BspService;
28 import org.apache.giraph.bsp.SuperstepState;
29 import org.apache.giraph.comm.GlobalCommType;
30 import org.apache.giraph.comm.MasterClient;
31 import org.apache.giraph.comm.aggregators.AggregatorUtils;
32 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
33 import org.apache.giraph.reducers.ReduceOperation;
34 import org.apache.giraph.reducers.Reducer;
35 import org.apache.giraph.utils.WritableUtils;
36 import org.apache.hadoop.io.Writable;
37 import org.apache.hadoop.util.Progressable;
38 import org.apache.log4j.Logger;
39
40 import com.google.common.base.Preconditions;
41 import com.google.common.collect.Maps;
42
43
44 public class MasterAggregatorHandler
45 implements MasterGlobalCommUsageAggregators, Writable {
46
47 private static final Logger LOG =
48 Logger.getLogger(MasterAggregatorHandler.class);
49
50
51 private final Map<String, Reducer<Object, Writable>> reducerMap =
52 Maps.newHashMap();
53
54 private final Map<String, Writable> broadcastMap =
55 Maps.newHashMap();
56
57 private final Map<String, Writable> reducedMap =
58 Maps.newHashMap();
59
60
61 private final AggregatorWriter aggregatorWriter;
62
63 private final Progressable progressable;
64
65
66 private final ImmutableClassesGiraphConfiguration<?, ?, ?> conf;
67
68
69
70
71
72
73
74 public MasterAggregatorHandler(
75 ImmutableClassesGiraphConfiguration<?, ?, ?> conf,
76 Progressable progressable) {
77 this.progressable = progressable;
78 this.conf = conf;
79 aggregatorWriter = conf.createAggregatorWriter();
80 }
81
82 @Override
83 public final <S, R extends Writable> void registerReducer(
84 String name, ReduceOperation<S, R> reduceOp) {
85 registerReducer(name, reduceOp, reduceOp.createInitialValue());
86 }
87
88 @Override
89 public <S, R extends Writable> void registerReducer(
90 String name, ReduceOperation<S, R> reduceOp,
91 R globalInitialValue) {
92 if (reducerMap.containsKey(name)) {
93 throw new IllegalArgumentException(
94 "Reducer with name " + name + " was already registered, " +
95 " and is " + reducerMap.get(name) + ", and we are trying to " +
96 " register " + reduceOp);
97 }
98 if (reduceOp == null) {
99 throw new IllegalArgumentException(
100 "null reducer cannot be registered, with name " + name);
101 }
102 if (globalInitialValue == null) {
103 throw new IllegalArgumentException(
104 "global initial value for reducer cannot be null, but is for " +
105 reduceOp + " with naem" + name);
106 }
107
108 Reducer<S, R> reducer = new Reducer<>(reduceOp, globalInitialValue);
109 reducerMap.put(name, (Reducer<Object, Writable>) reducer);
110 }
111
112 @Override
113 public <T extends Writable> T getReduced(String name) {
114 T value = (T) reducedMap.get(name);
115 if (value == null) {
116 LOG.warn("getReduced: " +
117 AggregatorUtils.getUnregisteredReducerMessage(name,
118 reducedMap.size() != 0, conf));
119 }
120 return value;
121 }
122
123 @Override
124 public void broadcast(String name, Writable object) {
125 if (broadcastMap.containsKey(name)) {
126 throw new IllegalArgumentException(
127 "Value already broadcasted for name " + name);
128 }
129 if (object == null) {
130 throw new IllegalArgumentException("null cannot be broadcasted");
131 }
132
133 broadcastMap.put(name, object);
134 }
135
136
137 public void prepareSuperstep() {
138 if (LOG.isDebugEnabled()) {
139 LOG.debug("prepareSuperstep: Start preparing reducers");
140 }
141
142 Preconditions.checkState(reducedMap.isEmpty(),
143 "reducedMap must be empty before start of the superstep");
144 Preconditions.checkState(broadcastMap.isEmpty(),
145 "broadcastMap must be empty before start of the superstep");
146
147 for (Entry<String, Reducer<Object, Writable>> entry :
148 reducerMap.entrySet()) {
149 Writable value = entry.getValue().getCurrentValue();
150 if (value == null) {
151 value = entry.getValue().createInitialValue();
152 }
153
154 reducedMap.put(entry.getKey(), value);
155 }
156
157 reducerMap.clear();
158
159 if (LOG.isDebugEnabled()) {
160 LOG.debug("prepareSuperstep: Aggregators prepared");
161 }
162 }
163
164
165 public void finishSuperstep() {
166 if (LOG.isDebugEnabled()) {
167 LOG.debug("finishSuperstep: Start finishing aggregators");
168 }
169
170 reducedMap.clear();
171
172 if (LOG.isDebugEnabled()) {
173 LOG.debug("finishSuperstep: Aggregators finished");
174 }
175 }
176
177
178
179
180
181
182 public void sendDataToOwners(MasterClient masterClient) {
183
184 try {
185 for (Entry<String, Reducer<Object, Writable>> entry :
186 reducerMap.entrySet()) {
187 masterClient.sendToOwner(entry.getKey(),
188 GlobalCommType.REDUCE_OPERATIONS,
189 entry.getValue().getReduceOp());
190 progressable.progress();
191 }
192
193 for (Entry<String, Writable> entry : broadcastMap.entrySet()) {
194 masterClient.sendToOwner(entry.getKey(),
195 GlobalCommType.BROADCAST,
196 entry.getValue());
197 progressable.progress();
198 }
199 masterClient.finishSendingValues();
200
201 broadcastMap.clear();
202 } catch (IOException e) {
203 throw new IllegalStateException("finishSuperstep: " +
204 "IOException occurred while sending aggregators", e);
205 }
206 }
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223 public void acceptReducedValues(
224 DataInput reducedValuesInput) throws IOException {
225 int numReducers = reducedValuesInput.readInt();
226 for (int i = 0; i < numReducers; i++) {
227 String name = reducedValuesInput.readUTF();
228 GlobalCommType type =
229 GlobalCommType.values()[reducedValuesInput.readByte()];
230 if (type != GlobalCommType.REDUCED_VALUE) {
231 throw new IllegalStateException(
232 "SendReducedToMasterRequest received " + type);
233 }
234 Reducer<Object, Writable> reducer = reducerMap.get(name);
235 if (reducer == null) {
236 throw new IllegalStateException(
237 "acceptReducedValues: " +
238 "Master received reduced value which isn't registered: " +
239 name);
240 }
241
242 Writable valueToReduce = reducer.createInitialValue();
243 valueToReduce.readFields(reducedValuesInput);
244
245 if (reducer.getCurrentValue() != null) {
246 reducer.reduceMerge(valueToReduce);
247 } else {
248 reducer.setCurrentValue(valueToReduce);
249 }
250 progressable.progress();
251 }
252 if (LOG.isDebugEnabled()) {
253 LOG.debug("acceptReducedValues: Accepted one set with " +
254 numReducers + " aggregated values");
255 }
256 }
257
258
259
260
261
262
263
264 public void writeAggregators(
265 long superstep, SuperstepState superstepState) {
266 try {
267 aggregatorWriter.writeAggregator(reducedMap.entrySet(),
268 (superstepState == SuperstepState.ALL_SUPERSTEPS_DONE) ?
269 AggregatorWriter.LAST_SUPERSTEP : superstep);
270 } catch (IOException e) {
271 throw new IllegalStateException(
272 "coordinateSuperstep: IOException while " +
273 "writing aggregators data", e);
274 }
275 }
276
277
278
279
280
281
282 public void initialize(BspService service) {
283 try {
284 aggregatorWriter.initialize(service.getContext(),
285 service.getApplicationAttempt());
286 } catch (IOException e) {
287 throw new IllegalStateException("initialize: " +
288 "Couldn't initialize aggregatorWriter", e);
289 }
290 }
291
292
293
294
295
296
297 public void close() throws IOException {
298 aggregatorWriter.close();
299 }
300
301 @Override
302 public void write(DataOutput out) throws IOException {
303
304 Preconditions.checkState(reducedMap.isEmpty(),
305 "reducedMap must be empty at the end of the superstep");
306
307 out.writeInt(reducerMap.size());
308 for (Entry<String, Reducer<Object, Writable>> entry :
309 reducerMap.entrySet()) {
310 out.writeUTF(entry.getKey());
311 entry.getValue().write(out);
312 progressable.progress();
313 }
314
315 out.writeInt(broadcastMap.size());
316 for (Entry<String, Writable> entry : broadcastMap.entrySet()) {
317 out.writeUTF(entry.getKey());
318 WritableUtils.writeWritableObject(entry.getValue(), out);
319 }
320 }
321
322 @Override
323 public void readFields(DataInput in) throws IOException {
324 reducedMap.clear();
325 broadcastMap.clear();
326 reducerMap.clear();
327
328 int numReducers = in.readInt();
329 for (int i = 0; i < numReducers; i++) {
330 String name = in.readUTF();
331 Reducer<Object, Writable> reducer = new Reducer<>();
332 reducer.readFields(in, conf);
333 reducerMap.put(name, reducer);
334 }
335
336 int numBroadcast = in.readInt();
337 for (int i = 0; i < numBroadcast; i++) {
338 String name = in.readUTF();
339 Writable value = WritableUtils.readWritableObject(in, conf);
340 broadcastMap.put(name, value);
341 }
342 }
343 }