This project has retired. For details please refer to its Attic page.
WorkerAggregatorHandler xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.worker;
19  
20  import java.io.IOException;
21  import java.util.Map;
22  import java.util.Map.Entry;
23  import java.util.Set;
24  
25  import org.apache.giraph.bsp.CentralizedServiceWorker;
26  import org.apache.giraph.comm.GlobalCommType;
27  import org.apache.giraph.comm.aggregators.AggregatorUtils;
28  import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
29  import org.apache.giraph.comm.aggregators.GlobalCommValueOutputStream;
30  import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
31  import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
32  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
33  import org.apache.giraph.reducers.ReduceOperation;
34  import org.apache.giraph.reducers.Reducer;
35  import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
36  import org.apache.giraph.utils.UnsafeReusableByteArrayInput;
37  import org.apache.giraph.utils.WritableUtils;
38  import org.apache.hadoop.io.Writable;
39  import org.apache.hadoop.util.Progressable;
40  import org.apache.log4j.Logger;
41  
42  import com.google.common.collect.Maps;
43  import com.google.common.collect.Sets;
44  
45  /** Handler for reduce/broadcast on the workers */
46  public class WorkerAggregatorHandler implements WorkerThreadGlobalCommUsage {
47    /** Class logger */
48    private static final Logger LOG =
49        Logger.getLogger(WorkerAggregatorHandler.class);
50    /** Map of broadcasted values */
51    private final Map<String, Writable> broadcastedMap =
52        Maps.newHashMap();
53    /** Map of reducers currently being reduced */
54    private final Map<String, Reducer<Object, Writable>> reducerMap =
55        Maps.newHashMap();
56  
57    /** Service worker */
58    private final CentralizedServiceWorker<?, ?, ?> serviceWorker;
59    /** Progressable for reporting progress */
60    private final Progressable progressable;
61    /** How big a single aggregator request can be */
62    private final int maxBytesPerAggregatorRequest;
63    /** Giraph configuration */
64    private final ImmutableClassesGiraphConfiguration conf;
65  
66    /**
67     * Constructor
68     *
69     * @param serviceWorker Service worker
70     * @param conf          Giraph configuration
71     * @param progressable  Progressable for reporting progress
72     */
73    public WorkerAggregatorHandler(
74        CentralizedServiceWorker<?, ?, ?> serviceWorker,
75        ImmutableClassesGiraphConfiguration conf,
76        Progressable progressable) {
77      this.serviceWorker = serviceWorker;
78      this.progressable = progressable;
79      this.conf = conf;
80      maxBytesPerAggregatorRequest = conf.getInt(
81          AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST,
82          AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST_DEFAULT);
83    }
84  
85    @Override
86    public <B extends Writable> B getBroadcast(String name) {
87      B value = (B) broadcastedMap.get(name);
88      if (value == null) {
89        LOG.warn("getBroadcast: " +
90            AggregatorUtils.getUnregisteredBroadcastMessage(name,
91                broadcastedMap.size() != 0, conf));
92      }
93      return value;
94    }
95  
96    @Override
97    public void reduce(String name, Object value) {
98      Reducer<Object, Writable> reducer = reducerMap.get(name);
99      if (reducer != null) {
100       progressable.progress();
101       synchronized (reducer) {
102         reducer.reduce(value);
103       }
104     } else {
105       throw new IllegalStateException("reduce: " +
106           AggregatorUtils.getUnregisteredReducerMessage(name,
107               reducerMap.size() != 0, conf));
108     }
109   }
110 
111   /**
112    * Combine partially reduced value into currently reduced value.
113    * @param name Name of the reducer
114    * @param valueToReduce Partial value to reduce
115    */
116   @Override
117   public void reduceMerge(String name, Writable valueToReduce) {
118     Reducer<Object, Writable> reducer = reducerMap.get(name);
119     if (reducer != null) {
120       progressable.progress();
121       synchronized (reducer) {
122         reducer.reduceMerge(valueToReduce);
123       }
124     } else {
125       throw new IllegalStateException("reduce: " +
126           AggregatorUtils.getUnregisteredReducerMessage(name,
127               reducerMap.size() != 0, conf));
128     }
129   }
130 
131   /**
132    * Prepare aggregators for current superstep
133    *
134    * @param requestProcessor Request processor for aggregators
135    */
136   public void prepareSuperstep(
137       WorkerAggregatorRequestProcessor requestProcessor) {
138     broadcastedMap.clear();
139     reducerMap.clear();
140 
141     if (LOG.isDebugEnabled()) {
142       LOG.debug("prepareSuperstep: Start preparing aggregators");
143     }
144     AllAggregatorServerData allGlobalCommData =
145         serviceWorker.getServerData().getAllAggregatorData();
146     // Wait for my aggregators
147     Iterable<byte[]> dataToDistribute =
148         allGlobalCommData.getDataFromMasterWhenReady(
149             serviceWorker.getMasterInfo());
150     try {
151       // Distribute my aggregators
152       requestProcessor.distributeReducedValues(dataToDistribute);
153     } catch (IOException e) {
154       throw new IllegalStateException("prepareSuperstep: " +
155           "IOException occurred while trying to distribute aggregators", e);
156     }
157     // Wait for all other aggregators and store them
158     allGlobalCommData.fillNextSuperstepMapsWhenReady(
159         getOtherWorkerIdsSet(), broadcastedMap,
160         reducerMap);
161     if (LOG.isDebugEnabled()) {
162       LOG.debug("prepareSuperstep: Aggregators prepared");
163     }
164   }
165 
166   /**
167    * Send aggregators to their owners and in the end to the master
168    *
169    * @param requestProcessor Request processor for aggregators
170    */
171   public void finishSuperstep(
172       WorkerAggregatorRequestProcessor requestProcessor) {
173     if (LOG.isInfoEnabled()) {
174       LOG.info("finishSuperstep: Start gathering aggregators, " +
175           "workers will send their aggregated values " +
176           "once they are done with superstep computation");
177     }
178     OwnerAggregatorServerData ownerGlobalCommData =
179         serviceWorker.getServerData().getOwnerAggregatorData();
180     // First send partial aggregated values to their owners and determine
181     // which aggregators belong to this worker
182     for (Map.Entry<String, Reducer<Object, Writable>> entry :
183         reducerMap.entrySet()) {
184       try {
185         boolean sent = requestProcessor.sendReducedValue(entry.getKey(),
186             entry.getValue().getCurrentValue());
187         if (!sent) {
188           // If it's my aggregator, add it directly
189           ownerGlobalCommData.reduce(entry.getKey(),
190               entry.getValue().getCurrentValue());
191         }
192       } catch (IOException e) {
193         throw new IllegalStateException("finishSuperstep: " +
194             "IOException occurred while sending aggregator " +
195             entry.getKey() + " to its owner", e);
196       }
197       progressable.progress();
198     }
199     try {
200       // Flush
201       requestProcessor.flush();
202     } catch (IOException e) {
203       throw new IllegalStateException("finishSuperstep: " +
204           "IOException occurred while sending aggregators to owners", e);
205     }
206 
207     // Wait to receive partial aggregated values from all other workers
208     Iterable<Map.Entry<String, Writable>> myReducedValues =
209         ownerGlobalCommData.getMyReducedValuesWhenReady(
210             getOtherWorkerIdsSet());
211 
212     // Send final aggregated values to master
213     GlobalCommValueOutputStream globalOutput =
214         new GlobalCommValueOutputStream(false);
215     for (Map.Entry<String, Writable> entry : myReducedValues) {
216       try {
217         int currentSize = globalOutput.addValue(entry.getKey(),
218             GlobalCommType.REDUCED_VALUE,
219             entry.getValue());
220         if (currentSize > maxBytesPerAggregatorRequest) {
221           requestProcessor.sendReducedValuesToMaster(
222               globalOutput.flush());
223         }
224         progressable.progress();
225       } catch (IOException e) {
226         throw new IllegalStateException("finishSuperstep: " +
227             "IOException occurred while writing aggregator " +
228             entry.getKey(), e);
229       }
230     }
231     try {
232       requestProcessor.sendReducedValuesToMaster(globalOutput.flush());
233     } catch (IOException e) {
234       throw new IllegalStateException("finishSuperstep: " +
235           "IOException occured while sending aggregators to master", e);
236     }
237     // Wait for master to receive aggregated values before proceeding
238     serviceWorker.getWorkerClient().waitAllRequests();
239 
240     ownerGlobalCommData.reset();
241     if (LOG.isDebugEnabled()) {
242       LOG.debug("finishSuperstep: Aggregators finished");
243     }
244   }
245 
246   /**
247    * Create new aggregator usage which will be used by one of the compute
248    * threads.
249    *
250    * @return New aggregator usage
251    */
252   public WorkerThreadGlobalCommUsage newThreadAggregatorUsage() {
253     if (AggregatorUtils.useThreadLocalAggregators(conf)) {
254       return new ThreadLocalWorkerGlobalCommUsage();
255     } else {
256       return this;
257     }
258   }
259 
260   @Override
261   public void finishThreadComputation() {
262     // If we don't use thread-local aggregators, all the aggregated values
263     // are already in this object
264   }
265 
266   /**
267    * Get set of all worker task ids except the current one
268    *
269    * @return Set of all other worker task ids
270    */
271   public Set<Integer> getOtherWorkerIdsSet() {
272     Set<Integer> otherWorkers = Sets.newHashSetWithExpectedSize(
273         serviceWorker.getWorkerInfoList().size());
274     for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
275       if (workerInfo.getTaskId() != serviceWorker.getWorkerInfo().getTaskId()) {
276         otherWorkers.add(workerInfo.getTaskId());
277       }
278     }
279     return otherWorkers;
280   }
281 
282   /**
283   * Not thread-safe implementation of {@link WorkerThreadGlobalCommUsage}.
284   * We can use one instance of this object per thread to prevent
285   * synchronizing on each aggregate() call. In the end of superstep,
286   * values from each of these will be aggregated back to {@link
287   * WorkerThreadGlobalCommUsage}
288   */
289   public class ThreadLocalWorkerGlobalCommUsage
290     implements WorkerThreadGlobalCommUsage {
291     /** Thread-local reducer map */
292     private final Map<String, Reducer<Object, Writable>> threadReducerMap;
293 
294     /**
295     * Constructor
296     *
297     * Creates new instances of all reducers from
298     * {@link WorkerAggregatorHandler}
299     */
300     public ThreadLocalWorkerGlobalCommUsage() {
301       threadReducerMap = Maps.newHashMapWithExpectedSize(
302           WorkerAggregatorHandler.this.reducerMap.size());
303 
304       UnsafeByteArrayOutputStream out = new UnsafeByteArrayOutputStream();
305       UnsafeReusableByteArrayInput in = new UnsafeReusableByteArrayInput();
306 
307       for (Entry<String, Reducer<Object, Writable>> entry :
308           reducerMap.entrySet()) {
309         ReduceOperation<Object, Writable> globalReduceOp =
310             entry.getValue().getReduceOp();
311 
312         ReduceOperation<Object, Writable> threadLocalCopy =
313             WritableUtils.createCopy(out, in, globalReduceOp, conf);
314 
315         threadReducerMap.put(entry.getKey(), new Reducer<>(threadLocalCopy));
316       }
317     }
318 
319     @Override
320     public void reduce(String name, Object value) {
321       Reducer<Object, Writable> reducer = threadReducerMap.get(name);
322       if (reducer != null) {
323         progressable.progress();
324         reducer.reduce(value);
325       } else {
326         throw new IllegalStateException("reduce: " +
327             AggregatorUtils.getUnregisteredAggregatorMessage(name,
328                 threadReducerMap.size() != 0, conf));
329       }
330     }
331 
332     @Override
333     public void reduceMerge(String name, Writable value) {
334       Reducer<Object, Writable> reducer = threadReducerMap.get(name);
335       if (reducer != null) {
336         progressable.progress();
337         reducer.reduceMerge(value);
338       } else {
339         throw new IllegalStateException("reduceMerge: " +
340             AggregatorUtils.getUnregisteredAggregatorMessage(name,
341                 threadReducerMap.size() != 0, conf));
342       }
343     }
344 
345     @Override
346     public <B extends Writable> B getBroadcast(String name) {
347       return WorkerAggregatorHandler.this.getBroadcast(name);
348     }
349 
350     @Override
351     public void finishThreadComputation() {
352       // Aggregate the values this thread's vertices provided back to
353       // WorkerAggregatorHandler
354       for (Entry<String, Reducer<Object, Writable>> entry :
355           threadReducerMap.entrySet()) {
356         WorkerAggregatorHandler.this.reduceMerge(entry.getKey(),
357             entry.getValue().getCurrentValue());
358       }
359     }
360   }
361 
362 }