View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.master;
20  
21  import static org.apache.giraph.conf.GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT;
22  import static org.apache.giraph.conf.GiraphConstants.KEEP_ZOOKEEPER_DATA;
23  import static org.apache.giraph.conf.GiraphConstants.PARTITION_LONG_TAIL_MIN_PRINT;
24  
25  import java.io.DataInputStream;
26  import java.io.IOException;
27  import java.io.PrintStream;
28  import java.nio.charset.Charset;
29  import java.util.ArrayList;
30  import java.util.Collection;
31  import java.util.Collections;
32  import java.util.Comparator;
33  import java.util.HashSet;
34  import java.util.List;
35  import java.util.Set;
36  import java.util.TreeSet;
37  import java.util.concurrent.TimeUnit;
38  
39  import net.iharder.Base64;
40  
41  import org.apache.commons.io.FilenameUtils;
42  import org.apache.giraph.bsp.ApplicationState;
43  import org.apache.giraph.bsp.BspInputFormat;
44  import org.apache.giraph.bsp.BspService;
45  import org.apache.giraph.bsp.CentralizedServiceMaster;
46  import org.apache.giraph.bsp.SuperstepState;
47  import org.apache.giraph.bsp.checkpoints.CheckpointStatus;
48  import org.apache.giraph.bsp.checkpoints.CheckpointSupportedChecker;
49  import org.apache.giraph.comm.MasterClient;
50  import org.apache.giraph.comm.MasterServer;
51  import org.apache.giraph.comm.netty.NettyMasterClient;
52  import org.apache.giraph.comm.netty.NettyMasterServer;
53  import org.apache.giraph.comm.requests.AddressesAndPartitionsRequest;
54  import org.apache.giraph.conf.GiraphConfiguration;
55  import org.apache.giraph.conf.GiraphConstants;
56  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
57  import org.apache.giraph.counters.GiraphStats;
58  import org.apache.giraph.graph.AddressesAndPartitionsWritable;
59  import org.apache.giraph.graph.GlobalStats;
60  import org.apache.giraph.graph.GraphFunctions;
61  import org.apache.giraph.graph.GraphState;
62  import org.apache.giraph.graph.GraphTaskManager;
63  import org.apache.giraph.io.EdgeInputFormat;
64  import org.apache.giraph.io.GiraphInputFormat;
65  import org.apache.giraph.io.MappingInputFormat;
66  import org.apache.giraph.io.VertexInputFormat;
67  import org.apache.giraph.io.InputType;
68  import org.apache.giraph.master.input.MasterInputSplitsHandler;
69  import org.apache.giraph.metrics.AggregatedMetrics;
70  import org.apache.giraph.metrics.GiraphMetrics;
71  import org.apache.giraph.metrics.GiraphTimer;
72  import org.apache.giraph.metrics.GiraphTimerContext;
73  import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
74  import org.apache.giraph.metrics.SuperstepMetricsRegistry;
75  import org.apache.giraph.metrics.WorkerSuperstepMetrics;
76  import org.apache.giraph.partition.BasicPartitionOwner;
77  import org.apache.giraph.partition.MasterGraphPartitioner;
78  import org.apache.giraph.partition.PartitionOwner;
79  import org.apache.giraph.partition.PartitionStats;
80  import org.apache.giraph.partition.PartitionUtils;
81  import org.apache.giraph.time.SystemTime;
82  import org.apache.giraph.time.Time;
83  import org.apache.giraph.utils.CheckpointingUtils;
84  import org.apache.giraph.utils.JMapHistoDumper;
85  import org.apache.giraph.utils.ReactiveJMapHistoDumper;
86  import org.apache.giraph.utils.ReflectionUtils;
87  import org.apache.giraph.utils.WritableUtils;
88  import org.apache.giraph.worker.WorkerInfo;
89  import org.apache.giraph.zk.BspEvent;
90  import org.apache.giraph.zk.PredicateLock;
91  import org.apache.hadoop.fs.FSDataOutputStream;
92  import org.apache.hadoop.fs.FileSystem;
93  import org.apache.hadoop.fs.Path;
94  import org.apache.hadoop.io.Writable;
95  import org.apache.hadoop.io.WritableComparable;
96  import org.apache.hadoop.mapred.JobID;
97  import org.apache.hadoop.mapred.RunningJob;
98  import org.apache.hadoop.mapreduce.InputSplit;
99  import org.apache.hadoop.mapreduce.Mapper;
100 import org.apache.log4j.Logger;
101 import org.apache.zookeeper.CreateMode;
102 import org.apache.zookeeper.KeeperException;
103 import org.apache.zookeeper.WatchedEvent;
104 import org.apache.zookeeper.Watcher.Event.EventType;
105 import org.apache.zookeeper.ZooDefs.Ids;
106 import org.json.JSONException;
107 import org.json.JSONObject;
108 
109 import com.google.common.collect.Lists;
110 import com.google.common.collect.Sets;
111 
112 /**
113  * ZooKeeper-based implementation of {@link CentralizedServiceMaster}.
114  *
115  * @param <I> Vertex id
116  * @param <V> Vertex data
117  * @param <E> Edge data
118  */
119 @SuppressWarnings("rawtypes, unchecked")
120 public class BspServiceMaster<I extends WritableComparable,
121     V extends Writable, E extends Writable>
122     extends BspService<I, V, E>
123     implements CentralizedServiceMaster<I, V, E>,
124     ResetSuperstepMetricsObserver {
125   /** Print worker names only if there are 10 workers left */
126   public static final int MAX_PRINTABLE_REMAINING_WORKERS = 10;
127   /** How many threads to use when writing input splits to zookeeper*/
128   public static final String NUM_MASTER_ZK_INPUT_SPLIT_THREADS =
129       "giraph.numMasterZkInputSplitThreads";
130   /** Default number of threads to use when writing input splits to zookeeper */
131   public static final int DEFAULT_INPUT_SPLIT_THREAD_COUNT = 1;
132   /** Time instance to use for timing */
133   private static final Time TIME = SystemTime.get();
134   /** Class logger */
135   private static final Logger LOG = Logger.getLogger(BspServiceMaster.class);
136   /** Am I the master? */
137   private boolean isMaster = false;
138   /** Max number of workers */
139   private final int maxWorkers;
140   /** Min number of workers */
141   private final int minWorkers;
142   /** Max number of supersteps */
143   private final int maxNumberOfSupersteps;
144   /** Min % responded workers */
145   private final float minPercentResponded;
146   /** Msecs to wait for an event */
147   private final int eventWaitMsecs;
148   /** Max msecs to wait for a superstep to get enough workers */
149   private final int maxSuperstepWaitMsecs;
150   /** Min number of long tails before printing */
151   private final int partitionLongTailMinPrint;
152   /** Last finalized checkpoint */
153   private long lastCheckpointedSuperstep = -1;
154   /** Worker wrote checkpoint */
155   private final BspEvent workerWroteCheckpoint;
156   /** State of the superstep changed */
157   private final BspEvent superstepStateChanged;
158   /** Master graph partitioner */
159   private final MasterGraphPartitioner<I, V, E> masterGraphPartitioner;
160   /** All the partition stats from the last superstep */
161   private final List<PartitionStats> allPartitionStatsList =
162       new ArrayList<PartitionStats>();
163   /** Handler for global communication */
164   private MasterGlobalCommHandler globalCommHandler;
165   /** Handler for aggregators to reduce/broadcast translation */
166   private AggregatorToGlobalCommTranslation aggregatorTranslation;
167   /** Master class */
168   private MasterCompute masterCompute;
169   /** IPC Client */
170   private MasterClient masterClient;
171   /** IPC Server */
172   private MasterServer masterServer;
173   /** Master info */
174   private MasterInfo masterInfo;
175   /** List of workers in current superstep, sorted by task id */
176   private List<WorkerInfo> chosenWorkerInfoList = Lists.newArrayList();
177   /** Limit locality information added to each InputSplit znode */
178   private final int localityLimit = 5;
179   /** Observers over master lifecycle. */
180   private final MasterObserver[] observers;
181 
182   // Per-Superstep Metrics
183   /** MasterCompute time */
184   private GiraphTimer masterComputeTimer;
185 
186   /** Checkpoint frequency */
187   private final int checkpointFrequency;
188   /** Current checkpoint status */
189   private CheckpointStatus checkpointStatus;
190   /** Checks if checkpointing supported */
191   private final CheckpointSupportedChecker checkpointSupportedChecker;
192 
193   /**
194    * Constructor for setting up the master.
195    *
196    * @param context Mapper context
197    * @param graphTaskManager GraphTaskManager for this compute node
198    */
199   public BspServiceMaster(
200       Mapper<?, ?, ?, ?>.Context context,
201       GraphTaskManager<I, V, E> graphTaskManager) {
202     super(context, graphTaskManager);
203     workerWroteCheckpoint = new PredicateLock(context);
204     registerBspEvent(workerWroteCheckpoint);
205     superstepStateChanged = new PredicateLock(context);
206     registerBspEvent(superstepStateChanged);
207 
208     ImmutableClassesGiraphConfiguration<I, V, E> conf =
209         getConfiguration();
210 
211     maxWorkers = conf.getMaxWorkers();
212     minWorkers = conf.getMinWorkers();
213     maxNumberOfSupersteps = conf.getMaxNumberOfSupersteps();
214     minPercentResponded = GiraphConstants.MIN_PERCENT_RESPONDED.get(conf);
215     eventWaitMsecs = conf.getEventWaitMsecs();
216     maxSuperstepWaitMsecs = conf.getMaxMasterSuperstepWaitMsecs();
217     partitionLongTailMinPrint = PARTITION_LONG_TAIL_MIN_PRINT.get(conf);
218     masterGraphPartitioner =
219         getGraphPartitionerFactory().createMasterGraphPartitioner();
220     if (conf.isJMapHistogramDumpEnabled()) {
221       conf.addMasterObserverClass(JMapHistoDumper.class);
222     }
223     if (conf.isReactiveJmapHistogramDumpEnabled()) {
224       conf.addMasterObserverClass(ReactiveJMapHistoDumper.class);
225     }
226     observers = conf.createMasterObservers();
227 
228     this.checkpointFrequency = conf.getCheckpointFrequency();
229     this.checkpointStatus = CheckpointStatus.NONE;
230     this.checkpointSupportedChecker =
231         ReflectionUtils.newInstance(
232             GiraphConstants.CHECKPOINT_SUPPORTED_CHECKER.get(conf));
233 
234     GiraphMetrics.get().addSuperstepResetObserver(this);
235     GiraphStats.init(context);
236   }
237 
238   @Override
239   public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
240     masterComputeTimer = new GiraphTimer(superstepMetrics,
241         "master-compute-call", TimeUnit.MILLISECONDS);
242   }
243 
244   @Override
245   public void setJobState(ApplicationState state,
246       long applicationAttempt,
247       long desiredSuperstep) {
248     setJobState(state, applicationAttempt, desiredSuperstep, true);
249   }
250 
251   /**
252    * Set the job state.
253    *
254    * @param state State of the application.
255    * @param applicationAttempt Attempt to start on
256    * @param desiredSuperstep Superstep to restart from (if applicable)
257    * @param killJobOnFailure if true, and the desired state is FAILED,
258    *                         then kill this job.
259    */
260   private void setJobState(ApplicationState state,
261       long applicationAttempt,
262       long desiredSuperstep,
263       boolean killJobOnFailure) {
264     JSONObject jobState = new JSONObject();
265     try {
266       jobState.put(JSONOBJ_STATE_KEY, state.toString());
267       jobState.put(JSONOBJ_APPLICATION_ATTEMPT_KEY, applicationAttempt);
268       jobState.put(JSONOBJ_SUPERSTEP_KEY, desiredSuperstep);
269     } catch (JSONException e) {
270       throw new RuntimeException("setJobState: Couldn't put " +
271           state.toString());
272     }
273     if (LOG.isInfoEnabled()) {
274       LOG.info("setJobState: " + jobState.toString() + " on superstep " +
275           getSuperstep());
276     }
277     try {
278       getZkExt().createExt(masterJobStatePath + "/jobState",
279           jobState.toString().getBytes(Charset.defaultCharset()),
280           Ids.OPEN_ACL_UNSAFE,
281           CreateMode.PERSISTENT_SEQUENTIAL,
282           true);
283       LOG.info("setJobState: " + jobState);
284     } catch (KeeperException.NodeExistsException e) {
285       throw new IllegalStateException(
286           "setJobState: Imposible that " +
287               masterJobStatePath + " already exists!", e);
288     } catch (KeeperException e) {
289       throw new IllegalStateException(
290           "setJobState: Unknown KeeperException for " +
291               masterJobStatePath, e);
292     } catch (InterruptedException e) {
293       throw new IllegalStateException(
294           "setJobState: Unknown InterruptedException for " +
295               masterJobStatePath, e);
296     }
297     if (state == ApplicationState.FAILED && killJobOnFailure) {
298       failJob(new IllegalStateException("FAILED"));
299     }
300 
301   }
302 
303   /**
304    * Set the job state to FAILED. This will kill the job, and log exceptions to
305    * any observers.
306    *
307    * @param reason The reason the job failed
308    */
309   private void setJobStateFailed(String reason) {
310     getGraphTaskManager().getJobProgressTracker().logFailure(reason);
311     setJobState(ApplicationState.FAILED, -1, -1, false);
312     failJob(new IllegalStateException(reason));
313   }
314 
315   /**
316    * Common method for generating vertex/edge input splits.
317    *
318    * @param inputFormat The vertex/edge input format
319    * @param minSplitCountHint Minimum number of splits to create (hint)
320    * @param inputSplitType Type of input splits (for logging purposes)
321    * @return List of input splits for the given format
322    */
323   private List<InputSplit> generateInputSplits(GiraphInputFormat inputFormat,
324                                                int minSplitCountHint,
325                                                InputType inputSplitType) {
326     String logPrefix = "generate" + inputSplitType + "InputSplits";
327     List<InputSplit> splits;
328     try {
329       splits = inputFormat.getSplits(getContext(), minSplitCountHint);
330     } catch (IOException e) {
331       throw new IllegalStateException(logPrefix + ": Got IOException", e);
332     } catch (InterruptedException e) {
333       throw new IllegalStateException(
334           logPrefix + ": Got InterruptedException", e);
335     }
336     float samplePercent =
337         INPUT_SPLIT_SAMPLE_PERCENT.get(getConfiguration());
338     if (samplePercent != INPUT_SPLIT_SAMPLE_PERCENT.getDefaultValue()) {
339       int lastIndex = (int) (samplePercent * splits.size() / 100f);
340       List<InputSplit> sampleSplits = splits.subList(0, lastIndex);
341       LOG.warn(logPrefix + ": Using sampling - Processing only " +
342           sampleSplits.size() + " instead of " + splits.size() +
343           " expected splits.");
344       return sampleSplits;
345     } else {
346       if (LOG.isInfoEnabled()) {
347         LOG.info(logPrefix + ": Got " + splits.size() +
348             " input splits for " + minSplitCountHint + " input threads");
349       }
350       return splits;
351     }
352   }
353 
354   /**
355    * When there is no salvaging this job, fail it.
356    *
357    * @param e Exception to log to observers
358    */
359   private void failJob(Exception e) {
360     LOG.fatal("failJob: Killing job " + getJobId());
361     LOG.fatal("failJob: exception " + e.toString());
362     try {
363       if (getConfiguration().isPureYarnJob()) {
364         throw new RuntimeException(
365           "BspServiceMaster (YARN profile) is " +
366           "FAILING this task, throwing exception to end job run.", e);
367       } else {
368         @SuppressWarnings("deprecation")
369         org.apache.hadoop.mapred.JobClient jobClient =
370           new org.apache.hadoop.mapred.JobClient(
371             (org.apache.hadoop.mapred.JobConf)
372             getContext().getConfiguration());
373         @SuppressWarnings("deprecation")
374         JobID jobId = JobID.forName(getJobId());
375         RunningJob job = jobClient.getJob(jobId);
376         if (job != null) {
377           job.killJob();
378         } else {
379           LOG.error("Jon not found for jobId=" + getJobId());
380         }
381       }
382     } catch (IOException ioe) {
383       throw new RuntimeException(ioe);
384     } finally {
385       failureCleanup(e);
386     }
387   }
388 
389   /**
390    * Parse the {@link WorkerInfo} objects from a ZooKeeper path
391    * (and children).
392    *
393    * @param workerInfosPath Path where all the workers are children
394    * @param watch Watch or not?
395    * @return List of workers in that path
396    */
397   private List<WorkerInfo> getWorkerInfosFromPath(String workerInfosPath,
398       boolean watch) {
399     List<WorkerInfo> workerInfoList = new ArrayList<WorkerInfo>();
400     List<String> workerInfoPathList;
401     try {
402       workerInfoPathList =
403           getZkExt().getChildrenExt(workerInfosPath, watch, false, true);
404     } catch (KeeperException e) {
405       throw new IllegalStateException(
406           "getWorkers: Got KeeperException", e);
407     } catch (InterruptedException e) {
408       throw new IllegalStateException(
409           "getWorkers: Got InterruptedStateException", e);
410     }
411     for (String workerInfoPath : workerInfoPathList) {
412       WorkerInfo workerInfo = new WorkerInfo();
413       try {
414         WritableUtils.readFieldsFromZnode(
415             getZkExt(), workerInfoPath, true, null, workerInfo);
416         workerInfoList.add(workerInfo);
417       } catch (IllegalStateException e) {
418         LOG.warn("Can't get info from worker, did it die in between? " +
419             "workerInfoPath=" + workerInfoPath, e);
420       }
421     }
422     return workerInfoList;
423   }
424 
425   /**
426    * Get the healthy and unhealthy {@link WorkerInfo} objects for
427    * a superstep
428    *
429    * @param superstep superstep to check
430    * @param healthyWorkerInfoList filled in with current data
431    * @param unhealthyWorkerInfoList filled in with current data
432    */
433   private void getAllWorkerInfos(
434       long superstep,
435       List<WorkerInfo> healthyWorkerInfoList,
436       List<WorkerInfo> unhealthyWorkerInfoList) {
437     String healthyWorkerInfoPath =
438         getWorkerInfoHealthyPath(getApplicationAttempt(), superstep);
439     String unhealthyWorkerInfoPath =
440         getWorkerInfoUnhealthyPath(getApplicationAttempt(), superstep);
441 
442     try {
443       getZkExt().createOnceExt(healthyWorkerInfoPath,
444           null,
445           Ids.OPEN_ACL_UNSAFE,
446           CreateMode.PERSISTENT,
447           true);
448     } catch (KeeperException e) {
449       throw new IllegalStateException("getWorkers: KeeperException", e);
450     } catch (InterruptedException e) {
451       throw new IllegalStateException("getWorkers: InterruptedException", e);
452     }
453 
454     try {
455       getZkExt().createOnceExt(unhealthyWorkerInfoPath,
456           null,
457           Ids.OPEN_ACL_UNSAFE,
458           CreateMode.PERSISTENT,
459           true);
460     } catch (KeeperException e) {
461       throw new IllegalStateException("getWorkers: KeeperException", e);
462     } catch (InterruptedException e) {
463       throw new IllegalStateException("getWorkers: InterruptedException", e);
464     }
465 
466     List<WorkerInfo> currentHealthyWorkerInfoList =
467         getWorkerInfosFromPath(healthyWorkerInfoPath, true);
468     List<WorkerInfo> currentUnhealthyWorkerInfoList =
469         getWorkerInfosFromPath(unhealthyWorkerInfoPath, false);
470 
471     healthyWorkerInfoList.clear();
472     if (currentHealthyWorkerInfoList != null) {
473       for (WorkerInfo healthyWorkerInfo :
474         currentHealthyWorkerInfoList) {
475         healthyWorkerInfoList.add(healthyWorkerInfo);
476       }
477     }
478 
479     unhealthyWorkerInfoList.clear();
480     if (currentUnhealthyWorkerInfoList != null) {
481       for (WorkerInfo unhealthyWorkerInfo :
482         currentUnhealthyWorkerInfoList) {
483         unhealthyWorkerInfoList.add(unhealthyWorkerInfo);
484       }
485     }
486   }
487 
488   @Override
489   public List<WorkerInfo> checkWorkers() {
490     boolean failJob = true;
491     long failWorkerCheckMsecs =
492         SystemTime.get().getMilliseconds() + maxSuperstepWaitMsecs;
493     List<WorkerInfo> healthyWorkerInfoList = new ArrayList<WorkerInfo>();
494     List<WorkerInfo> unhealthyWorkerInfoList = new ArrayList<WorkerInfo>();
495     int totalResponses = -1;
496     while (SystemTime.get().getMilliseconds() < failWorkerCheckMsecs) {
497       getContext().progress();
498       getAllWorkerInfos(
499           getSuperstep(), healthyWorkerInfoList, unhealthyWorkerInfoList);
500       totalResponses = healthyWorkerInfoList.size() +
501           unhealthyWorkerInfoList.size();
502       if ((totalResponses * 100.0f / maxWorkers) >=
503           minPercentResponded) {
504         failJob = false;
505         break;
506       }
507       getContext().setStatus(getGraphTaskManager().getGraphFunctions() + " " +
508           "checkWorkers: Only found " +
509           totalResponses +
510           " responses of " + maxWorkers +
511           " needed to start superstep " +
512           getSuperstep());
513       if (getWorkerHealthRegistrationChangedEvent().waitMsecs(
514           eventWaitMsecs)) {
515         if (LOG.isDebugEnabled()) {
516           LOG.debug("checkWorkers: Got event that health " +
517               "registration changed, not using poll attempt");
518         }
519         getWorkerHealthRegistrationChangedEvent().reset();
520         continue;
521       }
522       if (LOG.isInfoEnabled()) {
523         LOG.info("checkWorkers: Only found " + totalResponses +
524             " responses of " + maxWorkers +
525             " needed to start superstep " +
526             getSuperstep() + ".  Reporting every " +
527             eventWaitMsecs + " msecs, " +
528             (failWorkerCheckMsecs - SystemTime.get().getMilliseconds()) +
529             " more msecs left before giving up.");
530         // Find the missing workers if there are only a few
531         if ((maxWorkers - totalResponses) <=
532             partitionLongTailMinPrint) {
533           logMissingWorkersOnSuperstep(healthyWorkerInfoList,
534               unhealthyWorkerInfoList);
535         }
536       }
537     }
538     if (failJob) {
539       LOG.error("checkWorkers: Did not receive enough processes in " +
540           "time (only " + totalResponses + " of " +
541           minWorkers + " required) after waiting " + maxSuperstepWaitMsecs +
542           "msecs).  This occurs if you do not have enough map tasks " +
543           "available simultaneously on your Hadoop instance to fulfill " +
544           "the number of requested workers.");
545       return null;
546     }
547 
548     if (healthyWorkerInfoList.size() < minWorkers) {
549       LOG.error("checkWorkers: Only " + healthyWorkerInfoList.size() +
550           " available when " + minWorkers + " are required.");
551       logMissingWorkersOnSuperstep(healthyWorkerInfoList,
552           unhealthyWorkerInfoList);
553       return null;
554     }
555 
556     getContext().setStatus(getGraphTaskManager().getGraphFunctions() + " " +
557         "checkWorkers: Done - Found " + totalResponses +
558         " responses of " + maxWorkers + " needed to start superstep " +
559         getSuperstep());
560 
561     return healthyWorkerInfoList;
562   }
563 
564   /**
565    * Log info level of the missing workers on the superstep
566    *
567    * @param healthyWorkerInfoList Healthy worker list
568    * @param unhealthyWorkerInfoList Unhealthy worker list
569    */
570   private void logMissingWorkersOnSuperstep(
571       List<WorkerInfo> healthyWorkerInfoList,
572       List<WorkerInfo> unhealthyWorkerInfoList) {
573     if (LOG.isInfoEnabled()) {
574       Set<Integer> partitionSet = new TreeSet<Integer>();
575       for (WorkerInfo workerInfo : healthyWorkerInfoList) {
576         partitionSet.add(workerInfo.getTaskId());
577       }
578       for (WorkerInfo workerInfo : unhealthyWorkerInfoList) {
579         partitionSet.add(workerInfo.getTaskId());
580       }
581       for (int i = 1; i <= maxWorkers; ++i) {
582         if (partitionSet.contains(Integer.valueOf(i))) {
583           continue;
584         } else if (i == getTaskPartition()) {
585           continue;
586         } else {
587           LOG.info("logMissingWorkersOnSuperstep: No response from " +
588               "partition " + i + " (could be master)");
589         }
590       }
591     }
592   }
593 
594   /**
595    * Common method for creating vertex/edge input splits.
596    *
597    * @param inputFormat The vertex/edge input format
598    * @param inputSplitType Type of input split (for logging purposes)
599    * @return Number of splits. Returns -1 on failure to create
600    *         valid input splits.
601    */
602   private int createInputSplits(GiraphInputFormat inputFormat,
603                                 InputType inputSplitType) {
604     ImmutableClassesGiraphConfiguration conf = getConfiguration();
605     String logPrefix = "create" + inputSplitType + "InputSplits";
606     // Only the 'master' should be doing this.  Wait until the number of
607     // processes that have reported health exceeds the minimum percentage.
608     // If the minimum percentage is not met, fail the job.  Otherwise
609     // generate the input splits
610     List<WorkerInfo> healthyWorkerInfoList = checkWorkers();
611     if (healthyWorkerInfoList == null) {
612       setJobStateFailed("Not enough healthy workers to create input splits");
613       return -1;
614     }
615     globalCommHandler.getInputSplitsHandler().initialize(masterClient,
616         healthyWorkerInfoList);
617 
618     // Create at least as many splits as the total number of input threads.
619     int minSplitCountHint = healthyWorkerInfoList.size() *
620         conf.getNumInputSplitsThreads();
621 
622     // Note that the input splits may only be a sample if
623     // INPUT_SPLIT_SAMPLE_PERCENT is set to something other than 100
624     List<InputSplit> splitList = generateInputSplits(inputFormat,
625         minSplitCountHint, inputSplitType);
626 
627     if (splitList.isEmpty()) {
628       LOG.fatal(logPrefix + ": Failing job due to 0 input splits, " +
629           "check input of " + inputFormat.getClass().getName() + "!");
630       getContext().setStatus("Failing job due to 0 input splits, " +
631           "check input of " + inputFormat.getClass().getName() + "!");
632       setJobStateFailed("******* PLEASE CHECK YOUR INPUT TABLES - PARTITIONS " +
633           "WHICH YOU SPECIFIED ARE MISSING (for " + inputSplitType +
634           " input). FAILING THE JOB *******");
635     }
636     if (minSplitCountHint > splitList.size()) {
637       LOG.warn(logPrefix + ": Number of inputSplits=" +
638           splitList.size() + " < " +
639           minSplitCountHint +
640           "=total number of input threads, " +
641           "some threads will be not used");
642     }
643 
644     globalCommHandler.getInputSplitsHandler().addSplits(inputSplitType,
645         splitList, inputFormat);
646 
647     return splitList.size();
648   }
649 
650   @Override
651   public int createMappingInputSplits() {
652     if (!getConfiguration().hasMappingInputFormat()) {
653       return 0;
654     }
655     MappingInputFormat<I, V, E, ? extends Writable> mappingInputFormat =
656       getConfiguration().createWrappedMappingInputFormat();
657     return createInputSplits(mappingInputFormat, InputType.MAPPING);
658   }
659 
660   @Override
661   public int createVertexInputSplits() {
662     // Short-circuit if there is no vertex input format
663     if (!getConfiguration().hasVertexInputFormat()) {
664       return 0;
665     }
666     VertexInputFormat<I, V, E> vertexInputFormat =
667         getConfiguration().createWrappedVertexInputFormat();
668     return createInputSplits(vertexInputFormat, InputType.VERTEX);
669   }
670 
671   @Override
672   public int createEdgeInputSplits() {
673     // Short-circuit if there is no edge input format
674     if (!getConfiguration().hasEdgeInputFormat()) {
675       return 0;
676     }
677     EdgeInputFormat<I, E> edgeInputFormat =
678         getConfiguration().createWrappedEdgeInputFormat();
679     return createInputSplits(edgeInputFormat, InputType.EDGE);
680   }
681 
682   @Override
683   public List<WorkerInfo> getWorkerInfoList() {
684     return chosenWorkerInfoList;
685   }
686 
687   @Override
688   public MasterGlobalCommHandler getGlobalCommHandler() {
689     return globalCommHandler;
690   }
691 
692   @Override
693   public AggregatorToGlobalCommTranslation getAggregatorTranslationHandler() {
694     return aggregatorTranslation;
695   }
696 
697   @Override
698   public MasterCompute getMasterCompute() {
699     return masterCompute;
700   }
701 
702   /**
703    * Read the finalized checkpoint file and associated metadata files for the
704    * checkpoint.  Modifies the {@link PartitionOwner} objects to get the
705    * checkpoint prefixes.  It is an optimization to prevent all workers from
706    * searching all the files.  Also read in the aggregator data from the
707    * finalized checkpoint file and setting it.
708    *
709    * @param superstep Checkpoint set to examine.
710    * @throws IOException
711    * @throws InterruptedException
712    * @throws KeeperException
713    * @return Collection of generated partition owners.
714    */
715   private Collection<PartitionOwner> prepareCheckpointRestart(long superstep)
716     throws IOException, KeeperException, InterruptedException {
717     List<PartitionOwner> partitionOwners = new ArrayList<>();
718     FileSystem fs = getFs();
719     String finalizedCheckpointPath = getSavedCheckpointBasePath(superstep) +
720         CheckpointingUtils.CHECKPOINT_FINALIZED_POSTFIX;
721     LOG.info("Loading checkpoint from " + finalizedCheckpointPath);
722     DataInputStream finalizedStream =
723         fs.open(new Path(finalizedCheckpointPath));
724     GlobalStats globalStats = new GlobalStats();
725     globalStats.readFields(finalizedStream);
726     updateCounters(globalStats);
727     SuperstepClasses superstepClasses =
728         SuperstepClasses.createToRead(getConfiguration());
729     superstepClasses.readFields(finalizedStream);
730     getConfiguration().updateSuperstepClasses(superstepClasses);
731     int prefixFileCount = finalizedStream.readInt();
732 
733     String checkpointFile =
734         finalizedStream.readUTF();
735     for (int i = 0; i < prefixFileCount; ++i) {
736       int mrTaskId = finalizedStream.readInt();
737 
738       DataInputStream metadataStream = fs.open(new Path(checkpointFile +
739           "." + mrTaskId + CheckpointingUtils.CHECKPOINT_METADATA_POSTFIX));
740       long partitions = metadataStream.readInt();
741       WorkerInfo worker = getWorkerInfoById(mrTaskId);
742       for (long p = 0; p < partitions; ++p) {
743         int partitionId = metadataStream.readInt();
744         PartitionOwner partitionOwner = new BasicPartitionOwner(partitionId,
745             worker);
746         partitionOwners.add(partitionOwner);
747         LOG.info("prepareCheckpointRestart partitionId=" + partitionId +
748             " assigned to " + partitionOwner);
749       }
750       metadataStream.close();
751     }
752     //Ordering appears to be important as of right now we rely on this ordering
753     //in WorkerGraphPartitioner
754     Collections.sort(partitionOwners, new Comparator<PartitionOwner>() {
755       @Override
756       public int compare(PartitionOwner p1, PartitionOwner p2) {
757         return Integer.compare(p1.getPartitionId(), p2.getPartitionId());
758       }
759     });
760 
761 
762     globalCommHandler.getAggregatorHandler().readFields(finalizedStream);
763     aggregatorTranslation.readFields(finalizedStream);
764     masterCompute.readFields(finalizedStream);
765     finalizedStream.close();
766 
767     return partitionOwners;
768   }
769 
770   @Override
771   public void setup() {
772     // Might have to manually load a checkpoint.
773     // In that case, the input splits are not set, they will be faked by
774     // the checkpoint files.  Each checkpoint file will be an input split
775     // and the input split
776 
777     if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
778       GiraphStats.getInstance().getSuperstepCounter().
779         setValue(getRestartedSuperstep());
780     }
781     for (MasterObserver observer : observers) {
782       observer.preApplication();
783       getContext().progress();
784     }
785   }
786 
787   @Override
788   public boolean becomeMaster() {
789     // Create my bid to become the master, then try to become the worker
790     // or return false.
791     String myBid = null;
792     try {
793       myBid =
794           getZkExt().createExt(masterElectionPath +
795               "/" + getHostnamePartitionId(),
796               null,
797               Ids.OPEN_ACL_UNSAFE,
798               CreateMode.EPHEMERAL_SEQUENTIAL,
799               true);
800     } catch (KeeperException e) {
801       throw new IllegalStateException(
802           "becomeMaster: KeeperException", e);
803     } catch (InterruptedException e) {
804       throw new IllegalStateException(
805           "becomeMaster: IllegalStateException", e);
806     }
807     while (true) {
808       JSONObject jobState = getJobState();
809       try {
810         if ((jobState != null) &&
811             ApplicationState.valueOf(
812                 jobState.getString(JSONOBJ_STATE_KEY)) ==
813                 ApplicationState.FINISHED) {
814           LOG.info("becomeMaster: Job is finished, " +
815               "give up trying to be the master!");
816           isMaster = false;
817           return isMaster;
818         }
819       } catch (JSONException e) {
820         throw new IllegalStateException(
821             "becomeMaster: Couldn't get state from " + jobState, e);
822       }
823       try {
824         List<String> masterChildArr =
825             getZkExt().getChildrenExt(
826                 masterElectionPath, true, true, true);
827         if (LOG.isInfoEnabled()) {
828           LOG.info("becomeMaster: First child is '" +
829               masterChildArr.get(0) + "' and my bid is '" +
830               myBid + "'");
831         }
832         if (masterChildArr.get(0).equals(myBid)) {
833           GiraphStats.getInstance().getCurrentMasterTaskPartition().
834               setValue(getTaskPartition());
835 
836           globalCommHandler = new MasterGlobalCommHandler(
837               new MasterAggregatorHandler(getConfiguration(), getContext()),
838               new MasterInputSplitsHandler(
839                   getConfiguration().useInputSplitLocality()));
840           aggregatorTranslation = new AggregatorToGlobalCommTranslation(
841               getConfiguration(), globalCommHandler);
842 
843           globalCommHandler.getAggregatorHandler().initialize(this);
844           masterCompute = getConfiguration().createMasterCompute();
845           masterCompute.setMasterService(this);
846 
847           masterInfo = new MasterInfo();
848           masterServer =
849               new NettyMasterServer(getConfiguration(), this, getContext(),
850                   getGraphTaskManager().createUncaughtExceptionHandler());
851           masterInfo.setInetSocketAddress(masterServer.getMyAddress(),
852               masterServer.getLocalHostOrIp());
853           masterInfo.setTaskId(getTaskPartition());
854           masterClient =
855               new NettyMasterClient(getContext(), getConfiguration(), this,
856                   getGraphTaskManager().createUncaughtExceptionHandler());
857           masterServer.setFlowControl(masterClient.getFlowControl());
858 
859           if (LOG.isInfoEnabled()) {
860             LOG.info("becomeMaster: I am now the master!");
861           }
862           isMaster = true;
863           return isMaster;
864         }
865         LOG.info("becomeMaster: Waiting to become the master...");
866         getMasterElectionChildrenChangedEvent().waitForever();
867         getMasterElectionChildrenChangedEvent().reset();
868       } catch (KeeperException e) {
869         throw new IllegalStateException(
870             "becomeMaster: KeeperException", e);
871       } catch (InterruptedException e) {
872         throw new IllegalStateException(
873             "becomeMaster: IllegalStateException", e);
874       }
875     }
876   }
877 
878   @Override
879   public MasterInfo getMasterInfo() {
880     return masterInfo;
881   }
882 
883   /**
884    * Collect and aggregate the worker statistics for a particular superstep.
885    *
886    * @param superstep Superstep to aggregate on
887    * @return Global statistics aggregated on all worker statistics
888    */
889   private GlobalStats aggregateWorkerStats(long superstep) {
890     ImmutableClassesGiraphConfiguration conf = getConfiguration();
891 
892     GlobalStats globalStats = new GlobalStats();
893     // Get the stats from the all the worker selected nodes
894     String workerFinishedPath =
895         getWorkerFinishedPath(getApplicationAttempt(), superstep);
896     List<String> workerFinishedPathList = null;
897     try {
898       workerFinishedPathList =
899           getZkExt().getChildrenExt(
900               workerFinishedPath, false, false, true);
901     } catch (KeeperException e) {
902       throw new IllegalStateException(
903           "aggregateWorkerStats: KeeperException", e);
904     } catch (InterruptedException e) {
905       throw new IllegalStateException(
906           "aggregateWorkerStats: InterruptedException", e);
907     }
908 
909     AggregatedMetrics aggregatedMetrics = new AggregatedMetrics();
910 
911     for (String finishedPath : workerFinishedPathList) {
912       String hostnamePartitionId = FilenameUtils.getName(finishedPath);
913       JSONObject workerFinishedInfoObj = null;
914       try {
915         byte [] zkData =
916             getZkExt().getData(finishedPath, false, null);
917         workerFinishedInfoObj = new JSONObject(new String(zkData,
918             Charset.defaultCharset()));
919         globalStats.addMessageCount(
920             workerFinishedInfoObj.getLong(
921                 JSONOBJ_NUM_MESSAGES_KEY));
922         globalStats.addMessageBytesCount(
923           workerFinishedInfoObj.getLong(
924               JSONOBJ_NUM_MESSAGE_BYTES_KEY));
925         if (conf.metricsEnabled() &&
926             workerFinishedInfoObj.has(JSONOBJ_METRICS_KEY)) {
927           WorkerSuperstepMetrics workerMetrics = new WorkerSuperstepMetrics();
928           WritableUtils.readFieldsFromByteArray(
929               Base64.decode(
930                   workerFinishedInfoObj.getString(
931                       JSONOBJ_METRICS_KEY)),
932               workerMetrics);
933           globalStats.addOocLoadBytesCount(
934               workerMetrics.getBytesLoadedFromDisk());
935           globalStats.addOocStoreBytesCount(
936               workerMetrics.getBytesStoredOnDisk());
937           // Find the lowest percentage of graph in memory across all workers
938           // for one superstep
939           globalStats.setLowestGraphPercentageInMemory(
940               Math.min(globalStats.getLowestGraphPercentageInMemory(),
941                   (int) Math.round(
942                       workerMetrics.getGraphPercentageInMemory())));
943           aggregatedMetrics.add(workerMetrics, hostnamePartitionId);
944         }
945       } catch (JSONException e) {
946         throw new IllegalStateException(
947             "aggregateWorkerStats: JSONException", e);
948       } catch (KeeperException e) {
949         throw new IllegalStateException(
950             "aggregateWorkerStats: KeeperException", e);
951       } catch (InterruptedException e) {
952         throw new IllegalStateException(
953             "aggregateWorkerStats: InterruptedException", e);
954       } catch (IOException e) {
955         throw new IllegalStateException(
956             "aggregateWorkerStats: IOException", e);
957       }
958     }
959 
960     allPartitionStatsList.clear();
961     Iterable<PartitionStats> statsList = globalCommHandler.getAllPartitionStats(
962         workerFinishedPathList.size(), getContext());
963     for (PartitionStats partitionStats : statsList) {
964       globalStats.addPartitionStats(partitionStats);
965       allPartitionStatsList.add(partitionStats);
966     }
967 
968     if (conf.metricsEnabled()) {
969       if (GiraphConstants.METRICS_DIRECTORY.isDefaultValue(conf)) {
970         aggregatedMetrics.print(superstep, System.err);
971       } else {
972         printAggregatedMetricsToHDFS(superstep, aggregatedMetrics);
973       }
974     }
975 
976     if (LOG.isInfoEnabled()) {
977       LOG.info("aggregateWorkerStats: Aggregation found " + globalStats +
978           " on superstep = " + getSuperstep());
979     }
980     return globalStats;
981   }
982 
983   /**
984    * Write superstep metrics to own file in HDFS
985    * @param superstep the current superstep
986    * @param aggregatedMetrics the aggregated metrics to write
987    */
988   private void printAggregatedMetricsToHDFS(
989       long superstep, AggregatedMetrics aggregatedMetrics) {
990     ImmutableClassesGiraphConfiguration conf = getConfiguration();
991     PrintStream out = null;
992     Path dir = new Path(GiraphConstants.METRICS_DIRECTORY.get(conf));
993     Path outFile = new Path(GiraphConstants.METRICS_DIRECTORY.get(conf) +
994         Path.SEPARATOR_CHAR + "superstep_" + superstep + ".metrics");
995     try {
996       FileSystem fs;
997       fs = FileSystem.get(conf);
998       if (!fs.exists(dir)) {
999         fs.mkdirs(dir);
1000       }
1001       if (fs.exists(outFile)) {
1002         throw new RuntimeException(
1003             "printAggregatedMetricsToHDFS: metrics file exists");
1004       }
1005       out = new PrintStream(fs.create(outFile), false,
1006           Charset.defaultCharset().name());
1007       aggregatedMetrics.print(superstep, out);
1008     } catch (IOException e) {
1009       throw new RuntimeException(
1010           "printAggregatedMetricsToHDFS: error creating metrics file", e);
1011     } finally {
1012       if (out != null) {
1013         out.close();
1014       }
1015     }
1016   }
1017 
1018   /**
1019    * Finalize the checkpoint file prefixes by taking the chosen workers and
1020    * writing them to a finalized file.  Also write out the master
1021    * aggregated aggregator array from the previous superstep.
1022    *
1023    * @param superstep superstep to finalize
1024    * @param chosenWorkerInfoList list of chosen workers that will be finalized
1025    * @throws IOException
1026    * @throws InterruptedException
1027    * @throws KeeperException
1028    */
1029   private void finalizeCheckpoint(long superstep,
1030     List<WorkerInfo> chosenWorkerInfoList)
1031     throws IOException, KeeperException, InterruptedException {
1032     Path finalizedCheckpointPath =
1033         new Path(getCheckpointBasePath(superstep) +
1034             CheckpointingUtils.CHECKPOINT_FINALIZED_POSTFIX);
1035     try {
1036       getFs().delete(finalizedCheckpointPath, false);
1037     } catch (IOException e) {
1038       LOG.warn("finalizedValidCheckpointPrefixes: Removed old file " +
1039           finalizedCheckpointPath);
1040     }
1041 
1042     // Format:
1043     // <global statistics>
1044     // <superstep classes>
1045     // <number of files>
1046     // <used file prefix 0><used file prefix 1>...
1047     // <aggregator data>
1048     // <masterCompute data>
1049     FSDataOutputStream finalizedOutputStream =
1050         getFs().create(finalizedCheckpointPath);
1051 
1052     String superstepFinishedNode =
1053         getSuperstepFinishedPath(getApplicationAttempt(), superstep - 1);
1054     finalizedOutputStream.write(
1055         getZkExt().getData(superstepFinishedNode, false, null));
1056 
1057     finalizedOutputStream.writeInt(chosenWorkerInfoList.size());
1058     finalizedOutputStream.writeUTF(getCheckpointBasePath(superstep));
1059     for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) {
1060       finalizedOutputStream.writeInt(getWorkerId(chosenWorkerInfo));
1061     }
1062     globalCommHandler.getAggregatorHandler().write(finalizedOutputStream);
1063     aggregatorTranslation.write(finalizedOutputStream);
1064     masterCompute.write(finalizedOutputStream);
1065     finalizedOutputStream.close();
1066     lastCheckpointedSuperstep = superstep;
1067     GiraphStats.getInstance().
1068         getLastCheckpointedSuperstep().setValue(superstep);
1069   }
1070 
1071   /**
1072    * Assign the partitions for this superstep.  If there are changes,
1073    * the workers will know how to do the exchange.  If this was a restarted
1074    * superstep, then make sure to provide information on where to find the
1075    * checkpoint file.
1076    */
1077   private void assignPartitionOwners() {
1078     Collection<PartitionOwner> partitionOwners;
1079     if (getSuperstep() == INPUT_SUPERSTEP) {
1080       partitionOwners =
1081           masterGraphPartitioner.createInitialPartitionOwners(
1082               chosenWorkerInfoList, maxWorkers);
1083       if (partitionOwners.isEmpty()) {
1084         throw new IllegalStateException(
1085             "assignAndExchangePartitions: No partition owners set");
1086       }
1087     } else if (getRestartedSuperstep() == getSuperstep()) {
1088       // If restarted, prepare the checkpoint restart
1089       try {
1090         partitionOwners = prepareCheckpointRestart(getSuperstep());
1091       } catch (IOException e) {
1092         throw new IllegalStateException(
1093             "assignPartitionOwners: IOException on preparing", e);
1094       } catch (KeeperException e) {
1095         throw new IllegalStateException(
1096             "assignPartitionOwners: KeeperException on preparing", e);
1097       } catch (InterruptedException e) {
1098         throw new IllegalStateException(
1099             "assignPartitionOwners: InteruptedException on preparing",
1100             e);
1101       }
1102       masterGraphPartitioner.setPartitionOwners(partitionOwners);
1103     } else {
1104       partitionOwners =
1105           masterGraphPartitioner.generateChangedPartitionOwners(
1106               allPartitionStatsList,
1107               chosenWorkerInfoList,
1108               maxWorkers,
1109               getSuperstep());
1110 
1111       PartitionUtils.analyzePartitionStats(partitionOwners,
1112           allPartitionStatsList);
1113     }
1114     checkPartitions(masterGraphPartitioner.getCurrentPartitionOwners());
1115 
1116 
1117 
1118     // There will be some exchange of partitions
1119     if (!partitionOwners.isEmpty()) {
1120       String vertexExchangePath =
1121           getPartitionExchangePath(getApplicationAttempt(),
1122               getSuperstep());
1123       try {
1124         getZkExt().createOnceExt(vertexExchangePath,
1125             null,
1126             Ids.OPEN_ACL_UNSAFE,
1127             CreateMode.PERSISTENT,
1128             true);
1129       } catch (KeeperException e) {
1130         throw new IllegalStateException(
1131             "assignPartitionOwners: KeeperException creating " +
1132                 vertexExchangePath);
1133       } catch (InterruptedException e) {
1134         throw new IllegalStateException(
1135             "assignPartitionOwners: InterruptedException creating " +
1136                 vertexExchangePath);
1137       }
1138     }
1139 
1140     AddressesAndPartitionsWritable addressesAndPartitions =
1141         new AddressesAndPartitionsWritable(masterInfo, chosenWorkerInfoList,
1142             partitionOwners);
1143     // Send assignments to every worker
1144     // TODO for very large number of partitions we might want to split this
1145     // across multiple requests
1146     for (WorkerInfo workerInfo : chosenWorkerInfoList) {
1147       masterClient.sendWritableRequest(workerInfo.getTaskId(),
1148           new AddressesAndPartitionsRequest(addressesAndPartitions));
1149     }
1150   }
1151 
1152   /**
1153    * Check if partition ids are valid
1154    *
1155    * @param partitionOwners List of partition ids for current superstep
1156    */
1157   private void checkPartitions(Collection<PartitionOwner> partitionOwners) {
1158     for (PartitionOwner partitionOwner : partitionOwners) {
1159       int partitionId = partitionOwner.getPartitionId();
1160       if (partitionId < 0 || partitionId >= partitionOwners.size()) {
1161         throw new IllegalStateException("checkPartitions: " +
1162             "Invalid partition id " + partitionId +
1163             " - partition ids must be values from 0 to (numPartitions - 1)");
1164       }
1165     }
1166   }
1167 
1168   /**
1169    * Check whether the workers chosen for this superstep are still alive
1170    *
1171    * @param chosenWorkerInfoHealthPath Path to the healthy workers in ZooKeeper
1172    * @param chosenWorkerInfoList List of the healthy workers
1173    * @return a list of dead workers. Empty list if all workers are alive.
1174    * @throws InterruptedException
1175    * @throws KeeperException
1176    */
1177   private Collection<WorkerInfo> superstepChosenWorkerAlive(
1178     String chosenWorkerInfoHealthPath,
1179     List<WorkerInfo> chosenWorkerInfoList)
1180     throws KeeperException, InterruptedException {
1181     List<WorkerInfo> chosenWorkerInfoHealthyList =
1182         getWorkerInfosFromPath(chosenWorkerInfoHealthPath, false);
1183     Set<WorkerInfo> chosenWorkerInfoHealthySet =
1184         new HashSet<WorkerInfo>(chosenWorkerInfoHealthyList);
1185     List<WorkerInfo> deadWorkers = new ArrayList<>();
1186     for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) {
1187       if (!chosenWorkerInfoHealthySet.contains(chosenWorkerInfo)) {
1188         deadWorkers.add(chosenWorkerInfo);
1189       }
1190     }
1191     return deadWorkers;
1192   }
1193 
1194   @Override
1195   public void restartFromCheckpoint(long checkpoint) {
1196     // Process:
1197     // 1. Increase the application attempt and set to the correct checkpoint
1198     // 2. Send command to all workers to restart their tasks
1199     setApplicationAttempt(getApplicationAttempt() + 1);
1200     setCachedSuperstep(checkpoint);
1201     setRestartedSuperstep(checkpoint);
1202     setJobState(ApplicationState.START_SUPERSTEP,
1203         getApplicationAttempt(),
1204         checkpoint);
1205   }
1206 
1207   /**
1208    * Safely removes node from zookeeper.
1209    * Ignores if node is already removed. Can only throw runtime exception if
1210    * anything wrong.
1211    * @param path path to the node to be removed.
1212    */
1213   private void zkDeleteNode(String path) {
1214     try {
1215       getZkExt().deleteExt(path, -1, true);
1216     } catch (KeeperException.NoNodeException e) {
1217       LOG.info("zkDeleteNode: node has already been removed " + path);
1218     } catch (InterruptedException e) {
1219       throw new RuntimeException(
1220           "zkDeleteNode: InterruptedException", e);
1221     } catch (KeeperException e) {
1222       throw new RuntimeException(
1223           "zkDeleteNode: KeeperException", e);
1224     }
1225   }
1226 
1227   @Override
1228   public long getLastGoodCheckpoint() throws IOException {
1229     // Find the last good checkpoint if none have been written to the
1230     // knowledge of this master
1231     if (lastCheckpointedSuperstep == -1) {
1232       try {
1233         lastCheckpointedSuperstep = getLastCheckpointedSuperstep();
1234       } catch (IOException e) {
1235         LOG.fatal("getLastGoodCheckpoint: No last good checkpoints can be " +
1236             "found, killing the job.", e);
1237         failJob(e);
1238       }
1239     }
1240 
1241     return lastCheckpointedSuperstep;
1242   }
1243 
1244   /**
1245    * Wait for a set of workers to signal that they are done with the
1246    * barrier.
1247    *
1248    * @param finishedWorkerPath Path to where the workers will register their
1249    *        hostname and id
1250    * @param workerInfoList List of the workers to wait for
1251    * @param event Event to wait on for a chance to be done.
1252    * @param ignoreDeath In case if worker died after making it through
1253    *                    barrier, we will ignore death if set to true.
1254    * @return True if barrier was successful, false if there was a worker
1255    *         failure
1256    */
1257   private boolean barrierOnWorkerList(String finishedWorkerPath,
1258       List<WorkerInfo> workerInfoList,
1259       BspEvent event,
1260       boolean ignoreDeath) {
1261     try {
1262       getZkExt().createOnceExt(finishedWorkerPath,
1263           null,
1264           Ids.OPEN_ACL_UNSAFE,
1265           CreateMode.PERSISTENT,
1266           true);
1267     } catch (KeeperException e) {
1268       throw new IllegalStateException(
1269           "barrierOnWorkerList: KeeperException - Couldn't create " +
1270               finishedWorkerPath, e);
1271     } catch (InterruptedException e) {
1272       throw new IllegalStateException(
1273           "barrierOnWorkerList: InterruptedException - Couldn't create " +
1274               finishedWorkerPath, e);
1275     }
1276     List<String> hostnameIdList =
1277         new ArrayList<String>(workerInfoList.size());
1278     for (WorkerInfo workerInfo : workerInfoList) {
1279       hostnameIdList.add(workerInfo.getHostnameId());
1280     }
1281     String workerInfoHealthyPath =
1282         getWorkerInfoHealthyPath(getApplicationAttempt(), getSuperstep());
1283     List<String> finishedHostnameIdList = new ArrayList<>();
1284     long nextInfoMillis = System.currentTimeMillis();
1285     final int defaultTaskTimeoutMsec = 10 * 60 * 1000;  // from TaskTracker
1286     final int waitBetweenLogInfoMsec = 30 * 1000;
1287     final int taskTimeoutMsec = getContext().getConfiguration().getInt(
1288         "mapred.task.timeout", defaultTaskTimeoutMsec) / 2;
1289     long lastRegularRunTimeMsec = 0;
1290     int eventLoopTimeout =  Math.min(taskTimeoutMsec, waitBetweenLogInfoMsec);
1291     boolean logInfoOnlyRun = false;
1292     List<WorkerInfo> deadWorkers = new ArrayList<>();
1293     while (true) {
1294       if (! logInfoOnlyRun) {
1295         try {
1296           finishedHostnameIdList =
1297               getZkExt().getChildrenExt(finishedWorkerPath,
1298                                         true,
1299                                         false,
1300                                         false);
1301         } catch (KeeperException e) {
1302           throw new IllegalStateException(
1303               "barrierOnWorkerList: KeeperException - Couldn't get " +
1304                   "children of " + finishedWorkerPath, e);
1305         } catch (InterruptedException e) {
1306           throw new IllegalStateException(
1307               "barrierOnWorkerList: IllegalException - Couldn't get " +
1308                   "children of " + finishedWorkerPath, e);
1309         }
1310         if (LOG.isDebugEnabled()) {
1311           LOG.debug("barrierOnWorkerList: Got finished worker list = " +
1312                         finishedHostnameIdList + ", size = " +
1313                         finishedHostnameIdList.size() +
1314                         ", worker list = " +
1315                         workerInfoList + ", size = " +
1316                         workerInfoList.size() +
1317                         " from " + finishedWorkerPath);
1318         }
1319       }
1320 
1321       if (LOG.isInfoEnabled() &&
1322           (System.currentTimeMillis() > nextInfoMillis)) {
1323         nextInfoMillis = System.currentTimeMillis() + waitBetweenLogInfoMsec;
1324         LOG.info("barrierOnWorkerList: " +
1325             finishedHostnameIdList.size() +
1326             " out of " + workerInfoList.size() +
1327             " workers finished on superstep " +
1328             getSuperstep() + " on path " + finishedWorkerPath);
1329         if (workerInfoList.size() - finishedHostnameIdList.size() <
1330             MAX_PRINTABLE_REMAINING_WORKERS) {
1331           Set<String> remainingWorkers = Sets.newHashSet(hostnameIdList);
1332           remainingWorkers.removeAll(finishedHostnameIdList);
1333           LOG.info("barrierOnWorkerList: Waiting on " + remainingWorkers);
1334         }
1335       }
1336 
1337       if (! logInfoOnlyRun) {
1338         getContext().setStatus(getGraphTaskManager().getGraphFunctions() +
1339                                    " - " +
1340                                    finishedHostnameIdList.size() +
1341                                    " finished out of " +
1342                                    workerInfoList.size() +
1343                                    " on superstep " + getSuperstep());
1344         if (finishedHostnameIdList.containsAll(hostnameIdList)) {
1345           break;
1346         }
1347 
1348         for (WorkerInfo deadWorker : deadWorkers) {
1349           if (!finishedHostnameIdList.contains(deadWorker.getHostnameId())) {
1350             LOG.error("barrierOnWorkerList: no results arived from " +
1351                           "worker that was pronounced dead: " + deadWorker +
1352                           " on superstep " + getSuperstep());
1353             return false;
1354           }
1355         }
1356 
1357         // wall-clock time skew is ignored
1358         lastRegularRunTimeMsec = System.currentTimeMillis();
1359       }
1360 
1361       // Wait for a signal or timeout
1362       boolean eventTriggered = event.waitMsecs(eventLoopTimeout);
1363       long elapsedTimeSinceRegularRunMsec = System.currentTimeMillis() -
1364           lastRegularRunTimeMsec;
1365       event.reset();
1366       getContext().progress();
1367 
1368       if (eventTriggered ||
1369           taskTimeoutMsec == eventLoopTimeout ||
1370           elapsedTimeSinceRegularRunMsec >= taskTimeoutMsec) {
1371         logInfoOnlyRun = false;
1372       } else {
1373         logInfoOnlyRun = true;
1374         continue;
1375       }
1376 
1377       // Did a worker die?
1378       try {
1379         deadWorkers.addAll(superstepChosenWorkerAlive(
1380                 workerInfoHealthyPath,
1381                 workerInfoList));
1382         if (!ignoreDeath && deadWorkers.size() > 0) {
1383           String errorMessage = "******* WORKERS " + deadWorkers +
1384               " FAILED *******";
1385           // If checkpointing is not used, we should fail the job
1386           if (!getConfiguration().useCheckpointing()) {
1387             setJobStateFailed(errorMessage);
1388           } else {
1389             LOG.error("barrierOnWorkerList: Missing chosen " +
1390                 "workers " + deadWorkers +
1391                 " on superstep " + getSuperstep());
1392             // Log worker failure to command line
1393             getGraphTaskManager().getJobProgressTracker().logInfo(errorMessage);
1394           }
1395           return false;
1396         }
1397       } catch (KeeperException e) {
1398         throw new IllegalStateException(
1399             "barrierOnWorkerList: KeeperException - " +
1400                 "Couldn't get " + workerInfoHealthyPath, e);
1401       } catch (InterruptedException e) {
1402         throw new IllegalStateException(
1403             "barrierOnWorkerList: InterruptedException - " +
1404                 "Couldn't get " + workerInfoHealthyPath, e);
1405       }
1406     }
1407 
1408     return true;
1409   }
1410 
1411   /**
1412    * Clean up old superstep data from Zookeeper
1413    *
1414    * @param removeableSuperstep Supersteo to clean up
1415    * @throws InterruptedException
1416    */
1417   private void cleanUpOldSuperstep(long removeableSuperstep) throws
1418       InterruptedException {
1419     if (KEEP_ZOOKEEPER_DATA.isFalse(getConfiguration()) &&
1420         (removeableSuperstep >= 0)) {
1421       String oldSuperstepPath =
1422           getSuperstepPath(getApplicationAttempt()) + "/" +
1423               removeableSuperstep;
1424       try {
1425         if (LOG.isInfoEnabled()) {
1426           LOG.info("coordinateSuperstep: Cleaning up old Superstep " +
1427               oldSuperstepPath);
1428         }
1429         getZkExt().deleteExt(oldSuperstepPath,
1430             -1,
1431             true);
1432       } catch (KeeperException.NoNodeException e) {
1433         LOG.warn("coordinateBarrier: Already cleaned up " +
1434             oldSuperstepPath);
1435       } catch (KeeperException e) {
1436         throw new IllegalStateException(
1437             "coordinateSuperstep: KeeperException on " +
1438                 "finalizing checkpoint", e);
1439       }
1440     }
1441   }
1442 
1443   /**
1444    * Coordinate the exchange of vertex/edge input splits among workers.
1445    */
1446   private void coordinateInputSplits() {
1447     // Coordinate the workers finishing sending their vertices/edges to the
1448     // correct workers and signal when everything is done.
1449     if (!barrierOnWorkerList(inputSplitsWorkerDonePath,
1450         chosenWorkerInfoList,
1451         getInputSplitsWorkerDoneEvent(),
1452         false)) {
1453       throw new IllegalStateException("coordinateInputSplits: Worker failed " +
1454           "during input split (currently not supported)");
1455     }
1456     try {
1457       getZkExt().createExt(inputSplitsAllDonePath,
1458           null,
1459           Ids.OPEN_ACL_UNSAFE,
1460           CreateMode.PERSISTENT,
1461           false);
1462     } catch (KeeperException.NodeExistsException e) {
1463       LOG.info("coordinateInputSplits: Node " +
1464           inputSplitsAllDonePath + " already exists.");
1465     } catch (KeeperException e) {
1466       throw new IllegalStateException(
1467           "coordinateInputSplits: KeeperException", e);
1468     } catch (InterruptedException e) {
1469       throw new IllegalStateException(
1470           "coordinateInputSplits: IllegalStateException", e);
1471     }
1472   }
1473 
1474   /**
1475    * Initialize aggregator at the master side
1476    * before vertex/edge loading.
1477    * This methods cooperates with other code
1478    * to enables aggregation usage at INPUT_SUPERSTEP
1479    * Other codes are:
1480    *  BSPServiceWorker:
1481    *  aggregatorHandler.prepareSuperstep in
1482    *  setup
1483    *  set aggregator usage in vertexReader and
1484    *  edgeReader
1485    *
1486    * @throws InterruptedException
1487    */
1488   private void initializeAggregatorInputSuperstep()
1489     throws InterruptedException {
1490     globalCommHandler.getAggregatorHandler().prepareSuperstep();
1491 
1492     prepareMasterCompute(getSuperstep());
1493     try {
1494       masterCompute.initialize();
1495     } catch (InstantiationException e) {
1496       LOG.fatal(
1497         "initializeAggregatorInputSuperstep: Failed in instantiation", e);
1498       throw new RuntimeException(
1499         "initializeAggregatorInputSuperstep: Failed in instantiation", e);
1500     } catch (IllegalAccessException e) {
1501       LOG.fatal("initializeAggregatorInputSuperstep: Failed in access", e);
1502       throw new RuntimeException(
1503         "initializeAggregatorInputSuperstep: Failed in access", e);
1504     }
1505     aggregatorTranslation.postMasterCompute();
1506     globalCommHandler.getAggregatorHandler().finishSuperstep();
1507 
1508     globalCommHandler.getAggregatorHandler().sendDataToOwners(masterClient);
1509   }
1510 
1511   /**
1512    * This is required before initialization
1513    * and run of MasterCompute
1514    *
1515    * @param superstep superstep for which to run masterCompute
1516    * @return Superstep classes set by masterCompute
1517    */
1518   private SuperstepClasses prepareMasterCompute(long superstep) {
1519     GraphState graphState = new GraphState(superstep ,
1520         GiraphStats.getInstance().getVertices().getValue(),
1521         GiraphStats.getInstance().getEdges().getValue(),
1522         getContext());
1523     SuperstepClasses superstepClasses =
1524         SuperstepClasses.createAndExtractTypes(getConfiguration());
1525     masterCompute.setGraphState(graphState);
1526     masterCompute.setSuperstepClasses(superstepClasses);
1527     return superstepClasses;
1528   }
1529 
1530   @Override
1531   public SuperstepState coordinateSuperstep() throws
1532   KeeperException, InterruptedException {
1533     // 1. Get chosen workers and set up watches on them.
1534     // 2. Assign partitions to the workers
1535     //    (possibly reloading from a superstep)
1536     // 3. Wait for all workers to complete
1537     // 4. Collect and process aggregators
1538     // 5. Create superstep finished node
1539     // 6. If the checkpoint frequency is met, finalize the checkpoint
1540 
1541     for (MasterObserver observer : observers) {
1542       observer.preSuperstep(getSuperstep());
1543       getContext().progress();
1544     }
1545 
1546     chosenWorkerInfoList = checkWorkers();
1547     if (chosenWorkerInfoList == null) {
1548       setJobStateFailed("coordinateSuperstep: Not enough healthy workers for " +
1549                     "superstep " + getSuperstep());
1550     } else {
1551       // Sort this list, so order stays the same over supersteps
1552       Collections.sort(chosenWorkerInfoList, new Comparator<WorkerInfo>() {
1553         @Override
1554         public int compare(WorkerInfo wi1, WorkerInfo wi2) {
1555           return Integer.compare(wi1.getTaskId(), wi2.getTaskId());
1556         }
1557       });
1558       for (WorkerInfo workerInfo : chosenWorkerInfoList) {
1559         String workerInfoHealthyPath =
1560             getWorkerInfoHealthyPath(getApplicationAttempt(),
1561                 getSuperstep()) + "/" +
1562                 workerInfo.getHostnameId();
1563         if (getZkExt().exists(workerInfoHealthyPath, true) == null) {
1564           LOG.warn("coordinateSuperstep: Chosen worker " +
1565               workerInfoHealthyPath +
1566               " is no longer valid, failing superstep");
1567         }
1568       }
1569     }
1570 
1571     // We need to finalize aggregators from previous superstep
1572     if (getSuperstep() >= 0) {
1573       aggregatorTranslation.postMasterCompute();
1574       globalCommHandler.getAggregatorHandler().finishSuperstep();
1575     }
1576 
1577     masterClient.openConnections();
1578 
1579     GiraphStats.getInstance().
1580         getCurrentWorkers().setValue(chosenWorkerInfoList.size());
1581     assignPartitionOwners();
1582 
1583     // Finalize the valid checkpoint file prefixes and possibly
1584     // the aggregators.
1585     if (checkpointStatus != CheckpointStatus.NONE) {
1586       String workerWroteCheckpointPath =
1587           getWorkerWroteCheckpointPath(getApplicationAttempt(),
1588               getSuperstep());
1589       // first wait for all the workers to write their checkpoint data
1590       if (!barrierOnWorkerList(workerWroteCheckpointPath,
1591           chosenWorkerInfoList,
1592           getWorkerWroteCheckpointEvent(),
1593           checkpointStatus == CheckpointStatus.CHECKPOINT_AND_HALT)) {
1594         return SuperstepState.WORKER_FAILURE;
1595       }
1596       try {
1597         finalizeCheckpoint(getSuperstep(), chosenWorkerInfoList);
1598       } catch (IOException e) {
1599         throw new IllegalStateException(
1600             "coordinateSuperstep: IOException on finalizing checkpoint",
1601             e);
1602       }
1603       if (checkpointStatus == CheckpointStatus.CHECKPOINT_AND_HALT) {
1604         return SuperstepState.CHECKPOINT_AND_HALT;
1605       }
1606     }
1607 
1608     // We need to send aggregators to worker owners after new worker assignments
1609     if (getSuperstep() >= 0) {
1610       globalCommHandler.getAggregatorHandler().sendDataToOwners(masterClient);
1611     }
1612 
1613     if (getSuperstep() == INPUT_SUPERSTEP) {
1614       // Initialize aggregators before coordinating
1615       initializeAggregatorInputSuperstep();
1616       coordinateInputSplits();
1617     }
1618 
1619     String finishedWorkerPath =
1620         getWorkerFinishedPath(getApplicationAttempt(), getSuperstep());
1621     if (!barrierOnWorkerList(finishedWorkerPath,
1622         chosenWorkerInfoList,
1623         getSuperstepStateChangedEvent(),
1624         false)) {
1625       return SuperstepState.WORKER_FAILURE;
1626     }
1627 
1628     // Collect aggregator values, then run the master.compute() and
1629     // finally save the aggregator values
1630     globalCommHandler.getAggregatorHandler().prepareSuperstep();
1631     aggregatorTranslation.prepareSuperstep();
1632 
1633     SuperstepClasses superstepClasses =
1634       prepareMasterCompute(getSuperstep() + 1);
1635     doMasterCompute();
1636 
1637     // If the master is halted or all the vertices voted to halt and there
1638     // are no more messages in the system, stop the computation
1639     GlobalStats globalStats = aggregateWorkerStats(getSuperstep());
1640     if (masterCompute.isHalted() ||
1641         (globalStats.getFinishedVertexCount() ==
1642         globalStats.getVertexCount() &&
1643         globalStats.getMessageCount() == 0)) {
1644       globalStats.setHaltComputation(true);
1645     } else if (getZkExt().exists(haltComputationPath, false) != null) {
1646       if (LOG.isInfoEnabled()) {
1647         LOG.info("Halting computation because halt zookeeper node was created");
1648       }
1649       globalStats.setHaltComputation(true);
1650     }
1651 
1652     // If we have completed the maximum number of supersteps, stop
1653     // the computation
1654     if (maxNumberOfSupersteps !=
1655         GiraphConstants.MAX_NUMBER_OF_SUPERSTEPS.getDefaultValue() &&
1656         (getSuperstep() == maxNumberOfSupersteps - 1)) {
1657       if (LOG.isInfoEnabled()) {
1658         LOG.info("coordinateSuperstep: Finished " + maxNumberOfSupersteps +
1659             " supersteps (max specified by the user), halting");
1660       }
1661       globalStats.setHaltComputation(true);
1662     }
1663 
1664     // Superstep 0 doesn't need to have matching types (Message types may not
1665     // match) and if the computation is halted, no need to check any of
1666     // the types.
1667     if (!globalStats.getHaltComputation()) {
1668       superstepClasses.verifyTypesMatch(getSuperstep() > 0);
1669     }
1670     getConfiguration().updateSuperstepClasses(superstepClasses);
1671 
1672     //Signal workers that we want to checkpoint
1673     checkpointStatus = getCheckpointStatus(getSuperstep() + 1);
1674     globalStats.setCheckpointStatus(checkpointStatus);
1675     // Let everyone know the aggregated application state through the
1676     // superstep finishing znode.
1677     String superstepFinishedNode =
1678         getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep());
1679 
1680     WritableUtils.writeToZnode(
1681         getZkExt(), superstepFinishedNode, -1, globalStats, superstepClasses);
1682     updateCounters(globalStats);
1683 
1684     cleanUpOldSuperstep(getSuperstep() - 1);
1685     incrCachedSuperstep();
1686     // Counter starts at zero, so no need to increment
1687     if (getSuperstep() > 0) {
1688       GiraphStats.getInstance().getSuperstepCounter().increment();
1689     }
1690     SuperstepState superstepState;
1691     if (globalStats.getHaltComputation()) {
1692       superstepState = SuperstepState.ALL_SUPERSTEPS_DONE;
1693     } else {
1694       superstepState = SuperstepState.THIS_SUPERSTEP_DONE;
1695     }
1696     globalCommHandler.getAggregatorHandler().writeAggregators(
1697         getSuperstep(), superstepState);
1698 
1699     return superstepState;
1700   }
1701 
1702   /**
1703    * Should checkpoint on this superstep?  If checkpointing, always
1704    * checkpoint the first user superstep.  If restarting, the first
1705    * checkpoint is after the frequency has been met.
1706    *
1707    * @param superstep Decide if checkpointing no this superstep
1708    * @return True if this superstep should be checkpointed, false otherwise
1709    */
1710   private CheckpointStatus getCheckpointStatus(long superstep) {
1711     try {
1712       if (getZkExt().
1713           exists(basePath + FORCE_CHECKPOINT_USER_FLAG, false) != null) {
1714         if (isCheckpointingSupported(getConfiguration(), masterCompute)) {
1715           return CheckpointStatus.CHECKPOINT_AND_HALT;
1716         } else {
1717           LOG.warn("Attempted to manually checkpoint the job that " +
1718               "does not support checkpoints. Ignoring");
1719         }
1720       }
1721     } catch (KeeperException e) {
1722       throw new IllegalStateException(
1723           "cleanupZooKeeper: Got KeeperException", e);
1724     } catch (InterruptedException e) {
1725       throw new IllegalStateException(
1726           "cleanupZooKeeper: Got IllegalStateException", e);
1727     }
1728     if (checkpointFrequency == 0) {
1729       return CheckpointStatus.NONE;
1730     }
1731     long firstCheckpoint = INPUT_SUPERSTEP + 1 + checkpointFrequency;
1732     if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
1733       firstCheckpoint = getRestartedSuperstep() + checkpointFrequency;
1734     }
1735     if (superstep < firstCheckpoint) {
1736       return CheckpointStatus.NONE;
1737     }
1738     if (((superstep - firstCheckpoint) % checkpointFrequency) == 0) {
1739       if (isCheckpointingSupported(getConfiguration(), masterCompute)) {
1740         return CheckpointStatus.CHECKPOINT;
1741       }
1742     }
1743     return CheckpointStatus.NONE;
1744   }
1745 
1746   /**
1747    * Returns false if job doesn't support checkpoints.
1748    * Job may not support checkpointing if it does output during
1749    * computation, uses static variables to keep data between supersteps,
1750    * starts new threads etc.
1751    * @param conf Immutable configuration of the job
1752    * @param masterCompute instance of master compute
1753    * @return true if it is safe to checkpoint the job
1754    */
1755   private boolean isCheckpointingSupported(
1756       GiraphConfiguration conf, MasterCompute masterCompute) {
1757     return checkpointSupportedChecker.isCheckpointSupported(
1758         conf, masterCompute);
1759   }
1760 
1761 
1762   /**
1763    * This doMasterCompute is only called
1764    * after masterCompute is initialized
1765    */
1766   private void doMasterCompute() {
1767     GiraphTimerContext timerContext = masterComputeTimer.time();
1768     masterCompute.compute();
1769     timerContext.stop();
1770   }
1771 
1772   /**
1773    * Need to clean up ZooKeeper nicely.  Make sure all the masters and workers
1774    * have reported ending their ZooKeeper connections.
1775    */
1776   private void cleanUpZooKeeper() {
1777     try {
1778       getZkExt().createExt(cleanedUpPath,
1779           null,
1780           Ids.OPEN_ACL_UNSAFE,
1781           CreateMode.PERSISTENT,
1782           true);
1783     } catch (KeeperException.NodeExistsException e) {
1784       if (LOG.isInfoEnabled()) {
1785         LOG.info("cleanUpZooKeeper: Node " + cleanedUpPath +
1786             " already exists, no need to create.");
1787       }
1788     } catch (KeeperException e) {
1789       throw new IllegalStateException(
1790           "cleanupZooKeeper: Got KeeperException", e);
1791     } catch (InterruptedException e) {
1792       throw new IllegalStateException(
1793           "cleanupZooKeeper: Got IllegalStateException", e);
1794     }
1795     // Need to wait for the number of workers and masters to complete
1796     int maxTasks = BspInputFormat.getMaxTasks(getConfiguration());
1797     if ((getGraphTaskManager().getGraphFunctions() == GraphFunctions.ALL) ||
1798         (getGraphTaskManager().getGraphFunctions() ==
1799         GraphFunctions.ALL_EXCEPT_ZOOKEEPER)) {
1800       maxTasks *= 2;
1801     }
1802     List<String> cleanedUpChildrenList = null;
1803     while (true) {
1804       try {
1805         cleanedUpChildrenList =
1806             getZkExt().getChildrenExt(
1807                 cleanedUpPath, true, false, true);
1808         if (LOG.isInfoEnabled()) {
1809           LOG.info("cleanUpZooKeeper: Got " +
1810               cleanedUpChildrenList.size() + " of " +
1811               maxTasks  +  " desired children from " +
1812               cleanedUpPath);
1813         }
1814         if (cleanedUpChildrenList.size() == maxTasks) {
1815           break;
1816         }
1817         if (LOG.isInfoEnabled()) {
1818           LOG.info("cleanedUpZooKeeper: Waiting for the " +
1819               "children of " + cleanedUpPath +
1820               " to change since only got " +
1821               cleanedUpChildrenList.size() + " nodes.");
1822         }
1823       } catch (KeeperException e) {
1824         // We are in the cleanup phase -- just log the error
1825         LOG.error("cleanUpZooKeeper: Got KeeperException, " +
1826             "but will continue", e);
1827         return;
1828       } catch (InterruptedException e) {
1829         // We are in the cleanup phase -- just log the error
1830         LOG.error("cleanUpZooKeeper: Got InterruptedException, " +
1831             "but will continue", e);
1832         return;
1833       }
1834 
1835       getCleanedUpChildrenChangedEvent().waitForever();
1836       getCleanedUpChildrenChangedEvent().reset();
1837     }
1838 
1839     // At this point, all processes have acknowledged the cleanup,
1840     // and the master can do any final cleanup if the ZooKeeper service was
1841     // provided (not dynamically started) and we don't want to keep the data
1842     try {
1843       if (getConfiguration().isZookeeperExternal() &&
1844           KEEP_ZOOKEEPER_DATA.isFalse(getConfiguration())) {
1845         if (LOG.isInfoEnabled()) {
1846           LOG.info("cleanupZooKeeper: Removing the following path " +
1847               "and all children - " + basePath + " from ZooKeeper list " +
1848               getConfiguration().getZookeeperList());
1849         }
1850         getZkExt().deleteExt(basePath, -1, true);
1851       }
1852     } catch (KeeperException e) {
1853       LOG.error("cleanupZooKeeper: Failed to do cleanup of " +
1854           basePath + " due to KeeperException", e);
1855     } catch (InterruptedException e) {
1856       LOG.error("cleanupZooKeeper: Failed to do cleanup of " +
1857           basePath + " due to InterruptedException", e);
1858     }
1859   }
1860 
1861   @Override
1862   public void postApplication() {
1863     for (MasterObserver observer : observers) {
1864       observer.postApplication();
1865       getContext().progress();
1866     }
1867   }
1868 
1869   @Override
1870   public void postSuperstep() {
1871     for (MasterObserver observer : observers) {
1872       observer.postSuperstep(getSuperstep());
1873       getContext().progress();
1874     }
1875   }
1876 
1877   @Override
1878   public void failureCleanup(Exception e) {
1879     for (MasterObserver observer : observers) {
1880       try {
1881         observer.applicationFailed(e);
1882         // CHECKSTYLE: stop IllegalCatchCheck
1883       } catch (RuntimeException re) {
1884         // CHECKSTYLE: resume IllegalCatchCheck
1885         LOG.error(re.getClass().getName() + " from observer " +
1886             observer.getClass().getName(), re);
1887       }
1888       getContext().progress();
1889     }
1890   }
1891 
1892   @Override
1893   public void cleanup(SuperstepState superstepState) throws IOException {
1894     ImmutableClassesGiraphConfiguration conf = getConfiguration();
1895 
1896     // All master processes should denote they are done by adding special
1897     // znode.  Once the number of znodes equals the number of partitions
1898     // for workers and masters, the master will clean up the ZooKeeper
1899     // znodes associated with this job.
1900     String masterCleanedUpPath = cleanedUpPath  + "/" +
1901         getTaskPartition() + MASTER_SUFFIX;
1902     try {
1903       String finalFinishedPath =
1904           getZkExt().createExt(masterCleanedUpPath,
1905               null,
1906               Ids.OPEN_ACL_UNSAFE,
1907               CreateMode.PERSISTENT,
1908               true);
1909       if (LOG.isInfoEnabled()) {
1910         LOG.info("cleanup: Notifying master its okay to cleanup with " +
1911             finalFinishedPath);
1912       }
1913     } catch (KeeperException.NodeExistsException e) {
1914       if (LOG.isInfoEnabled()) {
1915         LOG.info("cleanup: Couldn't create finished node '" +
1916             masterCleanedUpPath);
1917       }
1918     } catch (KeeperException e) {
1919       LOG.error("cleanup: Got KeeperException, continuing", e);
1920     } catch (InterruptedException e) {
1921       LOG.error("cleanup: Got InterruptedException, continuing", e);
1922     }
1923 
1924     if (isMaster) {
1925       getGraphTaskManager().setIsMaster(true);
1926       cleanUpZooKeeper();
1927       // If desired, cleanup the checkpoint directory
1928       if (superstepState == SuperstepState.ALL_SUPERSTEPS_DONE &&
1929           GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.get(conf)) {
1930         boolean success =
1931             getFs().delete(new Path(checkpointBasePath), true);
1932         if (LOG.isInfoEnabled()) {
1933           LOG.info("cleanup: Removed HDFS checkpoint directory (" +
1934               checkpointBasePath + ") with return = " +
1935               success + " since the job " + getContext().getJobName() +
1936               " succeeded ");
1937         }
1938       }
1939       if (superstepState == SuperstepState.CHECKPOINT_AND_HALT) {
1940         getFs().create(CheckpointingUtils.getCheckpointMarkPath(conf,
1941             getJobId()), true);
1942         failJob(new Exception("Checkpoint and halt requested. " +
1943             "Killing this job."));
1944       }
1945       globalCommHandler.getAggregatorHandler().close();
1946       masterClient.closeConnections();
1947       masterServer.close();
1948     }
1949 
1950     try {
1951       getZkExt().close();
1952     } catch (InterruptedException e) {
1953       // cleanup phase -- just log the error
1954       LOG.error("cleanup: Zookeeper failed to close", e);
1955     }
1956   }
1957 
1958   /**
1959    * Event that the master watches that denotes when a worker wrote checkpoint
1960    *
1961    * @return Event that denotes when a worker wrote checkpoint
1962    */
1963   public final BspEvent getWorkerWroteCheckpointEvent() {
1964     return workerWroteCheckpoint;
1965   }
1966 
1967   /**
1968    * Event that the master watches that denotes if a worker has done something
1969    * that changes the state of a superstep (either a worker completed or died)
1970    *
1971    * @return Event that denotes a superstep state change
1972    */
1973   public final BspEvent getSuperstepStateChangedEvent() {
1974     return superstepStateChanged;
1975   }
1976 
1977   /**
1978    * Should this worker failure cause the current superstep to fail?
1979    *
1980    * @param failedWorkerPath Full path to the failed worker
1981    */
1982   private void checkHealthyWorkerFailure(String failedWorkerPath) {
1983     if (getSuperstepFromPath(failedWorkerPath) < getSuperstep()) {
1984       return;
1985     }
1986 
1987     Collection<PartitionOwner> partitionOwners =
1988         masterGraphPartitioner.getCurrentPartitionOwners();
1989     String hostnameId =
1990         getHealthyHostnameIdFromPath(failedWorkerPath);
1991     for (PartitionOwner partitionOwner : partitionOwners) {
1992       WorkerInfo workerInfo = partitionOwner.getWorkerInfo();
1993       WorkerInfo previousWorkerInfo =
1994           partitionOwner.getPreviousWorkerInfo();
1995       if (workerInfo.getHostnameId().equals(hostnameId) ||
1996           ((previousWorkerInfo != null) &&
1997               previousWorkerInfo.getHostnameId().equals(hostnameId))) {
1998         LOG.warn("checkHealthyWorkerFailure: " +
1999             "at least one healthy worker went down " +
2000             "for superstep " + getSuperstep() + " - " +
2001             hostnameId + ", will try to restart from " +
2002             "checkpointed superstep " +
2003             lastCheckpointedSuperstep);
2004         superstepStateChanged.signal();
2005       }
2006     }
2007   }
2008 
2009   @Override
2010   public boolean processEvent(WatchedEvent event) {
2011     boolean foundEvent = false;
2012     if (event.getPath().contains(WORKER_HEALTHY_DIR) &&
2013         (event.getType() == EventType.NodeDeleted)) {
2014       if (LOG.isDebugEnabled()) {
2015         LOG.debug("processEvent: Healthy worker died (node deleted) " +
2016             "in " + event.getPath());
2017       }
2018       checkHealthyWorkerFailure(event.getPath());
2019       superstepStateChanged.signal();
2020       foundEvent = true;
2021     } else if (event.getPath().contains(WORKER_FINISHED_DIR) &&
2022         event.getType() == EventType.NodeChildrenChanged) {
2023       if (LOG.isDebugEnabled()) {
2024         LOG.debug("processEvent: Worker finished (node change) " +
2025             "event - superstepStateChanged signaled");
2026       }
2027       superstepStateChanged.signal();
2028       foundEvent = true;
2029     } else if (event.getPath().contains(WORKER_WROTE_CHECKPOINT_DIR) &&
2030         event.getType() == EventType.NodeChildrenChanged) {
2031       if (LOG.isDebugEnabled()) {
2032         LOG.debug("processEvent: Worker wrote checkpoint (node change) " +
2033             "event - workerWroteCheckpoint signaled");
2034       }
2035       workerWroteCheckpoint.signal();
2036       foundEvent = true;
2037     }
2038 
2039     return foundEvent;
2040   }
2041 
2042   /**
2043    * Set values of counters to match the ones from {@link GlobalStats}
2044    *
2045    * @param globalStats Global statistics which holds new counter values
2046    */
2047   private void updateCounters(GlobalStats globalStats) {
2048     GiraphStats gs = GiraphStats.getInstance();
2049     gs.getVertices().setValue(globalStats.getVertexCount());
2050     gs.getFinishedVertexes().setValue(globalStats.getFinishedVertexCount());
2051     gs.getEdges().setValue(globalStats.getEdgeCount());
2052     gs.getSentMessages().setValue(globalStats.getMessageCount());
2053     gs.getSentMessageBytes().setValue(globalStats.getMessageBytesCount());
2054     gs.getAggregateSentMessages().increment(globalStats.getMessageCount());
2055     gs.getAggregateSentMessageBytes()
2056       .increment(globalStats.getMessageBytesCount());
2057     gs.getAggregateOOCBytesLoaded()
2058       .increment(globalStats.getOocLoadBytesCount());
2059     gs.getAggregateOOCBytesStored()
2060       .increment(globalStats.getOocStoreBytesCount());
2061     // Updating the lowest percentage of graph in memory throughout the
2062     // execution across all the supersteps
2063     int percentage = (int) gs.getLowestGraphPercentageInMemory().getValue();
2064     gs.getLowestGraphPercentageInMemory().setValue(
2065         Math.min(percentage, globalStats.getLowestGraphPercentageInMemory()));
2066   }
2067 }