This project has retired. For details please refer to its
Attic page.
ComputeCallable xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.graph;
19
20 import java.io.IOException;
21 import java.util.Collection;
22 import java.util.List;
23 import java.util.concurrent.Callable;
24
25 import org.apache.giraph.bsp.CentralizedServiceWorker;
26 import org.apache.giraph.comm.WorkerClientRequestProcessor;
27 import org.apache.giraph.comm.messages.MessageStore;
28 import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
29 import org.apache.giraph.conf.GiraphConstants;
30 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
31 import org.apache.giraph.function.primitive.PrimitiveRefs.LongRef;
32 import org.apache.giraph.io.SimpleVertexWriter;
33 import org.apache.giraph.metrics.GiraphMetrics;
34 import org.apache.giraph.metrics.MetricNames;
35 import org.apache.giraph.metrics.SuperstepMetricsRegistry;
36 import org.apache.giraph.ooc.OutOfCoreEngine;
37 import org.apache.giraph.partition.Partition;
38 import org.apache.giraph.partition.PartitionStats;
39 import org.apache.giraph.partition.PartitionStore;
40 import org.apache.giraph.time.SystemTime;
41 import org.apache.giraph.time.Time;
42 import org.apache.giraph.time.Times;
43 import org.apache.giraph.utils.MemoryUtils;
44 import org.apache.giraph.utils.TimedLogger;
45 import org.apache.giraph.utils.Trimmable;
46 import org.apache.giraph.worker.WorkerProgress;
47 import org.apache.giraph.worker.WorkerThreadGlobalCommUsage;
48 import org.apache.hadoop.io.Writable;
49 import org.apache.hadoop.io.WritableComparable;
50 import org.apache.hadoop.mapreduce.Mapper;
51 import org.apache.hadoop.util.Progressable;
52 import org.apache.log4j.Logger;
53
54 import com.google.common.base.Preconditions;
55 import com.google.common.collect.Iterables;
56 import com.google.common.collect.Lists;
57 import com.yammer.metrics.core.Counter;
58 import com.yammer.metrics.core.Histogram;
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74 public class ComputeCallable<I extends WritableComparable, V extends Writable,
75 E extends Writable, M1 extends Writable, M2 extends Writable>
76 implements Callable<Collection<PartitionStats>> {
77
78 private static final Logger LOG = Logger.getLogger(ComputeCallable.class);
79
80 private static final Time TIME = SystemTime.get();
81
82 private final long verticesToUpdateProgress;
83
84 private final Mapper<?, ?, ?, ?>.Context context;
85
86 private final GraphState graphState;
87
88 private final MessageStore<I, M1> messageStore;
89
90 private final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
91
92 private final CentralizedServiceWorker<I, V, E> serviceWorker;
93
94 private final TimedLogger timedLogger = new TimedLogger(30 * 1000, LOG);
95
96 private SimpleVertexWriter<I, V, E> vertexWriter;
97
98 private final long startNanos = TIME.getNanoseconds();
99
100
101
102 private final Counter messagesSentCounter;
103
104 private final Counter messageBytesSentCounter;
105
106 private final Histogram histogramComputePerPartition;
107
108 private final Histogram histogramGCTimePerThread;
109
110 private final Histogram histogramWaitTimePerThread;
111
112 private final Histogram histogramProcessingTimePerThread;
113
114
115
116
117
118
119
120
121
122
123 public ComputeCallable(Mapper<?, ?, ?, ?>.Context context,
124 GraphState graphState, MessageStore<I, M1> messageStore,
125 ImmutableClassesGiraphConfiguration<I, V, E> configuration,
126 CentralizedServiceWorker<I, V, E> serviceWorker) {
127 this.context = context;
128 this.configuration = configuration;
129 this.messageStore = messageStore;
130 this.serviceWorker = serviceWorker;
131 this.graphState = graphState;
132
133 SuperstepMetricsRegistry metrics = GiraphMetrics.get().perSuperstep();
134 messagesSentCounter = metrics.getCounter(MetricNames.MESSAGES_SENT);
135 messageBytesSentCounter =
136 metrics.getCounter(MetricNames.MESSAGE_BYTES_SENT);
137 histogramComputePerPartition = metrics.getUniformHistogram(
138 MetricNames.HISTOGRAM_COMPUTE_PER_PARTITION);
139 histogramGCTimePerThread = metrics.getUniformHistogram("gc-per-thread-ms");
140 histogramWaitTimePerThread =
141 metrics.getUniformHistogram("wait-per-thread-ms");
142 histogramProcessingTimePerThread =
143 metrics.getUniformHistogram("processing-per-thread-ms");
144 verticesToUpdateProgress =
145 GiraphConstants.VERTICES_TO_UPDATE_PROGRESS.get(configuration);
146 }
147
148 @Override
149 public Collection<PartitionStats> call() {
150
151 WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor =
152 new NettyWorkerClientRequestProcessor<I, V, E>(
153 context, configuration, serviceWorker,
154 configuration.getOutgoingMessageEncodeAndStoreType().
155 useOneMessageToManyIdsEncoding());
156 WorkerThreadGlobalCommUsage aggregatorUsage =
157 serviceWorker.getAggregatorHandler().newThreadAggregatorUsage();
158
159 vertexWriter = serviceWorker.getSuperstepOutput().getVertexWriter();
160
161 Computation<I, V, E, M1, M2> computation =
162 (Computation<I, V, E, M1, M2>) configuration.createComputation();
163 computation.initialize(graphState, workerClientRequestProcessor,
164 serviceWorker, aggregatorUsage);
165 computation.preSuperstep();
166
167 List<PartitionStats> partitionStatsList = Lists.newArrayList();
168 PartitionStore<I, V, E> partitionStore = serviceWorker.getPartitionStore();
169 OutOfCoreEngine oocEngine = serviceWorker.getServerData().getOocEngine();
170 GraphTaskManager<I, V, E> taskManager = serviceWorker.getGraphTaskManager();
171 if (oocEngine != null) {
172 oocEngine.processingThreadStart();
173 }
174 long timeWaiting = 0;
175 long timeProcessing = 0;
176 long timeDoingGC = 0;
177 while (true) {
178 long startTime = System.currentTimeMillis();
179 long startGCTime = taskManager.getSuperstepGCTime();
180 Partition<I, V, E> partition = partitionStore.getNextPartition();
181 long timeDoingGCWhileWaiting =
182 taskManager.getSuperstepGCTime() - startGCTime;
183 timeDoingGC += timeDoingGCWhileWaiting;
184 timeWaiting += System.currentTimeMillis() - startTime -
185 timeDoingGCWhileWaiting;
186 if (partition == null) {
187 break;
188 }
189 long startProcessingTime = System.currentTimeMillis();
190 startGCTime = taskManager.getSuperstepGCTime();
191 try {
192 serviceWorker.getServerData().resolvePartitionMutation(partition);
193 PartitionStats partitionStats = computePartition(
194 computation, partition, oocEngine,
195 serviceWorker.getConfiguration().getIncomingMessageClasses()
196 .ignoreExistingVertices());
197 partitionStatsList.add(partitionStats);
198 long partitionMsgs = workerClientRequestProcessor.resetMessageCount();
199 partitionStats.addMessagesSentCount(partitionMsgs);
200 messagesSentCounter.inc(partitionMsgs);
201 long partitionMsgBytes =
202 workerClientRequestProcessor.resetMessageBytesCount();
203 partitionStats.addMessageBytesSentCount(partitionMsgBytes);
204 messageBytesSentCounter.inc(partitionMsgBytes);
205 timedLogger.info("call: Completed " +
206 partitionStatsList.size() + " partitions, " +
207 partitionStore.getNumPartitions() + " remaining " +
208 MemoryUtils.getRuntimeMemoryStats());
209 long timeDoingGCWhileProcessing =
210 taskManager.getSuperstepGCTime() - startGCTime;
211 timeDoingGC += timeDoingGCWhileProcessing;
212 long timeProcessingPartition =
213 System.currentTimeMillis() - startProcessingTime -
214 timeDoingGCWhileProcessing;
215 timeProcessing += timeProcessingPartition;
216 partitionStats.setComputeMs(timeProcessingPartition);
217 } catch (IOException e) {
218 throw new IllegalStateException("call: Caught unexpected IOException," +
219 " failing.", e);
220 } catch (InterruptedException e) {
221 throw new IllegalStateException("call: Caught unexpected " +
222 "InterruptedException, failing.", e);
223 } finally {
224 partitionStore.putPartition(partition);
225 }
226 histogramComputePerPartition.update(
227 System.currentTimeMillis() - startTime);
228 }
229 histogramGCTimePerThread.update(timeDoingGC);
230 histogramWaitTimePerThread.update(timeWaiting);
231 histogramProcessingTimePerThread.update(timeProcessing);
232 computation.postSuperstep();
233
234
235 serviceWorker.getSuperstepOutput().returnVertexWriter(vertexWriter);
236
237 if (LOG.isInfoEnabled()) {
238 float seconds = Times.getNanosSince(TIME, startNanos) /
239 Time.NS_PER_SECOND_AS_FLOAT;
240 LOG.info("call: Computation took " + seconds + " secs for " +
241 partitionStatsList.size() + " partitions on superstep " +
242 graphState.getSuperstep() + ". Flushing started (time waiting on " +
243 "partitions was " +
244 String.format("%.2f s", timeWaiting / 1000.0) + ", time processing " +
245 "partitions was " + String.format("%.2f s", timeProcessing / 1000.0) +
246 ", time spent on gc was " +
247 String.format("%.2f s", timeDoingGC / 1000.0) + ")");
248 }
249 try {
250 workerClientRequestProcessor.flush();
251
252
253 if (partitionStatsList.size() > 0) {
254 long partitionMsgBytes =
255 workerClientRequestProcessor.resetMessageBytesCount();
256 partitionStatsList.get(partitionStatsList.size() - 1).
257 addMessageBytesSentCount(partitionMsgBytes);
258 messageBytesSentCounter.inc(partitionMsgBytes);
259 }
260 aggregatorUsage.finishThreadComputation();
261 } catch (IOException e) {
262 throw new IllegalStateException("call: Flushing failed.", e);
263 }
264 if (oocEngine != null) {
265 oocEngine.processingThreadFinish();
266 }
267 return partitionStatsList;
268 }
269
270
271
272
273
274
275
276
277
278
279 private PartitionStats computePartition(
280 Computation<I, V, E, M1, M2> computation,
281 Partition<I, V, E> partition, OutOfCoreEngine oocEngine,
282 boolean ignoreExistingVertices)
283 throws IOException, InterruptedException {
284 PartitionStats partitionStats =
285 new PartitionStats(partition.getId(), 0, 0, 0, 0, 0,
286 serviceWorker.getWorkerInfo().getHostnameId());
287 final LongRef verticesComputedProgress = new LongRef(0);
288
289 Progressable verticesProgressable = new Progressable() {
290 @Override
291 public void progress() {
292 verticesComputedProgress.value++;
293 if (verticesComputedProgress.value == verticesToUpdateProgress) {
294 WorkerProgress.get().addVerticesComputed(
295 verticesComputedProgress.value);
296 verticesComputedProgress.value = 0;
297 }
298 }
299 };
300
301 synchronized (partition) {
302 if (ignoreExistingVertices) {
303 Iterable<I> destinations =
304 messageStore.getPartitionDestinationVertices(partition.getId());
305 if (!Iterables.isEmpty(destinations)) {
306 OnlyIdVertex<I> vertex = new OnlyIdVertex<>();
307
308 for (I vertexId : destinations) {
309 Iterable<M1> messages = messageStore.getVertexMessages(vertexId);
310 Preconditions.checkState(!Iterables.isEmpty(messages));
311 vertex.setId(vertexId);
312 computation.compute((Vertex) vertex, messages);
313
314
315 messageStore.clearVertexMessages(vertexId);
316
317
318 partitionStats.incrVertexCount();
319
320 verticesProgressable.progress();
321 }
322 }
323 } else {
324 int count = 0;
325 for (Vertex<I, V, E> vertex : partition) {
326
327
328
329 if (oocEngine != null &&
330 (++count & OutOfCoreEngine.CHECK_IN_INTERVAL) == 0) {
331 oocEngine.activeThreadCheckIn();
332 }
333 Iterable<M1> messages =
334 messageStore.getVertexMessages(vertex.getId());
335 if (vertex.isHalted() && !Iterables.isEmpty(messages)) {
336 vertex.wakeUp();
337 }
338 if (!vertex.isHalted()) {
339 context.progress();
340 computation.compute(vertex, messages);
341
342 vertex.unwrapMutableEdges();
343
344 if (vertex instanceof Trimmable) {
345 ((Trimmable) vertex).trim();
346 }
347
348 vertexWriter.writeVertex(vertex);
349
350 partition.saveVertex(vertex);
351 }
352 if (vertex.isHalted()) {
353 partitionStats.incrFinishedVertexCount();
354 }
355
356 messageStore.clearVertexMessages(vertex.getId());
357
358
359 partitionStats.incrVertexCount();
360 partitionStats.addEdgeCount(vertex.getNumEdges());
361
362 verticesProgressable.progress();
363 }
364 }
365 messageStore.clearPartition(partition.getId());
366 }
367 WorkerProgress.get().addVerticesComputed(verticesComputedProgress.value);
368 WorkerProgress.get().incrementPartitionsComputed();
369 return partitionStats;
370 }
371 }
372