This project has retired. For details please refer to its
Attic page.
BspServiceMaster xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.master;
20
21 import com.google.common.collect.Lists;
22 import com.google.common.collect.Sets;
23 import net.iharder.Base64;
24 import org.apache.commons.io.FilenameUtils;
25 import org.apache.giraph.bsp.ApplicationState;
26 import org.apache.giraph.bsp.BspInputFormat;
27 import org.apache.giraph.bsp.BspService;
28 import org.apache.giraph.bsp.CentralizedServiceMaster;
29 import org.apache.giraph.bsp.SuperstepState;
30 import org.apache.giraph.bsp.checkpoints.CheckpointStatus;
31 import org.apache.giraph.bsp.checkpoints.CheckpointSupportedChecker;
32 import org.apache.giraph.comm.MasterClient;
33 import org.apache.giraph.comm.MasterServer;
34 import org.apache.giraph.comm.netty.NettyClient;
35 import org.apache.giraph.comm.netty.NettyMasterClient;
36 import org.apache.giraph.comm.netty.NettyMasterServer;
37 import org.apache.giraph.comm.requests.AddressesAndPartitionsRequest;
38 import org.apache.giraph.conf.GiraphConfiguration;
39 import org.apache.giraph.conf.GiraphConstants;
40 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
41 import org.apache.giraph.counters.CustomCounter;
42 import org.apache.giraph.counters.CustomCounters;
43 import org.apache.giraph.counters.GiraphCountersThriftStruct;
44 import org.apache.giraph.counters.GiraphStats;
45 import org.apache.giraph.counters.GiraphTimers;
46 import org.apache.giraph.graph.AddressesAndPartitionsWritable;
47 import org.apache.giraph.graph.GlobalStats;
48 import org.apache.giraph.graph.GraphFunctions;
49 import org.apache.giraph.graph.GraphState;
50 import org.apache.giraph.graph.GraphTaskManager;
51 import org.apache.giraph.io.EdgeInputFormat;
52 import org.apache.giraph.io.GiraphInputFormat;
53 import org.apache.giraph.io.InputType;
54 import org.apache.giraph.io.MappingInputFormat;
55 import org.apache.giraph.io.VertexInputFormat;
56 import org.apache.giraph.master.input.MasterInputSplitsHandler;
57 import org.apache.giraph.metrics.AggregatedMetrics;
58 import org.apache.giraph.metrics.GiraphMetrics;
59 import org.apache.giraph.metrics.GiraphTimer;
60 import org.apache.giraph.metrics.GiraphTimerContext;
61 import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
62 import org.apache.giraph.metrics.SuperstepMetricsRegistry;
63 import org.apache.giraph.metrics.WorkerSuperstepMetrics;
64 import org.apache.giraph.partition.BasicPartitionOwner;
65 import org.apache.giraph.partition.MasterGraphPartitioner;
66 import org.apache.giraph.partition.PartitionOwner;
67 import org.apache.giraph.partition.PartitionStats;
68 import org.apache.giraph.partition.PartitionUtils;
69 import org.apache.giraph.time.SystemTime;
70 import org.apache.giraph.time.Time;
71 import org.apache.giraph.utils.CheckpointingUtils;
72 import org.apache.giraph.utils.JMapHistoDumper;
73 import org.apache.giraph.utils.ReactiveJMapHistoDumper;
74 import org.apache.giraph.utils.ReflectionUtils;
75 import org.apache.giraph.utils.WritableUtils;
76 import org.apache.giraph.worker.WorkerInfo;
77 import org.apache.giraph.zk.BspEvent;
78 import org.apache.giraph.zk.PredicateLock;
79 import org.apache.hadoop.fs.FSDataOutputStream;
80 import org.apache.hadoop.fs.FileSystem;
81 import org.apache.hadoop.fs.Path;
82 import org.apache.hadoop.io.Writable;
83 import org.apache.hadoop.io.WritableComparable;
84 import org.apache.hadoop.mapred.JobID;
85 import org.apache.hadoop.mapred.RunningJob;
86 import org.apache.hadoop.mapreduce.Counter;
87 import org.apache.hadoop.mapreduce.InputSplit;
88 import org.apache.hadoop.mapreduce.Mapper;
89 import org.apache.log4j.Logger;
90 import org.apache.zookeeper.CreateMode;
91 import org.apache.zookeeper.KeeperException;
92 import org.apache.zookeeper.WatchedEvent;
93 import org.apache.zookeeper.Watcher.Event.EventType;
94 import org.apache.zookeeper.ZooDefs.Ids;
95 import org.json.JSONArray;
96 import org.json.JSONException;
97 import org.json.JSONObject;
98
99 import java.io.DataInputStream;
100 import java.io.IOException;
101 import java.io.PrintStream;
102 import java.nio.charset.Charset;
103 import java.util.ArrayList;
104 import java.util.Collection;
105 import java.util.Collections;
106 import java.util.Comparator;
107 import java.util.HashSet;
108 import java.util.List;
109 import java.util.Map;
110 import java.util.Set;
111 import java.util.TreeSet;
112 import java.util.concurrent.TimeUnit;
113
114 import static org.apache.giraph.conf.GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT;
115 import static org.apache.giraph.conf.GiraphConstants.KEEP_ZOOKEEPER_DATA;
116 import static org.apache.giraph.conf.GiraphConstants.PARTITION_LONG_TAIL_MIN_PRINT;
117
118
119
120
121
122
123
124
125 @SuppressWarnings("rawtypes, unchecked")
126 public class BspServiceMaster<I extends WritableComparable,
127 V extends Writable, E extends Writable>
128 extends BspService<I, V, E>
129 implements CentralizedServiceMaster<I, V, E>,
130 ResetSuperstepMetricsObserver {
131
132 public static final int MAX_PRINTABLE_REMAINING_WORKERS = 10;
133
134 public static final String NUM_MASTER_ZK_INPUT_SPLIT_THREADS =
135 "giraph.numMasterZkInputSplitThreads";
136
137 public static final int DEFAULT_INPUT_SPLIT_THREAD_COUNT = 1;
138
139 private static final Time TIME = SystemTime.get();
140
141 private static final Logger LOG = Logger.getLogger(BspServiceMaster.class);
142
143 private boolean isMaster = false;
144
145 private final int maxWorkers;
146
147 private final int minWorkers;
148
149 private final int maxNumberOfSupersteps;
150
151 private final float minPercentResponded;
152
153 private final int eventWaitMsecs;
154
155 private final int maxSuperstepWaitMsecs;
156
157
158 private final int maxCounterWaitMsecs;
159
160 private final int partitionLongTailMinPrint;
161
162 private long lastCheckpointedSuperstep = -1;
163
164 private final BspEvent workerWroteCheckpoint;
165
166 private final BspEvent superstepStateChanged;
167
168 private final MasterGraphPartitioner<I, V, E> masterGraphPartitioner;
169
170 private final List<PartitionStats> allPartitionStatsList =
171 new ArrayList<PartitionStats>();
172
173 private MasterGlobalCommHandler globalCommHandler;
174
175 private AggregatorToGlobalCommTranslation aggregatorTranslation;
176
177 private MasterCompute masterCompute;
178
179 private MasterClient masterClient;
180
181 private MasterServer masterServer;
182
183 private MasterInfo masterInfo;
184
185 private List<WorkerInfo> chosenWorkerInfoList = Lists.newArrayList();
186
187 private final MasterObserver[] observers;
188
189
190
191 private GiraphTimer masterComputeTimer;
192
193
194 private final int checkpointFrequency;
195
196 private CheckpointStatus checkpointStatus;
197
198 private final CheckpointSupportedChecker checkpointSupportedChecker;
199
200 private final GiraphCountersThriftStruct giraphCountersThriftStruct;
201
202
203
204
205
206
207
208 public BspServiceMaster(
209 Mapper<?, ?, ?, ?>.Context context,
210 GraphTaskManager<I, V, E> graphTaskManager) {
211 super(context, graphTaskManager);
212 workerWroteCheckpoint = new PredicateLock(context);
213 registerBspEvent(workerWroteCheckpoint);
214 superstepStateChanged = new PredicateLock(context);
215 registerBspEvent(superstepStateChanged);
216
217 ImmutableClassesGiraphConfiguration<I, V, E> conf =
218 getConfiguration();
219
220 maxWorkers = conf.getMaxWorkers();
221 minWorkers = conf.getMinWorkers();
222 maxNumberOfSupersteps = conf.getMaxNumberOfSupersteps();
223 minPercentResponded = GiraphConstants.MIN_PERCENT_RESPONDED.get(conf);
224 eventWaitMsecs = conf.getEventWaitMsecs();
225 maxSuperstepWaitMsecs = conf.getMaxMasterSuperstepWaitMsecs();
226 maxCounterWaitMsecs = conf.getMaxCounterWaitMsecs();
227 partitionLongTailMinPrint = PARTITION_LONG_TAIL_MIN_PRINT.get(conf);
228 masterGraphPartitioner =
229 getGraphPartitionerFactory().createMasterGraphPartitioner();
230 if (conf.isJMapHistogramDumpEnabled()) {
231 conf.addMasterObserverClass(JMapHistoDumper.class);
232 }
233 if (conf.isReactiveJmapHistogramDumpEnabled()) {
234 conf.addMasterObserverClass(ReactiveJMapHistoDumper.class);
235 }
236 observers = conf.createMasterObservers(context);
237
238 this.checkpointFrequency = conf.getCheckpointFrequency();
239 this.checkpointStatus = CheckpointStatus.NONE;
240 this.checkpointSupportedChecker =
241 ReflectionUtils.newInstance(
242 GiraphConstants.CHECKPOINT_SUPPORTED_CHECKER.get(conf));
243 this.giraphCountersThriftStruct = new GiraphCountersThriftStruct();
244
245 GiraphMetrics.get().addSuperstepResetObserver(this);
246 GiraphStats.init(context);
247 }
248
249 @Override
250 public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
251 masterComputeTimer = new GiraphTimer(superstepMetrics,
252 "master-compute-call", TimeUnit.MILLISECONDS);
253 }
254
255 @Override
256 public void setJobState(ApplicationState state,
257 long applicationAttempt,
258 long desiredSuperstep) {
259 setJobState(state, applicationAttempt, desiredSuperstep, true);
260 }
261
262
263
264
265
266
267
268
269
270
271 private void setJobState(ApplicationState state,
272 long applicationAttempt,
273 long desiredSuperstep,
274 boolean killJobOnFailure) {
275 JSONObject jobState = new JSONObject();
276 try {
277 jobState.put(JSONOBJ_STATE_KEY, state.toString());
278 jobState.put(JSONOBJ_APPLICATION_ATTEMPT_KEY, applicationAttempt);
279 jobState.put(JSONOBJ_SUPERSTEP_KEY, desiredSuperstep);
280 } catch (JSONException e) {
281 throw new RuntimeException("setJobState: Couldn't put " +
282 state.toString());
283 }
284 if (LOG.isInfoEnabled()) {
285 LOG.info("setJobState: " + jobState.toString() + " on superstep " +
286 getSuperstep());
287 }
288 try {
289 getZkExt().createExt(masterJobStatePath + "/jobState",
290 jobState.toString().getBytes(Charset.defaultCharset()),
291 Ids.OPEN_ACL_UNSAFE,
292 CreateMode.PERSISTENT_SEQUENTIAL,
293 true);
294 LOG.info("setJobState: " + jobState);
295 } catch (KeeperException.NodeExistsException e) {
296 throw new IllegalStateException(
297 "setJobState: Imposible that " +
298 masterJobStatePath + " already exists!", e);
299 } catch (KeeperException e) {
300 throw new IllegalStateException(
301 "setJobState: Unknown KeeperException for " +
302 masterJobStatePath, e);
303 } catch (InterruptedException e) {
304 throw new IllegalStateException(
305 "setJobState: Unknown InterruptedException for " +
306 masterJobStatePath, e);
307 }
308 if (state == ApplicationState.FAILED && killJobOnFailure) {
309 failJob(new IllegalStateException("FAILED"));
310 }
311
312 }
313
314
315
316
317
318
319
320 private void setJobStateFailed(String reason) {
321 getGraphTaskManager().getJobProgressTracker().logFailure(reason);
322 setJobState(ApplicationState.FAILED, -1, -1, false);
323 failJob(new IllegalStateException(reason));
324 }
325
326
327
328
329
330
331
332
333
334 private List<InputSplit> generateInputSplits(GiraphInputFormat inputFormat,
335 int minSplitCountHint,
336 InputType inputSplitType) {
337 String logPrefix = "generate" + inputSplitType + "InputSplits";
338 List<InputSplit> splits;
339 try {
340 splits = inputFormat.getSplits(getContext(), minSplitCountHint);
341 } catch (IOException e) {
342 throw new IllegalStateException(logPrefix + ": Got IOException", e);
343 } catch (InterruptedException e) {
344 throw new IllegalStateException(
345 logPrefix + ": Got InterruptedException", e);
346 }
347 float samplePercent =
348 INPUT_SPLIT_SAMPLE_PERCENT.get(getConfiguration());
349 if (samplePercent != INPUT_SPLIT_SAMPLE_PERCENT.getDefaultValue()) {
350 int lastIndex = (int) (samplePercent * splits.size() / 100f);
351 Collections.shuffle(splits);
352 List<InputSplit> sampleSplits = splits.subList(0, lastIndex);
353 LOG.warn(logPrefix + ": Using sampling - Processing only " +
354 sampleSplits.size() + " instead of " + splits.size() +
355 " expected splits.");
356 return sampleSplits;
357 } else {
358 if (LOG.isInfoEnabled()) {
359 LOG.info(logPrefix + ": Got " + splits.size() +
360 " input splits for " + minSplitCountHint + " input threads");
361 }
362 return splits;
363 }
364 }
365
366
367
368
369
370
371 private void failJob(Exception e) {
372 LOG.fatal("failJob: Killing job " + getJobId());
373 LOG.fatal("failJob: exception " + e.toString());
374 try {
375 if (getConfiguration().isPureYarnJob()) {
376 throw new RuntimeException(
377 "BspServiceMaster (YARN profile) is " +
378 "FAILING this task, throwing exception to end job run.", e);
379 } else {
380 @SuppressWarnings("deprecation")
381 org.apache.hadoop.mapred.JobClient jobClient =
382 new org.apache.hadoop.mapred.JobClient(
383 (org.apache.hadoop.mapred.JobConf)
384 getContext().getConfiguration());
385 try {
386 @SuppressWarnings("deprecation")
387 JobID jobId = JobID.forName(getJobId());
388 RunningJob job = jobClient.getJob(jobId);
389 if (job != null) {
390 job.killJob();
391 } else {
392 LOG.error("Job not found for jobId=" + getJobId());
393 }
394 } catch (IllegalArgumentException iae) {
395 LOG.info("This job (" + getJobId() +
396 ") is not a legacy Hadoop job and will " +
397 "continue with failure cleanup." +
398 e.getMessage(),
399 e);
400 }
401 }
402 } catch (IOException ioe) {
403 throw new RuntimeException(ioe);
404 } finally {
405 failureCleanup(e);
406 }
407 }
408
409
410
411
412
413
414
415
416
417 private List<WorkerInfo> getWorkerInfosFromPath(String workerInfosPath,
418 boolean watch) {
419 List<WorkerInfo> workerInfoList = new ArrayList<WorkerInfo>();
420 List<String> workerInfoPathList;
421 try {
422 workerInfoPathList =
423 getZkExt().getChildrenExt(workerInfosPath, watch, false, true);
424 } catch (KeeperException e) {
425 throw new IllegalStateException(
426 "getWorkers: Got KeeperException", e);
427 } catch (InterruptedException e) {
428 throw new IllegalStateException(
429 "getWorkers: Got InterruptedStateException", e);
430 }
431 for (String workerInfoPath : workerInfoPathList) {
432 WorkerInfo workerInfo = new WorkerInfo();
433 try {
434 WritableUtils.readFieldsFromZnode(
435 getZkExt(), workerInfoPath, true, null, workerInfo);
436 workerInfoList.add(workerInfo);
437 } catch (IllegalStateException e) {
438 LOG.warn("Can't get info from worker, did it die in between? " +
439 "workerInfoPath=" + workerInfoPath, e);
440 }
441 }
442 return workerInfoList;
443 }
444
445
446
447
448
449
450
451
452
453 private void getAllWorkerInfos(
454 long superstep,
455 List<WorkerInfo> healthyWorkerInfoList,
456 List<WorkerInfo> unhealthyWorkerInfoList) {
457 String healthyWorkerInfoPath =
458 getWorkerInfoHealthyPath(getApplicationAttempt(), superstep);
459 String unhealthyWorkerInfoPath =
460 getWorkerInfoUnhealthyPath(getApplicationAttempt(), superstep);
461
462 try {
463 getZkExt().createOnceExt(healthyWorkerInfoPath,
464 null,
465 Ids.OPEN_ACL_UNSAFE,
466 CreateMode.PERSISTENT,
467 true);
468 } catch (KeeperException e) {
469 throw new IllegalStateException("getWorkers: KeeperException", e);
470 } catch (InterruptedException e) {
471 throw new IllegalStateException("getWorkers: InterruptedException", e);
472 }
473
474 try {
475 getZkExt().createOnceExt(unhealthyWorkerInfoPath,
476 null,
477 Ids.OPEN_ACL_UNSAFE,
478 CreateMode.PERSISTENT,
479 true);
480 } catch (KeeperException e) {
481 throw new IllegalStateException("getWorkers: KeeperException", e);
482 } catch (InterruptedException e) {
483 throw new IllegalStateException("getWorkers: InterruptedException", e);
484 }
485
486 List<WorkerInfo> currentHealthyWorkerInfoList =
487 getWorkerInfosFromPath(healthyWorkerInfoPath, true);
488 List<WorkerInfo> currentUnhealthyWorkerInfoList =
489 getWorkerInfosFromPath(unhealthyWorkerInfoPath, false);
490
491 healthyWorkerInfoList.clear();
492 if (currentHealthyWorkerInfoList != null) {
493 for (WorkerInfo healthyWorkerInfo :
494 currentHealthyWorkerInfoList) {
495 healthyWorkerInfoList.add(healthyWorkerInfo);
496 }
497 }
498
499 unhealthyWorkerInfoList.clear();
500 if (currentUnhealthyWorkerInfoList != null) {
501 for (WorkerInfo unhealthyWorkerInfo :
502 currentUnhealthyWorkerInfoList) {
503 unhealthyWorkerInfoList.add(unhealthyWorkerInfo);
504 }
505 }
506 }
507
508 @Override
509 public List<WorkerInfo> checkWorkers() {
510 boolean failJob = true;
511 long failWorkerCheckMsecs =
512 SystemTime.get().getMilliseconds() + maxSuperstepWaitMsecs;
513 List<WorkerInfo> healthyWorkerInfoList = new ArrayList<WorkerInfo>();
514 List<WorkerInfo> unhealthyWorkerInfoList = new ArrayList<WorkerInfo>();
515 int totalResponses = -1;
516 while (SystemTime.get().getMilliseconds() < failWorkerCheckMsecs) {
517 getContext().progress();
518 getAllWorkerInfos(
519 getSuperstep(), healthyWorkerInfoList, unhealthyWorkerInfoList);
520 totalResponses = healthyWorkerInfoList.size() +
521 unhealthyWorkerInfoList.size();
522 if ((totalResponses * 100.0f / maxWorkers) >=
523 minPercentResponded) {
524 failJob = false;
525 break;
526 }
527 getContext().setStatus(getGraphTaskManager().getGraphFunctions() + " " +
528 "checkWorkers: Only found " +
529 totalResponses +
530 " responses of " + maxWorkers +
531 " needed to start superstep " +
532 getSuperstep());
533 if (getWorkerHealthRegistrationChangedEvent().waitMsecs(
534 eventWaitMsecs)) {
535 if (LOG.isDebugEnabled()) {
536 LOG.debug("checkWorkers: Got event that health " +
537 "registration changed, not using poll attempt");
538 }
539 getWorkerHealthRegistrationChangedEvent().reset();
540 continue;
541 }
542 if (LOG.isInfoEnabled()) {
543 LOG.info("checkWorkers: Only found " + totalResponses +
544 " responses of " + maxWorkers +
545 " needed to start superstep " +
546 getSuperstep() + ". Reporting every " +
547 eventWaitMsecs + " msecs, " +
548 (failWorkerCheckMsecs - SystemTime.get().getMilliseconds()) +
549 " more msecs left before giving up.");
550
551 if ((maxWorkers - totalResponses) <=
552 partitionLongTailMinPrint) {
553 logMissingWorkersOnSuperstep(healthyWorkerInfoList,
554 unhealthyWorkerInfoList);
555 }
556 }
557 }
558 if (failJob) {
559 LOG.error("checkWorkers: Did not receive enough processes in " +
560 "time (only " + totalResponses + " of " +
561 minWorkers + " required) after waiting " + maxSuperstepWaitMsecs +
562 "msecs). This occurs if you do not have enough map tasks " +
563 "available simultaneously on your Hadoop instance to fulfill " +
564 "the number of requested workers.");
565 return null;
566 }
567
568 if (healthyWorkerInfoList.size() < minWorkers) {
569 LOG.error("checkWorkers: Only " + healthyWorkerInfoList.size() +
570 " available when " + minWorkers + " are required.");
571 logMissingWorkersOnSuperstep(healthyWorkerInfoList,
572 unhealthyWorkerInfoList);
573 return null;
574 }
575
576 getContext().setStatus(getGraphTaskManager().getGraphFunctions() + " " +
577 "checkWorkers: Done - Found " + totalResponses +
578 " responses of " + maxWorkers + " needed to start superstep " +
579 getSuperstep());
580
581 return healthyWorkerInfoList;
582 }
583
584
585
586
587
588
589
590 private void logMissingWorkersOnSuperstep(
591 List<WorkerInfo> healthyWorkerInfoList,
592 List<WorkerInfo> unhealthyWorkerInfoList) {
593 if (LOG.isInfoEnabled()) {
594 Set<Integer> partitionSet = new TreeSet<Integer>();
595 for (WorkerInfo workerInfo : healthyWorkerInfoList) {
596 partitionSet.add(workerInfo.getTaskId() % maxWorkers);
597 }
598 for (WorkerInfo workerInfo : unhealthyWorkerInfoList) {
599 partitionSet.add(workerInfo.getTaskId() % maxWorkers);
600 }
601 for (int i = 1; i <= maxWorkers; ++i) {
602 if (partitionSet.contains(Integer.valueOf(i))) {
603 continue;
604 } else if (i == getTaskId() % maxWorkers) {
605 continue;
606 } else {
607 LOG.info("logMissingWorkersOnSuperstep: No response from " +
608 "partition " + i + " (could be master)");
609 }
610 }
611 }
612 }
613
614
615
616
617
618
619
620
621
622 private int createInputSplits(GiraphInputFormat inputFormat,
623 InputType inputSplitType) {
624 ImmutableClassesGiraphConfiguration conf = getConfiguration();
625 String logPrefix = "create" + inputSplitType + "InputSplits";
626
627
628
629
630 List<WorkerInfo> healthyWorkerInfoList = checkWorkers();
631 if (healthyWorkerInfoList == null) {
632 setJobStateFailed("Not enough healthy workers to create input splits");
633 return -1;
634 }
635 globalCommHandler.getInputSplitsHandler().initialize(masterClient,
636 healthyWorkerInfoList);
637
638
639 int minSplitCountHint = healthyWorkerInfoList.size() *
640 conf.getNumInputSplitsThreads();
641
642
643
644 List<InputSplit> splitList = generateInputSplits(inputFormat,
645 minSplitCountHint, inputSplitType);
646
647 if (splitList.isEmpty() && GiraphConstants.FAIL_ON_EMPTY_INPUT.get(conf)) {
648 LOG.fatal(logPrefix + ": Failing job due to 0 input splits, " +
649 "check input of " + inputFormat.getClass().getName() + "!");
650 getContext().setStatus("Failing job due to 0 input splits, " +
651 "check input of " + inputFormat.getClass().getName() + "!");
652 setJobStateFailed("******* PLEASE CHECK YOUR INPUT TABLES - PARTITIONS " +
653 "WHICH YOU SPECIFIED ARE MISSING (for " + inputSplitType +
654 " input). FAILING THE JOB *******");
655 }
656 if (minSplitCountHint > splitList.size()) {
657 LOG.warn(logPrefix + ": Number of inputSplits=" +
658 splitList.size() + " < " +
659 minSplitCountHint +
660 "=total number of input threads, " +
661 "some threads will be not used");
662 }
663
664 globalCommHandler.getInputSplitsHandler().addSplits(inputSplitType,
665 splitList, inputFormat);
666
667 return splitList.size();
668 }
669
670 @Override
671 public int createMappingInputSplits() {
672 if (!getConfiguration().hasMappingInputFormat()) {
673 return 0;
674 }
675 MappingInputFormat<I, V, E, ? extends Writable> mappingInputFormat =
676 getConfiguration().createWrappedMappingInputFormat();
677 return createInputSplits(mappingInputFormat, InputType.MAPPING);
678 }
679
680 @Override
681 public int createVertexInputSplits() {
682 int splits = 0;
683 if (getConfiguration().hasVertexInputFormat()) {
684 VertexInputFormat<I, V, E> vertexInputFormat =
685 getConfiguration().createWrappedVertexInputFormat();
686 splits = createInputSplits(vertexInputFormat, InputType.VERTEX);
687 }
688 MasterProgress.get().setVertexInputSplitCount(splits);
689 getJobProgressTracker().updateMasterProgress(MasterProgress.get());
690 return splits;
691 }
692
693 @Override
694 public int createEdgeInputSplits() {
695 int splits = 0;
696 if (getConfiguration().hasEdgeInputFormat()) {
697 EdgeInputFormat<I, E> edgeInputFormat =
698 getConfiguration().createWrappedEdgeInputFormat();
699 splits = createInputSplits(edgeInputFormat, InputType.EDGE);
700 }
701 MasterProgress.get().setEdgeInputSplitsCount(splits);
702 getJobProgressTracker().updateMasterProgress(MasterProgress.get());
703 return splits;
704 }
705
706 @Override
707 public List<WorkerInfo> getWorkerInfoList() {
708 return chosenWorkerInfoList;
709 }
710
711 @Override
712 public MasterGlobalCommHandler getGlobalCommHandler() {
713 return globalCommHandler;
714 }
715
716 @Override
717 public AggregatorToGlobalCommTranslation getAggregatorTranslationHandler() {
718 return aggregatorTranslation;
719 }
720
721 @Override
722 public MasterCompute getMasterCompute() {
723 return masterCompute;
724 }
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739 private Collection<PartitionOwner> prepareCheckpointRestart(long superstep)
740 throws IOException, KeeperException, InterruptedException {
741 List<PartitionOwner> partitionOwners = new ArrayList<>();
742 FileSystem fs = getFs();
743 String finalizedCheckpointPath = getSavedCheckpointBasePath(superstep) +
744 CheckpointingUtils.CHECKPOINT_FINALIZED_POSTFIX;
745 LOG.info("Loading checkpoint from " + finalizedCheckpointPath);
746 DataInputStream finalizedStream =
747 fs.open(new Path(finalizedCheckpointPath));
748 GlobalStats globalStats = new GlobalStats();
749 globalStats.readFields(finalizedStream);
750 updateCounters(globalStats);
751 SuperstepClasses superstepClasses =
752 SuperstepClasses.createToRead(getConfiguration());
753 superstepClasses.readFields(finalizedStream);
754 getConfiguration().updateSuperstepClasses(superstepClasses);
755 int prefixFileCount = finalizedStream.readInt();
756
757 String checkpointFile =
758 finalizedStream.readUTF();
759 for (int i = 0; i < prefixFileCount; ++i) {
760 int mrTaskId = finalizedStream.readInt();
761
762 DataInputStream metadataStream = fs.open(new Path(checkpointFile +
763 "." + mrTaskId + CheckpointingUtils.CHECKPOINT_METADATA_POSTFIX));
764 long partitions = metadataStream.readInt();
765 WorkerInfo worker = getWorkerInfoById(mrTaskId);
766 for (long p = 0; p < partitions; ++p) {
767 int partitionId = metadataStream.readInt();
768 PartitionOwner partitionOwner = new BasicPartitionOwner(partitionId,
769 worker);
770 partitionOwners.add(partitionOwner);
771 LOG.info("prepareCheckpointRestart partitionId=" + partitionId +
772 " assigned to " + partitionOwner);
773 }
774 metadataStream.close();
775 }
776
777
778 Collections.sort(partitionOwners, new Comparator<PartitionOwner>() {
779 @Override
780 public int compare(PartitionOwner p1, PartitionOwner p2) {
781 return Integer.compare(p1.getPartitionId(), p2.getPartitionId());
782 }
783 });
784
785
786 globalCommHandler.getAggregatorHandler().readFields(finalizedStream);
787 aggregatorTranslation.readFields(finalizedStream);
788 masterCompute.readFields(finalizedStream);
789 finalizedStream.close();
790
791 return partitionOwners;
792 }
793
794 @Override
795 public void setup() {
796
797
798
799
800
801 if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
802 GiraphStats.getInstance().getSuperstepCounter().
803 setValue(getRestartedSuperstep());
804 }
805 for (MasterObserver observer : observers) {
806 observer.preApplication();
807 getContext().progress();
808 }
809 }
810
811 @Override
812 public boolean becomeMaster() {
813
814
815 String myBid = null;
816 try {
817 myBid =
818 getZkExt().createExt(masterElectionPath +
819 "/" + getHostnameTaskId(),
820 null,
821 Ids.OPEN_ACL_UNSAFE,
822 CreateMode.EPHEMERAL_SEQUENTIAL,
823 true);
824 } catch (KeeperException e) {
825 throw new IllegalStateException(
826 "becomeMaster: KeeperException", e);
827 } catch (InterruptedException e) {
828 throw new IllegalStateException(
829 "becomeMaster: IllegalStateException", e);
830 }
831 while (true) {
832 JSONObject jobState = getJobState();
833 try {
834 if ((jobState != null) &&
835 ApplicationState.valueOf(
836 jobState.getString(JSONOBJ_STATE_KEY)) ==
837 ApplicationState.FINISHED) {
838 LOG.info("becomeMaster: Job is finished, " +
839 "give up trying to be the master!");
840 isMaster = false;
841 return isMaster;
842 }
843 } catch (JSONException e) {
844 throw new IllegalStateException(
845 "becomeMaster: Couldn't get state from " + jobState, e);
846 }
847 try {
848 List<String> masterChildArr =
849 getZkExt().getChildrenExt(
850 masterElectionPath, true, true, true);
851 if (LOG.isInfoEnabled()) {
852 LOG.info("becomeMaster: First child is '" +
853 masterChildArr.get(0) + "' and my bid is '" +
854 myBid + "'");
855 }
856 if (masterChildArr.get(0).equals(myBid)) {
857 GiraphStats.getInstance().getCurrentMasterTaskPartition().
858 setValue(getTaskId());
859
860 globalCommHandler = new MasterGlobalCommHandler(
861 new MasterAggregatorHandler(getConfiguration(), getContext()),
862 new MasterInputSplitsHandler(
863 getConfiguration().useInputSplitLocality(), getContext()));
864 aggregatorTranslation = new AggregatorToGlobalCommTranslation(
865 getConfiguration(), globalCommHandler);
866
867 globalCommHandler.getAggregatorHandler().initialize(this);
868 masterCompute = getConfiguration().createMasterCompute();
869 masterCompute.setMasterService(this);
870
871 masterInfo = new MasterInfo();
872 masterServer =
873 new NettyMasterServer(getConfiguration(), this, getContext(),
874 getGraphTaskManager().createUncaughtExceptionHandler());
875 masterInfo.setInetSocketAddress(masterServer.getMyAddress(),
876 masterServer.getLocalHostOrIp());
877 masterInfo.setTaskId(getTaskId());
878 masterClient =
879 new NettyMasterClient(getContext(), getConfiguration(), this,
880 getGraphTaskManager().createUncaughtExceptionHandler());
881 masterServer.setFlowControl(masterClient.getFlowControl());
882
883 if (LOG.isInfoEnabled()) {
884 LOG.info("becomeMaster: I am now the master!");
885 }
886 isMaster = true;
887 return isMaster;
888 }
889 LOG.info("becomeMaster: Waiting to become the master...");
890 getMasterElectionChildrenChangedEvent().waitForTimeoutOrFail(
891 GiraphConstants.WAIT_ZOOKEEPER_TIMEOUT_MSEC.get(
892 getConfiguration()));
893 getMasterElectionChildrenChangedEvent().reset();
894 } catch (KeeperException e) {
895 throw new IllegalStateException(
896 "becomeMaster: KeeperException", e);
897 } catch (InterruptedException e) {
898 throw new IllegalStateException(
899 "becomeMaster: IllegalStateException", e);
900 }
901 }
902 }
903
904 @Override
905 public MasterInfo getMasterInfo() {
906 return masterInfo;
907 }
908
909
910
911
912
913
914
915 private GlobalStats aggregateWorkerStats(long superstep) {
916 ImmutableClassesGiraphConfiguration conf = getConfiguration();
917
918 GlobalStats globalStats = new GlobalStats();
919
920 String workerFinishedPath =
921 getWorkerMetricsFinishedPath(getApplicationAttempt(), superstep);
922 List<String> workerFinishedPathList = null;
923 try {
924 workerFinishedPathList =
925 getZkExt().getChildrenExt(
926 workerFinishedPath, false, false, true);
927 } catch (KeeperException e) {
928 throw new IllegalStateException(
929 "aggregateWorkerStats: KeeperException", e);
930 } catch (InterruptedException e) {
931 throw new IllegalStateException(
932 "aggregateWorkerStats: InterruptedException", e);
933 }
934
935 AggregatedMetrics aggregatedMetrics = new AggregatedMetrics();
936
937 for (String finishedPath : workerFinishedPathList) {
938 String hostnamePartitionId = FilenameUtils.getName(finishedPath);
939 JSONObject workerFinishedInfoObj = null;
940 try {
941 byte [] zkData =
942 getZkExt().getData(finishedPath, false, null);
943 workerFinishedInfoObj = new JSONObject(new String(zkData,
944 Charset.defaultCharset()));
945 globalStats.addMessageCount(
946 workerFinishedInfoObj.getLong(
947 JSONOBJ_NUM_MESSAGES_KEY));
948 globalStats.addMessageBytesCount(
949 workerFinishedInfoObj.getLong(
950 JSONOBJ_NUM_MESSAGE_BYTES_KEY));
951 if (conf.metricsEnabled() &&
952 workerFinishedInfoObj.has(JSONOBJ_METRICS_KEY)) {
953 WorkerSuperstepMetrics workerMetrics = new WorkerSuperstepMetrics();
954 WritableUtils.readFieldsFromByteArray(
955 Base64.decode(
956 workerFinishedInfoObj.getString(
957 JSONOBJ_METRICS_KEY)),
958 workerMetrics);
959 globalStats.addOocLoadBytesCount(
960 workerMetrics.getBytesLoadedFromDisk());
961 globalStats.addOocStoreBytesCount(
962 workerMetrics.getBytesStoredOnDisk());
963
964
965 globalStats.setLowestGraphPercentageInMemory(
966 Math.min(globalStats.getLowestGraphPercentageInMemory(),
967 (int) Math.round(
968 workerMetrics.getGraphPercentageInMemory())));
969 aggregatedMetrics.add(workerMetrics, hostnamePartitionId);
970 }
971 } catch (JSONException e) {
972 throw new IllegalStateException(
973 "aggregateWorkerStats: JSONException", e);
974 } catch (KeeperException e) {
975 throw new IllegalStateException(
976 "aggregateWorkerStats: KeeperException", e);
977 } catch (InterruptedException e) {
978 throw new IllegalStateException(
979 "aggregateWorkerStats: InterruptedException", e);
980 } catch (IOException e) {
981 throw new IllegalStateException(
982 "aggregateWorkerStats: IOException", e);
983 }
984 }
985
986 allPartitionStatsList.clear();
987 Iterable<PartitionStats> statsList = globalCommHandler.getAllPartitionStats(
988 workerFinishedPathList.size(), getContext());
989 for (PartitionStats partitionStats : statsList) {
990 globalStats.addPartitionStats(partitionStats);
991 allPartitionStatsList.add(partitionStats);
992 }
993
994 if (conf.metricsEnabled()) {
995 if (GiraphConstants.METRICS_DIRECTORY.isDefaultValue(conf)) {
996 aggregatedMetrics.print(superstep, System.err);
997 } else {
998 printAggregatedMetricsToHDFS(superstep, aggregatedMetrics);
999 }
1000 for (MasterObserver observer : observers) {
1001 observer.superstepMetricsUpdate(
1002 superstep, aggregatedMetrics, allPartitionStatsList);
1003 }
1004 }
1005
1006 if (LOG.isInfoEnabled()) {
1007 LOG.info("aggregateWorkerStats: Aggregation found " + globalStats +
1008 " on superstep = " + getSuperstep());
1009 }
1010 return globalStats;
1011 }
1012
1013
1014
1015
1016
1017
1018 private void printAggregatedMetricsToHDFS(
1019 long superstep, AggregatedMetrics aggregatedMetrics) {
1020 ImmutableClassesGiraphConfiguration conf = getConfiguration();
1021 PrintStream out = null;
1022 Path dir = new Path(GiraphConstants.METRICS_DIRECTORY.get(conf));
1023 Path outFile = new Path(GiraphConstants.METRICS_DIRECTORY.get(conf) +
1024 Path.SEPARATOR_CHAR + "superstep_" + superstep + ".metrics");
1025 try {
1026 FileSystem fs;
1027 fs = FileSystem.get(conf);
1028 if (!fs.exists(dir)) {
1029 fs.mkdirs(dir);
1030 }
1031 if (fs.exists(outFile)) {
1032 throw new RuntimeException(
1033 "printAggregatedMetricsToHDFS: metrics file exists");
1034 }
1035 out = new PrintStream(fs.create(outFile), false,
1036 Charset.defaultCharset().name());
1037 aggregatedMetrics.print(superstep, out);
1038 } catch (IOException e) {
1039 throw new RuntimeException(
1040 "printAggregatedMetricsToHDFS: error creating metrics file", e);
1041 } finally {
1042 if (out != null) {
1043 out.close();
1044 }
1045 }
1046 }
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059 private void finalizeCheckpoint(long superstep,
1060 List<WorkerInfo> chosenWorkerInfoList)
1061 throws IOException, KeeperException, InterruptedException {
1062 Path finalizedCheckpointPath =
1063 new Path(getCheckpointBasePath(superstep) +
1064 CheckpointingUtils.CHECKPOINT_FINALIZED_POSTFIX);
1065 try {
1066 getFs().delete(finalizedCheckpointPath, false);
1067 } catch (IOException e) {
1068 LOG.warn("finalizedValidCheckpointPrefixes: Removed old file " +
1069 finalizedCheckpointPath);
1070 }
1071
1072
1073
1074
1075
1076
1077
1078
1079 FSDataOutputStream finalizedOutputStream =
1080 getFs().create(finalizedCheckpointPath);
1081
1082 String superstepFinishedNode =
1083 getSuperstepFinishedPath(getApplicationAttempt(), superstep - 1);
1084 finalizedOutputStream.write(
1085 getZkExt().getData(superstepFinishedNode, false, null));
1086
1087 finalizedOutputStream.writeInt(chosenWorkerInfoList.size());
1088 finalizedOutputStream.writeUTF(getCheckpointBasePath(superstep));
1089 for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) {
1090 finalizedOutputStream.writeInt(getWorkerId(chosenWorkerInfo));
1091 }
1092 globalCommHandler.getAggregatorHandler().write(finalizedOutputStream);
1093 aggregatorTranslation.write(finalizedOutputStream);
1094 masterCompute.write(finalizedOutputStream);
1095 finalizedOutputStream.close();
1096 lastCheckpointedSuperstep = superstep;
1097 GiraphStats.getInstance().
1098 getLastCheckpointedSuperstep().setValue(superstep);
1099 }
1100
1101
1102
1103
1104
1105
1106
1107 private void assignPartitionOwners() {
1108 Collection<PartitionOwner> partitionOwners;
1109 if (getSuperstep() == INPUT_SUPERSTEP) {
1110 partitionOwners =
1111 masterGraphPartitioner.createInitialPartitionOwners(
1112 chosenWorkerInfoList, maxWorkers);
1113 if (partitionOwners.isEmpty()) {
1114 throw new IllegalStateException(
1115 "assignAndExchangePartitions: No partition owners set");
1116 }
1117 } else if (getRestartedSuperstep() == getSuperstep()) {
1118
1119 try {
1120 partitionOwners = prepareCheckpointRestart(getSuperstep());
1121 } catch (IOException e) {
1122 throw new IllegalStateException(
1123 "assignPartitionOwners: IOException on preparing", e);
1124 } catch (KeeperException e) {
1125 throw new IllegalStateException(
1126 "assignPartitionOwners: KeeperException on preparing", e);
1127 } catch (InterruptedException e) {
1128 throw new IllegalStateException(
1129 "assignPartitionOwners: InteruptedException on preparing",
1130 e);
1131 }
1132 masterGraphPartitioner.setPartitionOwners(partitionOwners);
1133 } else {
1134 partitionOwners =
1135 masterGraphPartitioner.generateChangedPartitionOwners(
1136 allPartitionStatsList,
1137 chosenWorkerInfoList,
1138 maxWorkers,
1139 getSuperstep());
1140
1141 PartitionUtils.analyzePartitionStats(partitionOwners,
1142 allPartitionStatsList);
1143 }
1144 checkPartitions(masterGraphPartitioner.getCurrentPartitionOwners());
1145
1146
1147
1148
1149 if (!partitionOwners.isEmpty()) {
1150 String vertexExchangePath =
1151 getPartitionExchangePath(getApplicationAttempt(),
1152 getSuperstep());
1153 try {
1154 getZkExt().createOnceExt(vertexExchangePath,
1155 null,
1156 Ids.OPEN_ACL_UNSAFE,
1157 CreateMode.PERSISTENT,
1158 true);
1159 } catch (KeeperException e) {
1160 throw new IllegalStateException(
1161 "assignPartitionOwners: KeeperException creating " +
1162 vertexExchangePath);
1163 } catch (InterruptedException e) {
1164 throw new IllegalStateException(
1165 "assignPartitionOwners: InterruptedException creating " +
1166 vertexExchangePath);
1167 }
1168 }
1169
1170 AddressesAndPartitionsWritable addressesAndPartitions =
1171 new AddressesAndPartitionsWritable(masterInfo, chosenWorkerInfoList,
1172 partitionOwners);
1173
1174
1175
1176 for (WorkerInfo workerInfo : chosenWorkerInfoList) {
1177 masterClient.sendWritableRequest(workerInfo.getTaskId(),
1178 new AddressesAndPartitionsRequest(addressesAndPartitions));
1179 }
1180 }
1181
1182
1183
1184
1185
1186
1187 private void checkPartitions(Collection<PartitionOwner> partitionOwners) {
1188 for (PartitionOwner partitionOwner : partitionOwners) {
1189 int partitionId = partitionOwner.getPartitionId();
1190 if (partitionId < 0 || partitionId >= partitionOwners.size()) {
1191 throw new IllegalStateException("checkPartitions: " +
1192 "Invalid partition id " + partitionId +
1193 " - partition ids must be values from 0 to (numPartitions - 1)");
1194 }
1195 }
1196 }
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207 private Collection<WorkerInfo> superstepChosenWorkerAlive(
1208 String chosenWorkerInfoHealthPath,
1209 List<WorkerInfo> chosenWorkerInfoList)
1210 throws KeeperException, InterruptedException {
1211 List<WorkerInfo> chosenWorkerInfoHealthyList =
1212 getWorkerInfosFromPath(chosenWorkerInfoHealthPath, false);
1213 Set<WorkerInfo> chosenWorkerInfoHealthySet =
1214 new HashSet<WorkerInfo>(chosenWorkerInfoHealthyList);
1215 List<WorkerInfo> deadWorkers = new ArrayList<>();
1216 for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) {
1217 if (!chosenWorkerInfoHealthySet.contains(chosenWorkerInfo)) {
1218 deadWorkers.add(chosenWorkerInfo);
1219 }
1220 }
1221 return deadWorkers;
1222 }
1223
1224 @Override
1225 public void restartFromCheckpoint(long checkpoint) {
1226
1227
1228
1229 setApplicationAttempt(getApplicationAttempt() + 1);
1230 setCachedSuperstep(checkpoint);
1231 setRestartedSuperstep(checkpoint);
1232 checkpointStatus = CheckpointStatus.NONE;
1233 setJobState(ApplicationState.START_SUPERSTEP,
1234 getApplicationAttempt(),
1235 checkpoint);
1236 }
1237
1238
1239
1240
1241
1242
1243
1244 private void zkDeleteNode(String path) {
1245 try {
1246 getZkExt().deleteExt(path, -1, true);
1247 } catch (KeeperException.NoNodeException e) {
1248 LOG.info("zkDeleteNode: node has already been removed " + path);
1249 } catch (InterruptedException e) {
1250 throw new RuntimeException(
1251 "zkDeleteNode: InterruptedException", e);
1252 } catch (KeeperException e) {
1253 throw new RuntimeException(
1254 "zkDeleteNode: KeeperException", e);
1255 }
1256 }
1257
1258 @Override
1259 public long getLastGoodCheckpoint() throws IOException {
1260
1261
1262 if (lastCheckpointedSuperstep == -1) {
1263 try {
1264 lastCheckpointedSuperstep = getLastCheckpointedSuperstep();
1265 } catch (IOException e) {
1266 LOG.fatal("getLastGoodCheckpoint: No last good checkpoints can be " +
1267 "found, killing the job.", e);
1268 failJob(e);
1269 }
1270 }
1271
1272 return lastCheckpointedSuperstep;
1273 }
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288 private boolean barrierOnWorkerList(String finishedWorkerPath,
1289 List<WorkerInfo> workerInfoList,
1290 BspEvent event,
1291 boolean ignoreDeath) {
1292 try {
1293 getZkExt().createOnceExt(finishedWorkerPath,
1294 null,
1295 Ids.OPEN_ACL_UNSAFE,
1296 CreateMode.PERSISTENT,
1297 true);
1298 } catch (KeeperException e) {
1299 throw new IllegalStateException(
1300 "barrierOnWorkerList: KeeperException - Couldn't create " +
1301 finishedWorkerPath, e);
1302 } catch (InterruptedException e) {
1303 throw new IllegalStateException(
1304 "barrierOnWorkerList: InterruptedException - Couldn't create " +
1305 finishedWorkerPath, e);
1306 }
1307 List<String> hostnameIdList =
1308 new ArrayList<String>(workerInfoList.size());
1309 for (WorkerInfo workerInfo : workerInfoList) {
1310 hostnameIdList.add(workerInfo.getHostnameId());
1311 }
1312 String workerInfoHealthyPath =
1313 getWorkerInfoHealthyPath(getApplicationAttempt(), getSuperstep());
1314 List<String> finishedHostnameIdList = new ArrayList<>();
1315 List<String> tmpFinishedHostnameIdList;
1316 long nextInfoMillis = System.currentTimeMillis();
1317 final int defaultTaskTimeoutMsec = 10 * 60 * 1000;
1318 final int waitBetweenLogInfoMsec = 30 * 1000;
1319 final int taskTimeoutMsec = getContext().getConfiguration().getInt(
1320 "mapred.task.timeout", defaultTaskTimeoutMsec) / 2;
1321 long lastRegularRunTimeMsec = 0;
1322 int eventLoopTimeout = Math.min(taskTimeoutMsec, waitBetweenLogInfoMsec);
1323 boolean logInfoOnlyRun = false;
1324 List<WorkerInfo> deadWorkers = new ArrayList<>();
1325 while (true) {
1326 if (! logInfoOnlyRun) {
1327 try {
1328 tmpFinishedHostnameIdList =
1329 getZkExt().getChildrenExt(finishedWorkerPath,
1330 true,
1331 false,
1332 false);
1333 } catch (KeeperException e) {
1334 throw new IllegalStateException(
1335 "barrierOnWorkerList: KeeperException - Couldn't get " +
1336 "children of " + finishedWorkerPath, e);
1337 } catch (InterruptedException e) {
1338 throw new IllegalStateException(
1339 "barrierOnWorkerList: IllegalException - Couldn't get " +
1340 "children of " + finishedWorkerPath, e);
1341 }
1342 if (LOG.isDebugEnabled()) {
1343
1344 Set<String> newFinishedHostnames = Sets.difference(
1345 Sets.newHashSet(tmpFinishedHostnameIdList),
1346 Sets.newHashSet(finishedHostnameIdList));
1347 LOG.debug("barrierOnWorkerList: Got new finished worker list = " +
1348 newFinishedHostnames + ", size = " +
1349 newFinishedHostnames.size() +
1350 " from " + finishedWorkerPath);
1351 }
1352 finishedHostnameIdList = tmpFinishedHostnameIdList;
1353 }
1354
1355 if (LOG.isInfoEnabled() &&
1356 (System.currentTimeMillis() > nextInfoMillis)) {
1357 nextInfoMillis = System.currentTimeMillis() + waitBetweenLogInfoMsec;
1358 LOG.info("barrierOnWorkerList: " +
1359 finishedHostnameIdList.size() +
1360 " out of " + workerInfoList.size() +
1361 " workers finished on superstep " +
1362 getSuperstep() + " on path " + finishedWorkerPath);
1363 if (workerInfoList.size() - finishedHostnameIdList.size() <
1364 MAX_PRINTABLE_REMAINING_WORKERS) {
1365 Set<String> remainingWorkers = Sets.newHashSet(hostnameIdList);
1366 remainingWorkers.removeAll(finishedHostnameIdList);
1367 LOG.info("barrierOnWorkerList: Waiting on " + remainingWorkers);
1368 }
1369 }
1370
1371 if (! logInfoOnlyRun) {
1372 getContext().setStatus(getGraphTaskManager().getGraphFunctions() +
1373 " - " +
1374 finishedHostnameIdList.size() +
1375 " finished out of " +
1376 workerInfoList.size() +
1377 " on superstep " + getSuperstep());
1378 if (finishedHostnameIdList.containsAll(hostnameIdList)) {
1379 break;
1380 }
1381
1382 for (WorkerInfo deadWorker : deadWorkers) {
1383 if (!finishedHostnameIdList.contains(deadWorker.getHostnameId())) {
1384 LOG.error("barrierOnWorkerList: no results arived from " +
1385 "worker that was pronounced dead: " + deadWorker +
1386 " on superstep " + getSuperstep());
1387 return false;
1388 }
1389 }
1390
1391
1392 lastRegularRunTimeMsec = System.currentTimeMillis();
1393 }
1394
1395
1396 boolean eventTriggered = event.waitMsecs(eventLoopTimeout);
1397
1398
1399
1400 if (eventTriggered) {
1401 event.reset();
1402 }
1403
1404 long elapsedTimeSinceRegularRunMsec = System.currentTimeMillis() -
1405 lastRegularRunTimeMsec;
1406 getContext().progress();
1407
1408 if (eventTriggered ||
1409 taskTimeoutMsec == eventLoopTimeout ||
1410 elapsedTimeSinceRegularRunMsec >= taskTimeoutMsec) {
1411 logInfoOnlyRun = false;
1412 } else {
1413 logInfoOnlyRun = true;
1414 continue;
1415 }
1416
1417
1418 try {
1419 deadWorkers.addAll(superstepChosenWorkerAlive(
1420 workerInfoHealthyPath,
1421 workerInfoList));
1422 if (!ignoreDeath && deadWorkers.size() > 0) {
1423 String errorMessage = "******* WORKERS " + deadWorkers +
1424 " FAILED *******";
1425
1426 if (!getConfiguration().useCheckpointing()) {
1427 setJobStateFailed(errorMessage);
1428 } else {
1429 LOG.error("barrierOnWorkerList: Missing chosen " +
1430 "workers " + deadWorkers +
1431 " on superstep " + getSuperstep());
1432
1433 getGraphTaskManager().getJobProgressTracker().logInfo(errorMessage);
1434 }
1435 return false;
1436 }
1437 } catch (KeeperException e) {
1438 throw new IllegalStateException(
1439 "barrierOnWorkerList: KeeperException - " +
1440 "Couldn't get " + workerInfoHealthyPath, e);
1441 } catch (InterruptedException e) {
1442 throw new IllegalStateException(
1443 "barrierOnWorkerList: InterruptedException - " +
1444 "Couldn't get " + workerInfoHealthyPath, e);
1445 }
1446 }
1447
1448 return true;
1449 }
1450
1451
1452
1453
1454
1455
1456
1457 private void cleanUpOldSuperstep(long removeableSuperstep) throws
1458 InterruptedException {
1459 if (KEEP_ZOOKEEPER_DATA.isFalse(getConfiguration()) &&
1460 (removeableSuperstep >= 0)) {
1461 String oldSuperstepPath =
1462 getSuperstepPath(getApplicationAttempt()) + "/" +
1463 removeableSuperstep;
1464 try {
1465 if (LOG.isInfoEnabled()) {
1466 LOG.info("coordinateSuperstep: Cleaning up old Superstep " +
1467 oldSuperstepPath);
1468 }
1469 getZkExt().deleteExt(oldSuperstepPath,
1470 -1,
1471 true);
1472 } catch (KeeperException.NoNodeException e) {
1473 LOG.warn("coordinateBarrier: Already cleaned up " +
1474 oldSuperstepPath);
1475 } catch (KeeperException e) {
1476 throw new IllegalStateException(
1477 "coordinateSuperstep: KeeperException on " +
1478 "finalizing checkpoint", e);
1479 }
1480 }
1481 }
1482
1483
1484
1485
1486 private void coordinateInputSplits() {
1487
1488
1489 if (!barrierOnWorkerList(inputSplitsWorkerDonePath,
1490 chosenWorkerInfoList,
1491 getInputSplitsWorkerDoneEvent(),
1492 false)) {
1493 throw new IllegalStateException("coordinateInputSplits: Worker failed " +
1494 "during input split (currently not supported)");
1495 }
1496 try {
1497 getZkExt().createExt(inputSplitsAllDonePath,
1498 null,
1499 Ids.OPEN_ACL_UNSAFE,
1500 CreateMode.PERSISTENT,
1501 false);
1502 } catch (KeeperException.NodeExistsException e) {
1503 LOG.info("coordinateInputSplits: Node " +
1504 inputSplitsAllDonePath + " already exists.");
1505 } catch (KeeperException e) {
1506 throw new IllegalStateException(
1507 "coordinateInputSplits: KeeperException", e);
1508 } catch (InterruptedException e) {
1509 throw new IllegalStateException(
1510 "coordinateInputSplits: IllegalStateException", e);
1511 }
1512 }
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528 private void initializeAggregatorInputSuperstep()
1529 throws InterruptedException {
1530 globalCommHandler.getAggregatorHandler().prepareSuperstep();
1531
1532 prepareMasterCompute(getSuperstep());
1533 try {
1534 masterCompute.initialize();
1535 } catch (InstantiationException e) {
1536 LOG.fatal(
1537 "initializeAggregatorInputSuperstep: Failed in instantiation", e);
1538 throw new RuntimeException(
1539 "initializeAggregatorInputSuperstep: Failed in instantiation", e);
1540 } catch (IllegalAccessException e) {
1541 LOG.fatal("initializeAggregatorInputSuperstep: Failed in access", e);
1542 throw new RuntimeException(
1543 "initializeAggregatorInputSuperstep: Failed in access", e);
1544 }
1545 aggregatorTranslation.postMasterCompute();
1546 globalCommHandler.getAggregatorHandler().finishSuperstep();
1547
1548 globalCommHandler.getAggregatorHandler().sendDataToOwners(masterClient);
1549 }
1550
1551
1552
1553
1554
1555
1556
1557
1558 private SuperstepClasses prepareMasterCompute(long superstep) {
1559 GraphState graphState = new GraphState(superstep ,
1560 GiraphStats.getInstance().getVertices().getValue(),
1561 GiraphStats.getInstance().getEdges().getValue(),
1562 getContext());
1563 SuperstepClasses superstepClasses =
1564 SuperstepClasses.createAndExtractTypes(getConfiguration());
1565 masterCompute.setGraphState(graphState);
1566 masterCompute.setSuperstepClasses(superstepClasses);
1567 return superstepClasses;
1568 }
1569
1570 @Override
1571 public SuperstepState coordinateSuperstep() throws
1572 KeeperException, InterruptedException {
1573
1574
1575
1576
1577
1578
1579
1580
1581 for (MasterObserver observer : observers) {
1582 observer.preSuperstep(getSuperstep());
1583 getContext().progress();
1584 }
1585
1586 chosenWorkerInfoList = checkWorkers();
1587 if (chosenWorkerInfoList == null) {
1588 setJobStateFailed("coordinateSuperstep: Not enough healthy workers for " +
1589 "superstep " + getSuperstep());
1590 } else {
1591
1592 Collections.sort(chosenWorkerInfoList, new Comparator<WorkerInfo>() {
1593 @Override
1594 public int compare(WorkerInfo wi1, WorkerInfo wi2) {
1595 return Integer.compare(wi1.getTaskId(), wi2.getTaskId());
1596 }
1597 });
1598 for (WorkerInfo workerInfo : chosenWorkerInfoList) {
1599 String workerInfoHealthyPath =
1600 getWorkerInfoHealthyPath(getApplicationAttempt(),
1601 getSuperstep()) + "/" +
1602 workerInfo.getHostnameId();
1603 if (getZkExt().exists(workerInfoHealthyPath, true) == null) {
1604 LOG.warn("coordinateSuperstep: Chosen worker " +
1605 workerInfoHealthyPath +
1606 " is no longer valid, failing superstep");
1607 }
1608 }
1609 }
1610
1611
1612 if (getSuperstep() >= 0) {
1613 aggregatorTranslation.postMasterCompute();
1614 globalCommHandler.getAggregatorHandler().finishSuperstep();
1615 }
1616
1617 masterClient.openConnections();
1618
1619 GiraphStats.getInstance().
1620 getCurrentWorkers().setValue(chosenWorkerInfoList.size());
1621 assignPartitionOwners();
1622
1623
1624
1625 if (checkpointStatus != CheckpointStatus.NONE) {
1626 String workerWroteCheckpointPath =
1627 getWorkerWroteCheckpointPath(getApplicationAttempt(),
1628 getSuperstep());
1629
1630 if (!barrierOnWorkerList(workerWroteCheckpointPath,
1631 chosenWorkerInfoList,
1632 getWorkerWroteCheckpointEvent(),
1633 checkpointStatus == CheckpointStatus.CHECKPOINT_AND_HALT)) {
1634 return SuperstepState.WORKER_FAILURE;
1635 }
1636 try {
1637 finalizeCheckpoint(getSuperstep(), chosenWorkerInfoList);
1638 } catch (IOException e) {
1639 throw new IllegalStateException(
1640 "coordinateSuperstep: IOException on finalizing checkpoint",
1641 e);
1642 }
1643 if (checkpointStatus == CheckpointStatus.CHECKPOINT_AND_HALT) {
1644 return SuperstepState.CHECKPOINT_AND_HALT;
1645 }
1646 }
1647
1648
1649 if (getSuperstep() >= 0) {
1650 globalCommHandler.getAggregatorHandler().sendDataToOwners(masterClient);
1651 }
1652
1653 if (getSuperstep() == INPUT_SUPERSTEP) {
1654
1655 initializeAggregatorInputSuperstep();
1656 coordinateInputSplits();
1657 }
1658
1659 String finishedWorkerPath =
1660 getWorkerMetricsFinishedPath(getApplicationAttempt(), getSuperstep());
1661 if (!barrierOnWorkerList(finishedWorkerPath,
1662 chosenWorkerInfoList,
1663 getSuperstepStateChangedEvent(),
1664 false)) {
1665 return SuperstepState.WORKER_FAILURE;
1666 }
1667
1668
1669
1670 globalCommHandler.getAggregatorHandler().prepareSuperstep();
1671 aggregatorTranslation.prepareSuperstep();
1672
1673 SuperstepClasses superstepClasses =
1674 prepareMasterCompute(getSuperstep() + 1);
1675 doMasterCompute();
1676
1677
1678
1679 GlobalStats globalStats = aggregateWorkerStats(getSuperstep());
1680 aggregateCountersFromWorkersAndMaster();
1681 if (masterCompute.isHalted() ||
1682 (globalStats.getFinishedVertexCount() ==
1683 globalStats.getVertexCount() &&
1684 globalStats.getMessageCount() == 0)) {
1685 globalStats.setHaltComputation(true);
1686 } else if (getZkExt().exists(haltComputationPath, false) != null) {
1687 if (LOG.isInfoEnabled()) {
1688 LOG.info("Halting computation because halt zookeeper node was created");
1689 }
1690 globalStats.setHaltComputation(true);
1691 }
1692
1693
1694
1695 if (maxNumberOfSupersteps !=
1696 GiraphConstants.MAX_NUMBER_OF_SUPERSTEPS.getDefaultValue() &&
1697 (getSuperstep() == maxNumberOfSupersteps - 1)) {
1698 if (LOG.isInfoEnabled()) {
1699 LOG.info("coordinateSuperstep: Finished " + maxNumberOfSupersteps +
1700 " supersteps (max specified by the user), halting");
1701 }
1702 globalStats.setHaltComputation(true);
1703 }
1704
1705
1706
1707
1708 if (!globalStats.getHaltComputation()) {
1709 superstepClasses.verifyTypesMatch(getSuperstep() > 0);
1710 }
1711 getConfiguration().updateSuperstepClasses(superstepClasses);
1712
1713
1714 checkpointStatus = getCheckpointStatus(getSuperstep() + 1);
1715 globalStats.setCheckpointStatus(checkpointStatus);
1716
1717
1718 String superstepFinishedNode =
1719 getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep());
1720
1721 WritableUtils.writeToZnode(
1722 getZkExt(), superstepFinishedNode, -1, globalStats, superstepClasses);
1723 updateCounters(globalStats);
1724
1725 cleanUpOldSuperstep(getSuperstep() - 1);
1726 incrCachedSuperstep();
1727
1728 if (getSuperstep() > 0) {
1729 GiraphStats.getInstance().getSuperstepCounter().increment();
1730 }
1731 SuperstepState superstepState;
1732 if (globalStats.getHaltComputation()) {
1733 superstepState = SuperstepState.ALL_SUPERSTEPS_DONE;
1734 } else {
1735 superstepState = SuperstepState.THIS_SUPERSTEP_DONE;
1736 }
1737 globalCommHandler.getAggregatorHandler().writeAggregators(
1738 getSuperstep(), superstepState);
1739
1740 return superstepState;
1741 }
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751 private CheckpointStatus getCheckpointStatus(long superstep) {
1752 try {
1753 if (getZkExt().
1754 exists(basePath + FORCE_CHECKPOINT_USER_FLAG, false) != null) {
1755 if (isCheckpointingSupported(getConfiguration(), masterCompute)) {
1756 return CheckpointStatus.CHECKPOINT_AND_HALT;
1757 } else {
1758 LOG.warn("Attempted to manually checkpoint the job that " +
1759 "does not support checkpoints. Ignoring");
1760 }
1761 }
1762 } catch (KeeperException e) {
1763 throw new IllegalStateException(
1764 "cleanupZooKeeper: Got KeeperException", e);
1765 } catch (InterruptedException e) {
1766 throw new IllegalStateException(
1767 "cleanupZooKeeper: Got IllegalStateException", e);
1768 }
1769 if (checkpointFrequency == 0) {
1770 return CheckpointStatus.NONE;
1771 }
1772 long firstCheckpoint = INPUT_SUPERSTEP + 1;
1773 if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
1774 firstCheckpoint = getRestartedSuperstep() + checkpointFrequency;
1775 }
1776 if (superstep < firstCheckpoint) {
1777 return CheckpointStatus.NONE;
1778 }
1779 if (((superstep - firstCheckpoint) % checkpointFrequency) == 0) {
1780 if (isCheckpointingSupported(getConfiguration(), masterCompute)) {
1781 return CheckpointStatus.CHECKPOINT;
1782 }
1783 }
1784 return CheckpointStatus.NONE;
1785 }
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796 private boolean isCheckpointingSupported(
1797 GiraphConfiguration conf, MasterCompute masterCompute) {
1798 return checkpointSupportedChecker.isCheckpointSupported(
1799 conf, masterCompute);
1800 }
1801
1802
1803
1804
1805
1806
1807 private void doMasterCompute() {
1808 GiraphTimerContext timerContext = masterComputeTimer.time();
1809 masterCompute.compute();
1810 timerContext.stop();
1811 }
1812
1813
1814
1815
1816
1817
1818
1819
1820 private void populateCountersFromContext(Mapper.Context context,
1821 Map<String, Set<String>> counterGroupAndNames,
1822 Set<CustomCounter> counters) {
1823 Counter counter;
1824 for (Map.Entry<String, Set<String>> entry :
1825 counterGroupAndNames.entrySet()) {
1826 String groupName = entry.getKey();
1827 for (String counterName: entry.getValue()) {
1828 CustomCounter customCounter = new CustomCounter(groupName, counterName,
1829 CustomCounter.Aggregation.SUM);
1830 counter = context.getCounter(groupName, counterName);
1831 customCounter.setValue(counter.getValue());
1832 counters.add(customCounter);
1833 }
1834 }
1835 }
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845 private void aggregateCountersFromWorkersAndMaster() {
1846 CustomCounters customCounters = new CustomCounters();
1847 long superstep = getSuperstep();
1848
1849 String workerFinishedPath = getWorkerCountersFinishedPath(
1850 getApplicationAttempt(), superstep);
1851 try {
1852 getZkExt().createOnceExt(workerFinishedPath,
1853 null,
1854 Ids.OPEN_ACL_UNSAFE,
1855 CreateMode.PERSISTENT,
1856 true);
1857 } catch (KeeperException e) {
1858 LOG.warn("aggregateCounters: KeeperException - " +
1859 "Couldn't create " + workerFinishedPath, e);
1860 } catch (InterruptedException e) {
1861 LOG.warn("barrierOnWorkerList: InterruptedException - " +
1862 "Couldn't create " + workerFinishedPath, e);
1863 }
1864 List<String> workerFinishedPathList = new ArrayList<>();
1865 long waitForCountersTimeout =
1866 SystemTime.get().getMilliseconds() + maxCounterWaitMsecs;
1867
1868 int numWorkers = BspInputFormat.getMaxTasks(getConfiguration()) - 1;
1869 if (numWorkers == 0) {
1870
1871
1872 numWorkers += 1;
1873 }
1874
1875
1876 while (SystemTime.get().getMilliseconds() < waitForCountersTimeout) {
1877 try {
1878 workerFinishedPathList = getZkExt().getChildrenExt(
1879 workerFinishedPath, true,
1880 false, true);
1881 LOG.info(String.format("Fetching counter values from " +
1882 "workers for superstep %d. Got %d out of %d",
1883 superstep, workerFinishedPathList.size(), numWorkers));
1884 if (workerFinishedPathList.size() == numWorkers) {
1885 break;
1886 }
1887 } catch (KeeperException e) {
1888 LOG.warn("Got Keeper exception, but will retry: ", e);
1889 } catch (InterruptedException e) {
1890 LOG.warn("aggregateCounters: InterruptedException", e);
1891 }
1892 getWrittenCountersToZKEvent().waitMsecs(eventWaitMsecs);
1893 getWrittenCountersToZKEvent().reset();
1894 }
1895 for (String finishedPath : workerFinishedPathList) {
1896 JSONArray jsonCounters = null;
1897 try {
1898 byte [] zkData =
1899 getZkExt().getData(finishedPath, false, null);
1900 jsonCounters = new JSONArray(new String(zkData,
1901 Charset.defaultCharset()));
1902 Set<CustomCounter> workerCounters = new HashSet<>();
1903 for (int i = 0; i < jsonCounters.length(); i++) {
1904 CustomCounter customCounter = new CustomCounter();
1905 WritableUtils.readFieldsFromByteArray(Base64.decode(
1906 jsonCounters.getString(i)), customCounter);
1907 workerCounters.add(customCounter);
1908 }
1909 customCounters.mergeCounters(workerCounters);
1910 } catch (JSONException e) {
1911 LOG.warn("aggregateCounters: JSONException", e);
1912 } catch (KeeperException e) {
1913 LOG.warn("aggregateCounters: KeeperException", e);
1914 } catch (InterruptedException e) {
1915 LOG.warn("aggregateCounters: InterruptedException", e);
1916 } catch (IOException e) {
1917 LOG.warn("aggregateCounters: IOException", e);
1918 }
1919 }
1920 Mapper.Context context = getContext();
1921 Set<CustomCounter> masterCounters = new HashSet<>();
1922
1923 if (numWorkers != 1) {
1924
1925
1926
1927 Counter counter;
1928 Set<CustomCounter> masterCounterNames =
1929 CustomCounters.getAndClearCustomCounters();
1930 for (CustomCounter customCounter : masterCounterNames) {
1931 String groupName = customCounter.getGroupName();
1932 String counterName = customCounter.getCounterName();
1933 counter = context.getCounter(groupName, counterName);
1934 customCounter.setValue(counter.getValue());
1935 masterCounters.add(customCounter);
1936 }
1937
1938 Map<String, Set<String>> nettyCounters =
1939 NettyClient.getCounterGroupsAndNames();
1940 populateCountersFromContext(context, nettyCounters, masterCounters);
1941 }
1942
1943
1944
1945 Map<String, Set<String>> inputSplitCounter =
1946 MasterInputSplitsHandler.getCounterGroupAndNames();
1947 populateCountersFromContext(context, inputSplitCounter, masterCounters);
1948 customCounters.mergeCounters(masterCounters);
1949
1950 List<CustomCounter> allCounters = new ArrayList<>();
1951 allCounters.addAll(GiraphStats.getInstance().getCounterList());
1952
1953 allCounters.addAll(customCounters.getCounterList());
1954
1955 giraphCountersThriftStruct.setCounters(allCounters);
1956 }
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966 public void addGiraphTimersAndSendCounters(long superstep) {
1967 List<CustomCounter> giraphCounters =
1968 giraphCountersThriftStruct.getCounters();
1969 giraphCounters.addAll(GiraphTimers.getInstance().getCounterList(superstep));
1970 giraphCountersThriftStruct.setCounters(giraphCounters);
1971 getJobProgressTracker().sendMasterCounters(giraphCountersThriftStruct);
1972 }
1973
1974
1975
1976
1977
1978 private void waitForWorkersToCleanup() {
1979 try {
1980 getZkExt().createExt(cleanedUpPath,
1981 null,
1982 Ids.OPEN_ACL_UNSAFE,
1983 CreateMode.PERSISTENT,
1984 true);
1985 } catch (KeeperException.NodeExistsException e) {
1986 if (LOG.isInfoEnabled()) {
1987 LOG.info("cleanUpZooKeeper: Node " + cleanedUpPath +
1988 " already exists, no need to create.");
1989 }
1990 } catch (KeeperException e) {
1991 throw new IllegalStateException(
1992 "cleanupZooKeeper: Got KeeperException", e);
1993 } catch (InterruptedException e) {
1994 throw new IllegalStateException(
1995 "cleanupZooKeeper: Got IllegalStateException", e);
1996 }
1997
1998 int maxTasks = BspInputFormat.getMaxTasks(getConfiguration());
1999 if ((getGraphTaskManager().getGraphFunctions() == GraphFunctions.ALL) ||
2000 (getGraphTaskManager().getGraphFunctions() ==
2001 GraphFunctions.ALL_EXCEPT_ZOOKEEPER)) {
2002 maxTasks *= 2;
2003 }
2004 List<String> cleanedUpChildrenList = null;
2005 while (true) {
2006 try {
2007 cleanedUpChildrenList =
2008 getZkExt().getChildrenExt(
2009 cleanedUpPath, true, false, true);
2010 if (LOG.isInfoEnabled()) {
2011 LOG.info("cleanUpZooKeeper: Got " +
2012 cleanedUpChildrenList.size() + " of " +
2013 maxTasks + " desired children from " +
2014 cleanedUpPath);
2015 }
2016 if (cleanedUpChildrenList.size() == maxTasks) {
2017 break;
2018 }
2019 if (LOG.isInfoEnabled()) {
2020 LOG.info("cleanedUpZooKeeper: Waiting for the " +
2021 "children of " + cleanedUpPath +
2022 " to change since only got " +
2023 cleanedUpChildrenList.size() + " nodes.");
2024 }
2025 } catch (KeeperException e) {
2026
2027 LOG.error("cleanUpZooKeeper: Got KeeperException, " +
2028 "but will continue", e);
2029 return;
2030 } catch (InterruptedException e) {
2031
2032 LOG.error("cleanUpZooKeeper: Got InterruptedException, " +
2033 "but will continue", e);
2034 return;
2035 }
2036
2037 getCleanedUpChildrenChangedEvent().waitForTimeoutOrFail(
2038 GiraphConstants.WAIT_FOR_OTHER_WORKERS_TIMEOUT_MSEC.get(
2039 getConfiguration()));
2040 getCleanedUpChildrenChangedEvent().reset();
2041 }
2042 }
2043
2044
2045
2046
2047
2048 private void cleanUpZooKeeper() {
2049
2050
2051
2052 try {
2053 if (getConfiguration().isZookeeperExternal() &&
2054 KEEP_ZOOKEEPER_DATA.isFalse(getConfiguration())) {
2055 if (LOG.isInfoEnabled()) {
2056 LOG.info("cleanupZooKeeper: Removing the following path " +
2057 "and all children - " + basePath + " from ZooKeeper list " +
2058 getConfiguration().getZookeeperList());
2059 }
2060 getZkExt().deleteExt(basePath, -1, true);
2061 }
2062 } catch (KeeperException e) {
2063 LOG.error("cleanupZooKeeper: Failed to do cleanup of " +
2064 basePath + " due to KeeperException", e);
2065 } catch (InterruptedException e) {
2066 LOG.error("cleanupZooKeeper: Failed to do cleanup of " +
2067 basePath + " due to InterruptedException", e);
2068 }
2069 }
2070
2071 @Override
2072 public void postApplication() {
2073 for (MasterObserver observer : observers) {
2074 observer.postApplication();
2075 getContext().progress();
2076 }
2077 }
2078
2079 @Override
2080 public void postSuperstep() {
2081 for (MasterObserver observer : observers) {
2082 observer.postSuperstep(getSuperstep());
2083 getContext().progress();
2084 }
2085 }
2086
2087 @Override
2088 public void failureCleanup(Exception e) {
2089 for (MasterObserver observer : observers) {
2090 try {
2091 observer.applicationFailed(e);
2092
2093 } catch (RuntimeException re) {
2094
2095 LOG.error(re.getClass().getName() + " from observer " +
2096 observer.getClass().getName(), re);
2097 }
2098 getContext().progress();
2099 }
2100 }
2101
2102 @Override
2103 public void cleanup(SuperstepState superstepState) throws IOException {
2104 ImmutableClassesGiraphConfiguration conf = getConfiguration();
2105
2106
2107
2108
2109
2110 String masterCleanedUpPath = cleanedUpPath + "/" +
2111 getTaskId() + MASTER_SUFFIX;
2112 try {
2113 String finalFinishedPath =
2114 getZkExt().createExt(masterCleanedUpPath,
2115 null,
2116 Ids.OPEN_ACL_UNSAFE,
2117 CreateMode.PERSISTENT,
2118 true);
2119 if (LOG.isInfoEnabled()) {
2120 LOG.info("cleanup: Notifying master its okay to cleanup with " +
2121 finalFinishedPath);
2122 }
2123 } catch (KeeperException.NodeExistsException e) {
2124 if (LOG.isInfoEnabled()) {
2125 LOG.info("cleanup: Couldn't create finished node '" +
2126 masterCleanedUpPath);
2127 }
2128 } catch (KeeperException e) {
2129 LOG.error("cleanup: Got KeeperException, continuing", e);
2130 } catch (InterruptedException e) {
2131 LOG.error("cleanup: Got InterruptedException, continuing", e);
2132 }
2133
2134 if (isMaster) {
2135 getGraphTaskManager().setIsMaster(true);
2136 waitForWorkersToCleanup();
2137 aggregateCountersFromWorkersAndMaster();
2138 cleanUpZooKeeper();
2139
2140 if (superstepState == SuperstepState.ALL_SUPERSTEPS_DONE &&
2141 GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.get(conf)) {
2142 boolean success =
2143 getFs().delete(new Path(checkpointBasePath), true);
2144 if (LOG.isInfoEnabled()) {
2145 LOG.info("cleanup: Removed HDFS checkpoint directory (" +
2146 checkpointBasePath + ") with return = " +
2147 success + " since the job " + getContext().getJobName() +
2148 " succeeded ");
2149 }
2150 }
2151 if (superstepState == SuperstepState.CHECKPOINT_AND_HALT) {
2152 getFs().create(CheckpointingUtils.getCheckpointMarkPath(conf,
2153 getJobId()), true);
2154 failJob(new Exception("Checkpoint and halt requested. " +
2155 "Killing this job."));
2156 }
2157 globalCommHandler.getAggregatorHandler().close();
2158 masterClient.closeConnections();
2159 masterServer.close();
2160 }
2161
2162 try {
2163 getZkExt().close();
2164 } catch (InterruptedException e) {
2165
2166 LOG.error("cleanup: Zookeeper failed to close", e);
2167 }
2168 }
2169
2170
2171
2172
2173
2174
2175 public final BspEvent getWorkerWroteCheckpointEvent() {
2176 return workerWroteCheckpoint;
2177 }
2178
2179
2180
2181
2182
2183
2184
2185 public final BspEvent getSuperstepStateChangedEvent() {
2186 return superstepStateChanged;
2187 }
2188
2189
2190
2191
2192
2193
2194 private void checkHealthyWorkerFailure(String failedWorkerPath) {
2195 if (getSuperstepFromPath(failedWorkerPath) < getSuperstep()) {
2196 return;
2197 }
2198
2199 Collection<PartitionOwner> partitionOwners =
2200 masterGraphPartitioner.getCurrentPartitionOwners();
2201 String hostnameId =
2202 getHealthyHostnameIdFromPath(failedWorkerPath);
2203 for (PartitionOwner partitionOwner : partitionOwners) {
2204 WorkerInfo workerInfo = partitionOwner.getWorkerInfo();
2205 WorkerInfo previousWorkerInfo =
2206 partitionOwner.getPreviousWorkerInfo();
2207 if (workerInfo.getHostnameId().equals(hostnameId) ||
2208 ((previousWorkerInfo != null) &&
2209 previousWorkerInfo.getHostnameId().equals(hostnameId))) {
2210 LOG.warn("checkHealthyWorkerFailure: " +
2211 "at least one healthy worker went down " +
2212 "for superstep " + getSuperstep() + " - " +
2213 hostnameId + ", will try to restart from " +
2214 "checkpointed superstep " +
2215 lastCheckpointedSuperstep);
2216 superstepStateChanged.signal();
2217 }
2218 }
2219 }
2220
2221 @Override
2222 public boolean processEvent(WatchedEvent event) {
2223 boolean foundEvent = false;
2224 if (event.getPath().contains(WORKER_HEALTHY_DIR) &&
2225 (event.getType() == EventType.NodeDeleted)) {
2226 if (LOG.isDebugEnabled()) {
2227 LOG.debug("processEvent: Healthy worker died (node deleted) " +
2228 "in " + event.getPath());
2229 }
2230 checkHealthyWorkerFailure(event.getPath());
2231 superstepStateChanged.signal();
2232 foundEvent = true;
2233 } else if (event.getPath().endsWith(METRICS_DIR) &&
2234 event.getType() == EventType.NodeChildrenChanged) {
2235 if (LOG.isDebugEnabled()) {
2236 LOG.debug("processEvent: Worker finished (node change) " +
2237 "event - superstepStateChanged signaled");
2238 }
2239 superstepStateChanged.signal();
2240 foundEvent = true;
2241 } else if (event.getPath().contains(WORKER_WROTE_CHECKPOINT_DIR) &&
2242 event.getType() == EventType.NodeChildrenChanged) {
2243 if (LOG.isDebugEnabled()) {
2244 LOG.debug("processEvent: Worker wrote checkpoint (node change) " +
2245 "event - workerWroteCheckpoint signaled");
2246 }
2247 workerWroteCheckpoint.signal();
2248 foundEvent = true;
2249 }
2250
2251 return foundEvent;
2252 }
2253
2254
2255
2256
2257
2258
2259 private void updateCounters(GlobalStats globalStats) {
2260 GiraphStats gs = GiraphStats.getInstance();
2261 gs.getVertices().setValue(globalStats.getVertexCount());
2262 gs.getFinishedVertexes().setValue(globalStats.getFinishedVertexCount());
2263 gs.getEdges().setValue(globalStats.getEdgeCount());
2264 gs.getSentMessages().setValue(globalStats.getMessageCount());
2265 gs.getSentMessageBytes().setValue(globalStats.getMessageBytesCount());
2266 gs.getAggregateSentMessages().increment(globalStats.getMessageCount());
2267 gs.getAggregateSentMessageBytes()
2268 .increment(globalStats.getMessageBytesCount());
2269 gs.getAggregateOOCBytesLoaded()
2270 .increment(globalStats.getOocLoadBytesCount());
2271 gs.getAggregateOOCBytesStored()
2272 .increment(globalStats.getOocStoreBytesCount());
2273
2274
2275 int percentage = (int) gs.getLowestGraphPercentageInMemory().getValue();
2276 gs.getLowestGraphPercentageInMemory().setValue(
2277 Math.min(percentage, globalStats.getLowestGraphPercentageInMemory()));
2278 }
2279 }