This project has retired. For details please refer to its
Attic page.
GraphTaskManager xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.graph;
20
21 import java.io.IOException;
22 import java.lang.management.GarbageCollectorMXBean;
23 import java.lang.management.ManagementFactory;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.Enumeration;
27 import java.util.List;
28 import java.util.concurrent.Callable;
29 import java.util.concurrent.TimeUnit;
30
31 import com.sun.management.GarbageCollectionNotificationInfo;
32 import com.yammer.metrics.core.Counter;
33
34 import org.apache.commons.lang3.exception.ExceptionUtils;
35 import org.apache.giraph.bsp.BspService;
36 import org.apache.giraph.bsp.CentralizedServiceMaster;
37 import org.apache.giraph.bsp.CentralizedServiceWorker;
38 import org.apache.giraph.bsp.checkpoints.CheckpointStatus;
39 import org.apache.giraph.comm.messages.MessageStore;
40 import org.apache.giraph.conf.ClassConfOption;
41 import org.apache.giraph.conf.GiraphConstants;
42 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
43 import org.apache.giraph.job.JobProgressTracker;
44 import org.apache.giraph.master.BspServiceMaster;
45 import org.apache.giraph.master.MasterThread;
46 import org.apache.giraph.metrics.GiraphMetrics;
47 import org.apache.giraph.metrics.GiraphMetricsRegistry;
48 import org.apache.giraph.metrics.GiraphTimer;
49 import org.apache.giraph.metrics.GiraphTimerContext;
50 import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
51 import org.apache.giraph.metrics.SuperstepMetricsRegistry;
52 import org.apache.giraph.ooc.OutOfCoreEngine;
53 import org.apache.giraph.partition.PartitionOwner;
54 import org.apache.giraph.partition.PartitionStats;
55 import org.apache.giraph.partition.PartitionStore;
56 import org.apache.giraph.scripting.ScriptLoader;
57 import org.apache.giraph.utils.CallableFactory;
58 import org.apache.giraph.utils.GcObserver;
59 import org.apache.giraph.utils.MemoryUtils;
60 import org.apache.giraph.utils.ProgressableUtils;
61 import org.apache.giraph.worker.BspServiceWorker;
62 import org.apache.giraph.worker.InputSplitsCallable;
63 import org.apache.giraph.worker.WorkerContext;
64 import org.apache.giraph.worker.WorkerObserver;
65 import org.apache.giraph.worker.WorkerProgress;
66 import org.apache.giraph.writable.kryo.KryoWritableWrapper;
67 import org.apache.giraph.zk.ZooKeeperManager;
68 import org.apache.hadoop.conf.Configuration;
69 import org.apache.hadoop.fs.Path;
70 import org.apache.hadoop.io.Writable;
71 import org.apache.hadoop.io.WritableComparable;
72 import org.apache.hadoop.mapreduce.Mapper;
73 import org.apache.log4j.Appender;
74 import org.apache.log4j.Level;
75 import org.apache.log4j.LogManager;
76 import org.apache.log4j.Logger;
77 import org.apache.log4j.PatternLayout;
78
79 import javax.management.Notification;
80 import javax.management.NotificationEmitter;
81 import javax.management.NotificationListener;
82 import javax.management.openmbean.CompositeData;
83
84
85
86
87
88
89
90
91
92
93
94
95 @SuppressWarnings("rawtypes")
96 public class GraphTaskManager<I extends WritableComparable, V extends Writable,
97 E extends Writable> implements
98 ResetSuperstepMetricsObserver {
99
100
101
102
103
104
105
106
107
108 public static final ClassConfOption<CheckerIfWorkerShouldFailAfterException>
109 CHECKER_IF_WORKER_SHOULD_FAIL_AFTER_EXCEPTION_CLASS = ClassConfOption.create(
110 "giraph.checkerIfWorkerShouldFailAfterExceptionClass",
111 FailWithEveryExceptionOccurred.class,
112 CheckerIfWorkerShouldFailAfterException.class,
113 "Class which checks if an exception on some thread should cause worker " +
114 "to fail, by default all exceptions cause failure");
115
116 public static final String TIMER_SUPERSTEP_TIME = "superstep-time-ms";
117
118 public static final String TIMER_COMPUTE_ALL = "compute-all-ms";
119
120 public static final String TIMER_TIME_TO_FIRST_MSG =
121 "time-to-first-message-ms";
122
123 public static final String TIMER_COMMUNICATION_TIME = "communication-time-ms";
124
125 public static final String TIMER_SUPERSTEP_GC_TIME = "superstep-gc-time-ms";
126
127
128 private static final Logger LOG = Logger.getLogger(GraphTaskManager.class);
129
130 private CentralizedServiceWorker<I, V, E> serviceWorker;
131
132 private CentralizedServiceMaster<I, V, E> serviceMaster;
133
134 private Thread masterThread = null;
135
136 private boolean alreadyRun = false;
137
138 private ZooKeeperManager zkManager;
139
140 private ImmutableClassesGiraphConfiguration<I, V, E> conf;
141
142 private boolean done = false;
143
144 private GraphFunctions graphFunctions = GraphFunctions.UNKNOWN;
145
146 private FinishedSuperstepStats finishedSuperstepStats =
147 new FinishedSuperstepStats(0, false, 0, 0, false, CheckpointStatus.NONE);
148
149 private JobProgressTrackerClient jobProgressTracker;
150
151
152
153 private GiraphTimer wcPreAppTimer;
154
155 private GiraphTimer wcPostAppTimer;
156
157
158
159 private GiraphTimer superstepTimer;
160
161 private GiraphTimer computeAll;
162
163 private GiraphTimer timeToFirstMessage;
164
165 private GiraphTimerContext timeToFirstMessageTimerContext;
166
167 private GiraphTimer communicationTimer;
168
169 private GiraphTimerContext communicationTimerContext;
170
171 private GiraphTimer wcPreSuperstepTimer;
172
173 private Counter gcTimeMetric;
174
175 private final Mapper<?, ?, ?, ?>.Context context;
176
177 private boolean isMaster;
178
179 private MapperObserver[] mapperObservers;
180
181
182
183
184
185
186 public GraphTaskManager(Mapper<?, ?, ?, ?>.Context context) {
187 this.context = context;
188 this.isMaster = false;
189 }
190
191
192
193
194 private void checkInput() {
195 if (conf.hasEdgeInputFormat()) {
196 conf.createWrappedEdgeInputFormat().checkInputSpecs(conf);
197 }
198 if (conf.hasVertexInputFormat()) {
199 conf.createWrappedVertexInputFormat().checkInputSpecs(conf);
200 }
201 }
202
203
204
205
206
207
208
209
210 private void createZooKeeperCounter(String serverPortList) {
211
212 context.getCounter(GiraphConstants.ZOOKEEPER_SERVER_PORT_COUNTER_GROUP,
213 serverPortList);
214 }
215
216
217
218
219
220
221 public void setup(Path[] zkPathList)
222 throws IOException, InterruptedException {
223 context.setStatus("setup: Beginning worker setup.");
224 Configuration hadoopConf = context.getConfiguration();
225 conf = new ImmutableClassesGiraphConfiguration<I, V, E>(hadoopConf);
226 initializeJobProgressTracker();
227
228 Thread.setDefaultUncaughtExceptionHandler(createUncaughtExceptionHandler());
229 setupMapperObservers();
230
231
232
233 conf.getGiraphTypes().writeIfUnset(conf);
234
235 initializeAndConfigureLogging();
236
237 setupAndInitializeGiraphMetrics();
238
239 checkInput();
240
241 ScriptLoader.loadScripts(conf);
242
243 conf.createComputationFactory().initialize(conf);
244
245 context.setStatus("setup: Initializing Zookeeper services.");
246 String serverPortList = conf.getZookeeperList();
247 if (serverPortList.isEmpty()) {
248 if (startZooKeeperManager()) {
249 return;
250 }
251 } else {
252 createZooKeeperCounter(serverPortList);
253 }
254 if (zkManager != null && zkManager.runsZooKeeper()) {
255 if (LOG.isInfoEnabled()) {
256 LOG.info("setup: Chosen to run ZooKeeper...");
257 }
258 }
259 context
260 .setStatus("setup: Connected to Zookeeper service " + serverPortList);
261 this.graphFunctions = determineGraphFunctions(conf, zkManager);
262 if (zkManager != null && this.graphFunctions.isMaster()) {
263 zkManager.cleanupOnExit();
264 }
265 try {
266 instantiateBspService();
267 } catch (IOException e) {
268 LOG.error("setup: Caught exception just before end of setup", e);
269 if (zkManager != null) {
270 zkManager.offlineZooKeeperServers(ZooKeeperManager.State.FAILED);
271 }
272 throw new RuntimeException(
273 "setup: Offlining servers due to exception...", e);
274 }
275 context.setStatus(getGraphFunctions().toString() + " starting...");
276 }
277
278
279
280
281
282
283 private void initializeJobProgressTracker() {
284 if (!conf.trackJobProgressOnClient()) {
285 jobProgressTracker = new JobProgressTrackerClientNoOp();
286 } else {
287 jobProgressTracker =
288 GiraphConstants.JOB_PROGRESS_TRACKER_CLIENT_CLASS.newInstance(conf);
289 try {
290 jobProgressTracker.init(conf);
291
292 } catch (Exception e) {
293
294 throw new RuntimeException(
295 "Failed to initialize JobProgressTrackerClient", e);
296 }
297 }
298 jobProgressTracker.mapperStarted();
299 }
300
301
302
303
304
305
306
307
308
309 public void execute() throws IOException, InterruptedException {
310 if (checkTaskState()) {
311 return;
312 }
313 preLoadOnWorkerObservers();
314 GiraphTimerContext superstepTimerContext = superstepTimer.time();
315 finishedSuperstepStats = serviceWorker.setup();
316 superstepTimerContext.stop();
317 if (collectInputSuperstepStats(finishedSuperstepStats)) {
318 return;
319 }
320 prepareGraphStateAndWorkerContext();
321 List<PartitionStats> partitionStatsList = new ArrayList<PartitionStats>();
322 int numComputeThreads = conf.getNumComputeThreads();
323
324
325 while (!finishedSuperstepStats.allVerticesHalted()) {
326 final long superstep = serviceWorker.getSuperstep();
327 superstepTimerContext = getTimerForThisSuperstep(superstep);
328 GraphState graphState = new GraphState(superstep,
329 finishedSuperstepStats.getVertexCount(),
330 finishedSuperstepStats.getEdgeCount(),
331 context);
332 Collection<? extends PartitionOwner> masterAssignedPartitionOwners =
333 serviceWorker.startSuperstep();
334 if (LOG.isDebugEnabled()) {
335 LOG.debug("execute: " + MemoryUtils.getRuntimeMemoryStats());
336 }
337 context.progress();
338 serviceWorker.exchangeVertexPartitions(masterAssignedPartitionOwners);
339 context.progress();
340 boolean hasBeenRestarted = checkSuperstepRestarted(superstep);
341
342 GlobalStats globalStats = serviceWorker.getGlobalStats();
343
344 if (hasBeenRestarted) {
345 graphState = new GraphState(superstep,
346 finishedSuperstepStats.getVertexCount(),
347 finishedSuperstepStats.getEdgeCount(),
348 context);
349 } else if (storeCheckpoint(globalStats.getCheckpointStatus())) {
350 break;
351 }
352 serviceWorker.getServerData().prepareResolveMutations();
353 context.progress();
354 prepareForSuperstep(graphState);
355 context.progress();
356 MessageStore<I, Writable> messageStore =
357 serviceWorker.getServerData().getCurrentMessageStore();
358 int numPartitions = serviceWorker.getPartitionStore().getNumPartitions();
359 int numThreads = Math.min(numComputeThreads, numPartitions);
360 if (LOG.isInfoEnabled()) {
361 LOG.info("execute: " + numPartitions + " partitions to process with " +
362 numThreads + " compute thread(s), originally " +
363 numComputeThreads + " thread(s) on superstep " + superstep);
364 }
365 partitionStatsList.clear();
366
367 if (numPartitions > 0) {
368 processGraphPartitions(context, partitionStatsList, graphState,
369 messageStore, numThreads);
370 }
371 finishedSuperstepStats = completeSuperstepAndCollectStats(
372 partitionStatsList, superstepTimerContext);
373
374
375 }
376
377 if (LOG.isInfoEnabled()) {
378 LOG.info("execute: BSP application done (global vertices marked done)");
379 }
380 updateSuperstepGraphState();
381 postApplication();
382 }
383
384
385
386
387 private void postApplication() throws IOException, InterruptedException {
388 GiraphTimerContext postAppTimerContext = wcPostAppTimer.time();
389 serviceWorker.getWorkerContext().postApplication();
390 serviceWorker.getSuperstepOutput().postApplication();
391 postAppTimerContext.stop();
392 context.progress();
393
394 for (WorkerObserver obs : serviceWorker.getWorkerObservers()) {
395 obs.postApplication();
396 context.progress();
397 }
398 }
399
400
401
402
403
404 public void setIsMaster(final boolean im) {
405 this.isMaster = im;
406 }
407
408
409
410
411
412
413 public boolean isMaster() {
414 return isMaster;
415 }
416
417
418
419
420
421
422
423 private GiraphTimerContext getTimerForThisSuperstep(long superstep) {
424 GiraphMetrics.get().resetSuperstepMetrics(superstep);
425 return superstepTimer.time();
426 }
427
428
429
430
431 private void setupAndInitializeGiraphMetrics() {
432 GiraphMetrics.init(conf);
433 GiraphMetrics.get().addSuperstepResetObserver(this);
434 initJobMetrics();
435 MemoryUtils.initMetrics();
436 InputSplitsCallable.initMetrics();
437 }
438
439
440
441
442
443
444
445 private boolean startZooKeeperManager()
446 throws IOException, InterruptedException {
447 zkManager = new ZooKeeperManager(context, conf);
448 context.setStatus("setup: Setting up Zookeeper manager.");
449 zkManager.setup();
450 if (zkManager.computationDone()) {
451 done = true;
452 return true;
453 }
454 zkManager.onlineZooKeeperServer();
455 String serverPortList = zkManager.getZooKeeperServerPortString();
456 conf.setZookeeperList(serverPortList);
457 createZooKeeperCounter(serverPortList);
458 return false;
459 }
460
461
462
463
464 private void updateSuperstepGraphState() {
465 serviceWorker.getWorkerContext().setGraphState(
466 new GraphState(serviceWorker.getSuperstep(),
467 finishedSuperstepStats.getVertexCount(),
468 finishedSuperstepStats.getEdgeCount(), context));
469 }
470
471
472
473
474
475
476
477
478 private FinishedSuperstepStats completeSuperstepAndCollectStats(
479 List<PartitionStats> partitionStatsList,
480 GiraphTimerContext superstepTimerContext) {
481
482
483
484
485 finishedSuperstepStats =
486 serviceWorker.finishSuperstep(partitionStatsList, superstepTimerContext);
487 if (conf.metricsEnabled()) {
488 GiraphMetrics.get().perSuperstep().printSummary(System.err);
489 }
490 return finishedSuperstepStats;
491 }
492
493
494
495
496
497
498 private void prepareForSuperstep(GraphState graphState) {
499 serviceWorker.prepareSuperstep();
500
501 serviceWorker.getWorkerContext().setGraphState(graphState);
502 serviceWorker.getWorkerContext().setupSuperstep(serviceWorker);
503 GiraphTimerContext preSuperstepTimer = wcPreSuperstepTimer.time();
504 serviceWorker.getWorkerContext().preSuperstep();
505 preSuperstepTimer.stop();
506 context.progress();
507
508 for (WorkerObserver obs : serviceWorker.getWorkerObservers()) {
509 obs.preSuperstep(graphState.getSuperstep());
510 context.progress();
511 }
512 }
513
514
515
516
517 private void prepareGraphStateAndWorkerContext() {
518 updateSuperstepGraphState();
519 workerContextPreApp();
520 }
521
522
523
524
525
526
527
528 public GraphFunctions getGraphFunctions() {
529 return graphFunctions;
530 }
531
532 public final WorkerContext getWorkerContext() {
533 return serviceWorker.getWorkerContext();
534 }
535
536 public JobProgressTracker getJobProgressTracker() {
537 return jobProgressTracker;
538 }
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556 private static GraphFunctions determineGraphFunctions(
557 ImmutableClassesGiraphConfiguration conf,
558 ZooKeeperManager zkManager) {
559 boolean splitMasterWorker = conf.getSplitMasterWorker();
560 int taskPartition = conf.getTaskPartition();
561 boolean zkAlreadyProvided = conf.isZookeeperExternal();
562 GraphFunctions functions = GraphFunctions.UNKNOWN;
563
564 if (!splitMasterWorker) {
565 if ((zkManager != null) && zkManager.runsZooKeeper()) {
566 functions = GraphFunctions.ALL;
567 } else {
568 functions = GraphFunctions.ALL_EXCEPT_ZOOKEEPER;
569 }
570 } else {
571 if (zkAlreadyProvided) {
572 if (taskPartition == 0) {
573 functions = GraphFunctions.MASTER_ONLY;
574 } else {
575 functions = GraphFunctions.WORKER_ONLY;
576 }
577 } else {
578 if ((zkManager != null) && zkManager.runsZooKeeper()) {
579 functions = GraphFunctions.MASTER_ZOOKEEPER_ONLY;
580 } else {
581 functions = GraphFunctions.WORKER_ONLY;
582 }
583 }
584 }
585 return functions;
586 }
587
588
589
590
591
592 private void instantiateBspService()
593 throws IOException, InterruptedException {
594 if (graphFunctions.isMaster()) {
595 if (LOG.isInfoEnabled()) {
596 LOG.info("setup: Starting up BspServiceMaster " +
597 "(master thread)...");
598 }
599 serviceMaster = new BspServiceMaster<I, V, E>(context, this);
600 masterThread = new MasterThread<I, V, E>(serviceMaster, context);
601 masterThread.setUncaughtExceptionHandler(
602 createUncaughtExceptionHandler());
603 masterThread.start();
604 }
605 if (graphFunctions.isWorker()) {
606 if (LOG.isInfoEnabled()) {
607 LOG.info("setup: Starting up BspServiceWorker...");
608 }
609 serviceWorker = new BspServiceWorker<I, V, E>(context, this);
610 installGCMonitoring();
611 if (LOG.isInfoEnabled()) {
612 LOG.info("setup: Registering health of this worker...");
613 }
614 }
615 }
616
617
618
619
620
621 private void installGCMonitoring() {
622 final GcObserver[] gcObservers = conf.createGcObservers(context);
623 List<GarbageCollectorMXBean> mxBeans = ManagementFactory
624 .getGarbageCollectorMXBeans();
625 final OutOfCoreEngine oocEngine =
626 serviceWorker.getServerData().getOocEngine();
627 for (GarbageCollectorMXBean gcBean : mxBeans) {
628 NotificationEmitter emitter = (NotificationEmitter) gcBean;
629 NotificationListener listener = new NotificationListener() {
630 @Override
631 public void handleNotification(Notification notification,
632 Object handle) {
633 if (notification.getType().equals(GarbageCollectionNotificationInfo
634 .GARBAGE_COLLECTION_NOTIFICATION)) {
635 GarbageCollectionNotificationInfo info =
636 GarbageCollectionNotificationInfo.from(
637 (CompositeData) notification.getUserData());
638
639 if (LOG.isInfoEnabled()) {
640 LOG.info("installGCMonitoring: name = " + info.getGcName() +
641 ", action = " + info.getGcAction() + ", cause = " +
642 info.getGcCause() + ", duration = " +
643 info.getGcInfo().getDuration() + "ms");
644 }
645 gcTimeMetric.inc(info.getGcInfo().getDuration());
646 GiraphMetrics.get().getGcTracker().gcOccurred(info.getGcInfo());
647 for (GcObserver gcObserver : gcObservers) {
648 gcObserver.gcOccurred(info);
649 }
650 if (oocEngine != null) {
651 oocEngine.gcCompleted(info);
652 }
653 }
654 }
655 };
656
657 emitter.addNotificationListener(listener, null, null);
658 }
659 }
660
661
662
663
664 private void initializeAndConfigureLogging() {
665
666 String logLevel = conf.getLocalLevel();
667 if (!Logger.getRootLogger().getLevel().equals(Level.toLevel(logLevel))) {
668 Logger.getRootLogger().setLevel(Level.toLevel(logLevel));
669 if (LOG.isInfoEnabled()) {
670 LOG.info("setup: Set log level to " + logLevel);
671 }
672 } else {
673 if (LOG.isInfoEnabled()) {
674 LOG.info("setup: Log level remains at " + logLevel);
675 }
676 }
677
678 if (conf.useLogThreadLayout()) {
679 PatternLayout layout =
680 new PatternLayout("%-7p %d [%t] %c %x - %m%n");
681 Enumeration<Appender> appenderEnum =
682 Logger.getRootLogger().getAllAppenders();
683 while (appenderEnum.hasMoreElements()) {
684 appenderEnum.nextElement().setLayout(layout);
685 }
686 }
687
688
689 if (conf.getLocalTestMode()) {
690 LogManager.getLogger(org.apache.zookeeper.server.PrepRequestProcessor.
691 class.getName()).setLevel(Level.ERROR);
692 }
693 }
694
695
696
697
698 private void initJobMetrics() {
699 GiraphMetricsRegistry jobMetrics = GiraphMetrics.get().perJobOptional();
700 wcPreAppTimer = new GiraphTimer(jobMetrics, "worker-context-pre-app",
701 TimeUnit.MILLISECONDS);
702 wcPostAppTimer = new GiraphTimer(jobMetrics, "worker-context-post-app",
703 TimeUnit.MILLISECONDS);
704 }
705
706 @Override
707 public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
708 superstepTimer = new GiraphTimer(superstepMetrics,
709 TIMER_SUPERSTEP_TIME, TimeUnit.MILLISECONDS);
710 computeAll = new GiraphTimer(superstepMetrics,
711 TIMER_COMPUTE_ALL, TimeUnit.MILLISECONDS);
712 timeToFirstMessage = new GiraphTimer(superstepMetrics,
713 TIMER_TIME_TO_FIRST_MSG, TimeUnit.MICROSECONDS);
714 communicationTimer = new GiraphTimer(superstepMetrics,
715 TIMER_COMMUNICATION_TIME, TimeUnit.MILLISECONDS);
716 gcTimeMetric = superstepMetrics.getCounter(TIMER_SUPERSTEP_GC_TIME);
717 wcPreSuperstepTimer = new GiraphTimer(superstepMetrics,
718 "worker-context-pre-superstep", TimeUnit.MILLISECONDS);
719 }
720
721
722
723
724 public void notifySentMessages() {
725
726
727 GiraphTimerContext tmp = timeToFirstMessageTimerContext;
728 if (tmp != null) {
729 synchronized (timeToFirstMessage) {
730 if (timeToFirstMessageTimerContext != null) {
731 timeToFirstMessageTimerContext.stop();
732 timeToFirstMessageTimerContext = null;
733 communicationTimerContext = communicationTimer.time();
734 }
735 }
736 }
737 }
738
739
740
741
742
743 public void notifyFinishedCommunication() {
744 GiraphTimerContext tmp = communicationTimerContext;
745 if (tmp != null) {
746 synchronized (communicationTimer) {
747 if (communicationTimerContext != null) {
748 communicationTimerContext.stop();
749 communicationTimerContext = null;
750 }
751 }
752 }
753 }
754
755
756
757
758
759
760
761
762
763 private void processGraphPartitions(final Mapper<?, ?, ?, ?>.Context context,
764 List<PartitionStats> partitionStatsList,
765 final GraphState graphState,
766 final MessageStore<I, Writable> messageStore,
767 int numThreads) {
768 PartitionStore<I, V, E> partitionStore = serviceWorker.getPartitionStore();
769 long verticesToCompute = 0;
770 for (Integer partitionId : partitionStore.getPartitionIds()) {
771 verticesToCompute += partitionStore.getPartitionVertexCount(partitionId);
772 }
773 WorkerProgress.get().startSuperstep(
774 serviceWorker.getSuperstep(), verticesToCompute,
775 serviceWorker.getPartitionStore().getNumPartitions());
776 partitionStore.startIteration();
777
778 GiraphTimerContext computeAllTimerContext = computeAll.time();
779 timeToFirstMessageTimerContext = timeToFirstMessage.time();
780
781 CallableFactory<Collection<PartitionStats>> callableFactory =
782 new CallableFactory<Collection<PartitionStats>>() {
783 @Override
784 public Callable<Collection<PartitionStats>> newCallable(
785 int callableId) {
786 return new ComputeCallable<I, V, E, Writable, Writable>(
787 context,
788 graphState,
789 messageStore,
790 conf,
791 serviceWorker);
792 }
793 };
794 List<Collection<PartitionStats>> results =
795 ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
796 "compute-%d", context);
797
798 for (Collection<PartitionStats> result : results) {
799 partitionStatsList.addAll(result);
800 }
801
802 computeAllTimerContext.stop();
803 }
804
805
806
807
808
809
810 private boolean checkSuperstepRestarted(long superstep) throws IOException {
811
812
813 if (serviceWorker.getRestartedSuperstep() == superstep) {
814 if (LOG.isInfoEnabled()) {
815 LOG.info("execute: Loading from checkpoint " + superstep);
816 }
817 VertexEdgeCount vertexEdgeCount = serviceWorker.loadCheckpoint(
818 serviceWorker.getRestartedSuperstep());
819 finishedSuperstepStats = new FinishedSuperstepStats(0, false,
820 vertexEdgeCount.getVertexCount(), vertexEdgeCount.getEdgeCount(),
821 false, CheckpointStatus.NONE);
822 return true;
823 }
824 return false;
825 }
826
827
828
829
830
831
832
833
834 private boolean storeCheckpoint(CheckpointStatus checkpointStatus)
835 throws IOException {
836 if (checkpointStatus != CheckpointStatus.NONE) {
837 serviceWorker.storeCheckpoint();
838 }
839 return checkpointStatus == CheckpointStatus.CHECKPOINT_AND_HALT;
840 }
841
842
843
844
845
846
847
848
849
850 private boolean collectInputSuperstepStats(
851 FinishedSuperstepStats inputSuperstepStats) {
852 if (inputSuperstepStats.getVertexCount() == 0 &&
853 !inputSuperstepStats.mustLoadCheckpoint()) {
854 LOG.warn("map: No vertices in the graph, exiting.");
855 return true;
856 }
857 if (conf.metricsEnabled()) {
858 GiraphMetrics.get().perSuperstep().printSummary(System.err);
859 }
860 return false;
861 }
862
863
864
865
866
867 private boolean checkTaskState() {
868 if (done) {
869 return true;
870 }
871 GiraphMetrics.get().resetSuperstepMetrics(BspService.INPUT_SUPERSTEP);
872 if (graphFunctions.isNotAWorker()) {
873 if (LOG.isInfoEnabled()) {
874 LOG.info("map: No need to do anything when not a worker");
875 }
876 return true;
877 }
878 if (alreadyRun) {
879 throw new RuntimeException("map: In BSP, map should have only been" +
880 " run exactly once, (already run)");
881 }
882 alreadyRun = true;
883 return false;
884 }
885
886
887
888
889 private void workerContextPreApp() {
890 GiraphTimerContext preAppTimerContext = wcPreAppTimer.time();
891 try {
892 serviceWorker.getWorkerContext().preApplication();
893 } catch (InstantiationException e) {
894 LOG.fatal("execute: preApplication failed in instantiation", e);
895 throw new RuntimeException(
896 "execute: preApplication failed in instantiation", e);
897 } catch (IllegalAccessException e) {
898 LOG.fatal("execute: preApplication failed in access", e);
899 throw new RuntimeException(
900 "execute: preApplication failed in access", e);
901 }
902 preAppTimerContext.stop();
903 context.progress();
904
905 for (WorkerObserver obs : serviceWorker.getWorkerObservers()) {
906 obs.preApplication();
907 context.progress();
908 }
909 }
910
911
912
913
914 public void setupMapperObservers() {
915 mapperObservers = conf.createMapperObservers(context);
916 for (MapperObserver mapperObserver : mapperObservers) {
917 mapperObserver.setup();
918 }
919 }
920
921
922
923
924 private void preLoadOnWorkerObservers() {
925 for (WorkerObserver obs : serviceWorker.getWorkerObservers()) {
926 obs.preLoad();
927 context.progress();
928 }
929 }
930
931
932
933
934 private void postSaveOnWorkerObservers() {
935 for (WorkerObserver obs : serviceWorker.getWorkerObservers()) {
936 obs.postSave();
937 context.progress();
938 }
939 }
940
941
942
943
944 public void cleanup()
945 throws IOException, InterruptedException {
946 if (LOG.isInfoEnabled()) {
947 LOG.info("cleanup: Starting for " + getGraphFunctions());
948 }
949 jobProgressTracker.cleanup();
950 if (done) {
951 return;
952 }
953
954 if (serviceWorker != null) {
955 serviceWorker.cleanup(finishedSuperstepStats);
956 }
957 }
958
959
960
961
962
963 public void sendWorkerCountersAndFinishCleanup() {
964 if (serviceWorker != null) {
965 postSaveOnWorkerObservers();
966 serviceWorker.storeCountersInZooKeeper(true);
967 serviceWorker.closeZooKeeper();
968 }
969 try {
970 if (masterThread != null) {
971 masterThread.join();
972 LOG.info("cleanup: Joined with master thread");
973 }
974 } catch (InterruptedException e) {
975
976 LOG.error("cleanup: Master thread couldn't join");
977 }
978 if (zkManager != null) {
979 LOG.info("cleanup: Offlining ZooKeeper servers");
980 try {
981 zkManager.offlineZooKeeperServers(ZooKeeperManager.State.FINISHED);
982
983
984
985
986
987
988
989 } catch (Throwable e) {
990
991 LOG.error("cleanup: Error offlining zookeeper", e);
992 }
993 }
994
995
996 GiraphMetrics.get().shutdown();
997 }
998
999
1000
1001
1002
1003 public void zooKeeperCleanup() {
1004 if (graphFunctions.isZooKeeper()) {
1005
1006 if (zkManager != null) {
1007 zkManager.cleanup();
1008 }
1009 }
1010 }
1011
1012
1013
1014
1015
1016 public void workerFailureCleanup() {
1017 try {
1018 if (graphFunctions.isWorker()) {
1019 serviceWorker.failureCleanup();
1020 }
1021
1022 GiraphMetrics.get().shutdown();
1023
1024
1025
1026 } catch (RuntimeException e1) {
1027
1028 LOG.error("run: Worker failure failed on another RuntimeException, " +
1029 "original expection will be rethrown", e1);
1030 }
1031 }
1032
1033
1034
1035
1036
1037
1038 public Thread.UncaughtExceptionHandler createUncaughtExceptionHandler() {
1039 return new OverrideExceptionHandler(
1040 CHECKER_IF_WORKER_SHOULD_FAIL_AFTER_EXCEPTION_CLASS.newInstance(
1041 getConf()), getJobProgressTracker());
1042 }
1043
1044
1045
1046
1047
1048
1049
1050
1051 public Thread.UncaughtExceptionHandler createUncaughtExceptionHandler(
1052 CheckerIfWorkerShouldFailAfterException checker) {
1053 return new OverrideExceptionHandler(checker, getJobProgressTracker());
1054 }
1055
1056 public ImmutableClassesGiraphConfiguration<I, V, E> getConf() {
1057 return conf;
1058 }
1059
1060
1061
1062
1063 public long getSuperstepGCTime() {
1064 return (gcTimeMetric == null) ? 0 : gcTimeMetric.count();
1065 }
1066
1067
1068
1069
1070
1071
1072
1073
1074 public String getZookeeperList() {
1075 if (zkManager != null) {
1076 return zkManager.getZooKeeperServerPortString();
1077 } else {
1078 return conf.getZookeeperList();
1079 }
1080 }
1081
1082
1083
1084
1085
1086 class OverrideExceptionHandler implements Thread.UncaughtExceptionHandler {
1087
1088 private final CheckerIfWorkerShouldFailAfterException checker;
1089
1090 private final JobProgressTracker jobProgressTracker;
1091
1092
1093
1094
1095
1096
1097
1098
1099 public OverrideExceptionHandler(
1100 CheckerIfWorkerShouldFailAfterException checker,
1101 JobProgressTracker jobProgressTracker) {
1102 this.checker = checker;
1103 this.jobProgressTracker = jobProgressTracker;
1104 }
1105
1106 @Override
1107 public void uncaughtException(final Thread t, final Throwable e) {
1108 if (!checker.checkIfWorkerShouldFail(t, e)) {
1109 LOG.error(
1110 "uncaughtException: OverrideExceptionHandler on thread " +
1111 t.getName() + ", msg = " + e.getMessage(), e);
1112 return;
1113 }
1114 try {
1115 LOG.fatal(
1116 "uncaughtException: OverrideExceptionHandler on thread " +
1117 t.getName() + ", msg = " + e.getMessage() + ", exiting...", e);
1118 byte [] exByteArray = KryoWritableWrapper.convertToByteArray(e);
1119 jobProgressTracker.logError(ExceptionUtils.getStackTrace(e),
1120 exByteArray);
1121 zooKeeperCleanup();
1122 workerFailureCleanup();
1123 } finally {
1124 System.exit(1);
1125 }
1126 }
1127 }
1128
1129
1130
1131
1132 public interface CheckerIfWorkerShouldFailAfterException {
1133
1134
1135
1136
1137
1138
1139
1140 boolean checkIfWorkerShouldFail(Thread thread, Throwable exception);
1141 }
1142
1143
1144
1145
1146 public static class FailWithEveryExceptionOccurred
1147 implements CheckerIfWorkerShouldFailAfterException {
1148 @Override
1149 public boolean checkIfWorkerShouldFail(Thread thread, Throwable exception) {
1150 return true;
1151 }
1152 }
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162 public static boolean isConnectionResetByPeer(Throwable throwable) {
1163 return throwable.getMessage().startsWith(
1164 "Connection reset by peer") ? true : false;
1165 }
1166 }