View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.master;
20  
21  import com.google.common.collect.Lists;
22  import com.google.common.collect.Sets;
23  import net.iharder.Base64;
24  import org.apache.commons.io.FilenameUtils;
25  import org.apache.giraph.bsp.ApplicationState;
26  import org.apache.giraph.bsp.BspInputFormat;
27  import org.apache.giraph.bsp.BspService;
28  import org.apache.giraph.bsp.CentralizedServiceMaster;
29  import org.apache.giraph.bsp.SuperstepState;
30  import org.apache.giraph.bsp.checkpoints.CheckpointStatus;
31  import org.apache.giraph.bsp.checkpoints.CheckpointSupportedChecker;
32  import org.apache.giraph.comm.MasterClient;
33  import org.apache.giraph.comm.MasterServer;
34  import org.apache.giraph.comm.netty.NettyMasterClient;
35  import org.apache.giraph.comm.netty.NettyMasterServer;
36  import org.apache.giraph.comm.requests.AddressesAndPartitionsRequest;
37  import org.apache.giraph.conf.GiraphConfiguration;
38  import org.apache.giraph.conf.GiraphConstants;
39  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
40  import org.apache.giraph.counters.GiraphStats;
41  import org.apache.giraph.graph.AddressesAndPartitionsWritable;
42  import org.apache.giraph.graph.GlobalStats;
43  import org.apache.giraph.graph.GraphFunctions;
44  import org.apache.giraph.graph.GraphState;
45  import org.apache.giraph.graph.GraphTaskManager;
46  import org.apache.giraph.io.EdgeInputFormat;
47  import org.apache.giraph.io.GiraphInputFormat;
48  import org.apache.giraph.io.InputType;
49  import org.apache.giraph.io.MappingInputFormat;
50  import org.apache.giraph.io.VertexInputFormat;
51  import org.apache.giraph.master.input.MasterInputSplitsHandler;
52  import org.apache.giraph.metrics.AggregatedMetrics;
53  import org.apache.giraph.metrics.GiraphMetrics;
54  import org.apache.giraph.metrics.GiraphTimer;
55  import org.apache.giraph.metrics.GiraphTimerContext;
56  import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
57  import org.apache.giraph.metrics.SuperstepMetricsRegistry;
58  import org.apache.giraph.metrics.WorkerSuperstepMetrics;
59  import org.apache.giraph.partition.BasicPartitionOwner;
60  import org.apache.giraph.partition.MasterGraphPartitioner;
61  import org.apache.giraph.partition.PartitionOwner;
62  import org.apache.giraph.partition.PartitionStats;
63  import org.apache.giraph.partition.PartitionUtils;
64  import org.apache.giraph.time.SystemTime;
65  import org.apache.giraph.time.Time;
66  import org.apache.giraph.utils.CheckpointingUtils;
67  import org.apache.giraph.utils.JMapHistoDumper;
68  import org.apache.giraph.utils.ReactiveJMapHistoDumper;
69  import org.apache.giraph.utils.ReflectionUtils;
70  import org.apache.giraph.utils.WritableUtils;
71  import org.apache.giraph.worker.WorkerInfo;
72  import org.apache.giraph.zk.BspEvent;
73  import org.apache.giraph.zk.PredicateLock;
74  import org.apache.hadoop.fs.FSDataOutputStream;
75  import org.apache.hadoop.fs.FileSystem;
76  import org.apache.hadoop.fs.Path;
77  import org.apache.hadoop.io.Writable;
78  import org.apache.hadoop.io.WritableComparable;
79  import org.apache.hadoop.mapred.JobID;
80  import org.apache.hadoop.mapred.RunningJob;
81  import org.apache.hadoop.mapreduce.InputSplit;
82  import org.apache.hadoop.mapreduce.Mapper;
83  import org.apache.log4j.Logger;
84  import org.apache.zookeeper.CreateMode;
85  import org.apache.zookeeper.KeeperException;
86  import org.apache.zookeeper.WatchedEvent;
87  import org.apache.zookeeper.Watcher.Event.EventType;
88  import org.apache.zookeeper.ZooDefs.Ids;
89  import org.json.JSONException;
90  import org.json.JSONObject;
91  
92  import java.io.DataInputStream;
93  import java.io.IOException;
94  import java.io.PrintStream;
95  import java.nio.charset.Charset;
96  import java.util.ArrayList;
97  import java.util.Collection;
98  import java.util.Collections;
99  import java.util.Comparator;
100 import java.util.HashSet;
101 import java.util.List;
102 import java.util.Set;
103 import java.util.TreeSet;
104 import java.util.concurrent.TimeUnit;
105 
106 import static org.apache.giraph.conf.GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT;
107 import static org.apache.giraph.conf.GiraphConstants.KEEP_ZOOKEEPER_DATA;
108 import static org.apache.giraph.conf.GiraphConstants.PARTITION_LONG_TAIL_MIN_PRINT;
109 
110 /**
111  * ZooKeeper-based implementation of {@link CentralizedServiceMaster}.
112  *
113  * @param <I> Vertex id
114  * @param <V> Vertex data
115  * @param <E> Edge data
116  */
117 @SuppressWarnings("rawtypes, unchecked")
118 public class BspServiceMaster<I extends WritableComparable,
119     V extends Writable, E extends Writable>
120     extends BspService<I, V, E>
121     implements CentralizedServiceMaster<I, V, E>,
122     ResetSuperstepMetricsObserver {
123   /** Print worker names only if there are 10 workers left */
124   public static final int MAX_PRINTABLE_REMAINING_WORKERS = 10;
125   /** How many threads to use when writing input splits to zookeeper*/
126   public static final String NUM_MASTER_ZK_INPUT_SPLIT_THREADS =
127       "giraph.numMasterZkInputSplitThreads";
128   /** Default number of threads to use when writing input splits to zookeeper */
129   public static final int DEFAULT_INPUT_SPLIT_THREAD_COUNT = 1;
130   /** Time instance to use for timing */
131   private static final Time TIME = SystemTime.get();
132   /** Class logger */
133   private static final Logger LOG = Logger.getLogger(BspServiceMaster.class);
134   /** Am I the master? */
135   private boolean isMaster = false;
136   /** Max number of workers */
137   private final int maxWorkers;
138   /** Min number of workers */
139   private final int minWorkers;
140   /** Max number of supersteps */
141   private final int maxNumberOfSupersteps;
142   /** Min % responded workers */
143   private final float minPercentResponded;
144   /** Msecs to wait for an event */
145   private final int eventWaitMsecs;
146   /** Max msecs to wait for a superstep to get enough workers */
147   private final int maxSuperstepWaitMsecs;
148   /** Min number of long tails before printing */
149   private final int partitionLongTailMinPrint;
150   /** Last finalized checkpoint */
151   private long lastCheckpointedSuperstep = -1;
152   /** Worker wrote checkpoint */
153   private final BspEvent workerWroteCheckpoint;
154   /** State of the superstep changed */
155   private final BspEvent superstepStateChanged;
156   /** Master graph partitioner */
157   private final MasterGraphPartitioner<I, V, E> masterGraphPartitioner;
158   /** All the partition stats from the last superstep */
159   private final List<PartitionStats> allPartitionStatsList =
160       new ArrayList<PartitionStats>();
161   /** Handler for global communication */
162   private MasterGlobalCommHandler globalCommHandler;
163   /** Handler for aggregators to reduce/broadcast translation */
164   private AggregatorToGlobalCommTranslation aggregatorTranslation;
165   /** Master class */
166   private MasterCompute masterCompute;
167   /** IPC Client */
168   private MasterClient masterClient;
169   /** IPC Server */
170   private MasterServer masterServer;
171   /** Master info */
172   private MasterInfo masterInfo;
173   /** List of workers in current superstep, sorted by task id */
174   private List<WorkerInfo> chosenWorkerInfoList = Lists.newArrayList();
175   /** Observers over master lifecycle. */
176   private final MasterObserver[] observers;
177 
178   // Per-Superstep Metrics
179   /** MasterCompute time */
180   private GiraphTimer masterComputeTimer;
181 
182   /** Checkpoint frequency */
183   private final int checkpointFrequency;
184   /** Current checkpoint status */
185   private CheckpointStatus checkpointStatus;
186   /** Checks if checkpointing supported */
187   private final CheckpointSupportedChecker checkpointSupportedChecker;
188 
189   /**
190    * Constructor for setting up the master.
191    *
192    * @param context Mapper context
193    * @param graphTaskManager GraphTaskManager for this compute node
194    */
195   public BspServiceMaster(
196       Mapper<?, ?, ?, ?>.Context context,
197       GraphTaskManager<I, V, E> graphTaskManager) {
198     super(context, graphTaskManager);
199     workerWroteCheckpoint = new PredicateLock(context);
200     registerBspEvent(workerWroteCheckpoint);
201     superstepStateChanged = new PredicateLock(context);
202     registerBspEvent(superstepStateChanged);
203 
204     ImmutableClassesGiraphConfiguration<I, V, E> conf =
205         getConfiguration();
206 
207     maxWorkers = conf.getMaxWorkers();
208     minWorkers = conf.getMinWorkers();
209     maxNumberOfSupersteps = conf.getMaxNumberOfSupersteps();
210     minPercentResponded = GiraphConstants.MIN_PERCENT_RESPONDED.get(conf);
211     eventWaitMsecs = conf.getEventWaitMsecs();
212     maxSuperstepWaitMsecs = conf.getMaxMasterSuperstepWaitMsecs();
213     partitionLongTailMinPrint = PARTITION_LONG_TAIL_MIN_PRINT.get(conf);
214     masterGraphPartitioner =
215         getGraphPartitionerFactory().createMasterGraphPartitioner();
216     if (conf.isJMapHistogramDumpEnabled()) {
217       conf.addMasterObserverClass(JMapHistoDumper.class);
218     }
219     if (conf.isReactiveJmapHistogramDumpEnabled()) {
220       conf.addMasterObserverClass(ReactiveJMapHistoDumper.class);
221     }
222     observers = conf.createMasterObservers(context);
223 
224     this.checkpointFrequency = conf.getCheckpointFrequency();
225     this.checkpointStatus = CheckpointStatus.NONE;
226     this.checkpointSupportedChecker =
227         ReflectionUtils.newInstance(
228             GiraphConstants.CHECKPOINT_SUPPORTED_CHECKER.get(conf));
229 
230     GiraphMetrics.get().addSuperstepResetObserver(this);
231     GiraphStats.init(context);
232   }
233 
234   @Override
235   public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
236     masterComputeTimer = new GiraphTimer(superstepMetrics,
237         "master-compute-call", TimeUnit.MILLISECONDS);
238   }
239 
240   @Override
241   public void setJobState(ApplicationState state,
242       long applicationAttempt,
243       long desiredSuperstep) {
244     setJobState(state, applicationAttempt, desiredSuperstep, true);
245   }
246 
247   /**
248    * Set the job state.
249    *
250    * @param state State of the application.
251    * @param applicationAttempt Attempt to start on
252    * @param desiredSuperstep Superstep to restart from (if applicable)
253    * @param killJobOnFailure if true, and the desired state is FAILED,
254    *                         then kill this job.
255    */
256   private void setJobState(ApplicationState state,
257       long applicationAttempt,
258       long desiredSuperstep,
259       boolean killJobOnFailure) {
260     JSONObject jobState = new JSONObject();
261     try {
262       jobState.put(JSONOBJ_STATE_KEY, state.toString());
263       jobState.put(JSONOBJ_APPLICATION_ATTEMPT_KEY, applicationAttempt);
264       jobState.put(JSONOBJ_SUPERSTEP_KEY, desiredSuperstep);
265     } catch (JSONException e) {
266       throw new RuntimeException("setJobState: Couldn't put " +
267           state.toString());
268     }
269     if (LOG.isInfoEnabled()) {
270       LOG.info("setJobState: " + jobState.toString() + " on superstep " +
271           getSuperstep());
272     }
273     try {
274       getZkExt().createExt(masterJobStatePath + "/jobState",
275           jobState.toString().getBytes(Charset.defaultCharset()),
276           Ids.OPEN_ACL_UNSAFE,
277           CreateMode.PERSISTENT_SEQUENTIAL,
278           true);
279       LOG.info("setJobState: " + jobState);
280     } catch (KeeperException.NodeExistsException e) {
281       throw new IllegalStateException(
282           "setJobState: Imposible that " +
283               masterJobStatePath + " already exists!", e);
284     } catch (KeeperException e) {
285       throw new IllegalStateException(
286           "setJobState: Unknown KeeperException for " +
287               masterJobStatePath, e);
288     } catch (InterruptedException e) {
289       throw new IllegalStateException(
290           "setJobState: Unknown InterruptedException for " +
291               masterJobStatePath, e);
292     }
293     if (state == ApplicationState.FAILED && killJobOnFailure) {
294       failJob(new IllegalStateException("FAILED"));
295     }
296 
297   }
298 
299   /**
300    * Set the job state to FAILED. This will kill the job, and log exceptions to
301    * any observers.
302    *
303    * @param reason The reason the job failed
304    */
305   private void setJobStateFailed(String reason) {
306     getGraphTaskManager().getJobProgressTracker().logFailure(reason);
307     setJobState(ApplicationState.FAILED, -1, -1, false);
308     failJob(new IllegalStateException(reason));
309   }
310 
311   /**
312    * Common method for generating vertex/edge input splits.
313    *
314    * @param inputFormat The vertex/edge input format
315    * @param minSplitCountHint Minimum number of splits to create (hint)
316    * @param inputSplitType Type of input splits (for logging purposes)
317    * @return List of input splits for the given format
318    */
319   private List<InputSplit> generateInputSplits(GiraphInputFormat inputFormat,
320                                                int minSplitCountHint,
321                                                InputType inputSplitType) {
322     String logPrefix = "generate" + inputSplitType + "InputSplits";
323     List<InputSplit> splits;
324     try {
325       splits = inputFormat.getSplits(getContext(), minSplitCountHint);
326     } catch (IOException e) {
327       throw new IllegalStateException(logPrefix + ": Got IOException", e);
328     } catch (InterruptedException e) {
329       throw new IllegalStateException(
330           logPrefix + ": Got InterruptedException", e);
331     }
332     float samplePercent =
333         INPUT_SPLIT_SAMPLE_PERCENT.get(getConfiguration());
334     if (samplePercent != INPUT_SPLIT_SAMPLE_PERCENT.getDefaultValue()) {
335       int lastIndex = (int) (samplePercent * splits.size() / 100f);
336       Collections.shuffle(splits);
337       List<InputSplit> sampleSplits = splits.subList(0, lastIndex);
338       LOG.warn(logPrefix + ": Using sampling - Processing only " +
339           sampleSplits.size() + " instead of " + splits.size() +
340           " expected splits.");
341       return sampleSplits;
342     } else {
343       if (LOG.isInfoEnabled()) {
344         LOG.info(logPrefix + ": Got " + splits.size() +
345             " input splits for " + minSplitCountHint + " input threads");
346       }
347       return splits;
348     }
349   }
350 
351   /**
352    * When there is no salvaging this job, fail it.
353    *
354    * @param e Exception to log to observers
355    */
356   private void failJob(Exception e) {
357     LOG.fatal("failJob: Killing job " + getJobId());
358     LOG.fatal("failJob: exception " + e.toString());
359     try {
360       if (getConfiguration().isPureYarnJob()) {
361         throw new RuntimeException(
362           "BspServiceMaster (YARN profile) is " +
363           "FAILING this task, throwing exception to end job run.", e);
364       } else {
365         @SuppressWarnings("deprecation")
366         org.apache.hadoop.mapred.JobClient jobClient =
367           new org.apache.hadoop.mapred.JobClient(
368             (org.apache.hadoop.mapred.JobConf)
369             getContext().getConfiguration());
370         try {
371           @SuppressWarnings("deprecation")
372           JobID jobId = JobID.forName(getJobId());
373           RunningJob job = jobClient.getJob(jobId);
374           if (job != null) {
375             job.killJob();
376           } else {
377             LOG.error("Job not found for jobId=" + getJobId());
378           }
379         } catch (IllegalArgumentException iae) {
380           LOG.info("This job (" + getJobId() +
381                        ") is not a legacy Hadoop job and will " +
382                        "continue with failure cleanup." +
383                        e.getMessage(),
384                    e);
385         }
386       }
387     } catch (IOException ioe) {
388       throw new RuntimeException(ioe);
389     } finally {
390       failureCleanup(e);
391     }
392   }
393 
394   /**
395    * Parse the {@link WorkerInfo} objects from a ZooKeeper path
396    * (and children).
397    *
398    * @param workerInfosPath Path where all the workers are children
399    * @param watch Watch or not?
400    * @return List of workers in that path
401    */
402   private List<WorkerInfo> getWorkerInfosFromPath(String workerInfosPath,
403       boolean watch) {
404     List<WorkerInfo> workerInfoList = new ArrayList<WorkerInfo>();
405     List<String> workerInfoPathList;
406     try {
407       workerInfoPathList =
408           getZkExt().getChildrenExt(workerInfosPath, watch, false, true);
409     } catch (KeeperException e) {
410       throw new IllegalStateException(
411           "getWorkers: Got KeeperException", e);
412     } catch (InterruptedException e) {
413       throw new IllegalStateException(
414           "getWorkers: Got InterruptedStateException", e);
415     }
416     for (String workerInfoPath : workerInfoPathList) {
417       WorkerInfo workerInfo = new WorkerInfo();
418       try {
419         WritableUtils.readFieldsFromZnode(
420             getZkExt(), workerInfoPath, true, null, workerInfo);
421         workerInfoList.add(workerInfo);
422       } catch (IllegalStateException e) {
423         LOG.warn("Can't get info from worker, did it die in between? " +
424             "workerInfoPath=" + workerInfoPath, e);
425       }
426     }
427     return workerInfoList;
428   }
429 
430   /**
431    * Get the healthy and unhealthy {@link WorkerInfo} objects for
432    * a superstep
433    *
434    * @param superstep superstep to check
435    * @param healthyWorkerInfoList filled in with current data
436    * @param unhealthyWorkerInfoList filled in with current data
437    */
438   private void getAllWorkerInfos(
439       long superstep,
440       List<WorkerInfo> healthyWorkerInfoList,
441       List<WorkerInfo> unhealthyWorkerInfoList) {
442     String healthyWorkerInfoPath =
443         getWorkerInfoHealthyPath(getApplicationAttempt(), superstep);
444     String unhealthyWorkerInfoPath =
445         getWorkerInfoUnhealthyPath(getApplicationAttempt(), superstep);
446 
447     try {
448       getZkExt().createOnceExt(healthyWorkerInfoPath,
449           null,
450           Ids.OPEN_ACL_UNSAFE,
451           CreateMode.PERSISTENT,
452           true);
453     } catch (KeeperException e) {
454       throw new IllegalStateException("getWorkers: KeeperException", e);
455     } catch (InterruptedException e) {
456       throw new IllegalStateException("getWorkers: InterruptedException", e);
457     }
458 
459     try {
460       getZkExt().createOnceExt(unhealthyWorkerInfoPath,
461           null,
462           Ids.OPEN_ACL_UNSAFE,
463           CreateMode.PERSISTENT,
464           true);
465     } catch (KeeperException e) {
466       throw new IllegalStateException("getWorkers: KeeperException", e);
467     } catch (InterruptedException e) {
468       throw new IllegalStateException("getWorkers: InterruptedException", e);
469     }
470 
471     List<WorkerInfo> currentHealthyWorkerInfoList =
472         getWorkerInfosFromPath(healthyWorkerInfoPath, true);
473     List<WorkerInfo> currentUnhealthyWorkerInfoList =
474         getWorkerInfosFromPath(unhealthyWorkerInfoPath, false);
475 
476     healthyWorkerInfoList.clear();
477     if (currentHealthyWorkerInfoList != null) {
478       for (WorkerInfo healthyWorkerInfo :
479         currentHealthyWorkerInfoList) {
480         healthyWorkerInfoList.add(healthyWorkerInfo);
481       }
482     }
483 
484     unhealthyWorkerInfoList.clear();
485     if (currentUnhealthyWorkerInfoList != null) {
486       for (WorkerInfo unhealthyWorkerInfo :
487         currentUnhealthyWorkerInfoList) {
488         unhealthyWorkerInfoList.add(unhealthyWorkerInfo);
489       }
490     }
491   }
492 
493   @Override
494   public List<WorkerInfo> checkWorkers() {
495     boolean failJob = true;
496     long failWorkerCheckMsecs =
497         SystemTime.get().getMilliseconds() + maxSuperstepWaitMsecs;
498     List<WorkerInfo> healthyWorkerInfoList = new ArrayList<WorkerInfo>();
499     List<WorkerInfo> unhealthyWorkerInfoList = new ArrayList<WorkerInfo>();
500     int totalResponses = -1;
501     while (SystemTime.get().getMilliseconds() < failWorkerCheckMsecs) {
502       getContext().progress();
503       getAllWorkerInfos(
504           getSuperstep(), healthyWorkerInfoList, unhealthyWorkerInfoList);
505       totalResponses = healthyWorkerInfoList.size() +
506           unhealthyWorkerInfoList.size();
507       if ((totalResponses * 100.0f / maxWorkers) >=
508           minPercentResponded) {
509         failJob = false;
510         break;
511       }
512       getContext().setStatus(getGraphTaskManager().getGraphFunctions() + " " +
513           "checkWorkers: Only found " +
514           totalResponses +
515           " responses of " + maxWorkers +
516           " needed to start superstep " +
517           getSuperstep());
518       if (getWorkerHealthRegistrationChangedEvent().waitMsecs(
519           eventWaitMsecs)) {
520         if (LOG.isDebugEnabled()) {
521           LOG.debug("checkWorkers: Got event that health " +
522               "registration changed, not using poll attempt");
523         }
524         getWorkerHealthRegistrationChangedEvent().reset();
525         continue;
526       }
527       if (LOG.isInfoEnabled()) {
528         LOG.info("checkWorkers: Only found " + totalResponses +
529             " responses of " + maxWorkers +
530             " needed to start superstep " +
531             getSuperstep() + ".  Reporting every " +
532             eventWaitMsecs + " msecs, " +
533             (failWorkerCheckMsecs - SystemTime.get().getMilliseconds()) +
534             " more msecs left before giving up.");
535         // Find the missing workers if there are only a few
536         if ((maxWorkers - totalResponses) <=
537             partitionLongTailMinPrint) {
538           logMissingWorkersOnSuperstep(healthyWorkerInfoList,
539               unhealthyWorkerInfoList);
540         }
541       }
542     }
543     if (failJob) {
544       LOG.error("checkWorkers: Did not receive enough processes in " +
545           "time (only " + totalResponses + " of " +
546           minWorkers + " required) after waiting " + maxSuperstepWaitMsecs +
547           "msecs).  This occurs if you do not have enough map tasks " +
548           "available simultaneously on your Hadoop instance to fulfill " +
549           "the number of requested workers.");
550       return null;
551     }
552 
553     if (healthyWorkerInfoList.size() < minWorkers) {
554       LOG.error("checkWorkers: Only " + healthyWorkerInfoList.size() +
555           " available when " + minWorkers + " are required.");
556       logMissingWorkersOnSuperstep(healthyWorkerInfoList,
557           unhealthyWorkerInfoList);
558       return null;
559     }
560 
561     getContext().setStatus(getGraphTaskManager().getGraphFunctions() + " " +
562         "checkWorkers: Done - Found " + totalResponses +
563         " responses of " + maxWorkers + " needed to start superstep " +
564         getSuperstep());
565 
566     return healthyWorkerInfoList;
567   }
568 
569   /**
570    * Log info level of the missing workers on the superstep
571    *
572    * @param healthyWorkerInfoList Healthy worker list
573    * @param unhealthyWorkerInfoList Unhealthy worker list
574    */
575   private void logMissingWorkersOnSuperstep(
576       List<WorkerInfo> healthyWorkerInfoList,
577       List<WorkerInfo> unhealthyWorkerInfoList) {
578     if (LOG.isInfoEnabled()) {
579       Set<Integer> partitionSet = new TreeSet<Integer>();
580       for (WorkerInfo workerInfo : healthyWorkerInfoList) {
581         partitionSet.add(workerInfo.getTaskId() % maxWorkers);
582       }
583       for (WorkerInfo workerInfo : unhealthyWorkerInfoList) {
584         partitionSet.add(workerInfo.getTaskId() % maxWorkers);
585       }
586       for (int i = 1; i <= maxWorkers; ++i) {
587         if (partitionSet.contains(Integer.valueOf(i))) {
588           continue;
589         } else if (i == getTaskId() % maxWorkers) {
590           continue;
591         } else {
592           LOG.info("logMissingWorkersOnSuperstep: No response from " +
593               "partition " + i + " (could be master)");
594         }
595       }
596     }
597   }
598 
599   /**
600    * Common method for creating vertex/edge input splits.
601    *
602    * @param inputFormat The vertex/edge input format
603    * @param inputSplitType Type of input split (for logging purposes)
604    * @return Number of splits. Returns -1 on failure to create
605    *         valid input splits.
606    */
607   private int createInputSplits(GiraphInputFormat inputFormat,
608                                 InputType inputSplitType) {
609     ImmutableClassesGiraphConfiguration conf = getConfiguration();
610     String logPrefix = "create" + inputSplitType + "InputSplits";
611     // Only the 'master' should be doing this.  Wait until the number of
612     // processes that have reported health exceeds the minimum percentage.
613     // If the minimum percentage is not met, fail the job.  Otherwise
614     // generate the input splits
615     List<WorkerInfo> healthyWorkerInfoList = checkWorkers();
616     if (healthyWorkerInfoList == null) {
617       setJobStateFailed("Not enough healthy workers to create input splits");
618       return -1;
619     }
620     globalCommHandler.getInputSplitsHandler().initialize(masterClient,
621         healthyWorkerInfoList);
622 
623     // Create at least as many splits as the total number of input threads.
624     int minSplitCountHint = healthyWorkerInfoList.size() *
625         conf.getNumInputSplitsThreads();
626 
627     // Note that the input splits may only be a sample if
628     // INPUT_SPLIT_SAMPLE_PERCENT is set to something other than 100
629     List<InputSplit> splitList = generateInputSplits(inputFormat,
630         minSplitCountHint, inputSplitType);
631 
632     if (splitList.isEmpty()) {
633       LOG.fatal(logPrefix + ": Failing job due to 0 input splits, " +
634           "check input of " + inputFormat.getClass().getName() + "!");
635       getContext().setStatus("Failing job due to 0 input splits, " +
636           "check input of " + inputFormat.getClass().getName() + "!");
637       setJobStateFailed("******* PLEASE CHECK YOUR INPUT TABLES - PARTITIONS " +
638           "WHICH YOU SPECIFIED ARE MISSING (for " + inputSplitType +
639           " input). FAILING THE JOB *******");
640     }
641     if (minSplitCountHint > splitList.size()) {
642       LOG.warn(logPrefix + ": Number of inputSplits=" +
643           splitList.size() + " < " +
644           minSplitCountHint +
645           "=total number of input threads, " +
646           "some threads will be not used");
647     }
648 
649     globalCommHandler.getInputSplitsHandler().addSplits(inputSplitType,
650         splitList, inputFormat);
651 
652     return splitList.size();
653   }
654 
655   @Override
656   public int createMappingInputSplits() {
657     if (!getConfiguration().hasMappingInputFormat()) {
658       return 0;
659     }
660     MappingInputFormat<I, V, E, ? extends Writable> mappingInputFormat =
661       getConfiguration().createWrappedMappingInputFormat();
662     return createInputSplits(mappingInputFormat, InputType.MAPPING);
663   }
664 
665   @Override
666   public int createVertexInputSplits() {
667     int splits = 0;
668     if (getConfiguration().hasVertexInputFormat()) {
669       VertexInputFormat<I, V, E> vertexInputFormat =
670           getConfiguration().createWrappedVertexInputFormat();
671       splits = createInputSplits(vertexInputFormat, InputType.VERTEX);
672     }
673     MasterProgress.get().setVertexInputSplitCount(splits);
674     getJobProgressTracker().updateMasterProgress(MasterProgress.get());
675     return splits;
676   }
677 
678   @Override
679   public int createEdgeInputSplits() {
680     int splits = 0;
681     if (getConfiguration().hasEdgeInputFormat()) {
682       EdgeInputFormat<I, E> edgeInputFormat =
683           getConfiguration().createWrappedEdgeInputFormat();
684       splits = createInputSplits(edgeInputFormat, InputType.EDGE);
685     }
686     MasterProgress.get().setEdgeInputSplitsCount(splits);
687     getJobProgressTracker().updateMasterProgress(MasterProgress.get());
688     return splits;
689   }
690 
691   @Override
692   public List<WorkerInfo> getWorkerInfoList() {
693     return chosenWorkerInfoList;
694   }
695 
696   @Override
697   public MasterGlobalCommHandler getGlobalCommHandler() {
698     return globalCommHandler;
699   }
700 
701   @Override
702   public AggregatorToGlobalCommTranslation getAggregatorTranslationHandler() {
703     return aggregatorTranslation;
704   }
705 
706   @Override
707   public MasterCompute getMasterCompute() {
708     return masterCompute;
709   }
710 
711   /**
712    * Read the finalized checkpoint file and associated metadata files for the
713    * checkpoint.  Modifies the {@link PartitionOwner} objects to get the
714    * checkpoint prefixes.  It is an optimization to prevent all workers from
715    * searching all the files.  Also read in the aggregator data from the
716    * finalized checkpoint file and setting it.
717    *
718    * @param superstep Checkpoint set to examine.
719    * @throws IOException
720    * @throws InterruptedException
721    * @throws KeeperException
722    * @return Collection of generated partition owners.
723    */
724   private Collection<PartitionOwner> prepareCheckpointRestart(long superstep)
725     throws IOException, KeeperException, InterruptedException {
726     List<PartitionOwner> partitionOwners = new ArrayList<>();
727     FileSystem fs = getFs();
728     String finalizedCheckpointPath = getSavedCheckpointBasePath(superstep) +
729         CheckpointingUtils.CHECKPOINT_FINALIZED_POSTFIX;
730     LOG.info("Loading checkpoint from " + finalizedCheckpointPath);
731     DataInputStream finalizedStream =
732         fs.open(new Path(finalizedCheckpointPath));
733     GlobalStats globalStats = new GlobalStats();
734     globalStats.readFields(finalizedStream);
735     updateCounters(globalStats);
736     SuperstepClasses superstepClasses =
737         SuperstepClasses.createToRead(getConfiguration());
738     superstepClasses.readFields(finalizedStream);
739     getConfiguration().updateSuperstepClasses(superstepClasses);
740     int prefixFileCount = finalizedStream.readInt();
741 
742     String checkpointFile =
743         finalizedStream.readUTF();
744     for (int i = 0; i < prefixFileCount; ++i) {
745       int mrTaskId = finalizedStream.readInt();
746 
747       DataInputStream metadataStream = fs.open(new Path(checkpointFile +
748           "." + mrTaskId + CheckpointingUtils.CHECKPOINT_METADATA_POSTFIX));
749       long partitions = metadataStream.readInt();
750       WorkerInfo worker = getWorkerInfoById(mrTaskId);
751       for (long p = 0; p < partitions; ++p) {
752         int partitionId = metadataStream.readInt();
753         PartitionOwner partitionOwner = new BasicPartitionOwner(partitionId,
754             worker);
755         partitionOwners.add(partitionOwner);
756         LOG.info("prepareCheckpointRestart partitionId=" + partitionId +
757             " assigned to " + partitionOwner);
758       }
759       metadataStream.close();
760     }
761     //Ordering appears to be important as of right now we rely on this ordering
762     //in WorkerGraphPartitioner
763     Collections.sort(partitionOwners, new Comparator<PartitionOwner>() {
764       @Override
765       public int compare(PartitionOwner p1, PartitionOwner p2) {
766         return Integer.compare(p1.getPartitionId(), p2.getPartitionId());
767       }
768     });
769 
770 
771     globalCommHandler.getAggregatorHandler().readFields(finalizedStream);
772     aggregatorTranslation.readFields(finalizedStream);
773     masterCompute.readFields(finalizedStream);
774     finalizedStream.close();
775 
776     return partitionOwners;
777   }
778 
779   @Override
780   public void setup() {
781     // Might have to manually load a checkpoint.
782     // In that case, the input splits are not set, they will be faked by
783     // the checkpoint files.  Each checkpoint file will be an input split
784     // and the input split
785 
786     if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
787       GiraphStats.getInstance().getSuperstepCounter().
788         setValue(getRestartedSuperstep());
789     }
790     for (MasterObserver observer : observers) {
791       observer.preApplication();
792       getContext().progress();
793     }
794   }
795 
796   @Override
797   public boolean becomeMaster() {
798     // Create my bid to become the master, then try to become the worker
799     // or return false.
800     String myBid = null;
801     try {
802       myBid =
803           getZkExt().createExt(masterElectionPath +
804               "/" + getHostnameTaskId(),
805               null,
806               Ids.OPEN_ACL_UNSAFE,
807               CreateMode.EPHEMERAL_SEQUENTIAL,
808               true);
809     } catch (KeeperException e) {
810       throw new IllegalStateException(
811           "becomeMaster: KeeperException", e);
812     } catch (InterruptedException e) {
813       throw new IllegalStateException(
814           "becomeMaster: IllegalStateException", e);
815     }
816     while (true) {
817       JSONObject jobState = getJobState();
818       try {
819         if ((jobState != null) &&
820             ApplicationState.valueOf(
821                 jobState.getString(JSONOBJ_STATE_KEY)) ==
822                 ApplicationState.FINISHED) {
823           LOG.info("becomeMaster: Job is finished, " +
824               "give up trying to be the master!");
825           isMaster = false;
826           return isMaster;
827         }
828       } catch (JSONException e) {
829         throw new IllegalStateException(
830             "becomeMaster: Couldn't get state from " + jobState, e);
831       }
832       try {
833         List<String> masterChildArr =
834             getZkExt().getChildrenExt(
835                 masterElectionPath, true, true, true);
836         if (LOG.isInfoEnabled()) {
837           LOG.info("becomeMaster: First child is '" +
838               masterChildArr.get(0) + "' and my bid is '" +
839               myBid + "'");
840         }
841         if (masterChildArr.get(0).equals(myBid)) {
842           GiraphStats.getInstance().getCurrentMasterTaskPartition().
843               setValue(getTaskId());
844 
845           globalCommHandler = new MasterGlobalCommHandler(
846               new MasterAggregatorHandler(getConfiguration(), getContext()),
847               new MasterInputSplitsHandler(
848                   getConfiguration().useInputSplitLocality(), getContext()));
849           aggregatorTranslation = new AggregatorToGlobalCommTranslation(
850               getConfiguration(), globalCommHandler);
851 
852           globalCommHandler.getAggregatorHandler().initialize(this);
853           masterCompute = getConfiguration().createMasterCompute();
854           masterCompute.setMasterService(this);
855 
856           masterInfo = new MasterInfo();
857           masterServer =
858               new NettyMasterServer(getConfiguration(), this, getContext(),
859                   getGraphTaskManager().createUncaughtExceptionHandler());
860           masterInfo.setInetSocketAddress(masterServer.getMyAddress(),
861               masterServer.getLocalHostOrIp());
862           masterInfo.setTaskId(getTaskId());
863           masterClient =
864               new NettyMasterClient(getContext(), getConfiguration(), this,
865                   getGraphTaskManager().createUncaughtExceptionHandler());
866           masterServer.setFlowControl(masterClient.getFlowControl());
867 
868           if (LOG.isInfoEnabled()) {
869             LOG.info("becomeMaster: I am now the master!");
870           }
871           isMaster = true;
872           return isMaster;
873         }
874         LOG.info("becomeMaster: Waiting to become the master...");
875         getMasterElectionChildrenChangedEvent().waitForTimeoutOrFail(
876             GiraphConstants.WAIT_ZOOKEEPER_TIMEOUT_MSEC.get(
877                 getConfiguration()));
878         getMasterElectionChildrenChangedEvent().reset();
879       } catch (KeeperException e) {
880         throw new IllegalStateException(
881             "becomeMaster: KeeperException", e);
882       } catch (InterruptedException e) {
883         throw new IllegalStateException(
884             "becomeMaster: IllegalStateException", e);
885       }
886     }
887   }
888 
889   @Override
890   public MasterInfo getMasterInfo() {
891     return masterInfo;
892   }
893 
894   /**
895    * Collect and aggregate the worker statistics for a particular superstep.
896    *
897    * @param superstep Superstep to aggregate on
898    * @return Global statistics aggregated on all worker statistics
899    */
900   private GlobalStats aggregateWorkerStats(long superstep) {
901     ImmutableClassesGiraphConfiguration conf = getConfiguration();
902 
903     GlobalStats globalStats = new GlobalStats();
904     // Get the stats from the all the worker selected nodes
905     String workerFinishedPath =
906         getWorkerFinishedPath(getApplicationAttempt(), superstep);
907     List<String> workerFinishedPathList = null;
908     try {
909       workerFinishedPathList =
910           getZkExt().getChildrenExt(
911               workerFinishedPath, false, false, true);
912     } catch (KeeperException e) {
913       throw new IllegalStateException(
914           "aggregateWorkerStats: KeeperException", e);
915     } catch (InterruptedException e) {
916       throw new IllegalStateException(
917           "aggregateWorkerStats: InterruptedException", e);
918     }
919 
920     AggregatedMetrics aggregatedMetrics = new AggregatedMetrics();
921 
922     for (String finishedPath : workerFinishedPathList) {
923       String hostnamePartitionId = FilenameUtils.getName(finishedPath);
924       JSONObject workerFinishedInfoObj = null;
925       try {
926         byte [] zkData =
927             getZkExt().getData(finishedPath, false, null);
928         workerFinishedInfoObj = new JSONObject(new String(zkData,
929             Charset.defaultCharset()));
930         globalStats.addMessageCount(
931             workerFinishedInfoObj.getLong(
932                 JSONOBJ_NUM_MESSAGES_KEY));
933         globalStats.addMessageBytesCount(
934           workerFinishedInfoObj.getLong(
935               JSONOBJ_NUM_MESSAGE_BYTES_KEY));
936         if (conf.metricsEnabled() &&
937             workerFinishedInfoObj.has(JSONOBJ_METRICS_KEY)) {
938           WorkerSuperstepMetrics workerMetrics = new WorkerSuperstepMetrics();
939           WritableUtils.readFieldsFromByteArray(
940               Base64.decode(
941                   workerFinishedInfoObj.getString(
942                       JSONOBJ_METRICS_KEY)),
943               workerMetrics);
944           globalStats.addOocLoadBytesCount(
945               workerMetrics.getBytesLoadedFromDisk());
946           globalStats.addOocStoreBytesCount(
947               workerMetrics.getBytesStoredOnDisk());
948           // Find the lowest percentage of graph in memory across all workers
949           // for one superstep
950           globalStats.setLowestGraphPercentageInMemory(
951               Math.min(globalStats.getLowestGraphPercentageInMemory(),
952                   (int) Math.round(
953                       workerMetrics.getGraphPercentageInMemory())));
954           aggregatedMetrics.add(workerMetrics, hostnamePartitionId);
955         }
956       } catch (JSONException e) {
957         throw new IllegalStateException(
958             "aggregateWorkerStats: JSONException", e);
959       } catch (KeeperException e) {
960         throw new IllegalStateException(
961             "aggregateWorkerStats: KeeperException", e);
962       } catch (InterruptedException e) {
963         throw new IllegalStateException(
964             "aggregateWorkerStats: InterruptedException", e);
965       } catch (IOException e) {
966         throw new IllegalStateException(
967             "aggregateWorkerStats: IOException", e);
968       }
969     }
970 
971     allPartitionStatsList.clear();
972     Iterable<PartitionStats> statsList = globalCommHandler.getAllPartitionStats(
973         workerFinishedPathList.size(), getContext());
974     for (PartitionStats partitionStats : statsList) {
975       globalStats.addPartitionStats(partitionStats);
976       allPartitionStatsList.add(partitionStats);
977     }
978 
979     if (conf.metricsEnabled()) {
980       if (GiraphConstants.METRICS_DIRECTORY.isDefaultValue(conf)) {
981         aggregatedMetrics.print(superstep, System.err);
982       } else {
983         printAggregatedMetricsToHDFS(superstep, aggregatedMetrics);
984       }
985       for (MasterObserver observer : observers) {
986         observer.superstepMetricsUpdate(
987             superstep, aggregatedMetrics, allPartitionStatsList);
988       }
989     }
990 
991     if (LOG.isInfoEnabled()) {
992       LOG.info("aggregateWorkerStats: Aggregation found " + globalStats +
993           " on superstep = " + getSuperstep());
994     }
995     return globalStats;
996   }
997 
998   /**
999    * Write superstep metrics to own file in HDFS
1000    * @param superstep the current superstep
1001    * @param aggregatedMetrics the aggregated metrics to write
1002    */
1003   private void printAggregatedMetricsToHDFS(
1004       long superstep, AggregatedMetrics aggregatedMetrics) {
1005     ImmutableClassesGiraphConfiguration conf = getConfiguration();
1006     PrintStream out = null;
1007     Path dir = new Path(GiraphConstants.METRICS_DIRECTORY.get(conf));
1008     Path outFile = new Path(GiraphConstants.METRICS_DIRECTORY.get(conf) +
1009         Path.SEPARATOR_CHAR + "superstep_" + superstep + ".metrics");
1010     try {
1011       FileSystem fs;
1012       fs = FileSystem.get(conf);
1013       if (!fs.exists(dir)) {
1014         fs.mkdirs(dir);
1015       }
1016       if (fs.exists(outFile)) {
1017         throw new RuntimeException(
1018             "printAggregatedMetricsToHDFS: metrics file exists");
1019       }
1020       out = new PrintStream(fs.create(outFile), false,
1021           Charset.defaultCharset().name());
1022       aggregatedMetrics.print(superstep, out);
1023     } catch (IOException e) {
1024       throw new RuntimeException(
1025           "printAggregatedMetricsToHDFS: error creating metrics file", e);
1026     } finally {
1027       if (out != null) {
1028         out.close();
1029       }
1030     }
1031   }
1032 
1033   /**
1034    * Finalize the checkpoint file prefixes by taking the chosen workers and
1035    * writing them to a finalized file.  Also write out the master
1036    * aggregated aggregator array from the previous superstep.
1037    *
1038    * @param superstep superstep to finalize
1039    * @param chosenWorkerInfoList list of chosen workers that will be finalized
1040    * @throws IOException
1041    * @throws InterruptedException
1042    * @throws KeeperException
1043    */
1044   private void finalizeCheckpoint(long superstep,
1045     List<WorkerInfo> chosenWorkerInfoList)
1046     throws IOException, KeeperException, InterruptedException {
1047     Path finalizedCheckpointPath =
1048         new Path(getCheckpointBasePath(superstep) +
1049             CheckpointingUtils.CHECKPOINT_FINALIZED_POSTFIX);
1050     try {
1051       getFs().delete(finalizedCheckpointPath, false);
1052     } catch (IOException e) {
1053       LOG.warn("finalizedValidCheckpointPrefixes: Removed old file " +
1054           finalizedCheckpointPath);
1055     }
1056 
1057     // Format:
1058     // <global statistics>
1059     // <superstep classes>
1060     // <number of files>
1061     // <used file prefix 0><used file prefix 1>...
1062     // <aggregator data>
1063     // <masterCompute data>
1064     FSDataOutputStream finalizedOutputStream =
1065         getFs().create(finalizedCheckpointPath);
1066 
1067     String superstepFinishedNode =
1068         getSuperstepFinishedPath(getApplicationAttempt(), superstep - 1);
1069     finalizedOutputStream.write(
1070         getZkExt().getData(superstepFinishedNode, false, null));
1071 
1072     finalizedOutputStream.writeInt(chosenWorkerInfoList.size());
1073     finalizedOutputStream.writeUTF(getCheckpointBasePath(superstep));
1074     for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) {
1075       finalizedOutputStream.writeInt(getWorkerId(chosenWorkerInfo));
1076     }
1077     globalCommHandler.getAggregatorHandler().write(finalizedOutputStream);
1078     aggregatorTranslation.write(finalizedOutputStream);
1079     masterCompute.write(finalizedOutputStream);
1080     finalizedOutputStream.close();
1081     lastCheckpointedSuperstep = superstep;
1082     GiraphStats.getInstance().
1083         getLastCheckpointedSuperstep().setValue(superstep);
1084   }
1085 
1086   /**
1087    * Assign the partitions for this superstep.  If there are changes,
1088    * the workers will know how to do the exchange.  If this was a restarted
1089    * superstep, then make sure to provide information on where to find the
1090    * checkpoint file.
1091    */
1092   private void assignPartitionOwners() {
1093     Collection<PartitionOwner> partitionOwners;
1094     if (getSuperstep() == INPUT_SUPERSTEP) {
1095       partitionOwners =
1096           masterGraphPartitioner.createInitialPartitionOwners(
1097               chosenWorkerInfoList, maxWorkers);
1098       if (partitionOwners.isEmpty()) {
1099         throw new IllegalStateException(
1100             "assignAndExchangePartitions: No partition owners set");
1101       }
1102     } else if (getRestartedSuperstep() == getSuperstep()) {
1103       // If restarted, prepare the checkpoint restart
1104       try {
1105         partitionOwners = prepareCheckpointRestart(getSuperstep());
1106       } catch (IOException e) {
1107         throw new IllegalStateException(
1108             "assignPartitionOwners: IOException on preparing", e);
1109       } catch (KeeperException e) {
1110         throw new IllegalStateException(
1111             "assignPartitionOwners: KeeperException on preparing", e);
1112       } catch (InterruptedException e) {
1113         throw new IllegalStateException(
1114             "assignPartitionOwners: InteruptedException on preparing",
1115             e);
1116       }
1117       masterGraphPartitioner.setPartitionOwners(partitionOwners);
1118     } else {
1119       partitionOwners =
1120           masterGraphPartitioner.generateChangedPartitionOwners(
1121               allPartitionStatsList,
1122               chosenWorkerInfoList,
1123               maxWorkers,
1124               getSuperstep());
1125 
1126       PartitionUtils.analyzePartitionStats(partitionOwners,
1127           allPartitionStatsList);
1128     }
1129     checkPartitions(masterGraphPartitioner.getCurrentPartitionOwners());
1130 
1131 
1132 
1133     // There will be some exchange of partitions
1134     if (!partitionOwners.isEmpty()) {
1135       String vertexExchangePath =
1136           getPartitionExchangePath(getApplicationAttempt(),
1137               getSuperstep());
1138       try {
1139         getZkExt().createOnceExt(vertexExchangePath,
1140             null,
1141             Ids.OPEN_ACL_UNSAFE,
1142             CreateMode.PERSISTENT,
1143             true);
1144       } catch (KeeperException e) {
1145         throw new IllegalStateException(
1146             "assignPartitionOwners: KeeperException creating " +
1147                 vertexExchangePath);
1148       } catch (InterruptedException e) {
1149         throw new IllegalStateException(
1150             "assignPartitionOwners: InterruptedException creating " +
1151                 vertexExchangePath);
1152       }
1153     }
1154 
1155     AddressesAndPartitionsWritable addressesAndPartitions =
1156         new AddressesAndPartitionsWritable(masterInfo, chosenWorkerInfoList,
1157             partitionOwners);
1158     // Send assignments to every worker
1159     // TODO for very large number of partitions we might want to split this
1160     // across multiple requests
1161     for (WorkerInfo workerInfo : chosenWorkerInfoList) {
1162       masterClient.sendWritableRequest(workerInfo.getTaskId(),
1163           new AddressesAndPartitionsRequest(addressesAndPartitions));
1164     }
1165   }
1166 
1167   /**
1168    * Check if partition ids are valid
1169    *
1170    * @param partitionOwners List of partition ids for current superstep
1171    */
1172   private void checkPartitions(Collection<PartitionOwner> partitionOwners) {
1173     for (PartitionOwner partitionOwner : partitionOwners) {
1174       int partitionId = partitionOwner.getPartitionId();
1175       if (partitionId < 0 || partitionId >= partitionOwners.size()) {
1176         throw new IllegalStateException("checkPartitions: " +
1177             "Invalid partition id " + partitionId +
1178             " - partition ids must be values from 0 to (numPartitions - 1)");
1179       }
1180     }
1181   }
1182 
1183   /**
1184    * Check whether the workers chosen for this superstep are still alive
1185    *
1186    * @param chosenWorkerInfoHealthPath Path to the healthy workers in ZooKeeper
1187    * @param chosenWorkerInfoList List of the healthy workers
1188    * @return a list of dead workers. Empty list if all workers are alive.
1189    * @throws InterruptedException
1190    * @throws KeeperException
1191    */
1192   private Collection<WorkerInfo> superstepChosenWorkerAlive(
1193     String chosenWorkerInfoHealthPath,
1194     List<WorkerInfo> chosenWorkerInfoList)
1195     throws KeeperException, InterruptedException {
1196     List<WorkerInfo> chosenWorkerInfoHealthyList =
1197         getWorkerInfosFromPath(chosenWorkerInfoHealthPath, false);
1198     Set<WorkerInfo> chosenWorkerInfoHealthySet =
1199         new HashSet<WorkerInfo>(chosenWorkerInfoHealthyList);
1200     List<WorkerInfo> deadWorkers = new ArrayList<>();
1201     for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) {
1202       if (!chosenWorkerInfoHealthySet.contains(chosenWorkerInfo)) {
1203         deadWorkers.add(chosenWorkerInfo);
1204       }
1205     }
1206     return deadWorkers;
1207   }
1208 
1209   @Override
1210   public void restartFromCheckpoint(long checkpoint) {
1211     // Process:
1212     // 1. Increase the application attempt and set to the correct checkpoint
1213     // 2. Send command to all workers to restart their tasks
1214     setApplicationAttempt(getApplicationAttempt() + 1);
1215     setCachedSuperstep(checkpoint);
1216     setRestartedSuperstep(checkpoint);
1217     checkpointStatus = CheckpointStatus.NONE;
1218     setJobState(ApplicationState.START_SUPERSTEP,
1219         getApplicationAttempt(),
1220         checkpoint);
1221   }
1222 
1223   /**
1224    * Safely removes node from zookeeper.
1225    * Ignores if node is already removed. Can only throw runtime exception if
1226    * anything wrong.
1227    * @param path path to the node to be removed.
1228    */
1229   private void zkDeleteNode(String path) {
1230     try {
1231       getZkExt().deleteExt(path, -1, true);
1232     } catch (KeeperException.NoNodeException e) {
1233       LOG.info("zkDeleteNode: node has already been removed " + path);
1234     } catch (InterruptedException e) {
1235       throw new RuntimeException(
1236           "zkDeleteNode: InterruptedException", e);
1237     } catch (KeeperException e) {
1238       throw new RuntimeException(
1239           "zkDeleteNode: KeeperException", e);
1240     }
1241   }
1242 
1243   @Override
1244   public long getLastGoodCheckpoint() throws IOException {
1245     // Find the last good checkpoint if none have been written to the
1246     // knowledge of this master
1247     if (lastCheckpointedSuperstep == -1) {
1248       try {
1249         lastCheckpointedSuperstep = getLastCheckpointedSuperstep();
1250       } catch (IOException e) {
1251         LOG.fatal("getLastGoodCheckpoint: No last good checkpoints can be " +
1252             "found, killing the job.", e);
1253         failJob(e);
1254       }
1255     }
1256 
1257     return lastCheckpointedSuperstep;
1258   }
1259 
1260   /**
1261    * Wait for a set of workers to signal that they are done with the
1262    * barrier.
1263    *
1264    * @param finishedWorkerPath Path to where the workers will register their
1265    *        hostname and id
1266    * @param workerInfoList List of the workers to wait for
1267    * @param event Event to wait on for a chance to be done.
1268    * @param ignoreDeath In case if worker died after making it through
1269    *                    barrier, we will ignore death if set to true.
1270    * @return True if barrier was successful, false if there was a worker
1271    *         failure
1272    */
1273   private boolean barrierOnWorkerList(String finishedWorkerPath,
1274       List<WorkerInfo> workerInfoList,
1275       BspEvent event,
1276       boolean ignoreDeath) {
1277     try {
1278       getZkExt().createOnceExt(finishedWorkerPath,
1279           null,
1280           Ids.OPEN_ACL_UNSAFE,
1281           CreateMode.PERSISTENT,
1282           true);
1283     } catch (KeeperException e) {
1284       throw new IllegalStateException(
1285           "barrierOnWorkerList: KeeperException - Couldn't create " +
1286               finishedWorkerPath, e);
1287     } catch (InterruptedException e) {
1288       throw new IllegalStateException(
1289           "barrierOnWorkerList: InterruptedException - Couldn't create " +
1290               finishedWorkerPath, e);
1291     }
1292     List<String> hostnameIdList =
1293         new ArrayList<String>(workerInfoList.size());
1294     for (WorkerInfo workerInfo : workerInfoList) {
1295       hostnameIdList.add(workerInfo.getHostnameId());
1296     }
1297     String workerInfoHealthyPath =
1298         getWorkerInfoHealthyPath(getApplicationAttempt(), getSuperstep());
1299     List<String> finishedHostnameIdList = new ArrayList<>();
1300     List<String> tmpFinishedHostnameIdList;
1301     long nextInfoMillis = System.currentTimeMillis();
1302     final int defaultTaskTimeoutMsec = 10 * 60 * 1000;  // from TaskTracker
1303     final int waitBetweenLogInfoMsec = 30 * 1000;
1304     final int taskTimeoutMsec = getContext().getConfiguration().getInt(
1305         "mapred.task.timeout", defaultTaskTimeoutMsec) / 2;
1306     long lastRegularRunTimeMsec = 0;
1307     int eventLoopTimeout =  Math.min(taskTimeoutMsec, waitBetweenLogInfoMsec);
1308     boolean logInfoOnlyRun = false;
1309     List<WorkerInfo> deadWorkers = new ArrayList<>();
1310     while (true) {
1311       if (! logInfoOnlyRun) {
1312         try {
1313           tmpFinishedHostnameIdList =
1314               getZkExt().getChildrenExt(finishedWorkerPath,
1315                                         true,
1316                                         false,
1317                                         false);
1318         } catch (KeeperException e) {
1319           throw new IllegalStateException(
1320               "barrierOnWorkerList: KeeperException - Couldn't get " +
1321                   "children of " + finishedWorkerPath, e);
1322         } catch (InterruptedException e) {
1323           throw new IllegalStateException(
1324               "barrierOnWorkerList: IllegalException - Couldn't get " +
1325                   "children of " + finishedWorkerPath, e);
1326         }
1327         if (LOG.isDebugEnabled()) {
1328           // Log the names of the new workers that have finished since last time
1329           Set<String> newFinishedHostnames = Sets.difference(
1330             Sets.newHashSet(tmpFinishedHostnameIdList),
1331             Sets.newHashSet(finishedHostnameIdList));
1332           LOG.debug("barrierOnWorkerList: Got new finished worker list = " +
1333                         newFinishedHostnames + ", size = " +
1334                         newFinishedHostnames.size() +
1335                         " from " + finishedWorkerPath);
1336         }
1337         finishedHostnameIdList = tmpFinishedHostnameIdList;
1338       }
1339 
1340       if (LOG.isInfoEnabled() &&
1341           (System.currentTimeMillis() > nextInfoMillis)) {
1342         nextInfoMillis = System.currentTimeMillis() + waitBetweenLogInfoMsec;
1343         LOG.info("barrierOnWorkerList: " +
1344             finishedHostnameIdList.size() +
1345             " out of " + workerInfoList.size() +
1346             " workers finished on superstep " +
1347             getSuperstep() + " on path " + finishedWorkerPath);
1348         if (workerInfoList.size() - finishedHostnameIdList.size() <
1349             MAX_PRINTABLE_REMAINING_WORKERS) {
1350           Set<String> remainingWorkers = Sets.newHashSet(hostnameIdList);
1351           remainingWorkers.removeAll(finishedHostnameIdList);
1352           LOG.info("barrierOnWorkerList: Waiting on " + remainingWorkers);
1353         }
1354       }
1355 
1356       if (! logInfoOnlyRun) {
1357         getContext().setStatus(getGraphTaskManager().getGraphFunctions() +
1358                                    " - " +
1359                                    finishedHostnameIdList.size() +
1360                                    " finished out of " +
1361                                    workerInfoList.size() +
1362                                    " on superstep " + getSuperstep());
1363         if (finishedHostnameIdList.containsAll(hostnameIdList)) {
1364           break;
1365         }
1366 
1367         for (WorkerInfo deadWorker : deadWorkers) {
1368           if (!finishedHostnameIdList.contains(deadWorker.getHostnameId())) {
1369             LOG.error("barrierOnWorkerList: no results arived from " +
1370                           "worker that was pronounced dead: " + deadWorker +
1371                           " on superstep " + getSuperstep());
1372             return false;
1373           }
1374         }
1375 
1376         // wall-clock time skew is ignored
1377         lastRegularRunTimeMsec = System.currentTimeMillis();
1378       }
1379 
1380       // Wait for a signal or timeout
1381       boolean eventTriggered = event.waitMsecs(eventLoopTimeout);
1382 
1383       // If the event was triggered, we reset it. In the next loop run, we will
1384       // read ZK to get the new hosts.
1385       if (eventTriggered) {
1386         event.reset();
1387       }
1388 
1389       long elapsedTimeSinceRegularRunMsec = System.currentTimeMillis() -
1390           lastRegularRunTimeMsec;
1391       getContext().progress();
1392 
1393       if (eventTriggered ||
1394           taskTimeoutMsec == eventLoopTimeout ||
1395           elapsedTimeSinceRegularRunMsec >= taskTimeoutMsec) {
1396         logInfoOnlyRun = false;
1397       } else {
1398         logInfoOnlyRun = true;
1399         continue;
1400       }
1401 
1402       // Did a worker die?
1403       try {
1404         deadWorkers.addAll(superstepChosenWorkerAlive(
1405                 workerInfoHealthyPath,
1406                 workerInfoList));
1407         if (!ignoreDeath && deadWorkers.size() > 0) {
1408           String errorMessage = "******* WORKERS " + deadWorkers +
1409               " FAILED *******";
1410           // If checkpointing is not used, we should fail the job
1411           if (!getConfiguration().useCheckpointing()) {
1412             setJobStateFailed(errorMessage);
1413           } else {
1414             LOG.error("barrierOnWorkerList: Missing chosen " +
1415                 "workers " + deadWorkers +
1416                 " on superstep " + getSuperstep());
1417             // Log worker failure to command line
1418             getGraphTaskManager().getJobProgressTracker().logInfo(errorMessage);
1419           }
1420           return false;
1421         }
1422       } catch (KeeperException e) {
1423         throw new IllegalStateException(
1424             "barrierOnWorkerList: KeeperException - " +
1425                 "Couldn't get " + workerInfoHealthyPath, e);
1426       } catch (InterruptedException e) {
1427         throw new IllegalStateException(
1428             "barrierOnWorkerList: InterruptedException - " +
1429                 "Couldn't get " + workerInfoHealthyPath, e);
1430       }
1431     }
1432 
1433     return true;
1434   }
1435 
1436   /**
1437    * Clean up old superstep data from Zookeeper
1438    *
1439    * @param removeableSuperstep Supersteo to clean up
1440    * @throws InterruptedException
1441    */
1442   private void cleanUpOldSuperstep(long removeableSuperstep) throws
1443       InterruptedException {
1444     if (KEEP_ZOOKEEPER_DATA.isFalse(getConfiguration()) &&
1445         (removeableSuperstep >= 0)) {
1446       String oldSuperstepPath =
1447           getSuperstepPath(getApplicationAttempt()) + "/" +
1448               removeableSuperstep;
1449       try {
1450         if (LOG.isInfoEnabled()) {
1451           LOG.info("coordinateSuperstep: Cleaning up old Superstep " +
1452               oldSuperstepPath);
1453         }
1454         getZkExt().deleteExt(oldSuperstepPath,
1455             -1,
1456             true);
1457       } catch (KeeperException.NoNodeException e) {
1458         LOG.warn("coordinateBarrier: Already cleaned up " +
1459             oldSuperstepPath);
1460       } catch (KeeperException e) {
1461         throw new IllegalStateException(
1462             "coordinateSuperstep: KeeperException on " +
1463                 "finalizing checkpoint", e);
1464       }
1465     }
1466   }
1467 
1468   /**
1469    * Coordinate the exchange of vertex/edge input splits among workers.
1470    */
1471   private void coordinateInputSplits() {
1472     // Coordinate the workers finishing sending their vertices/edges to the
1473     // correct workers and signal when everything is done.
1474     if (!barrierOnWorkerList(inputSplitsWorkerDonePath,
1475         chosenWorkerInfoList,
1476         getInputSplitsWorkerDoneEvent(),
1477         false)) {
1478       throw new IllegalStateException("coordinateInputSplits: Worker failed " +
1479           "during input split (currently not supported)");
1480     }
1481     try {
1482       getZkExt().createExt(inputSplitsAllDonePath,
1483           null,
1484           Ids.OPEN_ACL_UNSAFE,
1485           CreateMode.PERSISTENT,
1486           false);
1487     } catch (KeeperException.NodeExistsException e) {
1488       LOG.info("coordinateInputSplits: Node " +
1489           inputSplitsAllDonePath + " already exists.");
1490     } catch (KeeperException e) {
1491       throw new IllegalStateException(
1492           "coordinateInputSplits: KeeperException", e);
1493     } catch (InterruptedException e) {
1494       throw new IllegalStateException(
1495           "coordinateInputSplits: IllegalStateException", e);
1496     }
1497   }
1498 
1499   /**
1500    * Initialize aggregator at the master side
1501    * before vertex/edge loading.
1502    * This methods cooperates with other code
1503    * to enables aggregation usage at INPUT_SUPERSTEP
1504    * Other codes are:
1505    *  BSPServiceWorker:
1506    *  aggregatorHandler.prepareSuperstep in
1507    *  setup
1508    *  set aggregator usage in vertexReader and
1509    *  edgeReader
1510    *
1511    * @throws InterruptedException
1512    */
1513   private void initializeAggregatorInputSuperstep()
1514     throws InterruptedException {
1515     globalCommHandler.getAggregatorHandler().prepareSuperstep();
1516 
1517     prepareMasterCompute(getSuperstep());
1518     try {
1519       masterCompute.initialize();
1520     } catch (InstantiationException e) {
1521       LOG.fatal(
1522         "initializeAggregatorInputSuperstep: Failed in instantiation", e);
1523       throw new RuntimeException(
1524         "initializeAggregatorInputSuperstep: Failed in instantiation", e);
1525     } catch (IllegalAccessException e) {
1526       LOG.fatal("initializeAggregatorInputSuperstep: Failed in access", e);
1527       throw new RuntimeException(
1528         "initializeAggregatorInputSuperstep: Failed in access", e);
1529     }
1530     aggregatorTranslation.postMasterCompute();
1531     globalCommHandler.getAggregatorHandler().finishSuperstep();
1532 
1533     globalCommHandler.getAggregatorHandler().sendDataToOwners(masterClient);
1534   }
1535 
1536   /**
1537    * This is required before initialization
1538    * and run of MasterCompute
1539    *
1540    * @param superstep superstep for which to run masterCompute
1541    * @return Superstep classes set by masterCompute
1542    */
1543   private SuperstepClasses prepareMasterCompute(long superstep) {
1544     GraphState graphState = new GraphState(superstep ,
1545         GiraphStats.getInstance().getVertices().getValue(),
1546         GiraphStats.getInstance().getEdges().getValue(),
1547         getContext());
1548     SuperstepClasses superstepClasses =
1549         SuperstepClasses.createAndExtractTypes(getConfiguration());
1550     masterCompute.setGraphState(graphState);
1551     masterCompute.setSuperstepClasses(superstepClasses);
1552     return superstepClasses;
1553   }
1554 
1555   @Override
1556   public SuperstepState coordinateSuperstep() throws
1557   KeeperException, InterruptedException {
1558     // 1. Get chosen workers and set up watches on them.
1559     // 2. Assign partitions to the workers
1560     //    (possibly reloading from a superstep)
1561     // 3. Wait for all workers to complete
1562     // 4. Collect and process aggregators
1563     // 5. Create superstep finished node
1564     // 6. If the checkpoint frequency is met, finalize the checkpoint
1565 
1566     for (MasterObserver observer : observers) {
1567       observer.preSuperstep(getSuperstep());
1568       getContext().progress();
1569     }
1570 
1571     chosenWorkerInfoList = checkWorkers();
1572     if (chosenWorkerInfoList == null) {
1573       setJobStateFailed("coordinateSuperstep: Not enough healthy workers for " +
1574                     "superstep " + getSuperstep());
1575     } else {
1576       // Sort this list, so order stays the same over supersteps
1577       Collections.sort(chosenWorkerInfoList, new Comparator<WorkerInfo>() {
1578         @Override
1579         public int compare(WorkerInfo wi1, WorkerInfo wi2) {
1580           return Integer.compare(wi1.getTaskId(), wi2.getTaskId());
1581         }
1582       });
1583       for (WorkerInfo workerInfo : chosenWorkerInfoList) {
1584         String workerInfoHealthyPath =
1585             getWorkerInfoHealthyPath(getApplicationAttempt(),
1586                 getSuperstep()) + "/" +
1587                 workerInfo.getHostnameId();
1588         if (getZkExt().exists(workerInfoHealthyPath, true) == null) {
1589           LOG.warn("coordinateSuperstep: Chosen worker " +
1590               workerInfoHealthyPath +
1591               " is no longer valid, failing superstep");
1592         }
1593       }
1594     }
1595 
1596     // We need to finalize aggregators from previous superstep
1597     if (getSuperstep() >= 0) {
1598       aggregatorTranslation.postMasterCompute();
1599       globalCommHandler.getAggregatorHandler().finishSuperstep();
1600     }
1601 
1602     masterClient.openConnections();
1603 
1604     GiraphStats.getInstance().
1605         getCurrentWorkers().setValue(chosenWorkerInfoList.size());
1606     assignPartitionOwners();
1607 
1608     // Finalize the valid checkpoint file prefixes and possibly
1609     // the aggregators.
1610     if (checkpointStatus != CheckpointStatus.NONE) {
1611       String workerWroteCheckpointPath =
1612           getWorkerWroteCheckpointPath(getApplicationAttempt(),
1613               getSuperstep());
1614       // first wait for all the workers to write their checkpoint data
1615       if (!barrierOnWorkerList(workerWroteCheckpointPath,
1616           chosenWorkerInfoList,
1617           getWorkerWroteCheckpointEvent(),
1618           checkpointStatus == CheckpointStatus.CHECKPOINT_AND_HALT)) {
1619         return SuperstepState.WORKER_FAILURE;
1620       }
1621       try {
1622         finalizeCheckpoint(getSuperstep(), chosenWorkerInfoList);
1623       } catch (IOException e) {
1624         throw new IllegalStateException(
1625             "coordinateSuperstep: IOException on finalizing checkpoint",
1626             e);
1627       }
1628       if (checkpointStatus == CheckpointStatus.CHECKPOINT_AND_HALT) {
1629         return SuperstepState.CHECKPOINT_AND_HALT;
1630       }
1631     }
1632 
1633     // We need to send aggregators to worker owners after new worker assignments
1634     if (getSuperstep() >= 0) {
1635       globalCommHandler.getAggregatorHandler().sendDataToOwners(masterClient);
1636     }
1637 
1638     if (getSuperstep() == INPUT_SUPERSTEP) {
1639       // Initialize aggregators before coordinating
1640       initializeAggregatorInputSuperstep();
1641       coordinateInputSplits();
1642     }
1643 
1644     String finishedWorkerPath =
1645         getWorkerFinishedPath(getApplicationAttempt(), getSuperstep());
1646     if (!barrierOnWorkerList(finishedWorkerPath,
1647         chosenWorkerInfoList,
1648         getSuperstepStateChangedEvent(),
1649         false)) {
1650       return SuperstepState.WORKER_FAILURE;
1651     }
1652 
1653     // Collect aggregator values, then run the master.compute() and
1654     // finally save the aggregator values
1655     globalCommHandler.getAggregatorHandler().prepareSuperstep();
1656     aggregatorTranslation.prepareSuperstep();
1657 
1658     SuperstepClasses superstepClasses =
1659       prepareMasterCompute(getSuperstep() + 1);
1660     doMasterCompute();
1661 
1662     // If the master is halted or all the vertices voted to halt and there
1663     // are no more messages in the system, stop the computation
1664     GlobalStats globalStats = aggregateWorkerStats(getSuperstep());
1665     if (masterCompute.isHalted() ||
1666         (globalStats.getFinishedVertexCount() ==
1667         globalStats.getVertexCount() &&
1668         globalStats.getMessageCount() == 0)) {
1669       globalStats.setHaltComputation(true);
1670     } else if (getZkExt().exists(haltComputationPath, false) != null) {
1671       if (LOG.isInfoEnabled()) {
1672         LOG.info("Halting computation because halt zookeeper node was created");
1673       }
1674       globalStats.setHaltComputation(true);
1675     }
1676 
1677     // If we have completed the maximum number of supersteps, stop
1678     // the computation
1679     if (maxNumberOfSupersteps !=
1680         GiraphConstants.MAX_NUMBER_OF_SUPERSTEPS.getDefaultValue() &&
1681         (getSuperstep() == maxNumberOfSupersteps - 1)) {
1682       if (LOG.isInfoEnabled()) {
1683         LOG.info("coordinateSuperstep: Finished " + maxNumberOfSupersteps +
1684             " supersteps (max specified by the user), halting");
1685       }
1686       globalStats.setHaltComputation(true);
1687     }
1688 
1689     // Superstep 0 doesn't need to have matching types (Message types may not
1690     // match) and if the computation is halted, no need to check any of
1691     // the types.
1692     if (!globalStats.getHaltComputation()) {
1693       superstepClasses.verifyTypesMatch(getSuperstep() > 0);
1694     }
1695     getConfiguration().updateSuperstepClasses(superstepClasses);
1696 
1697     //Signal workers that we want to checkpoint
1698     checkpointStatus = getCheckpointStatus(getSuperstep() + 1);
1699     globalStats.setCheckpointStatus(checkpointStatus);
1700     // Let everyone know the aggregated application state through the
1701     // superstep finishing znode.
1702     String superstepFinishedNode =
1703         getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep());
1704 
1705     WritableUtils.writeToZnode(
1706         getZkExt(), superstepFinishedNode, -1, globalStats, superstepClasses);
1707     updateCounters(globalStats);
1708 
1709     cleanUpOldSuperstep(getSuperstep() - 1);
1710     incrCachedSuperstep();
1711     // Counter starts at zero, so no need to increment
1712     if (getSuperstep() > 0) {
1713       GiraphStats.getInstance().getSuperstepCounter().increment();
1714     }
1715     SuperstepState superstepState;
1716     if (globalStats.getHaltComputation()) {
1717       superstepState = SuperstepState.ALL_SUPERSTEPS_DONE;
1718     } else {
1719       superstepState = SuperstepState.THIS_SUPERSTEP_DONE;
1720     }
1721     globalCommHandler.getAggregatorHandler().writeAggregators(
1722         getSuperstep(), superstepState);
1723 
1724     return superstepState;
1725   }
1726 
1727   /**
1728    * Should checkpoint on this superstep?  If checkpointing, always
1729    * checkpoint the first user superstep.  If restarting, the first
1730    * checkpoint is after the frequency has been met.
1731    *
1732    * @param superstep Decide if checkpointing no this superstep
1733    * @return True if this superstep should be checkpointed, false otherwise
1734    */
1735   private CheckpointStatus getCheckpointStatus(long superstep) {
1736     try {
1737       if (getZkExt().
1738           exists(basePath + FORCE_CHECKPOINT_USER_FLAG, false) != null) {
1739         if (isCheckpointingSupported(getConfiguration(), masterCompute)) {
1740           return CheckpointStatus.CHECKPOINT_AND_HALT;
1741         } else {
1742           LOG.warn("Attempted to manually checkpoint the job that " +
1743               "does not support checkpoints. Ignoring");
1744         }
1745       }
1746     } catch (KeeperException e) {
1747       throw new IllegalStateException(
1748           "cleanupZooKeeper: Got KeeperException", e);
1749     } catch (InterruptedException e) {
1750       throw new IllegalStateException(
1751           "cleanupZooKeeper: Got IllegalStateException", e);
1752     }
1753     if (checkpointFrequency == 0) {
1754       return CheckpointStatus.NONE;
1755     }
1756     long firstCheckpoint = INPUT_SUPERSTEP + 1;
1757     if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
1758       firstCheckpoint = getRestartedSuperstep() + checkpointFrequency;
1759     }
1760     if (superstep < firstCheckpoint) {
1761       return CheckpointStatus.NONE;
1762     }
1763     if (((superstep - firstCheckpoint) % checkpointFrequency) == 0) {
1764       if (isCheckpointingSupported(getConfiguration(), masterCompute)) {
1765         return CheckpointStatus.CHECKPOINT;
1766       }
1767     }
1768     return CheckpointStatus.NONE;
1769   }
1770 
1771   /**
1772    * Returns false if job doesn't support checkpoints.
1773    * Job may not support checkpointing if it does output during
1774    * computation, uses static variables to keep data between supersteps,
1775    * starts new threads etc.
1776    * @param conf Immutable configuration of the job
1777    * @param masterCompute instance of master compute
1778    * @return true if it is safe to checkpoint the job
1779    */
1780   private boolean isCheckpointingSupported(
1781       GiraphConfiguration conf, MasterCompute masterCompute) {
1782     return checkpointSupportedChecker.isCheckpointSupported(
1783         conf, masterCompute);
1784   }
1785 
1786 
1787   /**
1788    * This doMasterCompute is only called
1789    * after masterCompute is initialized
1790    */
1791   private void doMasterCompute() {
1792     GiraphTimerContext timerContext = masterComputeTimer.time();
1793     masterCompute.compute();
1794     timerContext.stop();
1795   }
1796 
1797   /**
1798    * Need to clean up ZooKeeper nicely.  Make sure all the masters and workers
1799    * have reported ending their ZooKeeper connections.
1800    */
1801   private void cleanUpZooKeeper() {
1802     try {
1803       getZkExt().createExt(cleanedUpPath,
1804           null,
1805           Ids.OPEN_ACL_UNSAFE,
1806           CreateMode.PERSISTENT,
1807           true);
1808     } catch (KeeperException.NodeExistsException e) {
1809       if (LOG.isInfoEnabled()) {
1810         LOG.info("cleanUpZooKeeper: Node " + cleanedUpPath +
1811             " already exists, no need to create.");
1812       }
1813     } catch (KeeperException e) {
1814       throw new IllegalStateException(
1815           "cleanupZooKeeper: Got KeeperException", e);
1816     } catch (InterruptedException e) {
1817       throw new IllegalStateException(
1818           "cleanupZooKeeper: Got IllegalStateException", e);
1819     }
1820     // Need to wait for the number of workers and masters to complete
1821     int maxTasks = BspInputFormat.getMaxTasks(getConfiguration());
1822     if ((getGraphTaskManager().getGraphFunctions() == GraphFunctions.ALL) ||
1823         (getGraphTaskManager().getGraphFunctions() ==
1824         GraphFunctions.ALL_EXCEPT_ZOOKEEPER)) {
1825       maxTasks *= 2;
1826     }
1827     List<String> cleanedUpChildrenList = null;
1828     while (true) {
1829       try {
1830         cleanedUpChildrenList =
1831             getZkExt().getChildrenExt(
1832                 cleanedUpPath, true, false, true);
1833         if (LOG.isInfoEnabled()) {
1834           LOG.info("cleanUpZooKeeper: Got " +
1835               cleanedUpChildrenList.size() + " of " +
1836               maxTasks  +  " desired children from " +
1837               cleanedUpPath);
1838         }
1839         if (cleanedUpChildrenList.size() == maxTasks) {
1840           break;
1841         }
1842         if (LOG.isInfoEnabled()) {
1843           LOG.info("cleanedUpZooKeeper: Waiting for the " +
1844               "children of " + cleanedUpPath +
1845               " to change since only got " +
1846               cleanedUpChildrenList.size() + " nodes.");
1847         }
1848       } catch (KeeperException e) {
1849         // We are in the cleanup phase -- just log the error
1850         LOG.error("cleanUpZooKeeper: Got KeeperException, " +
1851             "but will continue", e);
1852         return;
1853       } catch (InterruptedException e) {
1854         // We are in the cleanup phase -- just log the error
1855         LOG.error("cleanUpZooKeeper: Got InterruptedException, " +
1856             "but will continue", e);
1857         return;
1858       }
1859 
1860       getCleanedUpChildrenChangedEvent().waitForTimeoutOrFail(
1861           GiraphConstants.WAIT_FOR_OTHER_WORKERS_TIMEOUT_MSEC.get(
1862               getConfiguration()));
1863       getCleanedUpChildrenChangedEvent().reset();
1864     }
1865 
1866     // At this point, all processes have acknowledged the cleanup,
1867     // and the master can do any final cleanup if the ZooKeeper service was
1868     // provided (not dynamically started) and we don't want to keep the data
1869     try {
1870       if (getConfiguration().isZookeeperExternal() &&
1871           KEEP_ZOOKEEPER_DATA.isFalse(getConfiguration())) {
1872         if (LOG.isInfoEnabled()) {
1873           LOG.info("cleanupZooKeeper: Removing the following path " +
1874               "and all children - " + basePath + " from ZooKeeper list " +
1875               getConfiguration().getZookeeperList());
1876         }
1877         getZkExt().deleteExt(basePath, -1, true);
1878       }
1879     } catch (KeeperException e) {
1880       LOG.error("cleanupZooKeeper: Failed to do cleanup of " +
1881           basePath + " due to KeeperException", e);
1882     } catch (InterruptedException e) {
1883       LOG.error("cleanupZooKeeper: Failed to do cleanup of " +
1884           basePath + " due to InterruptedException", e);
1885     }
1886   }
1887 
1888   @Override
1889   public void postApplication() {
1890     for (MasterObserver observer : observers) {
1891       observer.postApplication();
1892       getContext().progress();
1893     }
1894   }
1895 
1896   @Override
1897   public void postSuperstep() {
1898     for (MasterObserver observer : observers) {
1899       observer.postSuperstep(getSuperstep());
1900       getContext().progress();
1901     }
1902   }
1903 
1904   @Override
1905   public void failureCleanup(Exception e) {
1906     for (MasterObserver observer : observers) {
1907       try {
1908         observer.applicationFailed(e);
1909         // CHECKSTYLE: stop IllegalCatchCheck
1910       } catch (RuntimeException re) {
1911         // CHECKSTYLE: resume IllegalCatchCheck
1912         LOG.error(re.getClass().getName() + " from observer " +
1913             observer.getClass().getName(), re);
1914       }
1915       getContext().progress();
1916     }
1917   }
1918 
1919   @Override
1920   public void cleanup(SuperstepState superstepState) throws IOException {
1921     ImmutableClassesGiraphConfiguration conf = getConfiguration();
1922 
1923     // All master processes should denote they are done by adding special
1924     // znode.  Once the number of znodes equals the number of partitions
1925     // for workers and masters, the master will clean up the ZooKeeper
1926     // znodes associated with this job.
1927     String masterCleanedUpPath = cleanedUpPath  + "/" +
1928         getTaskId() + MASTER_SUFFIX;
1929     try {
1930       String finalFinishedPath =
1931           getZkExt().createExt(masterCleanedUpPath,
1932               null,
1933               Ids.OPEN_ACL_UNSAFE,
1934               CreateMode.PERSISTENT,
1935               true);
1936       if (LOG.isInfoEnabled()) {
1937         LOG.info("cleanup: Notifying master its okay to cleanup with " +
1938             finalFinishedPath);
1939       }
1940     } catch (KeeperException.NodeExistsException e) {
1941       if (LOG.isInfoEnabled()) {
1942         LOG.info("cleanup: Couldn't create finished node '" +
1943             masterCleanedUpPath);
1944       }
1945     } catch (KeeperException e) {
1946       LOG.error("cleanup: Got KeeperException, continuing", e);
1947     } catch (InterruptedException e) {
1948       LOG.error("cleanup: Got InterruptedException, continuing", e);
1949     }
1950 
1951     if (isMaster) {
1952       getGraphTaskManager().setIsMaster(true);
1953       cleanUpZooKeeper();
1954       // If desired, cleanup the checkpoint directory
1955       if (superstepState == SuperstepState.ALL_SUPERSTEPS_DONE &&
1956           GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.get(conf)) {
1957         boolean success =
1958             getFs().delete(new Path(checkpointBasePath), true);
1959         if (LOG.isInfoEnabled()) {
1960           LOG.info("cleanup: Removed HDFS checkpoint directory (" +
1961               checkpointBasePath + ") with return = " +
1962               success + " since the job " + getContext().getJobName() +
1963               " succeeded ");
1964         }
1965       }
1966       if (superstepState == SuperstepState.CHECKPOINT_AND_HALT) {
1967         getFs().create(CheckpointingUtils.getCheckpointMarkPath(conf,
1968             getJobId()), true);
1969         failJob(new Exception("Checkpoint and halt requested. " +
1970             "Killing this job."));
1971       }
1972       globalCommHandler.getAggregatorHandler().close();
1973       masterClient.closeConnections();
1974       masterServer.close();
1975     }
1976 
1977     try {
1978       getZkExt().close();
1979     } catch (InterruptedException e) {
1980       // cleanup phase -- just log the error
1981       LOG.error("cleanup: Zookeeper failed to close", e);
1982     }
1983   }
1984 
1985   /**
1986    * Event that the master watches that denotes when a worker wrote checkpoint
1987    *
1988    * @return Event that denotes when a worker wrote checkpoint
1989    */
1990   public final BspEvent getWorkerWroteCheckpointEvent() {
1991     return workerWroteCheckpoint;
1992   }
1993 
1994   /**
1995    * Event that the master watches that denotes if a worker has done something
1996    * that changes the state of a superstep (either a worker completed or died)
1997    *
1998    * @return Event that denotes a superstep state change
1999    */
2000   public final BspEvent getSuperstepStateChangedEvent() {
2001     return superstepStateChanged;
2002   }
2003 
2004   /**
2005    * Should this worker failure cause the current superstep to fail?
2006    *
2007    * @param failedWorkerPath Full path to the failed worker
2008    */
2009   private void checkHealthyWorkerFailure(String failedWorkerPath) {
2010     if (getSuperstepFromPath(failedWorkerPath) < getSuperstep()) {
2011       return;
2012     }
2013 
2014     Collection<PartitionOwner> partitionOwners =
2015         masterGraphPartitioner.getCurrentPartitionOwners();
2016     String hostnameId =
2017         getHealthyHostnameIdFromPath(failedWorkerPath);
2018     for (PartitionOwner partitionOwner : partitionOwners) {
2019       WorkerInfo workerInfo = partitionOwner.getWorkerInfo();
2020       WorkerInfo previousWorkerInfo =
2021           partitionOwner.getPreviousWorkerInfo();
2022       if (workerInfo.getHostnameId().equals(hostnameId) ||
2023           ((previousWorkerInfo != null) &&
2024               previousWorkerInfo.getHostnameId().equals(hostnameId))) {
2025         LOG.warn("checkHealthyWorkerFailure: " +
2026             "at least one healthy worker went down " +
2027             "for superstep " + getSuperstep() + " - " +
2028             hostnameId + ", will try to restart from " +
2029             "checkpointed superstep " +
2030             lastCheckpointedSuperstep);
2031         superstepStateChanged.signal();
2032       }
2033     }
2034   }
2035 
2036   @Override
2037   public boolean processEvent(WatchedEvent event) {
2038     boolean foundEvent = false;
2039     if (event.getPath().contains(WORKER_HEALTHY_DIR) &&
2040         (event.getType() == EventType.NodeDeleted)) {
2041       if (LOG.isDebugEnabled()) {
2042         LOG.debug("processEvent: Healthy worker died (node deleted) " +
2043             "in " + event.getPath());
2044       }
2045       checkHealthyWorkerFailure(event.getPath());
2046       superstepStateChanged.signal();
2047       foundEvent = true;
2048     } else if (event.getPath().contains(WORKER_FINISHED_DIR) &&
2049         event.getType() == EventType.NodeChildrenChanged) {
2050       if (LOG.isDebugEnabled()) {
2051         LOG.debug("processEvent: Worker finished (node change) " +
2052             "event - superstepStateChanged signaled");
2053       }
2054       superstepStateChanged.signal();
2055       foundEvent = true;
2056     } else if (event.getPath().contains(WORKER_WROTE_CHECKPOINT_DIR) &&
2057         event.getType() == EventType.NodeChildrenChanged) {
2058       if (LOG.isDebugEnabled()) {
2059         LOG.debug("processEvent: Worker wrote checkpoint (node change) " +
2060             "event - workerWroteCheckpoint signaled");
2061       }
2062       workerWroteCheckpoint.signal();
2063       foundEvent = true;
2064     }
2065 
2066     return foundEvent;
2067   }
2068 
2069   /**
2070    * Set values of counters to match the ones from {@link GlobalStats}
2071    *
2072    * @param globalStats Global statistics which holds new counter values
2073    */
2074   private void updateCounters(GlobalStats globalStats) {
2075     GiraphStats gs = GiraphStats.getInstance();
2076     gs.getVertices().setValue(globalStats.getVertexCount());
2077     gs.getFinishedVertexes().setValue(globalStats.getFinishedVertexCount());
2078     gs.getEdges().setValue(globalStats.getEdgeCount());
2079     gs.getSentMessages().setValue(globalStats.getMessageCount());
2080     gs.getSentMessageBytes().setValue(globalStats.getMessageBytesCount());
2081     gs.getAggregateSentMessages().increment(globalStats.getMessageCount());
2082     gs.getAggregateSentMessageBytes()
2083       .increment(globalStats.getMessageBytesCount());
2084     gs.getAggregateOOCBytesLoaded()
2085       .increment(globalStats.getOocLoadBytesCount());
2086     gs.getAggregateOOCBytesStored()
2087       .increment(globalStats.getOocStoreBytesCount());
2088     // Updating the lowest percentage of graph in memory throughout the
2089     // execution across all the supersteps
2090     int percentage = (int) gs.getLowestGraphPercentageInMemory().getValue();
2091     gs.getLowestGraphPercentageInMemory().setValue(
2092         Math.min(percentage, globalStats.getLowestGraphPercentageInMemory()));
2093   }
2094 }