View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.graph;
19  
20  import java.io.IOException;
21  import java.util.Collection;
22  import java.util.List;
23  import java.util.concurrent.Callable;
24  
25  import org.apache.giraph.bsp.CentralizedServiceWorker;
26  import org.apache.giraph.comm.WorkerClientRequestProcessor;
27  import org.apache.giraph.comm.messages.MessageStore;
28  import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
29  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
30  import org.apache.giraph.io.SimpleVertexWriter;
31  import org.apache.giraph.metrics.GiraphMetrics;
32  import org.apache.giraph.metrics.MetricNames;
33  import org.apache.giraph.metrics.SuperstepMetricsRegistry;
34  import org.apache.giraph.ooc.OutOfCoreEngine;
35  import org.apache.giraph.partition.Partition;
36  import org.apache.giraph.partition.PartitionStats;
37  import org.apache.giraph.partition.PartitionStore;
38  import org.apache.giraph.time.SystemTime;
39  import org.apache.giraph.time.Time;
40  import org.apache.giraph.time.Times;
41  import org.apache.giraph.utils.MemoryUtils;
42  import org.apache.giraph.utils.TimedLogger;
43  import org.apache.giraph.utils.Trimmable;
44  import org.apache.giraph.worker.WorkerProgress;
45  import org.apache.giraph.worker.WorkerThreadGlobalCommUsage;
46  import org.apache.hadoop.io.Writable;
47  import org.apache.hadoop.io.WritableComparable;
48  import org.apache.hadoop.mapreduce.Mapper;
49  import org.apache.log4j.Logger;
50  
51  import com.google.common.collect.Iterables;
52  import com.google.common.collect.Lists;
53  import com.yammer.metrics.core.Counter;
54  import com.yammer.metrics.core.Histogram;
55  
56  /**
57   * Compute as many vertex partitions as possible.  Every thread will has its
58   * own instance of WorkerClientRequestProcessor to send requests.  Note that
59   * the partition ids are used in the partitionIdQueue rather than the actual
60   * partitions since that would cause the partitions to be loaded into memory
61   * when using the out-of-core graph partition store.  We should only load on
62   * demand.
63   *
64   * @param <I>  Vertex index value
65   * @param <V>  Vertex value
66   * @param <E>  Edge value
67   * @param <M1> Incoming message type
68   * @param <M2> Outgoing message type
69   */
70  public class ComputeCallable<I extends WritableComparable, V extends Writable,
71      E extends Writable, M1 extends Writable, M2 extends Writable>
72      implements Callable<Collection<PartitionStats>> {
73    /** Class logger */
74    private static final Logger LOG  = Logger.getLogger(ComputeCallable.class);
75    /** Class time object */
76    private static final Time TIME = SystemTime.get();
77    /** How often to update WorkerProgress */
78    private static final long VERTICES_TO_UPDATE_PROGRESS = 100000;
79    /** Context */
80    private final Mapper<?, ?, ?, ?>.Context context;
81    /** Graph state */
82    private final GraphState graphState;
83    /** Message store */
84    private final MessageStore<I, M1> messageStore;
85    /** Configuration */
86    private final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
87    /** Worker (for NettyWorkerClientRequestProcessor) */
88    private final CentralizedServiceWorker<I, V, E> serviceWorker;
89    /** Dump some progress every 30 seconds */
90    private final TimedLogger timedLogger = new TimedLogger(30 * 1000, LOG);
91    /** VertexWriter for this ComputeCallable */
92    private SimpleVertexWriter<I, V, E> vertexWriter;
93    /** Get the start time in nanos */
94    private final long startNanos = TIME.getNanoseconds();
95  
96    // Per-Superstep Metrics
97    /** Messages sent */
98    private final Counter messagesSentCounter;
99    /** Message bytes sent */
100   private final Counter messageBytesSentCounter;
101   /** Compute time per partition */
102   private final Histogram histogramComputePerPartition;
103   /** GC time per compute thread */
104   private final Histogram histogramGCTimePerThread;
105   /** Wait time per compute thread */
106   private final Histogram histogramWaitTimePerThread;
107   /** Processing time per compute thread */
108   private final Histogram histogramProcessingTimePerThread;
109 
110   /**
111    * Constructor
112    *
113    * @param context Context
114    * @param graphState Current graph state (use to create own graph state)
115    * @param messageStore Message store
116    * @param configuration Configuration
117    * @param serviceWorker Service worker
118    */
119   public ComputeCallable(Mapper<?, ?, ?, ?>.Context context,
120       GraphState graphState, MessageStore<I, M1> messageStore,
121       ImmutableClassesGiraphConfiguration<I, V, E> configuration,
122       CentralizedServiceWorker<I, V, E> serviceWorker) {
123     this.context = context;
124     this.configuration = configuration;
125     this.messageStore = messageStore;
126     this.serviceWorker = serviceWorker;
127     this.graphState = graphState;
128 
129     SuperstepMetricsRegistry metrics = GiraphMetrics.get().perSuperstep();
130     messagesSentCounter = metrics.getCounter(MetricNames.MESSAGES_SENT);
131     messageBytesSentCounter =
132       metrics.getCounter(MetricNames.MESSAGE_BYTES_SENT);
133     histogramComputePerPartition = metrics.getUniformHistogram(
134         MetricNames.HISTOGRAM_COMPUTE_PER_PARTITION);
135     histogramGCTimePerThread = metrics.getUniformHistogram("gc-per-thread-ms");
136     histogramWaitTimePerThread =
137         metrics.getUniformHistogram("wait-per-thread-ms");
138     histogramProcessingTimePerThread =
139         metrics.getUniformHistogram("processing-per-thread-ms");
140   }
141 
142   @Override
143   public Collection<PartitionStats> call() {
144     // Thread initialization (for locality)
145     WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor =
146         new NettyWorkerClientRequestProcessor<I, V, E>(
147             context, configuration, serviceWorker,
148             configuration.getOutgoingMessageEncodeAndStoreType().
149               useOneMessageToManyIdsEncoding());
150     WorkerThreadGlobalCommUsage aggregatorUsage =
151         serviceWorker.getAggregatorHandler().newThreadAggregatorUsage();
152 
153     vertexWriter = serviceWorker.getSuperstepOutput().getVertexWriter();
154 
155     Computation<I, V, E, M1, M2> computation =
156         (Computation<I, V, E, M1, M2>) configuration.createComputation();
157     computation.initialize(graphState, workerClientRequestProcessor,
158         serviceWorker, aggregatorUsage);
159     computation.preSuperstep();
160 
161     List<PartitionStats> partitionStatsList = Lists.newArrayList();
162     PartitionStore<I, V, E> partitionStore = serviceWorker.getPartitionStore();
163     OutOfCoreEngine oocEngine = serviceWorker.getServerData().getOocEngine();
164     GraphTaskManager<I, V, E> taskManager = serviceWorker.getGraphTaskManager();
165     if (oocEngine != null) {
166       oocEngine.processingThreadStart();
167     }
168     long timeWaiting = 0;
169     long timeProcessing = 0;
170     long timeDoingGC = 0;
171     while (true) {
172       long startTime = System.currentTimeMillis();
173       long startGCTime = taskManager.getSuperstepGCTime();
174       Partition<I, V, E> partition = partitionStore.getNextPartition();
175       long timeDoingGCWhileWaiting =
176           taskManager.getSuperstepGCTime() - startGCTime;
177       timeDoingGC += timeDoingGCWhileWaiting;
178       timeWaiting += System.currentTimeMillis() - startTime -
179           timeDoingGCWhileWaiting;
180       if (partition == null) {
181         break;
182       }
183       long startProcessingTime = System.currentTimeMillis();
184       startGCTime = taskManager.getSuperstepGCTime();
185       try {
186         serviceWorker.getServerData().resolvePartitionMutation(partition);
187         PartitionStats partitionStats =
188             computePartition(computation, partition, oocEngine);
189         partitionStatsList.add(partitionStats);
190         long partitionMsgs = workerClientRequestProcessor.resetMessageCount();
191         partitionStats.addMessagesSentCount(partitionMsgs);
192         messagesSentCounter.inc(partitionMsgs);
193         long partitionMsgBytes =
194           workerClientRequestProcessor.resetMessageBytesCount();
195         partitionStats.addMessageBytesSentCount(partitionMsgBytes);
196         messageBytesSentCounter.inc(partitionMsgBytes);
197         timedLogger.info("call: Completed " +
198             partitionStatsList.size() + " partitions, " +
199             partitionStore.getNumPartitions() + " remaining " +
200             MemoryUtils.getRuntimeMemoryStats());
201       } catch (IOException e) {
202         throw new IllegalStateException("call: Caught unexpected IOException," +
203             " failing.", e);
204       } catch (InterruptedException e) {
205         throw new IllegalStateException("call: Caught unexpected " +
206             "InterruptedException, failing.", e);
207       } finally {
208         partitionStore.putPartition(partition);
209       }
210       long timeDoingGCWhileProcessing =
211           taskManager.getSuperstepGCTime() - startGCTime;
212       timeDoingGC += timeDoingGCWhileProcessing;
213       timeProcessing += System.currentTimeMillis() - startProcessingTime -
214           timeDoingGCWhileProcessing;
215       histogramComputePerPartition.update(
216           System.currentTimeMillis() - startTime);
217     }
218     histogramGCTimePerThread.update(timeDoingGC);
219     histogramWaitTimePerThread.update(timeWaiting);
220     histogramProcessingTimePerThread.update(timeProcessing);
221     computation.postSuperstep();
222 
223     // Return VertexWriter after the usage
224     serviceWorker.getSuperstepOutput().returnVertexWriter(vertexWriter);
225 
226     if (LOG.isInfoEnabled()) {
227       float seconds = Times.getNanosSince(TIME, startNanos) /
228           Time.NS_PER_SECOND_AS_FLOAT;
229       LOG.info("call: Computation took " + seconds + " secs for "  +
230           partitionStatsList.size() + " partitions on superstep " +
231           graphState.getSuperstep() + ".  Flushing started (time waiting on " +
232           "partitions was " +
233           String.format("%.2f s", timeWaiting / 1000.0) + ", time processing " +
234           "partitions was " + String.format("%.2f s", timeProcessing / 1000.0) +
235           ", time spent on gc was " +
236           String.format("%.2f s", timeDoingGC / 1000.0) + ")");
237     }
238     try {
239       workerClientRequestProcessor.flush();
240       // The messages flushed out from the cache is
241       // from the last partition processed
242       if (partitionStatsList.size() > 0) {
243         long partitionMsgBytes =
244           workerClientRequestProcessor.resetMessageBytesCount();
245         partitionStatsList.get(partitionStatsList.size() - 1).
246           addMessageBytesSentCount(partitionMsgBytes);
247         messageBytesSentCounter.inc(partitionMsgBytes);
248       }
249       aggregatorUsage.finishThreadComputation();
250     } catch (IOException e) {
251       throw new IllegalStateException("call: Flushing failed.", e);
252     }
253     if (oocEngine != null) {
254       oocEngine.processingThreadFinish();
255     }
256     return partitionStatsList;
257   }
258 
259   /**
260    * Compute a single partition
261    *
262    * @param computation Computation to use
263    * @param partition Partition to compute
264    * @param oocEngine out-of-core engine
265    * @return Partition stats for this computed partition
266    */
267   private PartitionStats computePartition(
268       Computation<I, V, E, M1, M2> computation,
269       Partition<I, V, E> partition, OutOfCoreEngine oocEngine)
270       throws IOException, InterruptedException {
271     PartitionStats partitionStats =
272         new PartitionStats(partition.getId(), 0, 0, 0, 0, 0);
273     long verticesComputedProgress = 0;
274     // Make sure this is thread-safe across runs
275     synchronized (partition) {
276       int count = 0;
277       for (Vertex<I, V, E> vertex : partition) {
278         // If out-of-core mechanism is used, check whether this thread
279         // can stay active or it should temporarily suspend and stop
280         // processing and generating more data for the moment.
281         if (oocEngine != null &&
282             (++count & OutOfCoreEngine.CHECK_IN_INTERVAL) == 0) {
283           oocEngine.activeThreadCheckIn();
284         }
285         Iterable<M1> messages = messageStore.getVertexMessages(vertex.getId());
286         if (vertex.isHalted() && !Iterables.isEmpty(messages)) {
287           vertex.wakeUp();
288         }
289         if (!vertex.isHalted()) {
290           context.progress();
291           computation.compute(vertex, messages);
292           // Need to unwrap the mutated edges (possibly)
293           vertex.unwrapMutableEdges();
294           //Compact edges representation if possible
295           if (vertex instanceof Trimmable) {
296             ((Trimmable) vertex).trim();
297           }
298           // Write vertex to superstep output (no-op if it is not used)
299           vertexWriter.writeVertex(vertex);
300           // Need to save the vertex changes (possibly)
301           partition.saveVertex(vertex);
302         }
303         if (vertex.isHalted()) {
304           partitionStats.incrFinishedVertexCount();
305         }
306         // Remove the messages now that the vertex has finished computation
307         messageStore.clearVertexMessages(vertex.getId());
308 
309         // Add statistics for this vertex
310         partitionStats.incrVertexCount();
311         partitionStats.addEdgeCount(vertex.getNumEdges());
312 
313         verticesComputedProgress++;
314         if (verticesComputedProgress == VERTICES_TO_UPDATE_PROGRESS) {
315           WorkerProgress.get().addVerticesComputed(verticesComputedProgress);
316           verticesComputedProgress = 0;
317         }
318       }
319 
320       messageStore.clearPartition(partition.getId());
321     }
322     WorkerProgress.get().addVerticesComputed(verticesComputedProgress);
323     WorkerProgress.get().incrementPartitionsComputed();
324     return partitionStats;
325   }
326 }
327