This project has retired. For details please refer to its Attic page.
ComputeCallable xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.graph;
19  
20  import java.io.IOException;
21  import java.util.Collection;
22  import java.util.List;
23  import java.util.concurrent.Callable;
24  
25  import org.apache.giraph.bsp.CentralizedServiceWorker;
26  import org.apache.giraph.comm.WorkerClientRequestProcessor;
27  import org.apache.giraph.comm.messages.MessageStore;
28  import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
29  import org.apache.giraph.conf.GiraphConstants;
30  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
31  import org.apache.giraph.function.primitive.PrimitiveRefs.LongRef;
32  import org.apache.giraph.io.SimpleVertexWriter;
33  import org.apache.giraph.metrics.GiraphMetrics;
34  import org.apache.giraph.metrics.MetricNames;
35  import org.apache.giraph.metrics.SuperstepMetricsRegistry;
36  import org.apache.giraph.ooc.OutOfCoreEngine;
37  import org.apache.giraph.partition.Partition;
38  import org.apache.giraph.partition.PartitionStats;
39  import org.apache.giraph.partition.PartitionStore;
40  import org.apache.giraph.time.SystemTime;
41  import org.apache.giraph.time.Time;
42  import org.apache.giraph.time.Times;
43  import org.apache.giraph.utils.MemoryUtils;
44  import org.apache.giraph.utils.TimedLogger;
45  import org.apache.giraph.utils.Trimmable;
46  import org.apache.giraph.worker.WorkerProgress;
47  import org.apache.giraph.worker.WorkerThreadGlobalCommUsage;
48  import org.apache.hadoop.io.Writable;
49  import org.apache.hadoop.io.WritableComparable;
50  import org.apache.hadoop.mapreduce.Mapper;
51  import org.apache.hadoop.util.Progressable;
52  import org.apache.log4j.Logger;
53  
54  import com.google.common.base.Preconditions;
55  import com.google.common.collect.Iterables;
56  import com.google.common.collect.Lists;
57  import com.yammer.metrics.core.Counter;
58  import com.yammer.metrics.core.Histogram;
59  
60  /**
61   * Compute as many vertex partitions as possible.  Every thread will has its
62   * own instance of WorkerClientRequestProcessor to send requests.  Note that
63   * the partition ids are used in the partitionIdQueue rather than the actual
64   * partitions since that would cause the partitions to be loaded into memory
65   * when using the out-of-core graph partition store.  We should only load on
66   * demand.
67   *
68   * @param <I>  Vertex index value
69   * @param <V>  Vertex value
70   * @param <E>  Edge value
71   * @param <M1> Incoming message type
72   * @param <M2> Outgoing message type
73   */
74  public class ComputeCallable<I extends WritableComparable, V extends Writable,
75      E extends Writable, M1 extends Writable, M2 extends Writable>
76      implements Callable<Collection<PartitionStats>> {
77    /** Class logger */
78    private static final Logger LOG  = Logger.getLogger(ComputeCallable.class);
79    /** Class time object */
80    private static final Time TIME = SystemTime.get();
81    /** How often to update WorkerProgress */
82    private final long verticesToUpdateProgress;
83    /** Context */
84    private final Mapper<?, ?, ?, ?>.Context context;
85    /** Graph state */
86    private final GraphState graphState;
87    /** Message store */
88    private final MessageStore<I, M1> messageStore;
89    /** Configuration */
90    private final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
91    /** Worker (for NettyWorkerClientRequestProcessor) */
92    private final CentralizedServiceWorker<I, V, E> serviceWorker;
93    /** Dump some progress every 30 seconds */
94    private final TimedLogger timedLogger = new TimedLogger(30 * 1000, LOG);
95    /** VertexWriter for this ComputeCallable */
96    private SimpleVertexWriter<I, V, E> vertexWriter;
97    /** Get the start time in nanos */
98    private final long startNanos = TIME.getNanoseconds();
99  
100   // Per-Superstep Metrics
101   /** Messages sent */
102   private final Counter messagesSentCounter;
103   /** Message bytes sent */
104   private final Counter messageBytesSentCounter;
105   /** Compute time per partition */
106   private final Histogram histogramComputePerPartition;
107   /** GC time per compute thread */
108   private final Histogram histogramGCTimePerThread;
109   /** Wait time per compute thread */
110   private final Histogram histogramWaitTimePerThread;
111   /** Processing time per compute thread */
112   private final Histogram histogramProcessingTimePerThread;
113 
114   /**
115    * Constructor
116    *
117    * @param context Context
118    * @param graphState Current graph state (use to create own graph state)
119    * @param messageStore Message store
120    * @param configuration Configuration
121    * @param serviceWorker Service worker
122    */
123   public ComputeCallable(Mapper<?, ?, ?, ?>.Context context,
124       GraphState graphState, MessageStore<I, M1> messageStore,
125       ImmutableClassesGiraphConfiguration<I, V, E> configuration,
126       CentralizedServiceWorker<I, V, E> serviceWorker) {
127     this.context = context;
128     this.configuration = configuration;
129     this.messageStore = messageStore;
130     this.serviceWorker = serviceWorker;
131     this.graphState = graphState;
132 
133     SuperstepMetricsRegistry metrics = GiraphMetrics.get().perSuperstep();
134     messagesSentCounter = metrics.getCounter(MetricNames.MESSAGES_SENT);
135     messageBytesSentCounter =
136       metrics.getCounter(MetricNames.MESSAGE_BYTES_SENT);
137     histogramComputePerPartition = metrics.getUniformHistogram(
138         MetricNames.HISTOGRAM_COMPUTE_PER_PARTITION);
139     histogramGCTimePerThread = metrics.getUniformHistogram("gc-per-thread-ms");
140     histogramWaitTimePerThread =
141         metrics.getUniformHistogram("wait-per-thread-ms");
142     histogramProcessingTimePerThread =
143         metrics.getUniformHistogram("processing-per-thread-ms");
144     verticesToUpdateProgress =
145         GiraphConstants.VERTICES_TO_UPDATE_PROGRESS.get(configuration);
146   }
147 
148   @Override
149   public Collection<PartitionStats> call() {
150     // Thread initialization (for locality)
151     WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor =
152         new NettyWorkerClientRequestProcessor<I, V, E>(
153             context, configuration, serviceWorker,
154             configuration.getOutgoingMessageEncodeAndStoreType().
155               useOneMessageToManyIdsEncoding());
156     WorkerThreadGlobalCommUsage aggregatorUsage =
157         serviceWorker.getAggregatorHandler().newThreadAggregatorUsage();
158 
159     vertexWriter = serviceWorker.getSuperstepOutput().getVertexWriter();
160 
161     Computation<I, V, E, M1, M2> computation =
162         (Computation<I, V, E, M1, M2>) configuration.createComputation();
163     computation.initialize(graphState, workerClientRequestProcessor,
164         serviceWorker, aggregatorUsage);
165     computation.preSuperstep();
166 
167     List<PartitionStats> partitionStatsList = Lists.newArrayList();
168     PartitionStore<I, V, E> partitionStore = serviceWorker.getPartitionStore();
169     OutOfCoreEngine oocEngine = serviceWorker.getServerData().getOocEngine();
170     GraphTaskManager<I, V, E> taskManager = serviceWorker.getGraphTaskManager();
171     if (oocEngine != null) {
172       oocEngine.processingThreadStart();
173     }
174     long timeWaiting = 0;
175     long timeProcessing = 0;
176     long timeDoingGC = 0;
177     while (true) {
178       long startTime = System.currentTimeMillis();
179       long startGCTime = taskManager.getSuperstepGCTime();
180       Partition<I, V, E> partition = partitionStore.getNextPartition();
181       long timeDoingGCWhileWaiting =
182           taskManager.getSuperstepGCTime() - startGCTime;
183       timeDoingGC += timeDoingGCWhileWaiting;
184       timeWaiting += System.currentTimeMillis() - startTime -
185           timeDoingGCWhileWaiting;
186       if (partition == null) {
187         break;
188       }
189       long startProcessingTime = System.currentTimeMillis();
190       startGCTime = taskManager.getSuperstepGCTime();
191       try {
192         serviceWorker.getServerData().resolvePartitionMutation(partition);
193         PartitionStats partitionStats = computePartition(
194             computation, partition, oocEngine,
195             serviceWorker.getConfiguration().getIncomingMessageClasses()
196               .ignoreExistingVertices());
197         partitionStatsList.add(partitionStats);
198         long partitionMsgs = workerClientRequestProcessor.resetMessageCount();
199         partitionStats.addMessagesSentCount(partitionMsgs);
200         messagesSentCounter.inc(partitionMsgs);
201         long partitionMsgBytes =
202           workerClientRequestProcessor.resetMessageBytesCount();
203         partitionStats.addMessageBytesSentCount(partitionMsgBytes);
204         messageBytesSentCounter.inc(partitionMsgBytes);
205         timedLogger.info("call: Completed " +
206             partitionStatsList.size() + " partitions, " +
207             partitionStore.getNumPartitions() + " remaining " +
208             MemoryUtils.getRuntimeMemoryStats());
209         long timeDoingGCWhileProcessing =
210             taskManager.getSuperstepGCTime() - startGCTime;
211         timeDoingGC += timeDoingGCWhileProcessing;
212         long timeProcessingPartition =
213             System.currentTimeMillis() - startProcessingTime -
214                 timeDoingGCWhileProcessing;
215         timeProcessing += timeProcessingPartition;
216         partitionStats.setComputeMs(timeProcessingPartition);
217       } catch (IOException e) {
218         throw new IllegalStateException("call: Caught unexpected IOException," +
219             " failing.", e);
220       } catch (InterruptedException e) {
221         throw new IllegalStateException("call: Caught unexpected " +
222             "InterruptedException, failing.", e);
223       } finally {
224         partitionStore.putPartition(partition);
225       }
226       histogramComputePerPartition.update(
227           System.currentTimeMillis() - startTime);
228     }
229     histogramGCTimePerThread.update(timeDoingGC);
230     histogramWaitTimePerThread.update(timeWaiting);
231     histogramProcessingTimePerThread.update(timeProcessing);
232     computation.postSuperstep();
233 
234     // Return VertexWriter after the usage
235     serviceWorker.getSuperstepOutput().returnVertexWriter(vertexWriter);
236 
237     if (LOG.isInfoEnabled()) {
238       float seconds = Times.getNanosSince(TIME, startNanos) /
239           Time.NS_PER_SECOND_AS_FLOAT;
240       LOG.info("call: Computation took " + seconds + " secs for "  +
241           partitionStatsList.size() + " partitions on superstep " +
242           graphState.getSuperstep() + ".  Flushing started (time waiting on " +
243           "partitions was " +
244           String.format("%.2f s", timeWaiting / 1000.0) + ", time processing " +
245           "partitions was " + String.format("%.2f s", timeProcessing / 1000.0) +
246           ", time spent on gc was " +
247           String.format("%.2f s", timeDoingGC / 1000.0) + ")");
248     }
249     try {
250       workerClientRequestProcessor.flush();
251       // The messages flushed out from the cache is
252       // from the last partition processed
253       if (partitionStatsList.size() > 0) {
254         long partitionMsgBytes =
255           workerClientRequestProcessor.resetMessageBytesCount();
256         partitionStatsList.get(partitionStatsList.size() - 1).
257           addMessageBytesSentCount(partitionMsgBytes);
258         messageBytesSentCounter.inc(partitionMsgBytes);
259       }
260       aggregatorUsage.finishThreadComputation();
261     } catch (IOException e) {
262       throw new IllegalStateException("call: Flushing failed.", e);
263     }
264     if (oocEngine != null) {
265       oocEngine.processingThreadFinish();
266     }
267     return partitionStatsList;
268   }
269 
270   /**
271    * Compute a single partition
272    *
273    * @param computation Computation to use
274    * @param partition Partition to compute
275    * @param oocEngine out-of-core engine
276    * @param ignoreExistingVertices whether to ignore existing vertices
277    * @return Partition stats for this computed partition
278    */
279   private PartitionStats computePartition(
280       Computation<I, V, E, M1, M2> computation,
281       Partition<I, V, E> partition, OutOfCoreEngine oocEngine,
282       boolean ignoreExistingVertices)
283       throws IOException, InterruptedException {
284     PartitionStats partitionStats =
285         new PartitionStats(partition.getId(), 0, 0, 0, 0, 0,
286             serviceWorker.getWorkerInfo().getHostnameId());
287     final LongRef verticesComputedProgress = new LongRef(0);
288 
289     Progressable verticesProgressable = new Progressable() {
290       @Override
291       public void progress() {
292         verticesComputedProgress.value++;
293         if (verticesComputedProgress.value == verticesToUpdateProgress) {
294           WorkerProgress.get().addVerticesComputed(
295               verticesComputedProgress.value);
296           verticesComputedProgress.value = 0;
297         }
298       }
299     };
300     // Make sure this is thread-safe across runs
301     synchronized (partition) {
302       if (ignoreExistingVertices) {
303         Iterable<I> destinations =
304             messageStore.getPartitionDestinationVertices(partition.getId());
305         if (!Iterables.isEmpty(destinations)) {
306           OnlyIdVertex<I> vertex = new OnlyIdVertex<>();
307 
308           for (I vertexId : destinations) {
309             Iterable<M1> messages = messageStore.getVertexMessages(vertexId);
310             Preconditions.checkState(!Iterables.isEmpty(messages));
311             vertex.setId(vertexId);
312             computation.compute((Vertex) vertex, messages);
313 
314             // Remove the messages now that the vertex has finished computation
315             messageStore.clearVertexMessages(vertexId);
316 
317             // Add statistics for this vertex
318             partitionStats.incrVertexCount();
319 
320             verticesProgressable.progress();
321           }
322         }
323       } else {
324         int count = 0;
325         for (Vertex<I, V, E> vertex : partition) {
326           // If out-of-core mechanism is used, check whether this thread
327           // can stay active or it should temporarily suspend and stop
328           // processing and generating more data for the moment.
329           if (oocEngine != null &&
330               (++count & OutOfCoreEngine.CHECK_IN_INTERVAL) == 0) {
331             oocEngine.activeThreadCheckIn();
332           }
333           Iterable<M1> messages =
334               messageStore.getVertexMessages(vertex.getId());
335           if (vertex.isHalted() && !Iterables.isEmpty(messages)) {
336             vertex.wakeUp();
337           }
338           if (!vertex.isHalted()) {
339             context.progress();
340             computation.compute(vertex, messages);
341             // Need to unwrap the mutated edges (possibly)
342             vertex.unwrapMutableEdges();
343             //Compact edges representation if possible
344             if (vertex instanceof Trimmable) {
345               ((Trimmable) vertex).trim();
346             }
347             // Write vertex to superstep output (no-op if it is not used)
348             vertexWriter.writeVertex(vertex);
349             // Need to save the vertex changes (possibly)
350             partition.saveVertex(vertex);
351           }
352           if (vertex.isHalted()) {
353             partitionStats.incrFinishedVertexCount();
354           }
355           // Remove the messages now that the vertex has finished computation
356           messageStore.clearVertexMessages(vertex.getId());
357 
358           // Add statistics for this vertex
359           partitionStats.incrVertexCount();
360           partitionStats.addEdgeCount(vertex.getNumEdges());
361 
362           verticesProgressable.progress();
363         }
364       }
365       messageStore.clearPartition(partition.getId());
366     }
367     WorkerProgress.get().addVerticesComputed(verticesComputedProgress.value);
368     WorkerProgress.get().incrementPartitionsComputed();
369     return partitionStats;
370   }
371 }
372