View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.master;
20  
21  import com.google.common.collect.Lists;
22  import com.google.common.collect.Sets;
23  import net.iharder.Base64;
24  import org.apache.commons.io.FilenameUtils;
25  import org.apache.giraph.bsp.ApplicationState;
26  import org.apache.giraph.bsp.BspInputFormat;
27  import org.apache.giraph.bsp.BspService;
28  import org.apache.giraph.bsp.CentralizedServiceMaster;
29  import org.apache.giraph.bsp.SuperstepState;
30  import org.apache.giraph.bsp.checkpoints.CheckpointStatus;
31  import org.apache.giraph.bsp.checkpoints.CheckpointSupportedChecker;
32  import org.apache.giraph.comm.MasterClient;
33  import org.apache.giraph.comm.MasterServer;
34  import org.apache.giraph.comm.netty.NettyMasterClient;
35  import org.apache.giraph.comm.netty.NettyMasterServer;
36  import org.apache.giraph.comm.requests.AddressesAndPartitionsRequest;
37  import org.apache.giraph.conf.GiraphConfiguration;
38  import org.apache.giraph.conf.GiraphConstants;
39  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
40  import org.apache.giraph.counters.GiraphStats;
41  import org.apache.giraph.graph.AddressesAndPartitionsWritable;
42  import org.apache.giraph.graph.GlobalStats;
43  import org.apache.giraph.graph.GraphFunctions;
44  import org.apache.giraph.graph.GraphState;
45  import org.apache.giraph.graph.GraphTaskManager;
46  import org.apache.giraph.io.EdgeInputFormat;
47  import org.apache.giraph.io.GiraphInputFormat;
48  import org.apache.giraph.io.InputType;
49  import org.apache.giraph.io.MappingInputFormat;
50  import org.apache.giraph.io.VertexInputFormat;
51  import org.apache.giraph.master.input.MasterInputSplitsHandler;
52  import org.apache.giraph.metrics.AggregatedMetrics;
53  import org.apache.giraph.metrics.GiraphMetrics;
54  import org.apache.giraph.metrics.GiraphTimer;
55  import org.apache.giraph.metrics.GiraphTimerContext;
56  import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
57  import org.apache.giraph.metrics.SuperstepMetricsRegistry;
58  import org.apache.giraph.metrics.WorkerSuperstepMetrics;
59  import org.apache.giraph.partition.BasicPartitionOwner;
60  import org.apache.giraph.partition.MasterGraphPartitioner;
61  import org.apache.giraph.partition.PartitionOwner;
62  import org.apache.giraph.partition.PartitionStats;
63  import org.apache.giraph.partition.PartitionUtils;
64  import org.apache.giraph.time.SystemTime;
65  import org.apache.giraph.time.Time;
66  import org.apache.giraph.utils.CheckpointingUtils;
67  import org.apache.giraph.utils.JMapHistoDumper;
68  import org.apache.giraph.utils.ReactiveJMapHistoDumper;
69  import org.apache.giraph.utils.ReflectionUtils;
70  import org.apache.giraph.utils.WritableUtils;
71  import org.apache.giraph.worker.WorkerInfo;
72  import org.apache.giraph.zk.BspEvent;
73  import org.apache.giraph.zk.PredicateLock;
74  import org.apache.hadoop.fs.FSDataOutputStream;
75  import org.apache.hadoop.fs.FileSystem;
76  import org.apache.hadoop.fs.Path;
77  import org.apache.hadoop.io.Writable;
78  import org.apache.hadoop.io.WritableComparable;
79  import org.apache.hadoop.mapred.JobID;
80  import org.apache.hadoop.mapred.RunningJob;
81  import org.apache.hadoop.mapreduce.InputSplit;
82  import org.apache.hadoop.mapreduce.Mapper;
83  import org.apache.log4j.Logger;
84  import org.apache.zookeeper.CreateMode;
85  import org.apache.zookeeper.KeeperException;
86  import org.apache.zookeeper.WatchedEvent;
87  import org.apache.zookeeper.Watcher.Event.EventType;
88  import org.apache.zookeeper.ZooDefs.Ids;
89  import org.json.JSONException;
90  import org.json.JSONObject;
91  
92  import java.io.DataInputStream;
93  import java.io.IOException;
94  import java.io.PrintStream;
95  import java.nio.charset.Charset;
96  import java.util.ArrayList;
97  import java.util.Collection;
98  import java.util.Collections;
99  import java.util.Comparator;
100 import java.util.HashSet;
101 import java.util.List;
102 import java.util.Set;
103 import java.util.TreeSet;
104 import java.util.concurrent.TimeUnit;
105 
106 import static org.apache.giraph.conf.GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT;
107 import static org.apache.giraph.conf.GiraphConstants.KEEP_ZOOKEEPER_DATA;
108 import static org.apache.giraph.conf.GiraphConstants.PARTITION_LONG_TAIL_MIN_PRINT;
109 
110 /**
111  * ZooKeeper-based implementation of {@link CentralizedServiceMaster}.
112  *
113  * @param <I> Vertex id
114  * @param <V> Vertex data
115  * @param <E> Edge data
116  */
117 @SuppressWarnings("rawtypes, unchecked")
118 public class BspServiceMaster<I extends WritableComparable,
119     V extends Writable, E extends Writable>
120     extends BspService<I, V, E>
121     implements CentralizedServiceMaster<I, V, E>,
122     ResetSuperstepMetricsObserver {
123   /** Print worker names only if there are 10 workers left */
124   public static final int MAX_PRINTABLE_REMAINING_WORKERS = 10;
125   /** How many threads to use when writing input splits to zookeeper*/
126   public static final String NUM_MASTER_ZK_INPUT_SPLIT_THREADS =
127       "giraph.numMasterZkInputSplitThreads";
128   /** Default number of threads to use when writing input splits to zookeeper */
129   public static final int DEFAULT_INPUT_SPLIT_THREAD_COUNT = 1;
130   /** Time instance to use for timing */
131   private static final Time TIME = SystemTime.get();
132   /** Class logger */
133   private static final Logger LOG = Logger.getLogger(BspServiceMaster.class);
134   /** Am I the master? */
135   private boolean isMaster = false;
136   /** Max number of workers */
137   private final int maxWorkers;
138   /** Min number of workers */
139   private final int minWorkers;
140   /** Max number of supersteps */
141   private final int maxNumberOfSupersteps;
142   /** Min % responded workers */
143   private final float minPercentResponded;
144   /** Msecs to wait for an event */
145   private final int eventWaitMsecs;
146   /** Max msecs to wait for a superstep to get enough workers */
147   private final int maxSuperstepWaitMsecs;
148   /** Min number of long tails before printing */
149   private final int partitionLongTailMinPrint;
150   /** Last finalized checkpoint */
151   private long lastCheckpointedSuperstep = -1;
152   /** Worker wrote checkpoint */
153   private final BspEvent workerWroteCheckpoint;
154   /** State of the superstep changed */
155   private final BspEvent superstepStateChanged;
156   /** Master graph partitioner */
157   private final MasterGraphPartitioner<I, V, E> masterGraphPartitioner;
158   /** All the partition stats from the last superstep */
159   private final List<PartitionStats> allPartitionStatsList =
160       new ArrayList<PartitionStats>();
161   /** Handler for global communication */
162   private MasterGlobalCommHandler globalCommHandler;
163   /** Handler for aggregators to reduce/broadcast translation */
164   private AggregatorToGlobalCommTranslation aggregatorTranslation;
165   /** Master class */
166   private MasterCompute masterCompute;
167   /** IPC Client */
168   private MasterClient masterClient;
169   /** IPC Server */
170   private MasterServer masterServer;
171   /** Master info */
172   private MasterInfo masterInfo;
173   /** List of workers in current superstep, sorted by task id */
174   private List<WorkerInfo> chosenWorkerInfoList = Lists.newArrayList();
175   /** Limit locality information added to each InputSplit znode */
176   private final int localityLimit = 5;
177   /** Observers over master lifecycle. */
178   private final MasterObserver[] observers;
179 
180   // Per-Superstep Metrics
181   /** MasterCompute time */
182   private GiraphTimer masterComputeTimer;
183 
184   /** Checkpoint frequency */
185   private final int checkpointFrequency;
186   /** Current checkpoint status */
187   private CheckpointStatus checkpointStatus;
188   /** Checks if checkpointing supported */
189   private final CheckpointSupportedChecker checkpointSupportedChecker;
190 
191   /**
192    * Constructor for setting up the master.
193    *
194    * @param context Mapper context
195    * @param graphTaskManager GraphTaskManager for this compute node
196    */
197   public BspServiceMaster(
198       Mapper<?, ?, ?, ?>.Context context,
199       GraphTaskManager<I, V, E> graphTaskManager) {
200     super(context, graphTaskManager);
201     workerWroteCheckpoint = new PredicateLock(context);
202     registerBspEvent(workerWroteCheckpoint);
203     superstepStateChanged = new PredicateLock(context);
204     registerBspEvent(superstepStateChanged);
205 
206     ImmutableClassesGiraphConfiguration<I, V, E> conf =
207         getConfiguration();
208 
209     maxWorkers = conf.getMaxWorkers();
210     minWorkers = conf.getMinWorkers();
211     maxNumberOfSupersteps = conf.getMaxNumberOfSupersteps();
212     minPercentResponded = GiraphConstants.MIN_PERCENT_RESPONDED.get(conf);
213     eventWaitMsecs = conf.getEventWaitMsecs();
214     maxSuperstepWaitMsecs = conf.getMaxMasterSuperstepWaitMsecs();
215     partitionLongTailMinPrint = PARTITION_LONG_TAIL_MIN_PRINT.get(conf);
216     masterGraphPartitioner =
217         getGraphPartitionerFactory().createMasterGraphPartitioner();
218     if (conf.isJMapHistogramDumpEnabled()) {
219       conf.addMasterObserverClass(JMapHistoDumper.class);
220     }
221     if (conf.isReactiveJmapHistogramDumpEnabled()) {
222       conf.addMasterObserverClass(ReactiveJMapHistoDumper.class);
223     }
224     observers = conf.createMasterObservers(context);
225 
226     this.checkpointFrequency = conf.getCheckpointFrequency();
227     this.checkpointStatus = CheckpointStatus.NONE;
228     this.checkpointSupportedChecker =
229         ReflectionUtils.newInstance(
230             GiraphConstants.CHECKPOINT_SUPPORTED_CHECKER.get(conf));
231 
232     GiraphMetrics.get().addSuperstepResetObserver(this);
233     GiraphStats.init(context);
234   }
235 
236   @Override
237   public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
238     masterComputeTimer = new GiraphTimer(superstepMetrics,
239         "master-compute-call", TimeUnit.MILLISECONDS);
240   }
241 
242   @Override
243   public void setJobState(ApplicationState state,
244       long applicationAttempt,
245       long desiredSuperstep) {
246     setJobState(state, applicationAttempt, desiredSuperstep, true);
247   }
248 
249   /**
250    * Set the job state.
251    *
252    * @param state State of the application.
253    * @param applicationAttempt Attempt to start on
254    * @param desiredSuperstep Superstep to restart from (if applicable)
255    * @param killJobOnFailure if true, and the desired state is FAILED,
256    *                         then kill this job.
257    */
258   private void setJobState(ApplicationState state,
259       long applicationAttempt,
260       long desiredSuperstep,
261       boolean killJobOnFailure) {
262     JSONObject jobState = new JSONObject();
263     try {
264       jobState.put(JSONOBJ_STATE_KEY, state.toString());
265       jobState.put(JSONOBJ_APPLICATION_ATTEMPT_KEY, applicationAttempt);
266       jobState.put(JSONOBJ_SUPERSTEP_KEY, desiredSuperstep);
267     } catch (JSONException e) {
268       throw new RuntimeException("setJobState: Couldn't put " +
269           state.toString());
270     }
271     if (LOG.isInfoEnabled()) {
272       LOG.info("setJobState: " + jobState.toString() + " on superstep " +
273           getSuperstep());
274     }
275     try {
276       getZkExt().createExt(masterJobStatePath + "/jobState",
277           jobState.toString().getBytes(Charset.defaultCharset()),
278           Ids.OPEN_ACL_UNSAFE,
279           CreateMode.PERSISTENT_SEQUENTIAL,
280           true);
281       LOG.info("setJobState: " + jobState);
282     } catch (KeeperException.NodeExistsException e) {
283       throw new IllegalStateException(
284           "setJobState: Imposible that " +
285               masterJobStatePath + " already exists!", e);
286     } catch (KeeperException e) {
287       throw new IllegalStateException(
288           "setJobState: Unknown KeeperException for " +
289               masterJobStatePath, e);
290     } catch (InterruptedException e) {
291       throw new IllegalStateException(
292           "setJobState: Unknown InterruptedException for " +
293               masterJobStatePath, e);
294     }
295     if (state == ApplicationState.FAILED && killJobOnFailure) {
296       failJob(new IllegalStateException("FAILED"));
297     }
298 
299   }
300 
301   /**
302    * Set the job state to FAILED. This will kill the job, and log exceptions to
303    * any observers.
304    *
305    * @param reason The reason the job failed
306    */
307   private void setJobStateFailed(String reason) {
308     getGraphTaskManager().getJobProgressTracker().logFailure(reason);
309     setJobState(ApplicationState.FAILED, -1, -1, false);
310     failJob(new IllegalStateException(reason));
311   }
312 
313   /**
314    * Common method for generating vertex/edge input splits.
315    *
316    * @param inputFormat The vertex/edge input format
317    * @param minSplitCountHint Minimum number of splits to create (hint)
318    * @param inputSplitType Type of input splits (for logging purposes)
319    * @return List of input splits for the given format
320    */
321   private List<InputSplit> generateInputSplits(GiraphInputFormat inputFormat,
322                                                int minSplitCountHint,
323                                                InputType inputSplitType) {
324     String logPrefix = "generate" + inputSplitType + "InputSplits";
325     List<InputSplit> splits;
326     try {
327       splits = inputFormat.getSplits(getContext(), minSplitCountHint);
328     } catch (IOException e) {
329       throw new IllegalStateException(logPrefix + ": Got IOException", e);
330     } catch (InterruptedException e) {
331       throw new IllegalStateException(
332           logPrefix + ": Got InterruptedException", e);
333     }
334     float samplePercent =
335         INPUT_SPLIT_SAMPLE_PERCENT.get(getConfiguration());
336     if (samplePercent != INPUT_SPLIT_SAMPLE_PERCENT.getDefaultValue()) {
337       int lastIndex = (int) (samplePercent * splits.size() / 100f);
338       Collections.shuffle(splits);
339       List<InputSplit> sampleSplits = splits.subList(0, lastIndex);
340       LOG.warn(logPrefix + ": Using sampling - Processing only " +
341           sampleSplits.size() + " instead of " + splits.size() +
342           " expected splits.");
343       return sampleSplits;
344     } else {
345       if (LOG.isInfoEnabled()) {
346         LOG.info(logPrefix + ": Got " + splits.size() +
347             " input splits for " + minSplitCountHint + " input threads");
348       }
349       return splits;
350     }
351   }
352 
353   /**
354    * When there is no salvaging this job, fail it.
355    *
356    * @param e Exception to log to observers
357    */
358   private void failJob(Exception e) {
359     LOG.fatal("failJob: Killing job " + getJobId());
360     LOG.fatal("failJob: exception " + e.toString());
361     try {
362       if (getConfiguration().isPureYarnJob()) {
363         throw new RuntimeException(
364           "BspServiceMaster (YARN profile) is " +
365           "FAILING this task, throwing exception to end job run.", e);
366       } else {
367         @SuppressWarnings("deprecation")
368         org.apache.hadoop.mapred.JobClient jobClient =
369           new org.apache.hadoop.mapred.JobClient(
370             (org.apache.hadoop.mapred.JobConf)
371             getContext().getConfiguration());
372         try {
373           @SuppressWarnings("deprecation")
374           JobID jobId = JobID.forName(getJobId());
375           RunningJob job = jobClient.getJob(jobId);
376           if (job != null) {
377             job.killJob();
378           } else {
379             LOG.error("Job not found for jobId=" + getJobId());
380           }
381         } catch (IllegalArgumentException iae) {
382           LOG.info("This job (" + getJobId() +
383                        ") is not a legacy Hadoop job and will " +
384                        "continue with failure cleanup." +
385                        e.getMessage(),
386                    e);
387         }
388       }
389     } catch (IOException ioe) {
390       throw new RuntimeException(ioe);
391     } finally {
392       failureCleanup(e);
393     }
394   }
395 
396   /**
397    * Parse the {@link WorkerInfo} objects from a ZooKeeper path
398    * (and children).
399    *
400    * @param workerInfosPath Path where all the workers are children
401    * @param watch Watch or not?
402    * @return List of workers in that path
403    */
404   private List<WorkerInfo> getWorkerInfosFromPath(String workerInfosPath,
405       boolean watch) {
406     List<WorkerInfo> workerInfoList = new ArrayList<WorkerInfo>();
407     List<String> workerInfoPathList;
408     try {
409       workerInfoPathList =
410           getZkExt().getChildrenExt(workerInfosPath, watch, false, true);
411     } catch (KeeperException e) {
412       throw new IllegalStateException(
413           "getWorkers: Got KeeperException", e);
414     } catch (InterruptedException e) {
415       throw new IllegalStateException(
416           "getWorkers: Got InterruptedStateException", e);
417     }
418     for (String workerInfoPath : workerInfoPathList) {
419       WorkerInfo workerInfo = new WorkerInfo();
420       try {
421         WritableUtils.readFieldsFromZnode(
422             getZkExt(), workerInfoPath, true, null, workerInfo);
423         workerInfoList.add(workerInfo);
424       } catch (IllegalStateException e) {
425         LOG.warn("Can't get info from worker, did it die in between? " +
426             "workerInfoPath=" + workerInfoPath, e);
427       }
428     }
429     return workerInfoList;
430   }
431 
432   /**
433    * Get the healthy and unhealthy {@link WorkerInfo} objects for
434    * a superstep
435    *
436    * @param superstep superstep to check
437    * @param healthyWorkerInfoList filled in with current data
438    * @param unhealthyWorkerInfoList filled in with current data
439    */
440   private void getAllWorkerInfos(
441       long superstep,
442       List<WorkerInfo> healthyWorkerInfoList,
443       List<WorkerInfo> unhealthyWorkerInfoList) {
444     String healthyWorkerInfoPath =
445         getWorkerInfoHealthyPath(getApplicationAttempt(), superstep);
446     String unhealthyWorkerInfoPath =
447         getWorkerInfoUnhealthyPath(getApplicationAttempt(), superstep);
448 
449     try {
450       getZkExt().createOnceExt(healthyWorkerInfoPath,
451           null,
452           Ids.OPEN_ACL_UNSAFE,
453           CreateMode.PERSISTENT,
454           true);
455     } catch (KeeperException e) {
456       throw new IllegalStateException("getWorkers: KeeperException", e);
457     } catch (InterruptedException e) {
458       throw new IllegalStateException("getWorkers: InterruptedException", e);
459     }
460 
461     try {
462       getZkExt().createOnceExt(unhealthyWorkerInfoPath,
463           null,
464           Ids.OPEN_ACL_UNSAFE,
465           CreateMode.PERSISTENT,
466           true);
467     } catch (KeeperException e) {
468       throw new IllegalStateException("getWorkers: KeeperException", e);
469     } catch (InterruptedException e) {
470       throw new IllegalStateException("getWorkers: InterruptedException", e);
471     }
472 
473     List<WorkerInfo> currentHealthyWorkerInfoList =
474         getWorkerInfosFromPath(healthyWorkerInfoPath, true);
475     List<WorkerInfo> currentUnhealthyWorkerInfoList =
476         getWorkerInfosFromPath(unhealthyWorkerInfoPath, false);
477 
478     healthyWorkerInfoList.clear();
479     if (currentHealthyWorkerInfoList != null) {
480       for (WorkerInfo healthyWorkerInfo :
481         currentHealthyWorkerInfoList) {
482         healthyWorkerInfoList.add(healthyWorkerInfo);
483       }
484     }
485 
486     unhealthyWorkerInfoList.clear();
487     if (currentUnhealthyWorkerInfoList != null) {
488       for (WorkerInfo unhealthyWorkerInfo :
489         currentUnhealthyWorkerInfoList) {
490         unhealthyWorkerInfoList.add(unhealthyWorkerInfo);
491       }
492     }
493   }
494 
495   @Override
496   public List<WorkerInfo> checkWorkers() {
497     boolean failJob = true;
498     long failWorkerCheckMsecs =
499         SystemTime.get().getMilliseconds() + maxSuperstepWaitMsecs;
500     List<WorkerInfo> healthyWorkerInfoList = new ArrayList<WorkerInfo>();
501     List<WorkerInfo> unhealthyWorkerInfoList = new ArrayList<WorkerInfo>();
502     int totalResponses = -1;
503     while (SystemTime.get().getMilliseconds() < failWorkerCheckMsecs) {
504       getContext().progress();
505       getAllWorkerInfos(
506           getSuperstep(), healthyWorkerInfoList, unhealthyWorkerInfoList);
507       totalResponses = healthyWorkerInfoList.size() +
508           unhealthyWorkerInfoList.size();
509       if ((totalResponses * 100.0f / maxWorkers) >=
510           minPercentResponded) {
511         failJob = false;
512         break;
513       }
514       getContext().setStatus(getGraphTaskManager().getGraphFunctions() + " " +
515           "checkWorkers: Only found " +
516           totalResponses +
517           " responses of " + maxWorkers +
518           " needed to start superstep " +
519           getSuperstep());
520       if (getWorkerHealthRegistrationChangedEvent().waitMsecs(
521           eventWaitMsecs)) {
522         if (LOG.isDebugEnabled()) {
523           LOG.debug("checkWorkers: Got event that health " +
524               "registration changed, not using poll attempt");
525         }
526         getWorkerHealthRegistrationChangedEvent().reset();
527         continue;
528       }
529       if (LOG.isInfoEnabled()) {
530         LOG.info("checkWorkers: Only found " + totalResponses +
531             " responses of " + maxWorkers +
532             " needed to start superstep " +
533             getSuperstep() + ".  Reporting every " +
534             eventWaitMsecs + " msecs, " +
535             (failWorkerCheckMsecs - SystemTime.get().getMilliseconds()) +
536             " more msecs left before giving up.");
537         // Find the missing workers if there are only a few
538         if ((maxWorkers - totalResponses) <=
539             partitionLongTailMinPrint) {
540           logMissingWorkersOnSuperstep(healthyWorkerInfoList,
541               unhealthyWorkerInfoList);
542         }
543       }
544     }
545     if (failJob) {
546       LOG.error("checkWorkers: Did not receive enough processes in " +
547           "time (only " + totalResponses + " of " +
548           minWorkers + " required) after waiting " + maxSuperstepWaitMsecs +
549           "msecs).  This occurs if you do not have enough map tasks " +
550           "available simultaneously on your Hadoop instance to fulfill " +
551           "the number of requested workers.");
552       return null;
553     }
554 
555     if (healthyWorkerInfoList.size() < minWorkers) {
556       LOG.error("checkWorkers: Only " + healthyWorkerInfoList.size() +
557           " available when " + minWorkers + " are required.");
558       logMissingWorkersOnSuperstep(healthyWorkerInfoList,
559           unhealthyWorkerInfoList);
560       return null;
561     }
562 
563     getContext().setStatus(getGraphTaskManager().getGraphFunctions() + " " +
564         "checkWorkers: Done - Found " + totalResponses +
565         " responses of " + maxWorkers + " needed to start superstep " +
566         getSuperstep());
567 
568     return healthyWorkerInfoList;
569   }
570 
571   /**
572    * Log info level of the missing workers on the superstep
573    *
574    * @param healthyWorkerInfoList Healthy worker list
575    * @param unhealthyWorkerInfoList Unhealthy worker list
576    */
577   private void logMissingWorkersOnSuperstep(
578       List<WorkerInfo> healthyWorkerInfoList,
579       List<WorkerInfo> unhealthyWorkerInfoList) {
580     if (LOG.isInfoEnabled()) {
581       Set<Integer> partitionSet = new TreeSet<Integer>();
582       for (WorkerInfo workerInfo : healthyWorkerInfoList) {
583         partitionSet.add(workerInfo.getTaskId() % maxWorkers);
584       }
585       for (WorkerInfo workerInfo : unhealthyWorkerInfoList) {
586         partitionSet.add(workerInfo.getTaskId() % maxWorkers);
587       }
588       for (int i = 1; i <= maxWorkers; ++i) {
589         if (partitionSet.contains(Integer.valueOf(i))) {
590           continue;
591         } else if (i == getTaskId() % maxWorkers) {
592           continue;
593         } else {
594           LOG.info("logMissingWorkersOnSuperstep: No response from " +
595               "partition " + i + " (could be master)");
596         }
597       }
598     }
599   }
600 
601   /**
602    * Common method for creating vertex/edge input splits.
603    *
604    * @param inputFormat The vertex/edge input format
605    * @param inputSplitType Type of input split (for logging purposes)
606    * @return Number of splits. Returns -1 on failure to create
607    *         valid input splits.
608    */
609   private int createInputSplits(GiraphInputFormat inputFormat,
610                                 InputType inputSplitType) {
611     ImmutableClassesGiraphConfiguration conf = getConfiguration();
612     String logPrefix = "create" + inputSplitType + "InputSplits";
613     // Only the 'master' should be doing this.  Wait until the number of
614     // processes that have reported health exceeds the minimum percentage.
615     // If the minimum percentage is not met, fail the job.  Otherwise
616     // generate the input splits
617     List<WorkerInfo> healthyWorkerInfoList = checkWorkers();
618     if (healthyWorkerInfoList == null) {
619       setJobStateFailed("Not enough healthy workers to create input splits");
620       return -1;
621     }
622     globalCommHandler.getInputSplitsHandler().initialize(masterClient,
623         healthyWorkerInfoList);
624 
625     // Create at least as many splits as the total number of input threads.
626     int minSplitCountHint = healthyWorkerInfoList.size() *
627         conf.getNumInputSplitsThreads();
628 
629     // Note that the input splits may only be a sample if
630     // INPUT_SPLIT_SAMPLE_PERCENT is set to something other than 100
631     List<InputSplit> splitList = generateInputSplits(inputFormat,
632         minSplitCountHint, inputSplitType);
633 
634     if (splitList.isEmpty()) {
635       LOG.fatal(logPrefix + ": Failing job due to 0 input splits, " +
636           "check input of " + inputFormat.getClass().getName() + "!");
637       getContext().setStatus("Failing job due to 0 input splits, " +
638           "check input of " + inputFormat.getClass().getName() + "!");
639       setJobStateFailed("******* PLEASE CHECK YOUR INPUT TABLES - PARTITIONS " +
640           "WHICH YOU SPECIFIED ARE MISSING (for " + inputSplitType +
641           " input). FAILING THE JOB *******");
642     }
643     if (minSplitCountHint > splitList.size()) {
644       LOG.warn(logPrefix + ": Number of inputSplits=" +
645           splitList.size() + " < " +
646           minSplitCountHint +
647           "=total number of input threads, " +
648           "some threads will be not used");
649     }
650 
651     globalCommHandler.getInputSplitsHandler().addSplits(inputSplitType,
652         splitList, inputFormat);
653 
654     return splitList.size();
655   }
656 
657   @Override
658   public int createMappingInputSplits() {
659     if (!getConfiguration().hasMappingInputFormat()) {
660       return 0;
661     }
662     MappingInputFormat<I, V, E, ? extends Writable> mappingInputFormat =
663       getConfiguration().createWrappedMappingInputFormat();
664     return createInputSplits(mappingInputFormat, InputType.MAPPING);
665   }
666 
667   @Override
668   public int createVertexInputSplits() {
669     int splits = 0;
670     if (getConfiguration().hasVertexInputFormat()) {
671       VertexInputFormat<I, V, E> vertexInputFormat =
672           getConfiguration().createWrappedVertexInputFormat();
673       splits = createInputSplits(vertexInputFormat, InputType.VERTEX);
674     }
675     MasterProgress.get().setVertexInputSplitCount(splits);
676     getJobProgressTracker().updateMasterProgress(MasterProgress.get());
677     return splits;
678   }
679 
680   @Override
681   public int createEdgeInputSplits() {
682     int splits = 0;
683     if (getConfiguration().hasEdgeInputFormat()) {
684       EdgeInputFormat<I, E> edgeInputFormat =
685           getConfiguration().createWrappedEdgeInputFormat();
686       splits = createInputSplits(edgeInputFormat, InputType.EDGE);
687     }
688     MasterProgress.get().setEdgeInputSplitsCount(splits);
689     getJobProgressTracker().updateMasterProgress(MasterProgress.get());
690     return splits;
691   }
692 
693   @Override
694   public List<WorkerInfo> getWorkerInfoList() {
695     return chosenWorkerInfoList;
696   }
697 
698   @Override
699   public MasterGlobalCommHandler getGlobalCommHandler() {
700     return globalCommHandler;
701   }
702 
703   @Override
704   public AggregatorToGlobalCommTranslation getAggregatorTranslationHandler() {
705     return aggregatorTranslation;
706   }
707 
708   @Override
709   public MasterCompute getMasterCompute() {
710     return masterCompute;
711   }
712 
713   /**
714    * Read the finalized checkpoint file and associated metadata files for the
715    * checkpoint.  Modifies the {@link PartitionOwner} objects to get the
716    * checkpoint prefixes.  It is an optimization to prevent all workers from
717    * searching all the files.  Also read in the aggregator data from the
718    * finalized checkpoint file and setting it.
719    *
720    * @param superstep Checkpoint set to examine.
721    * @throws IOException
722    * @throws InterruptedException
723    * @throws KeeperException
724    * @return Collection of generated partition owners.
725    */
726   private Collection<PartitionOwner> prepareCheckpointRestart(long superstep)
727     throws IOException, KeeperException, InterruptedException {
728     List<PartitionOwner> partitionOwners = new ArrayList<>();
729     FileSystem fs = getFs();
730     String finalizedCheckpointPath = getSavedCheckpointBasePath(superstep) +
731         CheckpointingUtils.CHECKPOINT_FINALIZED_POSTFIX;
732     LOG.info("Loading checkpoint from " + finalizedCheckpointPath);
733     DataInputStream finalizedStream =
734         fs.open(new Path(finalizedCheckpointPath));
735     GlobalStats globalStats = new GlobalStats();
736     globalStats.readFields(finalizedStream);
737     updateCounters(globalStats);
738     SuperstepClasses superstepClasses =
739         SuperstepClasses.createToRead(getConfiguration());
740     superstepClasses.readFields(finalizedStream);
741     getConfiguration().updateSuperstepClasses(superstepClasses);
742     int prefixFileCount = finalizedStream.readInt();
743 
744     String checkpointFile =
745         finalizedStream.readUTF();
746     for (int i = 0; i < prefixFileCount; ++i) {
747       int mrTaskId = finalizedStream.readInt();
748 
749       DataInputStream metadataStream = fs.open(new Path(checkpointFile +
750           "." + mrTaskId + CheckpointingUtils.CHECKPOINT_METADATA_POSTFIX));
751       long partitions = metadataStream.readInt();
752       WorkerInfo worker = getWorkerInfoById(mrTaskId);
753       for (long p = 0; p < partitions; ++p) {
754         int partitionId = metadataStream.readInt();
755         PartitionOwner partitionOwner = new BasicPartitionOwner(partitionId,
756             worker);
757         partitionOwners.add(partitionOwner);
758         LOG.info("prepareCheckpointRestart partitionId=" + partitionId +
759             " assigned to " + partitionOwner);
760       }
761       metadataStream.close();
762     }
763     //Ordering appears to be important as of right now we rely on this ordering
764     //in WorkerGraphPartitioner
765     Collections.sort(partitionOwners, new Comparator<PartitionOwner>() {
766       @Override
767       public int compare(PartitionOwner p1, PartitionOwner p2) {
768         return Integer.compare(p1.getPartitionId(), p2.getPartitionId());
769       }
770     });
771 
772 
773     globalCommHandler.getAggregatorHandler().readFields(finalizedStream);
774     aggregatorTranslation.readFields(finalizedStream);
775     masterCompute.readFields(finalizedStream);
776     finalizedStream.close();
777 
778     return partitionOwners;
779   }
780 
781   @Override
782   public void setup() {
783     // Might have to manually load a checkpoint.
784     // In that case, the input splits are not set, they will be faked by
785     // the checkpoint files.  Each checkpoint file will be an input split
786     // and the input split
787 
788     if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
789       GiraphStats.getInstance().getSuperstepCounter().
790         setValue(getRestartedSuperstep());
791     }
792     for (MasterObserver observer : observers) {
793       observer.preApplication();
794       getContext().progress();
795     }
796   }
797 
798   @Override
799   public boolean becomeMaster() {
800     // Create my bid to become the master, then try to become the worker
801     // or return false.
802     String myBid = null;
803     try {
804       myBid =
805           getZkExt().createExt(masterElectionPath +
806               "/" + getHostnameTaskId(),
807               null,
808               Ids.OPEN_ACL_UNSAFE,
809               CreateMode.EPHEMERAL_SEQUENTIAL,
810               true);
811     } catch (KeeperException e) {
812       throw new IllegalStateException(
813           "becomeMaster: KeeperException", e);
814     } catch (InterruptedException e) {
815       throw new IllegalStateException(
816           "becomeMaster: IllegalStateException", e);
817     }
818     while (true) {
819       JSONObject jobState = getJobState();
820       try {
821         if ((jobState != null) &&
822             ApplicationState.valueOf(
823                 jobState.getString(JSONOBJ_STATE_KEY)) ==
824                 ApplicationState.FINISHED) {
825           LOG.info("becomeMaster: Job is finished, " +
826               "give up trying to be the master!");
827           isMaster = false;
828           return isMaster;
829         }
830       } catch (JSONException e) {
831         throw new IllegalStateException(
832             "becomeMaster: Couldn't get state from " + jobState, e);
833       }
834       try {
835         List<String> masterChildArr =
836             getZkExt().getChildrenExt(
837                 masterElectionPath, true, true, true);
838         if (LOG.isInfoEnabled()) {
839           LOG.info("becomeMaster: First child is '" +
840               masterChildArr.get(0) + "' and my bid is '" +
841               myBid + "'");
842         }
843         if (masterChildArr.get(0).equals(myBid)) {
844           GiraphStats.getInstance().getCurrentMasterTaskPartition().
845               setValue(getTaskId());
846 
847           globalCommHandler = new MasterGlobalCommHandler(
848               new MasterAggregatorHandler(getConfiguration(), getContext()),
849               new MasterInputSplitsHandler(
850                   getConfiguration().useInputSplitLocality(), getContext()));
851           aggregatorTranslation = new AggregatorToGlobalCommTranslation(
852               getConfiguration(), globalCommHandler);
853 
854           globalCommHandler.getAggregatorHandler().initialize(this);
855           masterCompute = getConfiguration().createMasterCompute();
856           masterCompute.setMasterService(this);
857 
858           masterInfo = new MasterInfo();
859           masterServer =
860               new NettyMasterServer(getConfiguration(), this, getContext(),
861                   getGraphTaskManager().createUncaughtExceptionHandler());
862           masterInfo.setInetSocketAddress(masterServer.getMyAddress(),
863               masterServer.getLocalHostOrIp());
864           masterInfo.setTaskId(getTaskId());
865           masterClient =
866               new NettyMasterClient(getContext(), getConfiguration(), this,
867                   getGraphTaskManager().createUncaughtExceptionHandler());
868           masterServer.setFlowControl(masterClient.getFlowControl());
869 
870           if (LOG.isInfoEnabled()) {
871             LOG.info("becomeMaster: I am now the master!");
872           }
873           isMaster = true;
874           return isMaster;
875         }
876         LOG.info("becomeMaster: Waiting to become the master...");
877         getMasterElectionChildrenChangedEvent().waitForTimeoutOrFail(
878             GiraphConstants.WAIT_ZOOKEEPER_TIMEOUT_MSEC.get(
879                 getConfiguration()));
880         getMasterElectionChildrenChangedEvent().reset();
881       } catch (KeeperException e) {
882         throw new IllegalStateException(
883             "becomeMaster: KeeperException", e);
884       } catch (InterruptedException e) {
885         throw new IllegalStateException(
886             "becomeMaster: IllegalStateException", e);
887       }
888     }
889   }
890 
891   @Override
892   public MasterInfo getMasterInfo() {
893     return masterInfo;
894   }
895 
896   /**
897    * Collect and aggregate the worker statistics for a particular superstep.
898    *
899    * @param superstep Superstep to aggregate on
900    * @return Global statistics aggregated on all worker statistics
901    */
902   private GlobalStats aggregateWorkerStats(long superstep) {
903     ImmutableClassesGiraphConfiguration conf = getConfiguration();
904 
905     GlobalStats globalStats = new GlobalStats();
906     // Get the stats from the all the worker selected nodes
907     String workerFinishedPath =
908         getWorkerFinishedPath(getApplicationAttempt(), superstep);
909     List<String> workerFinishedPathList = null;
910     try {
911       workerFinishedPathList =
912           getZkExt().getChildrenExt(
913               workerFinishedPath, false, false, true);
914     } catch (KeeperException e) {
915       throw new IllegalStateException(
916           "aggregateWorkerStats: KeeperException", e);
917     } catch (InterruptedException e) {
918       throw new IllegalStateException(
919           "aggregateWorkerStats: InterruptedException", e);
920     }
921 
922     AggregatedMetrics aggregatedMetrics = new AggregatedMetrics();
923 
924     for (String finishedPath : workerFinishedPathList) {
925       String hostnamePartitionId = FilenameUtils.getName(finishedPath);
926       JSONObject workerFinishedInfoObj = null;
927       try {
928         byte [] zkData =
929             getZkExt().getData(finishedPath, false, null);
930         workerFinishedInfoObj = new JSONObject(new String(zkData,
931             Charset.defaultCharset()));
932         globalStats.addMessageCount(
933             workerFinishedInfoObj.getLong(
934                 JSONOBJ_NUM_MESSAGES_KEY));
935         globalStats.addMessageBytesCount(
936           workerFinishedInfoObj.getLong(
937               JSONOBJ_NUM_MESSAGE_BYTES_KEY));
938         if (conf.metricsEnabled() &&
939             workerFinishedInfoObj.has(JSONOBJ_METRICS_KEY)) {
940           WorkerSuperstepMetrics workerMetrics = new WorkerSuperstepMetrics();
941           WritableUtils.readFieldsFromByteArray(
942               Base64.decode(
943                   workerFinishedInfoObj.getString(
944                       JSONOBJ_METRICS_KEY)),
945               workerMetrics);
946           globalStats.addOocLoadBytesCount(
947               workerMetrics.getBytesLoadedFromDisk());
948           globalStats.addOocStoreBytesCount(
949               workerMetrics.getBytesStoredOnDisk());
950           // Find the lowest percentage of graph in memory across all workers
951           // for one superstep
952           globalStats.setLowestGraphPercentageInMemory(
953               Math.min(globalStats.getLowestGraphPercentageInMemory(),
954                   (int) Math.round(
955                       workerMetrics.getGraphPercentageInMemory())));
956           aggregatedMetrics.add(workerMetrics, hostnamePartitionId);
957         }
958       } catch (JSONException e) {
959         throw new IllegalStateException(
960             "aggregateWorkerStats: JSONException", e);
961       } catch (KeeperException e) {
962         throw new IllegalStateException(
963             "aggregateWorkerStats: KeeperException", e);
964       } catch (InterruptedException e) {
965         throw new IllegalStateException(
966             "aggregateWorkerStats: InterruptedException", e);
967       } catch (IOException e) {
968         throw new IllegalStateException(
969             "aggregateWorkerStats: IOException", e);
970       }
971     }
972 
973     allPartitionStatsList.clear();
974     Iterable<PartitionStats> statsList = globalCommHandler.getAllPartitionStats(
975         workerFinishedPathList.size(), getContext());
976     for (PartitionStats partitionStats : statsList) {
977       globalStats.addPartitionStats(partitionStats);
978       allPartitionStatsList.add(partitionStats);
979     }
980 
981     if (conf.metricsEnabled()) {
982       if (GiraphConstants.METRICS_DIRECTORY.isDefaultValue(conf)) {
983         aggregatedMetrics.print(superstep, System.err);
984       } else {
985         printAggregatedMetricsToHDFS(superstep, aggregatedMetrics);
986       }
987       for (MasterObserver observer : observers) {
988         observer.superstepMetricsUpdate(
989             superstep, aggregatedMetrics, allPartitionStatsList);
990       }
991     }
992 
993     if (LOG.isInfoEnabled()) {
994       LOG.info("aggregateWorkerStats: Aggregation found " + globalStats +
995           " on superstep = " + getSuperstep());
996     }
997     return globalStats;
998   }
999 
1000   /**
1001    * Write superstep metrics to own file in HDFS
1002    * @param superstep the current superstep
1003    * @param aggregatedMetrics the aggregated metrics to write
1004    */
1005   private void printAggregatedMetricsToHDFS(
1006       long superstep, AggregatedMetrics aggregatedMetrics) {
1007     ImmutableClassesGiraphConfiguration conf = getConfiguration();
1008     PrintStream out = null;
1009     Path dir = new Path(GiraphConstants.METRICS_DIRECTORY.get(conf));
1010     Path outFile = new Path(GiraphConstants.METRICS_DIRECTORY.get(conf) +
1011         Path.SEPARATOR_CHAR + "superstep_" + superstep + ".metrics");
1012     try {
1013       FileSystem fs;
1014       fs = FileSystem.get(conf);
1015       if (!fs.exists(dir)) {
1016         fs.mkdirs(dir);
1017       }
1018       if (fs.exists(outFile)) {
1019         throw new RuntimeException(
1020             "printAggregatedMetricsToHDFS: metrics file exists");
1021       }
1022       out = new PrintStream(fs.create(outFile), false,
1023           Charset.defaultCharset().name());
1024       aggregatedMetrics.print(superstep, out);
1025     } catch (IOException e) {
1026       throw new RuntimeException(
1027           "printAggregatedMetricsToHDFS: error creating metrics file", e);
1028     } finally {
1029       if (out != null) {
1030         out.close();
1031       }
1032     }
1033   }
1034 
1035   /**
1036    * Finalize the checkpoint file prefixes by taking the chosen workers and
1037    * writing them to a finalized file.  Also write out the master
1038    * aggregated aggregator array from the previous superstep.
1039    *
1040    * @param superstep superstep to finalize
1041    * @param chosenWorkerInfoList list of chosen workers that will be finalized
1042    * @throws IOException
1043    * @throws InterruptedException
1044    * @throws KeeperException
1045    */
1046   private void finalizeCheckpoint(long superstep,
1047     List<WorkerInfo> chosenWorkerInfoList)
1048     throws IOException, KeeperException, InterruptedException {
1049     Path finalizedCheckpointPath =
1050         new Path(getCheckpointBasePath(superstep) +
1051             CheckpointingUtils.CHECKPOINT_FINALIZED_POSTFIX);
1052     try {
1053       getFs().delete(finalizedCheckpointPath, false);
1054     } catch (IOException e) {
1055       LOG.warn("finalizedValidCheckpointPrefixes: Removed old file " +
1056           finalizedCheckpointPath);
1057     }
1058 
1059     // Format:
1060     // <global statistics>
1061     // <superstep classes>
1062     // <number of files>
1063     // <used file prefix 0><used file prefix 1>...
1064     // <aggregator data>
1065     // <masterCompute data>
1066     FSDataOutputStream finalizedOutputStream =
1067         getFs().create(finalizedCheckpointPath);
1068 
1069     String superstepFinishedNode =
1070         getSuperstepFinishedPath(getApplicationAttempt(), superstep - 1);
1071     finalizedOutputStream.write(
1072         getZkExt().getData(superstepFinishedNode, false, null));
1073 
1074     finalizedOutputStream.writeInt(chosenWorkerInfoList.size());
1075     finalizedOutputStream.writeUTF(getCheckpointBasePath(superstep));
1076     for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) {
1077       finalizedOutputStream.writeInt(getWorkerId(chosenWorkerInfo));
1078     }
1079     globalCommHandler.getAggregatorHandler().write(finalizedOutputStream);
1080     aggregatorTranslation.write(finalizedOutputStream);
1081     masterCompute.write(finalizedOutputStream);
1082     finalizedOutputStream.close();
1083     lastCheckpointedSuperstep = superstep;
1084     GiraphStats.getInstance().
1085         getLastCheckpointedSuperstep().setValue(superstep);
1086   }
1087 
1088   /**
1089    * Assign the partitions for this superstep.  If there are changes,
1090    * the workers will know how to do the exchange.  If this was a restarted
1091    * superstep, then make sure to provide information on where to find the
1092    * checkpoint file.
1093    */
1094   private void assignPartitionOwners() {
1095     Collection<PartitionOwner> partitionOwners;
1096     if (getSuperstep() == INPUT_SUPERSTEP) {
1097       partitionOwners =
1098           masterGraphPartitioner.createInitialPartitionOwners(
1099               chosenWorkerInfoList, maxWorkers);
1100       if (partitionOwners.isEmpty()) {
1101         throw new IllegalStateException(
1102             "assignAndExchangePartitions: No partition owners set");
1103       }
1104     } else if (getRestartedSuperstep() == getSuperstep()) {
1105       // If restarted, prepare the checkpoint restart
1106       try {
1107         partitionOwners = prepareCheckpointRestart(getSuperstep());
1108       } catch (IOException e) {
1109         throw new IllegalStateException(
1110             "assignPartitionOwners: IOException on preparing", e);
1111       } catch (KeeperException e) {
1112         throw new IllegalStateException(
1113             "assignPartitionOwners: KeeperException on preparing", e);
1114       } catch (InterruptedException e) {
1115         throw new IllegalStateException(
1116             "assignPartitionOwners: InteruptedException on preparing",
1117             e);
1118       }
1119       masterGraphPartitioner.setPartitionOwners(partitionOwners);
1120     } else {
1121       partitionOwners =
1122           masterGraphPartitioner.generateChangedPartitionOwners(
1123               allPartitionStatsList,
1124               chosenWorkerInfoList,
1125               maxWorkers,
1126               getSuperstep());
1127 
1128       PartitionUtils.analyzePartitionStats(partitionOwners,
1129           allPartitionStatsList);
1130     }
1131     checkPartitions(masterGraphPartitioner.getCurrentPartitionOwners());
1132 
1133 
1134 
1135     // There will be some exchange of partitions
1136     if (!partitionOwners.isEmpty()) {
1137       String vertexExchangePath =
1138           getPartitionExchangePath(getApplicationAttempt(),
1139               getSuperstep());
1140       try {
1141         getZkExt().createOnceExt(vertexExchangePath,
1142             null,
1143             Ids.OPEN_ACL_UNSAFE,
1144             CreateMode.PERSISTENT,
1145             true);
1146       } catch (KeeperException e) {
1147         throw new IllegalStateException(
1148             "assignPartitionOwners: KeeperException creating " +
1149                 vertexExchangePath);
1150       } catch (InterruptedException e) {
1151         throw new IllegalStateException(
1152             "assignPartitionOwners: InterruptedException creating " +
1153                 vertexExchangePath);
1154       }
1155     }
1156 
1157     AddressesAndPartitionsWritable addressesAndPartitions =
1158         new AddressesAndPartitionsWritable(masterInfo, chosenWorkerInfoList,
1159             partitionOwners);
1160     // Send assignments to every worker
1161     // TODO for very large number of partitions we might want to split this
1162     // across multiple requests
1163     for (WorkerInfo workerInfo : chosenWorkerInfoList) {
1164       masterClient.sendWritableRequest(workerInfo.getTaskId(),
1165           new AddressesAndPartitionsRequest(addressesAndPartitions));
1166     }
1167   }
1168 
1169   /**
1170    * Check if partition ids are valid
1171    *
1172    * @param partitionOwners List of partition ids for current superstep
1173    */
1174   private void checkPartitions(Collection<PartitionOwner> partitionOwners) {
1175     for (PartitionOwner partitionOwner : partitionOwners) {
1176       int partitionId = partitionOwner.getPartitionId();
1177       if (partitionId < 0 || partitionId >= partitionOwners.size()) {
1178         throw new IllegalStateException("checkPartitions: " +
1179             "Invalid partition id " + partitionId +
1180             " - partition ids must be values from 0 to (numPartitions - 1)");
1181       }
1182     }
1183   }
1184 
1185   /**
1186    * Check whether the workers chosen for this superstep are still alive
1187    *
1188    * @param chosenWorkerInfoHealthPath Path to the healthy workers in ZooKeeper
1189    * @param chosenWorkerInfoList List of the healthy workers
1190    * @return a list of dead workers. Empty list if all workers are alive.
1191    * @throws InterruptedException
1192    * @throws KeeperException
1193    */
1194   private Collection<WorkerInfo> superstepChosenWorkerAlive(
1195     String chosenWorkerInfoHealthPath,
1196     List<WorkerInfo> chosenWorkerInfoList)
1197     throws KeeperException, InterruptedException {
1198     List<WorkerInfo> chosenWorkerInfoHealthyList =
1199         getWorkerInfosFromPath(chosenWorkerInfoHealthPath, false);
1200     Set<WorkerInfo> chosenWorkerInfoHealthySet =
1201         new HashSet<WorkerInfo>(chosenWorkerInfoHealthyList);
1202     List<WorkerInfo> deadWorkers = new ArrayList<>();
1203     for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) {
1204       if (!chosenWorkerInfoHealthySet.contains(chosenWorkerInfo)) {
1205         deadWorkers.add(chosenWorkerInfo);
1206       }
1207     }
1208     return deadWorkers;
1209   }
1210 
1211   @Override
1212   public void restartFromCheckpoint(long checkpoint) {
1213     // Process:
1214     // 1. Increase the application attempt and set to the correct checkpoint
1215     // 2. Send command to all workers to restart their tasks
1216     setApplicationAttempt(getApplicationAttempt() + 1);
1217     setCachedSuperstep(checkpoint);
1218     setRestartedSuperstep(checkpoint);
1219     checkpointStatus = CheckpointStatus.NONE;
1220     setJobState(ApplicationState.START_SUPERSTEP,
1221         getApplicationAttempt(),
1222         checkpoint);
1223   }
1224 
1225   /**
1226    * Safely removes node from zookeeper.
1227    * Ignores if node is already removed. Can only throw runtime exception if
1228    * anything wrong.
1229    * @param path path to the node to be removed.
1230    */
1231   private void zkDeleteNode(String path) {
1232     try {
1233       getZkExt().deleteExt(path, -1, true);
1234     } catch (KeeperException.NoNodeException e) {
1235       LOG.info("zkDeleteNode: node has already been removed " + path);
1236     } catch (InterruptedException e) {
1237       throw new RuntimeException(
1238           "zkDeleteNode: InterruptedException", e);
1239     } catch (KeeperException e) {
1240       throw new RuntimeException(
1241           "zkDeleteNode: KeeperException", e);
1242     }
1243   }
1244 
1245   @Override
1246   public long getLastGoodCheckpoint() throws IOException {
1247     // Find the last good checkpoint if none have been written to the
1248     // knowledge of this master
1249     if (lastCheckpointedSuperstep == -1) {
1250       try {
1251         lastCheckpointedSuperstep = getLastCheckpointedSuperstep();
1252       } catch (IOException e) {
1253         LOG.fatal("getLastGoodCheckpoint: No last good checkpoints can be " +
1254             "found, killing the job.", e);
1255         failJob(e);
1256       }
1257     }
1258 
1259     return lastCheckpointedSuperstep;
1260   }
1261 
1262   /**
1263    * Wait for a set of workers to signal that they are done with the
1264    * barrier.
1265    *
1266    * @param finishedWorkerPath Path to where the workers will register their
1267    *        hostname and id
1268    * @param workerInfoList List of the workers to wait for
1269    * @param event Event to wait on for a chance to be done.
1270    * @param ignoreDeath In case if worker died after making it through
1271    *                    barrier, we will ignore death if set to true.
1272    * @return True if barrier was successful, false if there was a worker
1273    *         failure
1274    */
1275   private boolean barrierOnWorkerList(String finishedWorkerPath,
1276       List<WorkerInfo> workerInfoList,
1277       BspEvent event,
1278       boolean ignoreDeath) {
1279     try {
1280       getZkExt().createOnceExt(finishedWorkerPath,
1281           null,
1282           Ids.OPEN_ACL_UNSAFE,
1283           CreateMode.PERSISTENT,
1284           true);
1285     } catch (KeeperException e) {
1286       throw new IllegalStateException(
1287           "barrierOnWorkerList: KeeperException - Couldn't create " +
1288               finishedWorkerPath, e);
1289     } catch (InterruptedException e) {
1290       throw new IllegalStateException(
1291           "barrierOnWorkerList: InterruptedException - Couldn't create " +
1292               finishedWorkerPath, e);
1293     }
1294     List<String> hostnameIdList =
1295         new ArrayList<String>(workerInfoList.size());
1296     for (WorkerInfo workerInfo : workerInfoList) {
1297       hostnameIdList.add(workerInfo.getHostnameId());
1298     }
1299     String workerInfoHealthyPath =
1300         getWorkerInfoHealthyPath(getApplicationAttempt(), getSuperstep());
1301     List<String> finishedHostnameIdList = new ArrayList<>();
1302     List<String> tmpFinishedHostnameIdList;
1303     long nextInfoMillis = System.currentTimeMillis();
1304     final int defaultTaskTimeoutMsec = 10 * 60 * 1000;  // from TaskTracker
1305     final int waitBetweenLogInfoMsec = 30 * 1000;
1306     final int taskTimeoutMsec = getContext().getConfiguration().getInt(
1307         "mapred.task.timeout", defaultTaskTimeoutMsec) / 2;
1308     long lastRegularRunTimeMsec = 0;
1309     int eventLoopTimeout =  Math.min(taskTimeoutMsec, waitBetweenLogInfoMsec);
1310     boolean logInfoOnlyRun = false;
1311     List<WorkerInfo> deadWorkers = new ArrayList<>();
1312     while (true) {
1313       if (! logInfoOnlyRun) {
1314         try {
1315           tmpFinishedHostnameIdList =
1316               getZkExt().getChildrenExt(finishedWorkerPath,
1317                                         true,
1318                                         false,
1319                                         false);
1320         } catch (KeeperException e) {
1321           throw new IllegalStateException(
1322               "barrierOnWorkerList: KeeperException - Couldn't get " +
1323                   "children of " + finishedWorkerPath, e);
1324         } catch (InterruptedException e) {
1325           throw new IllegalStateException(
1326               "barrierOnWorkerList: IllegalException - Couldn't get " +
1327                   "children of " + finishedWorkerPath, e);
1328         }
1329         if (LOG.isDebugEnabled()) {
1330           // Log the names of the new workers that have finished since last time
1331           Set<String> newFinishedHostnames = Sets.difference(
1332             Sets.newHashSet(tmpFinishedHostnameIdList),
1333             Sets.newHashSet(finishedHostnameIdList));
1334           LOG.debug("barrierOnWorkerList: Got new finished worker list = " +
1335                         newFinishedHostnames + ", size = " +
1336                         newFinishedHostnames.size() +
1337                         " from " + finishedWorkerPath);
1338         }
1339         finishedHostnameIdList = tmpFinishedHostnameIdList;
1340       }
1341 
1342       if (LOG.isInfoEnabled() &&
1343           (System.currentTimeMillis() > nextInfoMillis)) {
1344         nextInfoMillis = System.currentTimeMillis() + waitBetweenLogInfoMsec;
1345         LOG.info("barrierOnWorkerList: " +
1346             finishedHostnameIdList.size() +
1347             " out of " + workerInfoList.size() +
1348             " workers finished on superstep " +
1349             getSuperstep() + " on path " + finishedWorkerPath);
1350         if (workerInfoList.size() - finishedHostnameIdList.size() <
1351             MAX_PRINTABLE_REMAINING_WORKERS) {
1352           Set<String> remainingWorkers = Sets.newHashSet(hostnameIdList);
1353           remainingWorkers.removeAll(finishedHostnameIdList);
1354           LOG.info("barrierOnWorkerList: Waiting on " + remainingWorkers);
1355         }
1356       }
1357 
1358       if (! logInfoOnlyRun) {
1359         getContext().setStatus(getGraphTaskManager().getGraphFunctions() +
1360                                    " - " +
1361                                    finishedHostnameIdList.size() +
1362                                    " finished out of " +
1363                                    workerInfoList.size() +
1364                                    " on superstep " + getSuperstep());
1365         if (finishedHostnameIdList.containsAll(hostnameIdList)) {
1366           break;
1367         }
1368 
1369         for (WorkerInfo deadWorker : deadWorkers) {
1370           if (!finishedHostnameIdList.contains(deadWorker.getHostnameId())) {
1371             LOG.error("barrierOnWorkerList: no results arived from " +
1372                           "worker that was pronounced dead: " + deadWorker +
1373                           " on superstep " + getSuperstep());
1374             return false;
1375           }
1376         }
1377 
1378         // wall-clock time skew is ignored
1379         lastRegularRunTimeMsec = System.currentTimeMillis();
1380       }
1381 
1382       // Wait for a signal or timeout
1383       boolean eventTriggered = event.waitMsecs(eventLoopTimeout);
1384       long elapsedTimeSinceRegularRunMsec = System.currentTimeMillis() -
1385           lastRegularRunTimeMsec;
1386       event.reset();
1387       getContext().progress();
1388 
1389       if (eventTriggered ||
1390           taskTimeoutMsec == eventLoopTimeout ||
1391           elapsedTimeSinceRegularRunMsec >= taskTimeoutMsec) {
1392         logInfoOnlyRun = false;
1393       } else {
1394         logInfoOnlyRun = true;
1395         continue;
1396       }
1397 
1398       // Did a worker die?
1399       try {
1400         deadWorkers.addAll(superstepChosenWorkerAlive(
1401                 workerInfoHealthyPath,
1402                 workerInfoList));
1403         if (!ignoreDeath && deadWorkers.size() > 0) {
1404           String errorMessage = "******* WORKERS " + deadWorkers +
1405               " FAILED *******";
1406           // If checkpointing is not used, we should fail the job
1407           if (!getConfiguration().useCheckpointing()) {
1408             setJobStateFailed(errorMessage);
1409           } else {
1410             LOG.error("barrierOnWorkerList: Missing chosen " +
1411                 "workers " + deadWorkers +
1412                 " on superstep " + getSuperstep());
1413             // Log worker failure to command line
1414             getGraphTaskManager().getJobProgressTracker().logInfo(errorMessage);
1415           }
1416           return false;
1417         }
1418       } catch (KeeperException e) {
1419         throw new IllegalStateException(
1420             "barrierOnWorkerList: KeeperException - " +
1421                 "Couldn't get " + workerInfoHealthyPath, e);
1422       } catch (InterruptedException e) {
1423         throw new IllegalStateException(
1424             "barrierOnWorkerList: InterruptedException - " +
1425                 "Couldn't get " + workerInfoHealthyPath, e);
1426       }
1427     }
1428 
1429     return true;
1430   }
1431 
1432   /**
1433    * Clean up old superstep data from Zookeeper
1434    *
1435    * @param removeableSuperstep Supersteo to clean up
1436    * @throws InterruptedException
1437    */
1438   private void cleanUpOldSuperstep(long removeableSuperstep) throws
1439       InterruptedException {
1440     if (KEEP_ZOOKEEPER_DATA.isFalse(getConfiguration()) &&
1441         (removeableSuperstep >= 0)) {
1442       String oldSuperstepPath =
1443           getSuperstepPath(getApplicationAttempt()) + "/" +
1444               removeableSuperstep;
1445       try {
1446         if (LOG.isInfoEnabled()) {
1447           LOG.info("coordinateSuperstep: Cleaning up old Superstep " +
1448               oldSuperstepPath);
1449         }
1450         getZkExt().deleteExt(oldSuperstepPath,
1451             -1,
1452             true);
1453       } catch (KeeperException.NoNodeException e) {
1454         LOG.warn("coordinateBarrier: Already cleaned up " +
1455             oldSuperstepPath);
1456       } catch (KeeperException e) {
1457         throw new IllegalStateException(
1458             "coordinateSuperstep: KeeperException on " +
1459                 "finalizing checkpoint", e);
1460       }
1461     }
1462   }
1463 
1464   /**
1465    * Coordinate the exchange of vertex/edge input splits among workers.
1466    */
1467   private void coordinateInputSplits() {
1468     // Coordinate the workers finishing sending their vertices/edges to the
1469     // correct workers and signal when everything is done.
1470     if (!barrierOnWorkerList(inputSplitsWorkerDonePath,
1471         chosenWorkerInfoList,
1472         getInputSplitsWorkerDoneEvent(),
1473         false)) {
1474       throw new IllegalStateException("coordinateInputSplits: Worker failed " +
1475           "during input split (currently not supported)");
1476     }
1477     try {
1478       getZkExt().createExt(inputSplitsAllDonePath,
1479           null,
1480           Ids.OPEN_ACL_UNSAFE,
1481           CreateMode.PERSISTENT,
1482           false);
1483     } catch (KeeperException.NodeExistsException e) {
1484       LOG.info("coordinateInputSplits: Node " +
1485           inputSplitsAllDonePath + " already exists.");
1486     } catch (KeeperException e) {
1487       throw new IllegalStateException(
1488           "coordinateInputSplits: KeeperException", e);
1489     } catch (InterruptedException e) {
1490       throw new IllegalStateException(
1491           "coordinateInputSplits: IllegalStateException", e);
1492     }
1493   }
1494 
1495   /**
1496    * Initialize aggregator at the master side
1497    * before vertex/edge loading.
1498    * This methods cooperates with other code
1499    * to enables aggregation usage at INPUT_SUPERSTEP
1500    * Other codes are:
1501    *  BSPServiceWorker:
1502    *  aggregatorHandler.prepareSuperstep in
1503    *  setup
1504    *  set aggregator usage in vertexReader and
1505    *  edgeReader
1506    *
1507    * @throws InterruptedException
1508    */
1509   private void initializeAggregatorInputSuperstep()
1510     throws InterruptedException {
1511     globalCommHandler.getAggregatorHandler().prepareSuperstep();
1512 
1513     prepareMasterCompute(getSuperstep());
1514     try {
1515       masterCompute.initialize();
1516     } catch (InstantiationException e) {
1517       LOG.fatal(
1518         "initializeAggregatorInputSuperstep: Failed in instantiation", e);
1519       throw new RuntimeException(
1520         "initializeAggregatorInputSuperstep: Failed in instantiation", e);
1521     } catch (IllegalAccessException e) {
1522       LOG.fatal("initializeAggregatorInputSuperstep: Failed in access", e);
1523       throw new RuntimeException(
1524         "initializeAggregatorInputSuperstep: Failed in access", e);
1525     }
1526     aggregatorTranslation.postMasterCompute();
1527     globalCommHandler.getAggregatorHandler().finishSuperstep();
1528 
1529     globalCommHandler.getAggregatorHandler().sendDataToOwners(masterClient);
1530   }
1531 
1532   /**
1533    * This is required before initialization
1534    * and run of MasterCompute
1535    *
1536    * @param superstep superstep for which to run masterCompute
1537    * @return Superstep classes set by masterCompute
1538    */
1539   private SuperstepClasses prepareMasterCompute(long superstep) {
1540     GraphState graphState = new GraphState(superstep ,
1541         GiraphStats.getInstance().getVertices().getValue(),
1542         GiraphStats.getInstance().getEdges().getValue(),
1543         getContext());
1544     SuperstepClasses superstepClasses =
1545         SuperstepClasses.createAndExtractTypes(getConfiguration());
1546     masterCompute.setGraphState(graphState);
1547     masterCompute.setSuperstepClasses(superstepClasses);
1548     return superstepClasses;
1549   }
1550 
1551   @Override
1552   public SuperstepState coordinateSuperstep() throws
1553   KeeperException, InterruptedException {
1554     // 1. Get chosen workers and set up watches on them.
1555     // 2. Assign partitions to the workers
1556     //    (possibly reloading from a superstep)
1557     // 3. Wait for all workers to complete
1558     // 4. Collect and process aggregators
1559     // 5. Create superstep finished node
1560     // 6. If the checkpoint frequency is met, finalize the checkpoint
1561 
1562     for (MasterObserver observer : observers) {
1563       observer.preSuperstep(getSuperstep());
1564       getContext().progress();
1565     }
1566 
1567     chosenWorkerInfoList = checkWorkers();
1568     if (chosenWorkerInfoList == null) {
1569       setJobStateFailed("coordinateSuperstep: Not enough healthy workers for " +
1570                     "superstep " + getSuperstep());
1571     } else {
1572       // Sort this list, so order stays the same over supersteps
1573       Collections.sort(chosenWorkerInfoList, new Comparator<WorkerInfo>() {
1574         @Override
1575         public int compare(WorkerInfo wi1, WorkerInfo wi2) {
1576           return Integer.compare(wi1.getTaskId(), wi2.getTaskId());
1577         }
1578       });
1579       for (WorkerInfo workerInfo : chosenWorkerInfoList) {
1580         String workerInfoHealthyPath =
1581             getWorkerInfoHealthyPath(getApplicationAttempt(),
1582                 getSuperstep()) + "/" +
1583                 workerInfo.getHostnameId();
1584         if (getZkExt().exists(workerInfoHealthyPath, true) == null) {
1585           LOG.warn("coordinateSuperstep: Chosen worker " +
1586               workerInfoHealthyPath +
1587               " is no longer valid, failing superstep");
1588         }
1589       }
1590     }
1591 
1592     // We need to finalize aggregators from previous superstep
1593     if (getSuperstep() >= 0) {
1594       aggregatorTranslation.postMasterCompute();
1595       globalCommHandler.getAggregatorHandler().finishSuperstep();
1596     }
1597 
1598     masterClient.openConnections();
1599 
1600     GiraphStats.getInstance().
1601         getCurrentWorkers().setValue(chosenWorkerInfoList.size());
1602     assignPartitionOwners();
1603 
1604     // Finalize the valid checkpoint file prefixes and possibly
1605     // the aggregators.
1606     if (checkpointStatus != CheckpointStatus.NONE) {
1607       String workerWroteCheckpointPath =
1608           getWorkerWroteCheckpointPath(getApplicationAttempt(),
1609               getSuperstep());
1610       // first wait for all the workers to write their checkpoint data
1611       if (!barrierOnWorkerList(workerWroteCheckpointPath,
1612           chosenWorkerInfoList,
1613           getWorkerWroteCheckpointEvent(),
1614           checkpointStatus == CheckpointStatus.CHECKPOINT_AND_HALT)) {
1615         return SuperstepState.WORKER_FAILURE;
1616       }
1617       try {
1618         finalizeCheckpoint(getSuperstep(), chosenWorkerInfoList);
1619       } catch (IOException e) {
1620         throw new IllegalStateException(
1621             "coordinateSuperstep: IOException on finalizing checkpoint",
1622             e);
1623       }
1624       if (checkpointStatus == CheckpointStatus.CHECKPOINT_AND_HALT) {
1625         return SuperstepState.CHECKPOINT_AND_HALT;
1626       }
1627     }
1628 
1629     // We need to send aggregators to worker owners after new worker assignments
1630     if (getSuperstep() >= 0) {
1631       globalCommHandler.getAggregatorHandler().sendDataToOwners(masterClient);
1632     }
1633 
1634     if (getSuperstep() == INPUT_SUPERSTEP) {
1635       // Initialize aggregators before coordinating
1636       initializeAggregatorInputSuperstep();
1637       coordinateInputSplits();
1638     }
1639 
1640     String finishedWorkerPath =
1641         getWorkerFinishedPath(getApplicationAttempt(), getSuperstep());
1642     if (!barrierOnWorkerList(finishedWorkerPath,
1643         chosenWorkerInfoList,
1644         getSuperstepStateChangedEvent(),
1645         false)) {
1646       return SuperstepState.WORKER_FAILURE;
1647     }
1648 
1649     // Collect aggregator values, then run the master.compute() and
1650     // finally save the aggregator values
1651     globalCommHandler.getAggregatorHandler().prepareSuperstep();
1652     aggregatorTranslation.prepareSuperstep();
1653 
1654     SuperstepClasses superstepClasses =
1655       prepareMasterCompute(getSuperstep() + 1);
1656     doMasterCompute();
1657 
1658     // If the master is halted or all the vertices voted to halt and there
1659     // are no more messages in the system, stop the computation
1660     GlobalStats globalStats = aggregateWorkerStats(getSuperstep());
1661     if (masterCompute.isHalted() ||
1662         (globalStats.getFinishedVertexCount() ==
1663         globalStats.getVertexCount() &&
1664         globalStats.getMessageCount() == 0)) {
1665       globalStats.setHaltComputation(true);
1666     } else if (getZkExt().exists(haltComputationPath, false) != null) {
1667       if (LOG.isInfoEnabled()) {
1668         LOG.info("Halting computation because halt zookeeper node was created");
1669       }
1670       globalStats.setHaltComputation(true);
1671     }
1672 
1673     // If we have completed the maximum number of supersteps, stop
1674     // the computation
1675     if (maxNumberOfSupersteps !=
1676         GiraphConstants.MAX_NUMBER_OF_SUPERSTEPS.getDefaultValue() &&
1677         (getSuperstep() == maxNumberOfSupersteps - 1)) {
1678       if (LOG.isInfoEnabled()) {
1679         LOG.info("coordinateSuperstep: Finished " + maxNumberOfSupersteps +
1680             " supersteps (max specified by the user), halting");
1681       }
1682       globalStats.setHaltComputation(true);
1683     }
1684 
1685     // Superstep 0 doesn't need to have matching types (Message types may not
1686     // match) and if the computation is halted, no need to check any of
1687     // the types.
1688     if (!globalStats.getHaltComputation()) {
1689       superstepClasses.verifyTypesMatch(getSuperstep() > 0);
1690     }
1691     getConfiguration().updateSuperstepClasses(superstepClasses);
1692 
1693     //Signal workers that we want to checkpoint
1694     checkpointStatus = getCheckpointStatus(getSuperstep() + 1);
1695     globalStats.setCheckpointStatus(checkpointStatus);
1696     // Let everyone know the aggregated application state through the
1697     // superstep finishing znode.
1698     String superstepFinishedNode =
1699         getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep());
1700 
1701     WritableUtils.writeToZnode(
1702         getZkExt(), superstepFinishedNode, -1, globalStats, superstepClasses);
1703     updateCounters(globalStats);
1704 
1705     cleanUpOldSuperstep(getSuperstep() - 1);
1706     incrCachedSuperstep();
1707     // Counter starts at zero, so no need to increment
1708     if (getSuperstep() > 0) {
1709       GiraphStats.getInstance().getSuperstepCounter().increment();
1710     }
1711     SuperstepState superstepState;
1712     if (globalStats.getHaltComputation()) {
1713       superstepState = SuperstepState.ALL_SUPERSTEPS_DONE;
1714     } else {
1715       superstepState = SuperstepState.THIS_SUPERSTEP_DONE;
1716     }
1717     globalCommHandler.getAggregatorHandler().writeAggregators(
1718         getSuperstep(), superstepState);
1719 
1720     return superstepState;
1721   }
1722 
1723   /**
1724    * Should checkpoint on this superstep?  If checkpointing, always
1725    * checkpoint the first user superstep.  If restarting, the first
1726    * checkpoint is after the frequency has been met.
1727    *
1728    * @param superstep Decide if checkpointing no this superstep
1729    * @return True if this superstep should be checkpointed, false otherwise
1730    */
1731   private CheckpointStatus getCheckpointStatus(long superstep) {
1732     try {
1733       if (getZkExt().
1734           exists(basePath + FORCE_CHECKPOINT_USER_FLAG, false) != null) {
1735         if (isCheckpointingSupported(getConfiguration(), masterCompute)) {
1736           return CheckpointStatus.CHECKPOINT_AND_HALT;
1737         } else {
1738           LOG.warn("Attempted to manually checkpoint the job that " +
1739               "does not support checkpoints. Ignoring");
1740         }
1741       }
1742     } catch (KeeperException e) {
1743       throw new IllegalStateException(
1744           "cleanupZooKeeper: Got KeeperException", e);
1745     } catch (InterruptedException e) {
1746       throw new IllegalStateException(
1747           "cleanupZooKeeper: Got IllegalStateException", e);
1748     }
1749     if (checkpointFrequency == 0) {
1750       return CheckpointStatus.NONE;
1751     }
1752     long firstCheckpoint = INPUT_SUPERSTEP + 1;
1753     if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
1754       firstCheckpoint = getRestartedSuperstep() + checkpointFrequency;
1755     }
1756     if (superstep < firstCheckpoint) {
1757       return CheckpointStatus.NONE;
1758     }
1759     if (((superstep - firstCheckpoint) % checkpointFrequency) == 0) {
1760       if (isCheckpointingSupported(getConfiguration(), masterCompute)) {
1761         return CheckpointStatus.CHECKPOINT;
1762       }
1763     }
1764     return CheckpointStatus.NONE;
1765   }
1766 
1767   /**
1768    * Returns false if job doesn't support checkpoints.
1769    * Job may not support checkpointing if it does output during
1770    * computation, uses static variables to keep data between supersteps,
1771    * starts new threads etc.
1772    * @param conf Immutable configuration of the job
1773    * @param masterCompute instance of master compute
1774    * @return true if it is safe to checkpoint the job
1775    */
1776   private boolean isCheckpointingSupported(
1777       GiraphConfiguration conf, MasterCompute masterCompute) {
1778     return checkpointSupportedChecker.isCheckpointSupported(
1779         conf, masterCompute);
1780   }
1781 
1782 
1783   /**
1784    * This doMasterCompute is only called
1785    * after masterCompute is initialized
1786    */
1787   private void doMasterCompute() {
1788     GiraphTimerContext timerContext = masterComputeTimer.time();
1789     masterCompute.compute();
1790     timerContext.stop();
1791   }
1792 
1793   /**
1794    * Need to clean up ZooKeeper nicely.  Make sure all the masters and workers
1795    * have reported ending their ZooKeeper connections.
1796    */
1797   private void cleanUpZooKeeper() {
1798     try {
1799       getZkExt().createExt(cleanedUpPath,
1800           null,
1801           Ids.OPEN_ACL_UNSAFE,
1802           CreateMode.PERSISTENT,
1803           true);
1804     } catch (KeeperException.NodeExistsException e) {
1805       if (LOG.isInfoEnabled()) {
1806         LOG.info("cleanUpZooKeeper: Node " + cleanedUpPath +
1807             " already exists, no need to create.");
1808       }
1809     } catch (KeeperException e) {
1810       throw new IllegalStateException(
1811           "cleanupZooKeeper: Got KeeperException", e);
1812     } catch (InterruptedException e) {
1813       throw new IllegalStateException(
1814           "cleanupZooKeeper: Got IllegalStateException", e);
1815     }
1816     // Need to wait for the number of workers and masters to complete
1817     int maxTasks = BspInputFormat.getMaxTasks(getConfiguration());
1818     if ((getGraphTaskManager().getGraphFunctions() == GraphFunctions.ALL) ||
1819         (getGraphTaskManager().getGraphFunctions() ==
1820         GraphFunctions.ALL_EXCEPT_ZOOKEEPER)) {
1821       maxTasks *= 2;
1822     }
1823     List<String> cleanedUpChildrenList = null;
1824     while (true) {
1825       try {
1826         cleanedUpChildrenList =
1827             getZkExt().getChildrenExt(
1828                 cleanedUpPath, true, false, true);
1829         if (LOG.isInfoEnabled()) {
1830           LOG.info("cleanUpZooKeeper: Got " +
1831               cleanedUpChildrenList.size() + " of " +
1832               maxTasks  +  " desired children from " +
1833               cleanedUpPath);
1834         }
1835         if (cleanedUpChildrenList.size() == maxTasks) {
1836           break;
1837         }
1838         if (LOG.isInfoEnabled()) {
1839           LOG.info("cleanedUpZooKeeper: Waiting for the " +
1840               "children of " + cleanedUpPath +
1841               " to change since only got " +
1842               cleanedUpChildrenList.size() + " nodes.");
1843         }
1844       } catch (KeeperException e) {
1845         // We are in the cleanup phase -- just log the error
1846         LOG.error("cleanUpZooKeeper: Got KeeperException, " +
1847             "but will continue", e);
1848         return;
1849       } catch (InterruptedException e) {
1850         // We are in the cleanup phase -- just log the error
1851         LOG.error("cleanUpZooKeeper: Got InterruptedException, " +
1852             "but will continue", e);
1853         return;
1854       }
1855 
1856       getCleanedUpChildrenChangedEvent().waitForTimeoutOrFail(
1857           GiraphConstants.WAIT_FOR_OTHER_WORKERS_TIMEOUT_MSEC.get(
1858               getConfiguration()));
1859       getCleanedUpChildrenChangedEvent().reset();
1860     }
1861 
1862     // At this point, all processes have acknowledged the cleanup,
1863     // and the master can do any final cleanup if the ZooKeeper service was
1864     // provided (not dynamically started) and we don't want to keep the data
1865     try {
1866       if (getConfiguration().isZookeeperExternal() &&
1867           KEEP_ZOOKEEPER_DATA.isFalse(getConfiguration())) {
1868         if (LOG.isInfoEnabled()) {
1869           LOG.info("cleanupZooKeeper: Removing the following path " +
1870               "and all children - " + basePath + " from ZooKeeper list " +
1871               getConfiguration().getZookeeperList());
1872         }
1873         getZkExt().deleteExt(basePath, -1, true);
1874       }
1875     } catch (KeeperException e) {
1876       LOG.error("cleanupZooKeeper: Failed to do cleanup of " +
1877           basePath + " due to KeeperException", e);
1878     } catch (InterruptedException e) {
1879       LOG.error("cleanupZooKeeper: Failed to do cleanup of " +
1880           basePath + " due to InterruptedException", e);
1881     }
1882   }
1883 
1884   @Override
1885   public void postApplication() {
1886     for (MasterObserver observer : observers) {
1887       observer.postApplication();
1888       getContext().progress();
1889     }
1890   }
1891 
1892   @Override
1893   public void postSuperstep() {
1894     for (MasterObserver observer : observers) {
1895       observer.postSuperstep(getSuperstep());
1896       getContext().progress();
1897     }
1898   }
1899 
1900   @Override
1901   public void failureCleanup(Exception e) {
1902     for (MasterObserver observer : observers) {
1903       try {
1904         observer.applicationFailed(e);
1905         // CHECKSTYLE: stop IllegalCatchCheck
1906       } catch (RuntimeException re) {
1907         // CHECKSTYLE: resume IllegalCatchCheck
1908         LOG.error(re.getClass().getName() + " from observer " +
1909             observer.getClass().getName(), re);
1910       }
1911       getContext().progress();
1912     }
1913   }
1914 
1915   @Override
1916   public void cleanup(SuperstepState superstepState) throws IOException {
1917     ImmutableClassesGiraphConfiguration conf = getConfiguration();
1918 
1919     // All master processes should denote they are done by adding special
1920     // znode.  Once the number of znodes equals the number of partitions
1921     // for workers and masters, the master will clean up the ZooKeeper
1922     // znodes associated with this job.
1923     String masterCleanedUpPath = cleanedUpPath  + "/" +
1924         getTaskId() + MASTER_SUFFIX;
1925     try {
1926       String finalFinishedPath =
1927           getZkExt().createExt(masterCleanedUpPath,
1928               null,
1929               Ids.OPEN_ACL_UNSAFE,
1930               CreateMode.PERSISTENT,
1931               true);
1932       if (LOG.isInfoEnabled()) {
1933         LOG.info("cleanup: Notifying master its okay to cleanup with " +
1934             finalFinishedPath);
1935       }
1936     } catch (KeeperException.NodeExistsException e) {
1937       if (LOG.isInfoEnabled()) {
1938         LOG.info("cleanup: Couldn't create finished node '" +
1939             masterCleanedUpPath);
1940       }
1941     } catch (KeeperException e) {
1942       LOG.error("cleanup: Got KeeperException, continuing", e);
1943     } catch (InterruptedException e) {
1944       LOG.error("cleanup: Got InterruptedException, continuing", e);
1945     }
1946 
1947     if (isMaster) {
1948       getGraphTaskManager().setIsMaster(true);
1949       cleanUpZooKeeper();
1950       // If desired, cleanup the checkpoint directory
1951       if (superstepState == SuperstepState.ALL_SUPERSTEPS_DONE &&
1952           GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.get(conf)) {
1953         boolean success =
1954             getFs().delete(new Path(checkpointBasePath), true);
1955         if (LOG.isInfoEnabled()) {
1956           LOG.info("cleanup: Removed HDFS checkpoint directory (" +
1957               checkpointBasePath + ") with return = " +
1958               success + " since the job " + getContext().getJobName() +
1959               " succeeded ");
1960         }
1961       }
1962       if (superstepState == SuperstepState.CHECKPOINT_AND_HALT) {
1963         getFs().create(CheckpointingUtils.getCheckpointMarkPath(conf,
1964             getJobId()), true);
1965         failJob(new Exception("Checkpoint and halt requested. " +
1966             "Killing this job."));
1967       }
1968       globalCommHandler.getAggregatorHandler().close();
1969       masterClient.closeConnections();
1970       masterServer.close();
1971     }
1972 
1973     try {
1974       getZkExt().close();
1975     } catch (InterruptedException e) {
1976       // cleanup phase -- just log the error
1977       LOG.error("cleanup: Zookeeper failed to close", e);
1978     }
1979   }
1980 
1981   /**
1982    * Event that the master watches that denotes when a worker wrote checkpoint
1983    *
1984    * @return Event that denotes when a worker wrote checkpoint
1985    */
1986   public final BspEvent getWorkerWroteCheckpointEvent() {
1987     return workerWroteCheckpoint;
1988   }
1989 
1990   /**
1991    * Event that the master watches that denotes if a worker has done something
1992    * that changes the state of a superstep (either a worker completed or died)
1993    *
1994    * @return Event that denotes a superstep state change
1995    */
1996   public final BspEvent getSuperstepStateChangedEvent() {
1997     return superstepStateChanged;
1998   }
1999 
2000   /**
2001    * Should this worker failure cause the current superstep to fail?
2002    *
2003    * @param failedWorkerPath Full path to the failed worker
2004    */
2005   private void checkHealthyWorkerFailure(String failedWorkerPath) {
2006     if (getSuperstepFromPath(failedWorkerPath) < getSuperstep()) {
2007       return;
2008     }
2009 
2010     Collection<PartitionOwner> partitionOwners =
2011         masterGraphPartitioner.getCurrentPartitionOwners();
2012     String hostnameId =
2013         getHealthyHostnameIdFromPath(failedWorkerPath);
2014     for (PartitionOwner partitionOwner : partitionOwners) {
2015       WorkerInfo workerInfo = partitionOwner.getWorkerInfo();
2016       WorkerInfo previousWorkerInfo =
2017           partitionOwner.getPreviousWorkerInfo();
2018       if (workerInfo.getHostnameId().equals(hostnameId) ||
2019           ((previousWorkerInfo != null) &&
2020               previousWorkerInfo.getHostnameId().equals(hostnameId))) {
2021         LOG.warn("checkHealthyWorkerFailure: " +
2022             "at least one healthy worker went down " +
2023             "for superstep " + getSuperstep() + " - " +
2024             hostnameId + ", will try to restart from " +
2025             "checkpointed superstep " +
2026             lastCheckpointedSuperstep);
2027         superstepStateChanged.signal();
2028       }
2029     }
2030   }
2031 
2032   @Override
2033   public boolean processEvent(WatchedEvent event) {
2034     boolean foundEvent = false;
2035     if (event.getPath().contains(WORKER_HEALTHY_DIR) &&
2036         (event.getType() == EventType.NodeDeleted)) {
2037       if (LOG.isDebugEnabled()) {
2038         LOG.debug("processEvent: Healthy worker died (node deleted) " +
2039             "in " + event.getPath());
2040       }
2041       checkHealthyWorkerFailure(event.getPath());
2042       superstepStateChanged.signal();
2043       foundEvent = true;
2044     } else if (event.getPath().contains(WORKER_FINISHED_DIR) &&
2045         event.getType() == EventType.NodeChildrenChanged) {
2046       if (LOG.isDebugEnabled()) {
2047         LOG.debug("processEvent: Worker finished (node change) " +
2048             "event - superstepStateChanged signaled");
2049       }
2050       superstepStateChanged.signal();
2051       foundEvent = true;
2052     } else if (event.getPath().contains(WORKER_WROTE_CHECKPOINT_DIR) &&
2053         event.getType() == EventType.NodeChildrenChanged) {
2054       if (LOG.isDebugEnabled()) {
2055         LOG.debug("processEvent: Worker wrote checkpoint (node change) " +
2056             "event - workerWroteCheckpoint signaled");
2057       }
2058       workerWroteCheckpoint.signal();
2059       foundEvent = true;
2060     }
2061 
2062     return foundEvent;
2063   }
2064 
2065   /**
2066    * Set values of counters to match the ones from {@link GlobalStats}
2067    *
2068    * @param globalStats Global statistics which holds new counter values
2069    */
2070   private void updateCounters(GlobalStats globalStats) {
2071     GiraphStats gs = GiraphStats.getInstance();
2072     gs.getVertices().setValue(globalStats.getVertexCount());
2073     gs.getFinishedVertexes().setValue(globalStats.getFinishedVertexCount());
2074     gs.getEdges().setValue(globalStats.getEdgeCount());
2075     gs.getSentMessages().setValue(globalStats.getMessageCount());
2076     gs.getSentMessageBytes().setValue(globalStats.getMessageBytesCount());
2077     gs.getAggregateSentMessages().increment(globalStats.getMessageCount());
2078     gs.getAggregateSentMessageBytes()
2079       .increment(globalStats.getMessageBytesCount());
2080     gs.getAggregateOOCBytesLoaded()
2081       .increment(globalStats.getOocLoadBytesCount());
2082     gs.getAggregateOOCBytesStored()
2083       .increment(globalStats.getOocStoreBytesCount());
2084     // Updating the lowest percentage of graph in memory throughout the
2085     // execution across all the supersteps
2086     int percentage = (int) gs.getLowestGraphPercentageInMemory().getValue();
2087     gs.getLowestGraphPercentageInMemory().setValue(
2088         Math.min(percentage, globalStats.getLowestGraphPercentageInMemory()));
2089   }
2090 }