This project has retired. For details please refer to its Attic page.
MasterAggregatorHandler xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.master;
19  
20  import java.io.DataInput;
21  import java.io.DataOutput;
22  import java.io.IOException;
23  import java.util.Map;
24  import java.util.Map.Entry;
25  
26  import org.apache.giraph.aggregators.AggregatorWriter;
27  import org.apache.giraph.bsp.BspService;
28  import org.apache.giraph.bsp.SuperstepState;
29  import org.apache.giraph.comm.GlobalCommType;
30  import org.apache.giraph.comm.MasterClient;
31  import org.apache.giraph.comm.aggregators.AggregatorUtils;
32  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
33  import org.apache.giraph.reducers.ReduceOperation;
34  import org.apache.giraph.reducers.Reducer;
35  import org.apache.giraph.utils.WritableUtils;
36  import org.apache.hadoop.io.Writable;
37  import org.apache.hadoop.util.Progressable;
38  import org.apache.log4j.Logger;
39  
40  import com.google.common.base.Preconditions;
41  import com.google.common.collect.Maps;
42  
43  /** Handler for reduce/broadcast on the master */
44  public class MasterAggregatorHandler
45      implements MasterGlobalCommUsageAggregators, Writable {
46    /** Class logger */
47    private static final Logger LOG =
48        Logger.getLogger(MasterAggregatorHandler.class);
49  
50    /** Map of reducers registered for the next worker computation */
51    private final Map<String, Reducer<Object, Writable>> reducerMap =
52        Maps.newHashMap();
53    /** Map of values to be sent to workers for next computation */
54    private final Map<String, Writable> broadcastMap =
55        Maps.newHashMap();
56    /** Values reduced from previous computation */
57    private final Map<String, Writable> reducedMap =
58        Maps.newHashMap();
59  
60    /** Aggregator writer - for writing reduced values */
61    private final AggregatorWriter aggregatorWriter;
62    /** Progressable used to report progress */
63    private final Progressable progressable;
64  
65    /** Conf */
66    private final ImmutableClassesGiraphConfiguration<?, ?, ?> conf;
67  
68    /**
69     * Constructor
70     *
71     * @param conf Configuration
72     * @param progressable Progress reporter
73     */
74    public MasterAggregatorHandler(
75        ImmutableClassesGiraphConfiguration<?, ?, ?> conf,
76        Progressable progressable) {
77      this.progressable = progressable;
78      this.conf = conf;
79      aggregatorWriter = conf.createAggregatorWriter();
80    }
81  
82    @Override
83    public final <S, R extends Writable> void registerReducer(
84        String name, ReduceOperation<S, R> reduceOp) {
85      registerReducer(name, reduceOp, reduceOp.createInitialValue());
86    }
87  
88    @Override
89    public <S, R extends Writable> void registerReducer(
90        String name, ReduceOperation<S, R> reduceOp,
91        R globalInitialValue) {
92      if (reducerMap.containsKey(name)) {
93        throw new IllegalArgumentException(
94            "Reducer with name " + name + " was already registered, " +
95            " and is " + reducerMap.get(name) + ", and we are trying to " +
96            " register " + reduceOp);
97      }
98      if (reduceOp == null) {
99        throw new IllegalArgumentException(
100           "null reducer cannot be registered, with name " + name);
101     }
102     if (globalInitialValue == null) {
103       throw new IllegalArgumentException(
104           "global initial value for reducer cannot be null, but is for " +
105           reduceOp + " with naem" + name);
106     }
107 
108     Reducer<S, R> reducer = new Reducer<>(reduceOp, globalInitialValue);
109     reducerMap.put(name, (Reducer<Object, Writable>) reducer);
110   }
111 
112   @Override
113   public <T extends Writable> T getReduced(String name) {
114     T value = (T) reducedMap.get(name);
115     if (value == null) {
116       LOG.warn("getReduced: " +
117         AggregatorUtils.getUnregisteredReducerMessage(name,
118             reducedMap.size() != 0, conf));
119     }
120     return value;
121   }
122 
123   @Override
124   public void broadcast(String name, Writable object) {
125     if (broadcastMap.containsKey(name)) {
126       throw new IllegalArgumentException(
127           "Value already broadcasted for name " + name);
128     }
129     if (object == null) {
130       throw new IllegalArgumentException("null cannot be broadcasted");
131     }
132 
133     broadcastMap.put(name, object);
134   }
135 
136   /** Prepare reduced values for current superstep's master compute */
137   public void prepareSuperstep() {
138     if (LOG.isDebugEnabled()) {
139       LOG.debug("prepareSuperstep: Start preparing reducers");
140     }
141 
142     Preconditions.checkState(reducedMap.isEmpty(),
143         "reducedMap must be empty before start of the superstep");
144     Preconditions.checkState(broadcastMap.isEmpty(),
145         "broadcastMap must be empty before start of the superstep");
146 
147     for (Entry<String, Reducer<Object, Writable>> entry :
148         reducerMap.entrySet()) {
149       Writable value = entry.getValue().getCurrentValue();
150       if (value == null) {
151         value = entry.getValue().createInitialValue();
152       }
153 
154       reducedMap.put(entry.getKey(), value);
155     }
156 
157     reducerMap.clear();
158 
159     if (LOG.isDebugEnabled()) {
160       LOG.debug("prepareSuperstep: Aggregators prepared");
161     }
162   }
163 
164   /** Finalize aggregators for current superstep */
165   public void finishSuperstep() {
166     if (LOG.isDebugEnabled()) {
167       LOG.debug("finishSuperstep: Start finishing aggregators");
168     }
169 
170     reducedMap.clear();
171 
172     if (LOG.isDebugEnabled()) {
173       LOG.debug("finishSuperstep: Aggregators finished");
174     }
175   }
176 
177   /**
178    * Send data to workers (through owner workers)
179    *
180    * @param masterClient IPC client on master
181    */
182   public void sendDataToOwners(MasterClient masterClient) {
183     // send broadcast values and reduceOperations to their owners
184     try {
185       for (Entry<String, Reducer<Object, Writable>> entry :
186           reducerMap.entrySet()) {
187         masterClient.sendToOwner(entry.getKey(),
188             GlobalCommType.REDUCE_OPERATIONS,
189             entry.getValue().getReduceOp());
190         progressable.progress();
191       }
192 
193       for (Entry<String, Writable> entry : broadcastMap.entrySet()) {
194         masterClient.sendToOwner(entry.getKey(),
195             GlobalCommType.BROADCAST,
196             entry.getValue());
197         progressable.progress();
198       }
199       masterClient.finishSendingValues();
200 
201       broadcastMap.clear();
202     } catch (IOException e) {
203       throw new IllegalStateException("finishSuperstep: " +
204           "IOException occurred while sending aggregators", e);
205     }
206   }
207 
208   /**
209    * Accept reduced values sent by worker. Every value will be sent
210    * only once, by its owner.
211    * We don't need to count the number of these requests because global
212    * superstep barrier will happen after workers ensure all requests of this
213    * type have been received and processed by master.
214    *
215    * @param reducedValuesInput Input in which aggregated values are
216    *                              written in the following format:
217    *                              numReducers
218    *                              name_1  REDUCED_VALUE  value_1
219    *                              name_2  REDUCED_VALUE  value_2
220    *                              ...
221    * @throws IOException
222    */
223   public void acceptReducedValues(
224       DataInput reducedValuesInput) throws IOException {
225     int numReducers = reducedValuesInput.readInt();
226     for (int i = 0; i < numReducers; i++) {
227       String name = reducedValuesInput.readUTF();
228       GlobalCommType type =
229           GlobalCommType.values()[reducedValuesInput.readByte()];
230       if (type != GlobalCommType.REDUCED_VALUE) {
231         throw new IllegalStateException(
232             "SendReducedToMasterRequest received " + type);
233       }
234       Reducer<Object, Writable> reducer = reducerMap.get(name);
235       if (reducer == null) {
236         throw new IllegalStateException(
237             "acceptReducedValues: " +
238                 "Master received reduced value which isn't registered: " +
239                 name);
240       }
241 
242       Writable valueToReduce = reducer.createInitialValue();
243       valueToReduce.readFields(reducedValuesInput);
244 
245       if (reducer.getCurrentValue() != null) {
246         reducer.reduceMerge(valueToReduce);
247       } else {
248         reducer.setCurrentValue(valueToReduce);
249       }
250       progressable.progress();
251     }
252     if (LOG.isDebugEnabled()) {
253       LOG.debug("acceptReducedValues: Accepted one set with " +
254           numReducers + " aggregated values");
255     }
256   }
257 
258   /**
259    * Write aggregators to {@link AggregatorWriter}
260    *
261    * @param superstep      Superstep which just finished
262    * @param superstepState State of the superstep which just finished
263    */
264   public void writeAggregators(
265       long superstep, SuperstepState superstepState) {
266     try {
267       aggregatorWriter.writeAggregator(reducedMap.entrySet(),
268           (superstepState == SuperstepState.ALL_SUPERSTEPS_DONE) ?
269               AggregatorWriter.LAST_SUPERSTEP : superstep);
270     } catch (IOException e) {
271       throw new IllegalStateException(
272           "coordinateSuperstep: IOException while " +
273               "writing aggregators data", e);
274     }
275   }
276 
277   /**
278    * Initialize {@link AggregatorWriter}
279    *
280    * @param service BspService
281    */
282   public void initialize(BspService service) {
283     try {
284       aggregatorWriter.initialize(service.getContext(),
285           service.getApplicationAttempt());
286     } catch (IOException e) {
287       throw new IllegalStateException("initialize: " +
288           "Couldn't initialize aggregatorWriter", e);
289     }
290   }
291 
292   /**
293    * Close {@link AggregatorWriter}
294    *
295    * @throws IOException
296    */
297   public void close() throws IOException {
298     aggregatorWriter.close();
299   }
300 
301   @Override
302   public void write(DataOutput out) throws IOException {
303     // At the end of superstep, only reduceOpMap can be non-empty
304     Preconditions.checkState(reducedMap.isEmpty(),
305         "reducedMap must be empty at the end of the superstep");
306 
307     out.writeInt(reducerMap.size());
308     for (Entry<String, Reducer<Object, Writable>> entry :
309         reducerMap.entrySet()) {
310       out.writeUTF(entry.getKey());
311       entry.getValue().write(out);
312       progressable.progress();
313     }
314 
315     out.writeInt(broadcastMap.size());
316     for (Entry<String, Writable> entry : broadcastMap.entrySet()) {
317       out.writeUTF(entry.getKey());
318       WritableUtils.writeWritableObject(entry.getValue(), out);
319     }
320   }
321 
322   @Override
323   public void readFields(DataInput in) throws IOException {
324     reducedMap.clear();
325     broadcastMap.clear();
326     reducerMap.clear();
327 
328     int numReducers = in.readInt();
329     for (int i = 0; i < numReducers; i++) {
330       String name = in.readUTF();
331       Reducer<Object, Writable> reducer = new Reducer<>();
332       reducer.readFields(in, conf);
333       reducerMap.put(name, reducer);
334     }
335 
336     int numBroadcast = in.readInt();
337     for (int i = 0; i < numBroadcast; i++) {
338       String name = in.readUTF();
339       Writable value = WritableUtils.readWritableObject(in, conf);
340       broadcastMap.put(name, value);
341     }
342   }
343 }