This project has retired. For details please refer to its Attic page.
BspServiceMaster xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.master;
20  
21  import com.google.common.collect.Lists;
22  import com.google.common.collect.Sets;
23  import net.iharder.Base64;
24  import org.apache.commons.io.FilenameUtils;
25  import org.apache.giraph.bsp.ApplicationState;
26  import org.apache.giraph.bsp.BspInputFormat;
27  import org.apache.giraph.bsp.BspService;
28  import org.apache.giraph.bsp.CentralizedServiceMaster;
29  import org.apache.giraph.bsp.SuperstepState;
30  import org.apache.giraph.bsp.checkpoints.CheckpointStatus;
31  import org.apache.giraph.bsp.checkpoints.CheckpointSupportedChecker;
32  import org.apache.giraph.comm.MasterClient;
33  import org.apache.giraph.comm.MasterServer;
34  import org.apache.giraph.comm.netty.NettyClient;
35  import org.apache.giraph.comm.netty.NettyMasterClient;
36  import org.apache.giraph.comm.netty.NettyMasterServer;
37  import org.apache.giraph.comm.requests.AddressesAndPartitionsRequest;
38  import org.apache.giraph.conf.GiraphConfiguration;
39  import org.apache.giraph.conf.GiraphConstants;
40  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
41  import org.apache.giraph.counters.CustomCounter;
42  import org.apache.giraph.counters.CustomCounters;
43  import org.apache.giraph.counters.GiraphCountersThriftStruct;
44  import org.apache.giraph.counters.GiraphStats;
45  import org.apache.giraph.counters.GiraphTimers;
46  import org.apache.giraph.graph.AddressesAndPartitionsWritable;
47  import org.apache.giraph.graph.GlobalStats;
48  import org.apache.giraph.graph.GraphFunctions;
49  import org.apache.giraph.graph.GraphState;
50  import org.apache.giraph.graph.GraphTaskManager;
51  import org.apache.giraph.io.EdgeInputFormat;
52  import org.apache.giraph.io.GiraphInputFormat;
53  import org.apache.giraph.io.InputType;
54  import org.apache.giraph.io.MappingInputFormat;
55  import org.apache.giraph.io.VertexInputFormat;
56  import org.apache.giraph.master.input.MasterInputSplitsHandler;
57  import org.apache.giraph.metrics.AggregatedMetrics;
58  import org.apache.giraph.metrics.GiraphMetrics;
59  import org.apache.giraph.metrics.GiraphTimer;
60  import org.apache.giraph.metrics.GiraphTimerContext;
61  import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
62  import org.apache.giraph.metrics.SuperstepMetricsRegistry;
63  import org.apache.giraph.metrics.WorkerSuperstepMetrics;
64  import org.apache.giraph.partition.BasicPartitionOwner;
65  import org.apache.giraph.partition.MasterGraphPartitioner;
66  import org.apache.giraph.partition.PartitionOwner;
67  import org.apache.giraph.partition.PartitionStats;
68  import org.apache.giraph.partition.PartitionUtils;
69  import org.apache.giraph.time.SystemTime;
70  import org.apache.giraph.time.Time;
71  import org.apache.giraph.utils.CheckpointingUtils;
72  import org.apache.giraph.utils.JMapHistoDumper;
73  import org.apache.giraph.utils.ReactiveJMapHistoDumper;
74  import org.apache.giraph.utils.ReflectionUtils;
75  import org.apache.giraph.utils.WritableUtils;
76  import org.apache.giraph.worker.WorkerInfo;
77  import org.apache.giraph.zk.BspEvent;
78  import org.apache.giraph.zk.PredicateLock;
79  import org.apache.hadoop.fs.FSDataOutputStream;
80  import org.apache.hadoop.fs.FileSystem;
81  import org.apache.hadoop.fs.Path;
82  import org.apache.hadoop.io.Writable;
83  import org.apache.hadoop.io.WritableComparable;
84  import org.apache.hadoop.mapred.JobID;
85  import org.apache.hadoop.mapred.RunningJob;
86  import org.apache.hadoop.mapreduce.Counter;
87  import org.apache.hadoop.mapreduce.InputSplit;
88  import org.apache.hadoop.mapreduce.Mapper;
89  import org.apache.log4j.Logger;
90  import org.apache.zookeeper.CreateMode;
91  import org.apache.zookeeper.KeeperException;
92  import org.apache.zookeeper.WatchedEvent;
93  import org.apache.zookeeper.Watcher.Event.EventType;
94  import org.apache.zookeeper.ZooDefs.Ids;
95  import org.json.JSONArray;
96  import org.json.JSONException;
97  import org.json.JSONObject;
98  
99  import java.io.DataInputStream;
100 import java.io.IOException;
101 import java.io.PrintStream;
102 import java.nio.charset.Charset;
103 import java.util.ArrayList;
104 import java.util.Collection;
105 import java.util.Collections;
106 import java.util.Comparator;
107 import java.util.HashSet;
108 import java.util.List;
109 import java.util.Map;
110 import java.util.Set;
111 import java.util.TreeSet;
112 import java.util.concurrent.TimeUnit;
113 
114 import static org.apache.giraph.conf.GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT;
115 import static org.apache.giraph.conf.GiraphConstants.KEEP_ZOOKEEPER_DATA;
116 import static org.apache.giraph.conf.GiraphConstants.PARTITION_LONG_TAIL_MIN_PRINT;
117 
118 /**
119  * ZooKeeper-based implementation of {@link CentralizedServiceMaster}.
120  *
121  * @param <I> Vertex id
122  * @param <V> Vertex data
123  * @param <E> Edge data
124  */
125 @SuppressWarnings("rawtypes, unchecked")
126 public class BspServiceMaster<I extends WritableComparable,
127     V extends Writable, E extends Writable>
128     extends BspService<I, V, E>
129     implements CentralizedServiceMaster<I, V, E>,
130     ResetSuperstepMetricsObserver {
131   /** Print worker names only if there are 10 workers left */
132   public static final int MAX_PRINTABLE_REMAINING_WORKERS = 10;
133   /** How many threads to use when writing input splits to zookeeper*/
134   public static final String NUM_MASTER_ZK_INPUT_SPLIT_THREADS =
135       "giraph.numMasterZkInputSplitThreads";
136   /** Default number of threads to use when writing input splits to zookeeper */
137   public static final int DEFAULT_INPUT_SPLIT_THREAD_COUNT = 1;
138   /** Time instance to use for timing */
139   private static final Time TIME = SystemTime.get();
140   /** Class logger */
141   private static final Logger LOG = Logger.getLogger(BspServiceMaster.class);
142   /** Am I the master? */
143   private boolean isMaster = false;
144   /** Max number of workers */
145   private final int maxWorkers;
146   /** Min number of workers */
147   private final int minWorkers;
148   /** Max number of supersteps */
149   private final int maxNumberOfSupersteps;
150   /** Min % responded workers */
151   private final float minPercentResponded;
152   /** Msecs to wait for an event */
153   private final int eventWaitMsecs;
154   /** Max msecs to wait for a superstep to get enough workers */
155   private final int maxSuperstepWaitMsecs;
156   /** Max msecs to wait for the workers to write their counters for a
157    * superstep*/
158   private final int maxCounterWaitMsecs;
159   /** Min number of long tails before printing */
160   private final int partitionLongTailMinPrint;
161   /** Last finalized checkpoint */
162   private long lastCheckpointedSuperstep = -1;
163   /** Worker wrote checkpoint */
164   private final BspEvent workerWroteCheckpoint;
165   /** State of the superstep changed */
166   private final BspEvent superstepStateChanged;
167   /** Master graph partitioner */
168   private final MasterGraphPartitioner<I, V, E> masterGraphPartitioner;
169   /** All the partition stats from the last superstep */
170   private final List<PartitionStats> allPartitionStatsList =
171       new ArrayList<PartitionStats>();
172   /** Handler for global communication */
173   private MasterGlobalCommHandler globalCommHandler;
174   /** Handler for aggregators to reduce/broadcast translation */
175   private AggregatorToGlobalCommTranslation aggregatorTranslation;
176   /** Master class */
177   private MasterCompute masterCompute;
178   /** IPC Client */
179   private MasterClient masterClient;
180   /** IPC Server */
181   private MasterServer masterServer;
182   /** Master info */
183   private MasterInfo masterInfo;
184   /** List of workers in current superstep, sorted by task id */
185   private List<WorkerInfo> chosenWorkerInfoList = Lists.newArrayList();
186   /** Observers over master lifecycle. */
187   private final MasterObserver[] observers;
188 
189   // Per-Superstep Metrics
190   /** MasterCompute time */
191   private GiraphTimer masterComputeTimer;
192 
193   /** Checkpoint frequency */
194   private final int checkpointFrequency;
195   /** Current checkpoint status */
196   private CheckpointStatus checkpointStatus;
197   /** Checks if checkpointing supported */
198   private final CheckpointSupportedChecker checkpointSupportedChecker;
199   /** Thrift struct to store the aggregate counters */
200   private final GiraphCountersThriftStruct giraphCountersThriftStruct;
201 
202   /**
203    * Constructor for setting up the master.
204    *
205    * @param context Mapper context
206    * @param graphTaskManager GraphTaskManager for this compute node
207    */
208   public BspServiceMaster(
209       Mapper<?, ?, ?, ?>.Context context,
210       GraphTaskManager<I, V, E> graphTaskManager) {
211     super(context, graphTaskManager);
212     workerWroteCheckpoint = new PredicateLock(context);
213     registerBspEvent(workerWroteCheckpoint);
214     superstepStateChanged = new PredicateLock(context);
215     registerBspEvent(superstepStateChanged);
216 
217     ImmutableClassesGiraphConfiguration<I, V, E> conf =
218         getConfiguration();
219 
220     maxWorkers = conf.getMaxWorkers();
221     minWorkers = conf.getMinWorkers();
222     maxNumberOfSupersteps = conf.getMaxNumberOfSupersteps();
223     minPercentResponded = GiraphConstants.MIN_PERCENT_RESPONDED.get(conf);
224     eventWaitMsecs = conf.getEventWaitMsecs();
225     maxSuperstepWaitMsecs = conf.getMaxMasterSuperstepWaitMsecs();
226     maxCounterWaitMsecs = conf.getMaxCounterWaitMsecs();
227     partitionLongTailMinPrint = PARTITION_LONG_TAIL_MIN_PRINT.get(conf);
228     masterGraphPartitioner =
229         getGraphPartitionerFactory().createMasterGraphPartitioner();
230     if (conf.isJMapHistogramDumpEnabled()) {
231       conf.addMasterObserverClass(JMapHistoDumper.class);
232     }
233     if (conf.isReactiveJmapHistogramDumpEnabled()) {
234       conf.addMasterObserverClass(ReactiveJMapHistoDumper.class);
235     }
236     observers = conf.createMasterObservers(context);
237 
238     this.checkpointFrequency = conf.getCheckpointFrequency();
239     this.checkpointStatus = CheckpointStatus.NONE;
240     this.checkpointSupportedChecker =
241         ReflectionUtils.newInstance(
242             GiraphConstants.CHECKPOINT_SUPPORTED_CHECKER.get(conf));
243     this.giraphCountersThriftStruct = new GiraphCountersThriftStruct();
244 
245     GiraphMetrics.get().addSuperstepResetObserver(this);
246     GiraphStats.init(context);
247   }
248 
249   @Override
250   public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
251     masterComputeTimer = new GiraphTimer(superstepMetrics,
252         "master-compute-call", TimeUnit.MILLISECONDS);
253   }
254 
255   @Override
256   public void setJobState(ApplicationState state,
257       long applicationAttempt,
258       long desiredSuperstep) {
259     setJobState(state, applicationAttempt, desiredSuperstep, true);
260   }
261 
262   /**
263    * Set the job state.
264    *
265    * @param state State of the application.
266    * @param applicationAttempt Attempt to start on
267    * @param desiredSuperstep Superstep to restart from (if applicable)
268    * @param killJobOnFailure if true, and the desired state is FAILED,
269    *                         then kill this job.
270    */
271   private void setJobState(ApplicationState state,
272       long applicationAttempt,
273       long desiredSuperstep,
274       boolean killJobOnFailure) {
275     JSONObject jobState = new JSONObject();
276     try {
277       jobState.put(JSONOBJ_STATE_KEY, state.toString());
278       jobState.put(JSONOBJ_APPLICATION_ATTEMPT_KEY, applicationAttempt);
279       jobState.put(JSONOBJ_SUPERSTEP_KEY, desiredSuperstep);
280     } catch (JSONException e) {
281       throw new RuntimeException("setJobState: Couldn't put " +
282           state.toString());
283     }
284     if (LOG.isInfoEnabled()) {
285       LOG.info("setJobState: " + jobState.toString() + " on superstep " +
286           getSuperstep());
287     }
288     try {
289       getZkExt().createExt(masterJobStatePath + "/jobState",
290           jobState.toString().getBytes(Charset.defaultCharset()),
291           Ids.OPEN_ACL_UNSAFE,
292           CreateMode.PERSISTENT_SEQUENTIAL,
293           true);
294       LOG.info("setJobState: " + jobState);
295     } catch (KeeperException.NodeExistsException e) {
296       throw new IllegalStateException(
297           "setJobState: Imposible that " +
298               masterJobStatePath + " already exists!", e);
299     } catch (KeeperException e) {
300       throw new IllegalStateException(
301           "setJobState: Unknown KeeperException for " +
302               masterJobStatePath, e);
303     } catch (InterruptedException e) {
304       throw new IllegalStateException(
305           "setJobState: Unknown InterruptedException for " +
306               masterJobStatePath, e);
307     }
308     if (state == ApplicationState.FAILED && killJobOnFailure) {
309       failJob(new IllegalStateException("FAILED"));
310     }
311 
312   }
313 
314   /**
315    * Set the job state to FAILED. This will kill the job, and log exceptions to
316    * any observers.
317    *
318    * @param reason The reason the job failed
319    */
320   private void setJobStateFailed(String reason) {
321     getGraphTaskManager().getJobProgressTracker().logFailure(reason);
322     setJobState(ApplicationState.FAILED, -1, -1, false);
323     failJob(new IllegalStateException(reason));
324   }
325 
326   /**
327    * Common method for generating vertex/edge input splits.
328    *
329    * @param inputFormat The vertex/edge input format
330    * @param minSplitCountHint Minimum number of splits to create (hint)
331    * @param inputSplitType Type of input splits (for logging purposes)
332    * @return List of input splits for the given format
333    */
334   private List<InputSplit> generateInputSplits(GiraphInputFormat inputFormat,
335                                                int minSplitCountHint,
336                                                InputType inputSplitType) {
337     String logPrefix = "generate" + inputSplitType + "InputSplits";
338     List<InputSplit> splits;
339     try {
340       splits = inputFormat.getSplits(getContext(), minSplitCountHint);
341     } catch (IOException e) {
342       throw new IllegalStateException(logPrefix + ": Got IOException", e);
343     } catch (InterruptedException e) {
344       throw new IllegalStateException(
345           logPrefix + ": Got InterruptedException", e);
346     }
347     float samplePercent =
348         INPUT_SPLIT_SAMPLE_PERCENT.get(getConfiguration());
349     if (samplePercent != INPUT_SPLIT_SAMPLE_PERCENT.getDefaultValue()) {
350       int lastIndex = (int) (samplePercent * splits.size() / 100f);
351       Collections.shuffle(splits);
352       List<InputSplit> sampleSplits = splits.subList(0, lastIndex);
353       LOG.warn(logPrefix + ": Using sampling - Processing only " +
354           sampleSplits.size() + " instead of " + splits.size() +
355           " expected splits.");
356       return sampleSplits;
357     } else {
358       if (LOG.isInfoEnabled()) {
359         LOG.info(logPrefix + ": Got " + splits.size() +
360             " input splits for " + minSplitCountHint + " input threads");
361       }
362       return splits;
363     }
364   }
365 
366   /**
367    * When there is no salvaging this job, fail it.
368    *
369    * @param e Exception to log to observers
370    */
371   private void failJob(Exception e) {
372     LOG.fatal("failJob: Killing job " + getJobId());
373     LOG.fatal("failJob: exception " + e.toString());
374     try {
375       if (getConfiguration().isPureYarnJob()) {
376         throw new RuntimeException(
377           "BspServiceMaster (YARN profile) is " +
378           "FAILING this task, throwing exception to end job run.", e);
379       } else {
380         @SuppressWarnings("deprecation")
381         org.apache.hadoop.mapred.JobClient jobClient =
382           new org.apache.hadoop.mapred.JobClient(
383             (org.apache.hadoop.mapred.JobConf)
384             getContext().getConfiguration());
385         try {
386           @SuppressWarnings("deprecation")
387           JobID jobId = JobID.forName(getJobId());
388           RunningJob job = jobClient.getJob(jobId);
389           if (job != null) {
390             job.killJob();
391           } else {
392             LOG.error("Job not found for jobId=" + getJobId());
393           }
394         } catch (IllegalArgumentException iae) {
395           LOG.info("This job (" + getJobId() +
396                        ") is not a legacy Hadoop job and will " +
397                        "continue with failure cleanup." +
398                        e.getMessage(),
399                    e);
400         }
401       }
402     } catch (IOException ioe) {
403       throw new RuntimeException(ioe);
404     } finally {
405       failureCleanup(e);
406     }
407   }
408 
409   /**
410    * Parse the {@link WorkerInfo} objects from a ZooKeeper path
411    * (and children).
412    *
413    * @param workerInfosPath Path where all the workers are children
414    * @param watch Watch or not?
415    * @return List of workers in that path
416    */
417   private List<WorkerInfo> getWorkerInfosFromPath(String workerInfosPath,
418       boolean watch) {
419     List<WorkerInfo> workerInfoList = new ArrayList<WorkerInfo>();
420     List<String> workerInfoPathList;
421     try {
422       workerInfoPathList =
423           getZkExt().getChildrenExt(workerInfosPath, watch, false, true);
424     } catch (KeeperException e) {
425       throw new IllegalStateException(
426           "getWorkers: Got KeeperException", e);
427     } catch (InterruptedException e) {
428       throw new IllegalStateException(
429           "getWorkers: Got InterruptedStateException", e);
430     }
431     for (String workerInfoPath : workerInfoPathList) {
432       WorkerInfo workerInfo = new WorkerInfo();
433       try {
434         WritableUtils.readFieldsFromZnode(
435             getZkExt(), workerInfoPath, true, null, workerInfo);
436         workerInfoList.add(workerInfo);
437       } catch (IllegalStateException e) {
438         LOG.warn("Can't get info from worker, did it die in between? " +
439             "workerInfoPath=" + workerInfoPath, e);
440       }
441     }
442     return workerInfoList;
443   }
444 
445   /**
446    * Get the healthy and unhealthy {@link WorkerInfo} objects for
447    * a superstep
448    *
449    * @param superstep superstep to check
450    * @param healthyWorkerInfoList filled in with current data
451    * @param unhealthyWorkerInfoList filled in with current data
452    */
453   private void getAllWorkerInfos(
454       long superstep,
455       List<WorkerInfo> healthyWorkerInfoList,
456       List<WorkerInfo> unhealthyWorkerInfoList) {
457     String healthyWorkerInfoPath =
458         getWorkerInfoHealthyPath(getApplicationAttempt(), superstep);
459     String unhealthyWorkerInfoPath =
460         getWorkerInfoUnhealthyPath(getApplicationAttempt(), superstep);
461 
462     try {
463       getZkExt().createOnceExt(healthyWorkerInfoPath,
464           null,
465           Ids.OPEN_ACL_UNSAFE,
466           CreateMode.PERSISTENT,
467           true);
468     } catch (KeeperException e) {
469       throw new IllegalStateException("getWorkers: KeeperException", e);
470     } catch (InterruptedException e) {
471       throw new IllegalStateException("getWorkers: InterruptedException", e);
472     }
473 
474     try {
475       getZkExt().createOnceExt(unhealthyWorkerInfoPath,
476           null,
477           Ids.OPEN_ACL_UNSAFE,
478           CreateMode.PERSISTENT,
479           true);
480     } catch (KeeperException e) {
481       throw new IllegalStateException("getWorkers: KeeperException", e);
482     } catch (InterruptedException e) {
483       throw new IllegalStateException("getWorkers: InterruptedException", e);
484     }
485 
486     List<WorkerInfo> currentHealthyWorkerInfoList =
487         getWorkerInfosFromPath(healthyWorkerInfoPath, true);
488     List<WorkerInfo> currentUnhealthyWorkerInfoList =
489         getWorkerInfosFromPath(unhealthyWorkerInfoPath, false);
490 
491     healthyWorkerInfoList.clear();
492     if (currentHealthyWorkerInfoList != null) {
493       for (WorkerInfo healthyWorkerInfo :
494         currentHealthyWorkerInfoList) {
495         healthyWorkerInfoList.add(healthyWorkerInfo);
496       }
497     }
498 
499     unhealthyWorkerInfoList.clear();
500     if (currentUnhealthyWorkerInfoList != null) {
501       for (WorkerInfo unhealthyWorkerInfo :
502         currentUnhealthyWorkerInfoList) {
503         unhealthyWorkerInfoList.add(unhealthyWorkerInfo);
504       }
505     }
506   }
507 
508   @Override
509   public List<WorkerInfo> checkWorkers() {
510     boolean failJob = true;
511     long failWorkerCheckMsecs =
512         SystemTime.get().getMilliseconds() + maxSuperstepWaitMsecs;
513     List<WorkerInfo> healthyWorkerInfoList = new ArrayList<WorkerInfo>();
514     List<WorkerInfo> unhealthyWorkerInfoList = new ArrayList<WorkerInfo>();
515     int totalResponses = -1;
516     while (SystemTime.get().getMilliseconds() < failWorkerCheckMsecs) {
517       getContext().progress();
518       getAllWorkerInfos(
519           getSuperstep(), healthyWorkerInfoList, unhealthyWorkerInfoList);
520       totalResponses = healthyWorkerInfoList.size() +
521           unhealthyWorkerInfoList.size();
522       if ((totalResponses * 100.0f / maxWorkers) >=
523           minPercentResponded) {
524         failJob = false;
525         break;
526       }
527       getContext().setStatus(getGraphTaskManager().getGraphFunctions() + " " +
528           "checkWorkers: Only found " +
529           totalResponses +
530           " responses of " + maxWorkers +
531           " needed to start superstep " +
532           getSuperstep());
533       if (getWorkerHealthRegistrationChangedEvent().waitMsecs(
534           eventWaitMsecs)) {
535         if (LOG.isDebugEnabled()) {
536           LOG.debug("checkWorkers: Got event that health " +
537               "registration changed, not using poll attempt");
538         }
539         getWorkerHealthRegistrationChangedEvent().reset();
540         continue;
541       }
542       if (LOG.isInfoEnabled()) {
543         LOG.info("checkWorkers: Only found " + totalResponses +
544             " responses of " + maxWorkers +
545             " needed to start superstep " +
546             getSuperstep() + ".  Reporting every " +
547             eventWaitMsecs + " msecs, " +
548             (failWorkerCheckMsecs - SystemTime.get().getMilliseconds()) +
549             " more msecs left before giving up.");
550         // Find the missing workers if there are only a few
551         if ((maxWorkers - totalResponses) <=
552             partitionLongTailMinPrint) {
553           logMissingWorkersOnSuperstep(healthyWorkerInfoList,
554               unhealthyWorkerInfoList);
555         }
556       }
557     }
558     if (failJob) {
559       LOG.error("checkWorkers: Did not receive enough processes in " +
560           "time (only " + totalResponses + " of " +
561           minWorkers + " required) after waiting " + maxSuperstepWaitMsecs +
562           "msecs).  This occurs if you do not have enough map tasks " +
563           "available simultaneously on your Hadoop instance to fulfill " +
564           "the number of requested workers.");
565       return null;
566     }
567 
568     if (healthyWorkerInfoList.size() < minWorkers) {
569       LOG.error("checkWorkers: Only " + healthyWorkerInfoList.size() +
570           " available when " + minWorkers + " are required.");
571       logMissingWorkersOnSuperstep(healthyWorkerInfoList,
572           unhealthyWorkerInfoList);
573       return null;
574     }
575 
576     getContext().setStatus(getGraphTaskManager().getGraphFunctions() + " " +
577         "checkWorkers: Done - Found " + totalResponses +
578         " responses of " + maxWorkers + " needed to start superstep " +
579         getSuperstep());
580 
581     return healthyWorkerInfoList;
582   }
583 
584   /**
585    * Log info level of the missing workers on the superstep
586    *
587    * @param healthyWorkerInfoList Healthy worker list
588    * @param unhealthyWorkerInfoList Unhealthy worker list
589    */
590   private void logMissingWorkersOnSuperstep(
591       List<WorkerInfo> healthyWorkerInfoList,
592       List<WorkerInfo> unhealthyWorkerInfoList) {
593     if (LOG.isInfoEnabled()) {
594       Set<Integer> partitionSet = new TreeSet<Integer>();
595       for (WorkerInfo workerInfo : healthyWorkerInfoList) {
596         partitionSet.add(workerInfo.getTaskId() % maxWorkers);
597       }
598       for (WorkerInfo workerInfo : unhealthyWorkerInfoList) {
599         partitionSet.add(workerInfo.getTaskId() % maxWorkers);
600       }
601       for (int i = 1; i <= maxWorkers; ++i) {
602         if (partitionSet.contains(Integer.valueOf(i))) {
603           continue;
604         } else if (i == getTaskId() % maxWorkers) {
605           continue;
606         } else {
607           LOG.info("logMissingWorkersOnSuperstep: No response from " +
608               "partition " + i + " (could be master)");
609         }
610       }
611     }
612   }
613 
614   /**
615    * Common method for creating vertex/edge input splits.
616    *
617    * @param inputFormat The vertex/edge input format
618    * @param inputSplitType Type of input split (for logging purposes)
619    * @return Number of splits. Returns -1 on failure to create
620    *         valid input splits.
621    */
622   private int createInputSplits(GiraphInputFormat inputFormat,
623                                 InputType inputSplitType) {
624     ImmutableClassesGiraphConfiguration conf = getConfiguration();
625     String logPrefix = "create" + inputSplitType + "InputSplits";
626     // Only the 'master' should be doing this.  Wait until the number of
627     // processes that have reported health exceeds the minimum percentage.
628     // If the minimum percentage is not met, fail the job.  Otherwise
629     // generate the input splits
630     List<WorkerInfo> healthyWorkerInfoList = checkWorkers();
631     if (healthyWorkerInfoList == null) {
632       setJobStateFailed("Not enough healthy workers to create input splits");
633       return -1;
634     }
635     globalCommHandler.getInputSplitsHandler().initialize(masterClient,
636         healthyWorkerInfoList);
637 
638     // Create at least as many splits as the total number of input threads.
639     int minSplitCountHint = healthyWorkerInfoList.size() *
640         conf.getNumInputSplitsThreads();
641 
642     // Note that the input splits may only be a sample if
643     // INPUT_SPLIT_SAMPLE_PERCENT is set to something other than 100
644     List<InputSplit> splitList = generateInputSplits(inputFormat,
645         minSplitCountHint, inputSplitType);
646 
647     if (splitList.isEmpty() && GiraphConstants.FAIL_ON_EMPTY_INPUT.get(conf)) {
648       LOG.fatal(logPrefix + ": Failing job due to 0 input splits, " +
649           "check input of " + inputFormat.getClass().getName() + "!");
650       getContext().setStatus("Failing job due to 0 input splits, " +
651           "check input of " + inputFormat.getClass().getName() + "!");
652       setJobStateFailed("******* PLEASE CHECK YOUR INPUT TABLES - PARTITIONS " +
653           "WHICH YOU SPECIFIED ARE MISSING (for " + inputSplitType +
654           " input). FAILING THE JOB *******");
655     }
656     if (minSplitCountHint > splitList.size()) {
657       LOG.warn(logPrefix + ": Number of inputSplits=" +
658           splitList.size() + " < " +
659           minSplitCountHint +
660           "=total number of input threads, " +
661           "some threads will be not used");
662     }
663 
664     globalCommHandler.getInputSplitsHandler().addSplits(inputSplitType,
665         splitList, inputFormat);
666 
667     return splitList.size();
668   }
669 
670   @Override
671   public int createMappingInputSplits() {
672     if (!getConfiguration().hasMappingInputFormat()) {
673       return 0;
674     }
675     MappingInputFormat<I, V, E, ? extends Writable> mappingInputFormat =
676       getConfiguration().createWrappedMappingInputFormat();
677     return createInputSplits(mappingInputFormat, InputType.MAPPING);
678   }
679 
680   @Override
681   public int createVertexInputSplits() {
682     int splits = 0;
683     if (getConfiguration().hasVertexInputFormat()) {
684       VertexInputFormat<I, V, E> vertexInputFormat =
685           getConfiguration().createWrappedVertexInputFormat();
686       splits = createInputSplits(vertexInputFormat, InputType.VERTEX);
687     }
688     MasterProgress.get().setVertexInputSplitCount(splits);
689     getJobProgressTracker().updateMasterProgress(MasterProgress.get());
690     return splits;
691   }
692 
693   @Override
694   public int createEdgeInputSplits() {
695     int splits = 0;
696     if (getConfiguration().hasEdgeInputFormat()) {
697       EdgeInputFormat<I, E> edgeInputFormat =
698           getConfiguration().createWrappedEdgeInputFormat();
699       splits = createInputSplits(edgeInputFormat, InputType.EDGE);
700     }
701     MasterProgress.get().setEdgeInputSplitsCount(splits);
702     getJobProgressTracker().updateMasterProgress(MasterProgress.get());
703     return splits;
704   }
705 
706   @Override
707   public List<WorkerInfo> getWorkerInfoList() {
708     return chosenWorkerInfoList;
709   }
710 
711   @Override
712   public MasterGlobalCommHandler getGlobalCommHandler() {
713     return globalCommHandler;
714   }
715 
716   @Override
717   public AggregatorToGlobalCommTranslation getAggregatorTranslationHandler() {
718     return aggregatorTranslation;
719   }
720 
721   @Override
722   public MasterCompute getMasterCompute() {
723     return masterCompute;
724   }
725 
726   /**
727    * Read the finalized checkpoint file and associated metadata files for the
728    * checkpoint.  Modifies the {@link PartitionOwner} objects to get the
729    * checkpoint prefixes.  It is an optimization to prevent all workers from
730    * searching all the files.  Also read in the aggregator data from the
731    * finalized checkpoint file and setting it.
732    *
733    * @param superstep Checkpoint set to examine.
734    * @throws IOException
735    * @throws InterruptedException
736    * @throws KeeperException
737    * @return Collection of generated partition owners.
738    */
739   private Collection<PartitionOwner> prepareCheckpointRestart(long superstep)
740     throws IOException, KeeperException, InterruptedException {
741     List<PartitionOwner> partitionOwners = new ArrayList<>();
742     FileSystem fs = getFs();
743     String finalizedCheckpointPath = getSavedCheckpointBasePath(superstep) +
744         CheckpointingUtils.CHECKPOINT_FINALIZED_POSTFIX;
745     LOG.info("Loading checkpoint from " + finalizedCheckpointPath);
746     DataInputStream finalizedStream =
747         fs.open(new Path(finalizedCheckpointPath));
748     GlobalStats globalStats = new GlobalStats();
749     globalStats.readFields(finalizedStream);
750     updateCounters(globalStats);
751     SuperstepClasses superstepClasses =
752         SuperstepClasses.createToRead(getConfiguration());
753     superstepClasses.readFields(finalizedStream);
754     getConfiguration().updateSuperstepClasses(superstepClasses);
755     int prefixFileCount = finalizedStream.readInt();
756 
757     String checkpointFile =
758         finalizedStream.readUTF();
759     for (int i = 0; i < prefixFileCount; ++i) {
760       int mrTaskId = finalizedStream.readInt();
761 
762       DataInputStream metadataStream = fs.open(new Path(checkpointFile +
763           "." + mrTaskId + CheckpointingUtils.CHECKPOINT_METADATA_POSTFIX));
764       long partitions = metadataStream.readInt();
765       WorkerInfo worker = getWorkerInfoById(mrTaskId);
766       for (long p = 0; p < partitions; ++p) {
767         int partitionId = metadataStream.readInt();
768         PartitionOwner partitionOwner = new BasicPartitionOwner(partitionId,
769             worker);
770         partitionOwners.add(partitionOwner);
771         LOG.info("prepareCheckpointRestart partitionId=" + partitionId +
772             " assigned to " + partitionOwner);
773       }
774       metadataStream.close();
775     }
776     //Ordering appears to be important as of right now we rely on this ordering
777     //in WorkerGraphPartitioner
778     Collections.sort(partitionOwners, new Comparator<PartitionOwner>() {
779       @Override
780       public int compare(PartitionOwner p1, PartitionOwner p2) {
781         return Integer.compare(p1.getPartitionId(), p2.getPartitionId());
782       }
783     });
784 
785 
786     globalCommHandler.getAggregatorHandler().readFields(finalizedStream);
787     aggregatorTranslation.readFields(finalizedStream);
788     masterCompute.readFields(finalizedStream);
789     finalizedStream.close();
790 
791     return partitionOwners;
792   }
793 
794   @Override
795   public void setup() {
796     // Might have to manually load a checkpoint.
797     // In that case, the input splits are not set, they will be faked by
798     // the checkpoint files.  Each checkpoint file will be an input split
799     // and the input split
800 
801     if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
802       GiraphStats.getInstance().getSuperstepCounter().
803         setValue(getRestartedSuperstep());
804     }
805     for (MasterObserver observer : observers) {
806       observer.preApplication();
807       getContext().progress();
808     }
809   }
810 
811   @Override
812   public boolean becomeMaster() {
813     // Create my bid to become the master, then try to become the worker
814     // or return false.
815     String myBid = null;
816     try {
817       myBid =
818           getZkExt().createExt(masterElectionPath +
819               "/" + getHostnameTaskId(),
820               null,
821               Ids.OPEN_ACL_UNSAFE,
822               CreateMode.EPHEMERAL_SEQUENTIAL,
823               true);
824     } catch (KeeperException e) {
825       throw new IllegalStateException(
826           "becomeMaster: KeeperException", e);
827     } catch (InterruptedException e) {
828       throw new IllegalStateException(
829           "becomeMaster: IllegalStateException", e);
830     }
831     while (true) {
832       JSONObject jobState = getJobState();
833       try {
834         if ((jobState != null) &&
835             ApplicationState.valueOf(
836                 jobState.getString(JSONOBJ_STATE_KEY)) ==
837                 ApplicationState.FINISHED) {
838           LOG.info("becomeMaster: Job is finished, " +
839               "give up trying to be the master!");
840           isMaster = false;
841           return isMaster;
842         }
843       } catch (JSONException e) {
844         throw new IllegalStateException(
845             "becomeMaster: Couldn't get state from " + jobState, e);
846       }
847       try {
848         List<String> masterChildArr =
849             getZkExt().getChildrenExt(
850                 masterElectionPath, true, true, true);
851         if (LOG.isInfoEnabled()) {
852           LOG.info("becomeMaster: First child is '" +
853               masterChildArr.get(0) + "' and my bid is '" +
854               myBid + "'");
855         }
856         if (masterChildArr.get(0).equals(myBid)) {
857           GiraphStats.getInstance().getCurrentMasterTaskPartition().
858               setValue(getTaskId());
859 
860           globalCommHandler = new MasterGlobalCommHandler(
861               new MasterAggregatorHandler(getConfiguration(), getContext()),
862               new MasterInputSplitsHandler(
863                   getConfiguration().useInputSplitLocality(), getContext()));
864           aggregatorTranslation = new AggregatorToGlobalCommTranslation(
865               getConfiguration(), globalCommHandler);
866 
867           globalCommHandler.getAggregatorHandler().initialize(this);
868           masterCompute = getConfiguration().createMasterCompute();
869           masterCompute.setMasterService(this);
870 
871           masterInfo = new MasterInfo();
872           masterServer =
873               new NettyMasterServer(getConfiguration(), this, getContext(),
874                   getGraphTaskManager().createUncaughtExceptionHandler());
875           masterInfo.setInetSocketAddress(masterServer.getMyAddress(),
876               masterServer.getLocalHostOrIp());
877           masterInfo.setTaskId(getTaskId());
878           masterClient =
879               new NettyMasterClient(getContext(), getConfiguration(), this,
880                   getGraphTaskManager().createUncaughtExceptionHandler());
881           masterServer.setFlowControl(masterClient.getFlowControl());
882 
883           if (LOG.isInfoEnabled()) {
884             LOG.info("becomeMaster: I am now the master!");
885           }
886           isMaster = true;
887           return isMaster;
888         }
889         LOG.info("becomeMaster: Waiting to become the master...");
890         getMasterElectionChildrenChangedEvent().waitForTimeoutOrFail(
891             GiraphConstants.WAIT_ZOOKEEPER_TIMEOUT_MSEC.get(
892                 getConfiguration()));
893         getMasterElectionChildrenChangedEvent().reset();
894       } catch (KeeperException e) {
895         throw new IllegalStateException(
896             "becomeMaster: KeeperException", e);
897       } catch (InterruptedException e) {
898         throw new IllegalStateException(
899             "becomeMaster: IllegalStateException", e);
900       }
901     }
902   }
903 
904   @Override
905   public MasterInfo getMasterInfo() {
906     return masterInfo;
907   }
908 
909   /**
910    * Collect and aggregate the worker statistics for a particular superstep.
911    *
912    * @param superstep Superstep to aggregate on
913    * @return Global statistics aggregated on all worker statistics
914    */
915   private GlobalStats aggregateWorkerStats(long superstep) {
916     ImmutableClassesGiraphConfiguration conf = getConfiguration();
917 
918     GlobalStats globalStats = new GlobalStats();
919     // Get the stats from the all the worker selected nodes
920     String workerFinishedPath =
921         getWorkerMetricsFinishedPath(getApplicationAttempt(), superstep);
922     List<String> workerFinishedPathList = null;
923     try {
924       workerFinishedPathList =
925           getZkExt().getChildrenExt(
926               workerFinishedPath, false, false, true);
927     } catch (KeeperException e) {
928       throw new IllegalStateException(
929           "aggregateWorkerStats: KeeperException", e);
930     } catch (InterruptedException e) {
931       throw new IllegalStateException(
932           "aggregateWorkerStats: InterruptedException", e);
933     }
934 
935     AggregatedMetrics aggregatedMetrics = new AggregatedMetrics();
936 
937     for (String finishedPath : workerFinishedPathList) {
938       String hostnamePartitionId = FilenameUtils.getName(finishedPath);
939       JSONObject workerFinishedInfoObj = null;
940       try {
941         byte [] zkData =
942             getZkExt().getData(finishedPath, false, null);
943         workerFinishedInfoObj = new JSONObject(new String(zkData,
944             Charset.defaultCharset()));
945         globalStats.addMessageCount(
946             workerFinishedInfoObj.getLong(
947                 JSONOBJ_NUM_MESSAGES_KEY));
948         globalStats.addMessageBytesCount(
949           workerFinishedInfoObj.getLong(
950               JSONOBJ_NUM_MESSAGE_BYTES_KEY));
951         if (conf.metricsEnabled() &&
952             workerFinishedInfoObj.has(JSONOBJ_METRICS_KEY)) {
953           WorkerSuperstepMetrics workerMetrics = new WorkerSuperstepMetrics();
954           WritableUtils.readFieldsFromByteArray(
955               Base64.decode(
956                   workerFinishedInfoObj.getString(
957                       JSONOBJ_METRICS_KEY)),
958               workerMetrics);
959           globalStats.addOocLoadBytesCount(
960               workerMetrics.getBytesLoadedFromDisk());
961           globalStats.addOocStoreBytesCount(
962               workerMetrics.getBytesStoredOnDisk());
963           // Find the lowest percentage of graph in memory across all workers
964           // for one superstep
965           globalStats.setLowestGraphPercentageInMemory(
966               Math.min(globalStats.getLowestGraphPercentageInMemory(),
967                   (int) Math.round(
968                       workerMetrics.getGraphPercentageInMemory())));
969           aggregatedMetrics.add(workerMetrics, hostnamePartitionId);
970         }
971       } catch (JSONException e) {
972         throw new IllegalStateException(
973             "aggregateWorkerStats: JSONException", e);
974       } catch (KeeperException e) {
975         throw new IllegalStateException(
976             "aggregateWorkerStats: KeeperException", e);
977       } catch (InterruptedException e) {
978         throw new IllegalStateException(
979             "aggregateWorkerStats: InterruptedException", e);
980       } catch (IOException e) {
981         throw new IllegalStateException(
982             "aggregateWorkerStats: IOException", e);
983       }
984     }
985 
986     allPartitionStatsList.clear();
987     Iterable<PartitionStats> statsList = globalCommHandler.getAllPartitionStats(
988         workerFinishedPathList.size(), getContext());
989     for (PartitionStats partitionStats : statsList) {
990       globalStats.addPartitionStats(partitionStats);
991       allPartitionStatsList.add(partitionStats);
992     }
993 
994     if (conf.metricsEnabled()) {
995       if (GiraphConstants.METRICS_DIRECTORY.isDefaultValue(conf)) {
996         aggregatedMetrics.print(superstep, System.err);
997       } else {
998         printAggregatedMetricsToHDFS(superstep, aggregatedMetrics);
999       }
1000       for (MasterObserver observer : observers) {
1001         observer.superstepMetricsUpdate(
1002             superstep, aggregatedMetrics, allPartitionStatsList);
1003       }
1004     }
1005 
1006     if (LOG.isInfoEnabled()) {
1007       LOG.info("aggregateWorkerStats: Aggregation found " + globalStats +
1008           " on superstep = " + getSuperstep());
1009     }
1010     return globalStats;
1011   }
1012 
1013   /**
1014    * Write superstep metrics to own file in HDFS
1015    * @param superstep the current superstep
1016    * @param aggregatedMetrics the aggregated metrics to write
1017    */
1018   private void printAggregatedMetricsToHDFS(
1019       long superstep, AggregatedMetrics aggregatedMetrics) {
1020     ImmutableClassesGiraphConfiguration conf = getConfiguration();
1021     PrintStream out = null;
1022     Path dir = new Path(GiraphConstants.METRICS_DIRECTORY.get(conf));
1023     Path outFile = new Path(GiraphConstants.METRICS_DIRECTORY.get(conf) +
1024         Path.SEPARATOR_CHAR + "superstep_" + superstep + ".metrics");
1025     try {
1026       FileSystem fs;
1027       fs = FileSystem.get(conf);
1028       if (!fs.exists(dir)) {
1029         fs.mkdirs(dir);
1030       }
1031       if (fs.exists(outFile)) {
1032         throw new RuntimeException(
1033             "printAggregatedMetricsToHDFS: metrics file exists");
1034       }
1035       out = new PrintStream(fs.create(outFile), false,
1036           Charset.defaultCharset().name());
1037       aggregatedMetrics.print(superstep, out);
1038     } catch (IOException e) {
1039       throw new RuntimeException(
1040           "printAggregatedMetricsToHDFS: error creating metrics file", e);
1041     } finally {
1042       if (out != null) {
1043         out.close();
1044       }
1045     }
1046   }
1047 
1048   /**
1049    * Finalize the checkpoint file prefixes by taking the chosen workers and
1050    * writing them to a finalized file.  Also write out the master
1051    * aggregated aggregator array from the previous superstep.
1052    *
1053    * @param superstep superstep to finalize
1054    * @param chosenWorkerInfoList list of chosen workers that will be finalized
1055    * @throws IOException
1056    * @throws InterruptedException
1057    * @throws KeeperException
1058    */
1059   private void finalizeCheckpoint(long superstep,
1060     List<WorkerInfo> chosenWorkerInfoList)
1061     throws IOException, KeeperException, InterruptedException {
1062     Path finalizedCheckpointPath =
1063         new Path(getCheckpointBasePath(superstep) +
1064             CheckpointingUtils.CHECKPOINT_FINALIZED_POSTFIX);
1065     try {
1066       getFs().delete(finalizedCheckpointPath, false);
1067     } catch (IOException e) {
1068       LOG.warn("finalizedValidCheckpointPrefixes: Removed old file " +
1069           finalizedCheckpointPath);
1070     }
1071 
1072     // Format:
1073     // <global statistics>
1074     // <superstep classes>
1075     // <number of files>
1076     // <used file prefix 0><used file prefix 1>...
1077     // <aggregator data>
1078     // <masterCompute data>
1079     FSDataOutputStream finalizedOutputStream =
1080         getFs().create(finalizedCheckpointPath);
1081 
1082     String superstepFinishedNode =
1083         getSuperstepFinishedPath(getApplicationAttempt(), superstep - 1);
1084     finalizedOutputStream.write(
1085         getZkExt().getData(superstepFinishedNode, false, null));
1086 
1087     finalizedOutputStream.writeInt(chosenWorkerInfoList.size());
1088     finalizedOutputStream.writeUTF(getCheckpointBasePath(superstep));
1089     for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) {
1090       finalizedOutputStream.writeInt(getWorkerId(chosenWorkerInfo));
1091     }
1092     globalCommHandler.getAggregatorHandler().write(finalizedOutputStream);
1093     aggregatorTranslation.write(finalizedOutputStream);
1094     masterCompute.write(finalizedOutputStream);
1095     finalizedOutputStream.close();
1096     lastCheckpointedSuperstep = superstep;
1097     GiraphStats.getInstance().
1098         getLastCheckpointedSuperstep().setValue(superstep);
1099   }
1100 
1101   /**
1102    * Assign the partitions for this superstep.  If there are changes,
1103    * the workers will know how to do the exchange.  If this was a restarted
1104    * superstep, then make sure to provide information on where to find the
1105    * checkpoint file.
1106    */
1107   private void assignPartitionOwners() {
1108     Collection<PartitionOwner> partitionOwners;
1109     if (getSuperstep() == INPUT_SUPERSTEP) {
1110       partitionOwners =
1111           masterGraphPartitioner.createInitialPartitionOwners(
1112               chosenWorkerInfoList, maxWorkers);
1113       if (partitionOwners.isEmpty()) {
1114         throw new IllegalStateException(
1115             "assignAndExchangePartitions: No partition owners set");
1116       }
1117     } else if (getRestartedSuperstep() == getSuperstep()) {
1118       // If restarted, prepare the checkpoint restart
1119       try {
1120         partitionOwners = prepareCheckpointRestart(getSuperstep());
1121       } catch (IOException e) {
1122         throw new IllegalStateException(
1123             "assignPartitionOwners: IOException on preparing", e);
1124       } catch (KeeperException e) {
1125         throw new IllegalStateException(
1126             "assignPartitionOwners: KeeperException on preparing", e);
1127       } catch (InterruptedException e) {
1128         throw new IllegalStateException(
1129             "assignPartitionOwners: InteruptedException on preparing",
1130             e);
1131       }
1132       masterGraphPartitioner.setPartitionOwners(partitionOwners);
1133     } else {
1134       partitionOwners =
1135           masterGraphPartitioner.generateChangedPartitionOwners(
1136               allPartitionStatsList,
1137               chosenWorkerInfoList,
1138               maxWorkers,
1139               getSuperstep());
1140 
1141       PartitionUtils.analyzePartitionStats(partitionOwners,
1142           allPartitionStatsList);
1143     }
1144     checkPartitions(masterGraphPartitioner.getCurrentPartitionOwners());
1145 
1146 
1147 
1148     // There will be some exchange of partitions
1149     if (!partitionOwners.isEmpty()) {
1150       String vertexExchangePath =
1151           getPartitionExchangePath(getApplicationAttempt(),
1152               getSuperstep());
1153       try {
1154         getZkExt().createOnceExt(vertexExchangePath,
1155             null,
1156             Ids.OPEN_ACL_UNSAFE,
1157             CreateMode.PERSISTENT,
1158             true);
1159       } catch (KeeperException e) {
1160         throw new IllegalStateException(
1161             "assignPartitionOwners: KeeperException creating " +
1162                 vertexExchangePath);
1163       } catch (InterruptedException e) {
1164         throw new IllegalStateException(
1165             "assignPartitionOwners: InterruptedException creating " +
1166                 vertexExchangePath);
1167       }
1168     }
1169 
1170     AddressesAndPartitionsWritable addressesAndPartitions =
1171         new AddressesAndPartitionsWritable(masterInfo, chosenWorkerInfoList,
1172             partitionOwners);
1173     // Send assignments to every worker
1174     // TODO for very large number of partitions we might want to split this
1175     // across multiple requests
1176     for (WorkerInfo workerInfo : chosenWorkerInfoList) {
1177       masterClient.sendWritableRequest(workerInfo.getTaskId(),
1178           new AddressesAndPartitionsRequest(addressesAndPartitions));
1179     }
1180   }
1181 
1182   /**
1183    * Check if partition ids are valid
1184    *
1185    * @param partitionOwners List of partition ids for current superstep
1186    */
1187   private void checkPartitions(Collection<PartitionOwner> partitionOwners) {
1188     for (PartitionOwner partitionOwner : partitionOwners) {
1189       int partitionId = partitionOwner.getPartitionId();
1190       if (partitionId < 0 || partitionId >= partitionOwners.size()) {
1191         throw new IllegalStateException("checkPartitions: " +
1192             "Invalid partition id " + partitionId +
1193             " - partition ids must be values from 0 to (numPartitions - 1)");
1194       }
1195     }
1196   }
1197 
1198   /**
1199    * Check whether the workers chosen for this superstep are still alive
1200    *
1201    * @param chosenWorkerInfoHealthPath Path to the healthy workers in ZooKeeper
1202    * @param chosenWorkerInfoList List of the healthy workers
1203    * @return a list of dead workers. Empty list if all workers are alive.
1204    * @throws InterruptedException
1205    * @throws KeeperException
1206    */
1207   private Collection<WorkerInfo> superstepChosenWorkerAlive(
1208     String chosenWorkerInfoHealthPath,
1209     List<WorkerInfo> chosenWorkerInfoList)
1210     throws KeeperException, InterruptedException {
1211     List<WorkerInfo> chosenWorkerInfoHealthyList =
1212         getWorkerInfosFromPath(chosenWorkerInfoHealthPath, false);
1213     Set<WorkerInfo> chosenWorkerInfoHealthySet =
1214         new HashSet<WorkerInfo>(chosenWorkerInfoHealthyList);
1215     List<WorkerInfo> deadWorkers = new ArrayList<>();
1216     for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) {
1217       if (!chosenWorkerInfoHealthySet.contains(chosenWorkerInfo)) {
1218         deadWorkers.add(chosenWorkerInfo);
1219       }
1220     }
1221     return deadWorkers;
1222   }
1223 
1224   @Override
1225   public void restartFromCheckpoint(long checkpoint) {
1226     // Process:
1227     // 1. Increase the application attempt and set to the correct checkpoint
1228     // 2. Send command to all workers to restart their tasks
1229     setApplicationAttempt(getApplicationAttempt() + 1);
1230     setCachedSuperstep(checkpoint);
1231     setRestartedSuperstep(checkpoint);
1232     checkpointStatus = CheckpointStatus.NONE;
1233     setJobState(ApplicationState.START_SUPERSTEP,
1234         getApplicationAttempt(),
1235         checkpoint);
1236   }
1237 
1238   /**
1239    * Safely removes node from zookeeper.
1240    * Ignores if node is already removed. Can only throw runtime exception if
1241    * anything wrong.
1242    * @param path path to the node to be removed.
1243    */
1244   private void zkDeleteNode(String path) {
1245     try {
1246       getZkExt().deleteExt(path, -1, true);
1247     } catch (KeeperException.NoNodeException e) {
1248       LOG.info("zkDeleteNode: node has already been removed " + path);
1249     } catch (InterruptedException e) {
1250       throw new RuntimeException(
1251           "zkDeleteNode: InterruptedException", e);
1252     } catch (KeeperException e) {
1253       throw new RuntimeException(
1254           "zkDeleteNode: KeeperException", e);
1255     }
1256   }
1257 
1258   @Override
1259   public long getLastGoodCheckpoint() throws IOException {
1260     // Find the last good checkpoint if none have been written to the
1261     // knowledge of this master
1262     if (lastCheckpointedSuperstep == -1) {
1263       try {
1264         lastCheckpointedSuperstep = getLastCheckpointedSuperstep();
1265       } catch (IOException e) {
1266         LOG.fatal("getLastGoodCheckpoint: No last good checkpoints can be " +
1267             "found, killing the job.", e);
1268         failJob(e);
1269       }
1270     }
1271 
1272     return lastCheckpointedSuperstep;
1273   }
1274 
1275   /**
1276    * Wait for a set of workers to signal that they are done with the
1277    * barrier.
1278    *
1279    * @param finishedWorkerPath Path to where the workers will register their
1280    *        hostname and id
1281    * @param workerInfoList List of the workers to wait for
1282    * @param event Event to wait on for a chance to be done.
1283    * @param ignoreDeath In case if worker died after making it through
1284    *                    barrier, we will ignore death if set to true.
1285    * @return True if barrier was successful, false if there was a worker
1286    *         failure
1287    */
1288   private boolean barrierOnWorkerList(String finishedWorkerPath,
1289       List<WorkerInfo> workerInfoList,
1290       BspEvent event,
1291       boolean ignoreDeath) {
1292     try {
1293       getZkExt().createOnceExt(finishedWorkerPath,
1294           null,
1295           Ids.OPEN_ACL_UNSAFE,
1296           CreateMode.PERSISTENT,
1297           true);
1298     } catch (KeeperException e) {
1299       throw new IllegalStateException(
1300           "barrierOnWorkerList: KeeperException - Couldn't create " +
1301               finishedWorkerPath, e);
1302     } catch (InterruptedException e) {
1303       throw new IllegalStateException(
1304           "barrierOnWorkerList: InterruptedException - Couldn't create " +
1305               finishedWorkerPath, e);
1306     }
1307     List<String> hostnameIdList =
1308         new ArrayList<String>(workerInfoList.size());
1309     for (WorkerInfo workerInfo : workerInfoList) {
1310       hostnameIdList.add(workerInfo.getHostnameId());
1311     }
1312     String workerInfoHealthyPath =
1313         getWorkerInfoHealthyPath(getApplicationAttempt(), getSuperstep());
1314     List<String> finishedHostnameIdList = new ArrayList<>();
1315     List<String> tmpFinishedHostnameIdList;
1316     long nextInfoMillis = System.currentTimeMillis();
1317     final int defaultTaskTimeoutMsec = 10 * 60 * 1000;  // from TaskTracker
1318     final int waitBetweenLogInfoMsec = 30 * 1000;
1319     final int taskTimeoutMsec = getContext().getConfiguration().getInt(
1320         "mapred.task.timeout", defaultTaskTimeoutMsec) / 2;
1321     long lastRegularRunTimeMsec = 0;
1322     int eventLoopTimeout =  Math.min(taskTimeoutMsec, waitBetweenLogInfoMsec);
1323     boolean logInfoOnlyRun = false;
1324     List<WorkerInfo> deadWorkers = new ArrayList<>();
1325     while (true) {
1326       if (! logInfoOnlyRun) {
1327         try {
1328           tmpFinishedHostnameIdList =
1329               getZkExt().getChildrenExt(finishedWorkerPath,
1330                                         true,
1331                                         false,
1332                                         false);
1333         } catch (KeeperException e) {
1334           throw new IllegalStateException(
1335               "barrierOnWorkerList: KeeperException - Couldn't get " +
1336                   "children of " + finishedWorkerPath, e);
1337         } catch (InterruptedException e) {
1338           throw new IllegalStateException(
1339               "barrierOnWorkerList: IllegalException - Couldn't get " +
1340                   "children of " + finishedWorkerPath, e);
1341         }
1342         if (LOG.isDebugEnabled()) {
1343           // Log the names of the new workers that have finished since last time
1344           Set<String> newFinishedHostnames = Sets.difference(
1345             Sets.newHashSet(tmpFinishedHostnameIdList),
1346             Sets.newHashSet(finishedHostnameIdList));
1347           LOG.debug("barrierOnWorkerList: Got new finished worker list = " +
1348                         newFinishedHostnames + ", size = " +
1349                         newFinishedHostnames.size() +
1350                         " from " + finishedWorkerPath);
1351         }
1352         finishedHostnameIdList = tmpFinishedHostnameIdList;
1353       }
1354 
1355       if (LOG.isInfoEnabled() &&
1356           (System.currentTimeMillis() > nextInfoMillis)) {
1357         nextInfoMillis = System.currentTimeMillis() + waitBetweenLogInfoMsec;
1358         LOG.info("barrierOnWorkerList: " +
1359             finishedHostnameIdList.size() +
1360             " out of " + workerInfoList.size() +
1361             " workers finished on superstep " +
1362             getSuperstep() + " on path " + finishedWorkerPath);
1363         if (workerInfoList.size() - finishedHostnameIdList.size() <
1364             MAX_PRINTABLE_REMAINING_WORKERS) {
1365           Set<String> remainingWorkers = Sets.newHashSet(hostnameIdList);
1366           remainingWorkers.removeAll(finishedHostnameIdList);
1367           LOG.info("barrierOnWorkerList: Waiting on " + remainingWorkers);
1368         }
1369       }
1370 
1371       if (! logInfoOnlyRun) {
1372         getContext().setStatus(getGraphTaskManager().getGraphFunctions() +
1373                                    " - " +
1374                                    finishedHostnameIdList.size() +
1375                                    " finished out of " +
1376                                    workerInfoList.size() +
1377                                    " on superstep " + getSuperstep());
1378         if (finishedHostnameIdList.containsAll(hostnameIdList)) {
1379           break;
1380         }
1381 
1382         for (WorkerInfo deadWorker : deadWorkers) {
1383           if (!finishedHostnameIdList.contains(deadWorker.getHostnameId())) {
1384             LOG.error("barrierOnWorkerList: no results arived from " +
1385                           "worker that was pronounced dead: " + deadWorker +
1386                           " on superstep " + getSuperstep());
1387             return false;
1388           }
1389         }
1390 
1391         // wall-clock time skew is ignored
1392         lastRegularRunTimeMsec = System.currentTimeMillis();
1393       }
1394 
1395       // Wait for a signal or timeout
1396       boolean eventTriggered = event.waitMsecs(eventLoopTimeout);
1397 
1398       // If the event was triggered, we reset it. In the next loop run, we will
1399       // read ZK to get the new hosts.
1400       if (eventTriggered) {
1401         event.reset();
1402       }
1403 
1404       long elapsedTimeSinceRegularRunMsec = System.currentTimeMillis() -
1405           lastRegularRunTimeMsec;
1406       getContext().progress();
1407 
1408       if (eventTriggered ||
1409           taskTimeoutMsec == eventLoopTimeout ||
1410           elapsedTimeSinceRegularRunMsec >= taskTimeoutMsec) {
1411         logInfoOnlyRun = false;
1412       } else {
1413         logInfoOnlyRun = true;
1414         continue;
1415       }
1416 
1417       // Did a worker die?
1418       try {
1419         deadWorkers.addAll(superstepChosenWorkerAlive(
1420                 workerInfoHealthyPath,
1421                 workerInfoList));
1422         if (!ignoreDeath && deadWorkers.size() > 0) {
1423           String errorMessage = "******* WORKERS " + deadWorkers +
1424               " FAILED *******";
1425           // If checkpointing is not used, we should fail the job
1426           if (!getConfiguration().useCheckpointing()) {
1427             setJobStateFailed(errorMessage);
1428           } else {
1429             LOG.error("barrierOnWorkerList: Missing chosen " +
1430                 "workers " + deadWorkers +
1431                 " on superstep " + getSuperstep());
1432             // Log worker failure to command line
1433             getGraphTaskManager().getJobProgressTracker().logInfo(errorMessage);
1434           }
1435           return false;
1436         }
1437       } catch (KeeperException e) {
1438         throw new IllegalStateException(
1439             "barrierOnWorkerList: KeeperException - " +
1440                 "Couldn't get " + workerInfoHealthyPath, e);
1441       } catch (InterruptedException e) {
1442         throw new IllegalStateException(
1443             "barrierOnWorkerList: InterruptedException - " +
1444                 "Couldn't get " + workerInfoHealthyPath, e);
1445       }
1446     }
1447 
1448     return true;
1449   }
1450 
1451   /**
1452    * Clean up old superstep data from Zookeeper
1453    *
1454    * @param removeableSuperstep Supersteo to clean up
1455    * @throws InterruptedException
1456    */
1457   private void cleanUpOldSuperstep(long removeableSuperstep) throws
1458       InterruptedException {
1459     if (KEEP_ZOOKEEPER_DATA.isFalse(getConfiguration()) &&
1460         (removeableSuperstep >= 0)) {
1461       String oldSuperstepPath =
1462           getSuperstepPath(getApplicationAttempt()) + "/" +
1463               removeableSuperstep;
1464       try {
1465         if (LOG.isInfoEnabled()) {
1466           LOG.info("coordinateSuperstep: Cleaning up old Superstep " +
1467               oldSuperstepPath);
1468         }
1469         getZkExt().deleteExt(oldSuperstepPath,
1470             -1,
1471             true);
1472       } catch (KeeperException.NoNodeException e) {
1473         LOG.warn("coordinateBarrier: Already cleaned up " +
1474             oldSuperstepPath);
1475       } catch (KeeperException e) {
1476         throw new IllegalStateException(
1477             "coordinateSuperstep: KeeperException on " +
1478                 "finalizing checkpoint", e);
1479       }
1480     }
1481   }
1482 
1483   /**
1484    * Coordinate the exchange of vertex/edge input splits among workers.
1485    */
1486   private void coordinateInputSplits() {
1487     // Coordinate the workers finishing sending their vertices/edges to the
1488     // correct workers and signal when everything is done.
1489     if (!barrierOnWorkerList(inputSplitsWorkerDonePath,
1490         chosenWorkerInfoList,
1491         getInputSplitsWorkerDoneEvent(),
1492         false)) {
1493       throw new IllegalStateException("coordinateInputSplits: Worker failed " +
1494           "during input split (currently not supported)");
1495     }
1496     try {
1497       getZkExt().createExt(inputSplitsAllDonePath,
1498           null,
1499           Ids.OPEN_ACL_UNSAFE,
1500           CreateMode.PERSISTENT,
1501           false);
1502     } catch (KeeperException.NodeExistsException e) {
1503       LOG.info("coordinateInputSplits: Node " +
1504           inputSplitsAllDonePath + " already exists.");
1505     } catch (KeeperException e) {
1506       throw new IllegalStateException(
1507           "coordinateInputSplits: KeeperException", e);
1508     } catch (InterruptedException e) {
1509       throw new IllegalStateException(
1510           "coordinateInputSplits: IllegalStateException", e);
1511     }
1512   }
1513 
1514   /**
1515    * Initialize aggregator at the master side
1516    * before vertex/edge loading.
1517    * This methods cooperates with other code
1518    * to enables aggregation usage at INPUT_SUPERSTEP
1519    * Other codes are:
1520    *  BSPServiceWorker:
1521    *  aggregatorHandler.prepareSuperstep in
1522    *  setup
1523    *  set aggregator usage in vertexReader and
1524    *  edgeReader
1525    *
1526    * @throws InterruptedException
1527    */
1528   private void initializeAggregatorInputSuperstep()
1529     throws InterruptedException {
1530     globalCommHandler.getAggregatorHandler().prepareSuperstep();
1531 
1532     prepareMasterCompute(getSuperstep());
1533     try {
1534       masterCompute.initialize();
1535     } catch (InstantiationException e) {
1536       LOG.fatal(
1537         "initializeAggregatorInputSuperstep: Failed in instantiation", e);
1538       throw new RuntimeException(
1539         "initializeAggregatorInputSuperstep: Failed in instantiation", e);
1540     } catch (IllegalAccessException e) {
1541       LOG.fatal("initializeAggregatorInputSuperstep: Failed in access", e);
1542       throw new RuntimeException(
1543         "initializeAggregatorInputSuperstep: Failed in access", e);
1544     }
1545     aggregatorTranslation.postMasterCompute();
1546     globalCommHandler.getAggregatorHandler().finishSuperstep();
1547 
1548     globalCommHandler.getAggregatorHandler().sendDataToOwners(masterClient);
1549   }
1550 
1551   /**
1552    * This is required before initialization
1553    * and run of MasterCompute
1554    *
1555    * @param superstep superstep for which to run masterCompute
1556    * @return Superstep classes set by masterCompute
1557    */
1558   private SuperstepClasses prepareMasterCompute(long superstep) {
1559     GraphState graphState = new GraphState(superstep ,
1560         GiraphStats.getInstance().getVertices().getValue(),
1561         GiraphStats.getInstance().getEdges().getValue(),
1562         getContext());
1563     SuperstepClasses superstepClasses =
1564         SuperstepClasses.createAndExtractTypes(getConfiguration());
1565     masterCompute.setGraphState(graphState);
1566     masterCompute.setSuperstepClasses(superstepClasses);
1567     return superstepClasses;
1568   }
1569 
1570   @Override
1571   public SuperstepState coordinateSuperstep() throws
1572   KeeperException, InterruptedException {
1573     // 1. Get chosen workers and set up watches on them.
1574     // 2. Assign partitions to the workers
1575     //    (possibly reloading from a superstep)
1576     // 3. Wait for all workers to complete
1577     // 4. Collect and process aggregators
1578     // 5. Create superstep finished node
1579     // 6. If the checkpoint frequency is met, finalize the checkpoint
1580 
1581     for (MasterObserver observer : observers) {
1582       observer.preSuperstep(getSuperstep());
1583       getContext().progress();
1584     }
1585 
1586     chosenWorkerInfoList = checkWorkers();
1587     if (chosenWorkerInfoList == null) {
1588       setJobStateFailed("coordinateSuperstep: Not enough healthy workers for " +
1589                     "superstep " + getSuperstep());
1590     } else {
1591       // Sort this list, so order stays the same over supersteps
1592       Collections.sort(chosenWorkerInfoList, new Comparator<WorkerInfo>() {
1593         @Override
1594         public int compare(WorkerInfo wi1, WorkerInfo wi2) {
1595           return Integer.compare(wi1.getTaskId(), wi2.getTaskId());
1596         }
1597       });
1598       for (WorkerInfo workerInfo : chosenWorkerInfoList) {
1599         String workerInfoHealthyPath =
1600             getWorkerInfoHealthyPath(getApplicationAttempt(),
1601                 getSuperstep()) + "/" +
1602                 workerInfo.getHostnameId();
1603         if (getZkExt().exists(workerInfoHealthyPath, true) == null) {
1604           LOG.warn("coordinateSuperstep: Chosen worker " +
1605               workerInfoHealthyPath +
1606               " is no longer valid, failing superstep");
1607         }
1608       }
1609     }
1610 
1611     // We need to finalize aggregators from previous superstep
1612     if (getSuperstep() >= 0) {
1613       aggregatorTranslation.postMasterCompute();
1614       globalCommHandler.getAggregatorHandler().finishSuperstep();
1615     }
1616 
1617     masterClient.openConnections();
1618 
1619     GiraphStats.getInstance().
1620         getCurrentWorkers().setValue(chosenWorkerInfoList.size());
1621     assignPartitionOwners();
1622 
1623     // Finalize the valid checkpoint file prefixes and possibly
1624     // the aggregators.
1625     if (checkpointStatus != CheckpointStatus.NONE) {
1626       String workerWroteCheckpointPath =
1627           getWorkerWroteCheckpointPath(getApplicationAttempt(),
1628               getSuperstep());
1629       // first wait for all the workers to write their checkpoint data
1630       if (!barrierOnWorkerList(workerWroteCheckpointPath,
1631           chosenWorkerInfoList,
1632           getWorkerWroteCheckpointEvent(),
1633           checkpointStatus == CheckpointStatus.CHECKPOINT_AND_HALT)) {
1634         return SuperstepState.WORKER_FAILURE;
1635       }
1636       try {
1637         finalizeCheckpoint(getSuperstep(), chosenWorkerInfoList);
1638       } catch (IOException e) {
1639         throw new IllegalStateException(
1640             "coordinateSuperstep: IOException on finalizing checkpoint",
1641             e);
1642       }
1643       if (checkpointStatus == CheckpointStatus.CHECKPOINT_AND_HALT) {
1644         return SuperstepState.CHECKPOINT_AND_HALT;
1645       }
1646     }
1647 
1648     // We need to send aggregators to worker owners after new worker assignments
1649     if (getSuperstep() >= 0) {
1650       globalCommHandler.getAggregatorHandler().sendDataToOwners(masterClient);
1651     }
1652 
1653     if (getSuperstep() == INPUT_SUPERSTEP) {
1654       // Initialize aggregators before coordinating
1655       initializeAggregatorInputSuperstep();
1656       coordinateInputSplits();
1657     }
1658 
1659     String finishedWorkerPath =
1660         getWorkerMetricsFinishedPath(getApplicationAttempt(), getSuperstep());
1661     if (!barrierOnWorkerList(finishedWorkerPath,
1662         chosenWorkerInfoList,
1663         getSuperstepStateChangedEvent(),
1664         false)) {
1665       return SuperstepState.WORKER_FAILURE;
1666     }
1667 
1668     // Collect aggregator values, then run the master.compute() and
1669     // finally save the aggregator values
1670     globalCommHandler.getAggregatorHandler().prepareSuperstep();
1671     aggregatorTranslation.prepareSuperstep();
1672 
1673     SuperstepClasses superstepClasses =
1674       prepareMasterCompute(getSuperstep() + 1);
1675     doMasterCompute();
1676 
1677     // If the master is halted or all the vertices voted to halt and there
1678     // are no more messages in the system, stop the computation
1679     GlobalStats globalStats = aggregateWorkerStats(getSuperstep());
1680     aggregateCountersFromWorkersAndMaster();
1681     if (masterCompute.isHalted() ||
1682         (globalStats.getFinishedVertexCount() ==
1683         globalStats.getVertexCount() &&
1684         globalStats.getMessageCount() == 0)) {
1685       globalStats.setHaltComputation(true);
1686     } else if (getZkExt().exists(haltComputationPath, false) != null) {
1687       if (LOG.isInfoEnabled()) {
1688         LOG.info("Halting computation because halt zookeeper node was created");
1689       }
1690       globalStats.setHaltComputation(true);
1691     }
1692 
1693     // If we have completed the maximum number of supersteps, stop
1694     // the computation
1695     if (maxNumberOfSupersteps !=
1696         GiraphConstants.MAX_NUMBER_OF_SUPERSTEPS.getDefaultValue() &&
1697         (getSuperstep() == maxNumberOfSupersteps - 1)) {
1698       if (LOG.isInfoEnabled()) {
1699         LOG.info("coordinateSuperstep: Finished " + maxNumberOfSupersteps +
1700             " supersteps (max specified by the user), halting");
1701       }
1702       globalStats.setHaltComputation(true);
1703     }
1704 
1705     // Superstep 0 doesn't need to have matching types (Message types may not
1706     // match) and if the computation is halted, no need to check any of
1707     // the types.
1708     if (!globalStats.getHaltComputation()) {
1709       superstepClasses.verifyTypesMatch(getSuperstep() > 0);
1710     }
1711     getConfiguration().updateSuperstepClasses(superstepClasses);
1712 
1713     //Signal workers that we want to checkpoint
1714     checkpointStatus = getCheckpointStatus(getSuperstep() + 1);
1715     globalStats.setCheckpointStatus(checkpointStatus);
1716     // Let everyone know the aggregated application state through the
1717     // superstep finishing znode.
1718     String superstepFinishedNode =
1719         getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep());
1720 
1721     WritableUtils.writeToZnode(
1722         getZkExt(), superstepFinishedNode, -1, globalStats, superstepClasses);
1723     updateCounters(globalStats);
1724 
1725     cleanUpOldSuperstep(getSuperstep() - 1);
1726     incrCachedSuperstep();
1727     // Counter starts at zero, so no need to increment
1728     if (getSuperstep() > 0) {
1729       GiraphStats.getInstance().getSuperstepCounter().increment();
1730     }
1731     SuperstepState superstepState;
1732     if (globalStats.getHaltComputation()) {
1733       superstepState = SuperstepState.ALL_SUPERSTEPS_DONE;
1734     } else {
1735       superstepState = SuperstepState.THIS_SUPERSTEP_DONE;
1736     }
1737     globalCommHandler.getAggregatorHandler().writeAggregators(
1738         getSuperstep(), superstepState);
1739 
1740     return superstepState;
1741   }
1742 
1743   /**
1744    * Should checkpoint on this superstep?  If checkpointing, always
1745    * checkpoint the first user superstep.  If restarting, the first
1746    * checkpoint is after the frequency has been met.
1747    *
1748    * @param superstep Decide if checkpointing no this superstep
1749    * @return True if this superstep should be checkpointed, false otherwise
1750    */
1751   private CheckpointStatus getCheckpointStatus(long superstep) {
1752     try {
1753       if (getZkExt().
1754           exists(basePath + FORCE_CHECKPOINT_USER_FLAG, false) != null) {
1755         if (isCheckpointingSupported(getConfiguration(), masterCompute)) {
1756           return CheckpointStatus.CHECKPOINT_AND_HALT;
1757         } else {
1758           LOG.warn("Attempted to manually checkpoint the job that " +
1759               "does not support checkpoints. Ignoring");
1760         }
1761       }
1762     } catch (KeeperException e) {
1763       throw new IllegalStateException(
1764           "cleanupZooKeeper: Got KeeperException", e);
1765     } catch (InterruptedException e) {
1766       throw new IllegalStateException(
1767           "cleanupZooKeeper: Got IllegalStateException", e);
1768     }
1769     if (checkpointFrequency == 0) {
1770       return CheckpointStatus.NONE;
1771     }
1772     long firstCheckpoint = INPUT_SUPERSTEP + 1;
1773     if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
1774       firstCheckpoint = getRestartedSuperstep() + checkpointFrequency;
1775     }
1776     if (superstep < firstCheckpoint) {
1777       return CheckpointStatus.NONE;
1778     }
1779     if (((superstep - firstCheckpoint) % checkpointFrequency) == 0) {
1780       if (isCheckpointingSupported(getConfiguration(), masterCompute)) {
1781         return CheckpointStatus.CHECKPOINT;
1782       }
1783     }
1784     return CheckpointStatus.NONE;
1785   }
1786 
1787   /**
1788    * Returns false if job doesn't support checkpoints.
1789    * Job may not support checkpointing if it does output during
1790    * computation, uses static variables to keep data between supersteps,
1791    * starts new threads etc.
1792    * @param conf Immutable configuration of the job
1793    * @param masterCompute instance of master compute
1794    * @return true if it is safe to checkpoint the job
1795    */
1796   private boolean isCheckpointingSupported(
1797       GiraphConfiguration conf, MasterCompute masterCompute) {
1798     return checkpointSupportedChecker.isCheckpointSupported(
1799         conf, masterCompute);
1800   }
1801 
1802 
1803   /**
1804    * This doMasterCompute is only called
1805    * after masterCompute is initialized
1806    */
1807   private void doMasterCompute() {
1808     GiraphTimerContext timerContext = masterComputeTimer.time();
1809     masterCompute.compute();
1810     timerContext.stop();
1811   }
1812 
1813   /**
1814    * Use the counterGroupAndNames and context, to get the counter values,
1815    * create a custom counter out of each, and add to the set of counters
1816    * @param context Job context
1817    * @param counterGroupAndNames List of counter names
1818    * @param counters Set of CustomCounter which will be populated
1819    */
1820   private void populateCountersFromContext(Mapper.Context context,
1821            Map<String, Set<String>> counterGroupAndNames,
1822            Set<CustomCounter> counters) {
1823     Counter counter;
1824     for (Map.Entry<String, Set<String>> entry :
1825             counterGroupAndNames.entrySet()) {
1826       String groupName = entry.getKey();
1827       for (String counterName: entry.getValue()) {
1828         CustomCounter customCounter = new CustomCounter(groupName, counterName,
1829                 CustomCounter.Aggregation.SUM);
1830         counter = context.getCounter(groupName, counterName);
1831         customCounter.setValue(counter.getValue());
1832         counters.add(customCounter);
1833       }
1834     }
1835   }
1836 
1837   /**
1838    * Receive the counters from the workers, and aggregate them with the
1839    * master counters.
1840    * This method is called at the end of each superstep, and after the job
1841    * finishes successfully. In order to ensure best-effort counter values
1842    * in case of a job failure, we call this at the end of every superstep
1843    * The aggregated counters are stored in a thrift struct
1844    */
1845   private void aggregateCountersFromWorkersAndMaster() {
1846     CustomCounters customCounters = new CustomCounters();
1847     long superstep = getSuperstep();
1848     // Get the stats from the all the worker selected nodes
1849     String workerFinishedPath = getWorkerCountersFinishedPath(
1850             getApplicationAttempt(), superstep);
1851     try {
1852       getZkExt().createOnceExt(workerFinishedPath,
1853               null,
1854               Ids.OPEN_ACL_UNSAFE,
1855               CreateMode.PERSISTENT,
1856               true);
1857     } catch (KeeperException e) {
1858       LOG.warn("aggregateCounters: KeeperException - " +
1859               "Couldn't create " + workerFinishedPath, e);
1860     } catch (InterruptedException e) {
1861       LOG.warn("barrierOnWorkerList: InterruptedException - " +
1862               "Couldn't create " + workerFinishedPath, e);
1863     }
1864     List<String> workerFinishedPathList = new ArrayList<>();
1865     long waitForCountersTimeout =
1866             SystemTime.get().getMilliseconds() + maxCounterWaitMsecs;
1867     // Subtract 1 for the master
1868     int numWorkers = BspInputFormat.getMaxTasks(getConfiguration()) - 1;
1869     if (numWorkers == 0) {
1870       // When the job is run with 1 worker, numWorkers would be 0,
1871       // and thus add 1 to it, to get the worker-related counters
1872       numWorkers += 1;
1873     }
1874     // Get the counter values from the zookeeper, written by the workers
1875     // We keep retrying until all the workers have written
1876     while (SystemTime.get().getMilliseconds() < waitForCountersTimeout) {
1877       try {
1878         workerFinishedPathList = getZkExt().getChildrenExt(
1879                 workerFinishedPath, true,
1880                 false, true);
1881         LOG.info(String.format("Fetching counter values from " +
1882                         "workers for superstep %d. Got %d out of %d",
1883                 superstep, workerFinishedPathList.size(), numWorkers));
1884         if (workerFinishedPathList.size() == numWorkers) {
1885           break;
1886         }
1887       } catch (KeeperException e) {
1888         LOG.warn("Got Keeper exception, but will retry: ", e);
1889       } catch (InterruptedException e) {
1890         LOG.warn("aggregateCounters: InterruptedException", e);
1891       }
1892       getWrittenCountersToZKEvent().waitMsecs(eventWaitMsecs);
1893       getWrittenCountersToZKEvent().reset();
1894     }
1895     for (String finishedPath : workerFinishedPathList) {
1896       JSONArray jsonCounters = null;
1897       try {
1898         byte [] zkData =
1899                 getZkExt().getData(finishedPath, false, null);
1900         jsonCounters = new JSONArray(new String(zkData,
1901                 Charset.defaultCharset()));
1902         Set<CustomCounter> workerCounters = new HashSet<>();
1903         for (int i = 0; i < jsonCounters.length(); i++) {
1904           CustomCounter customCounter = new CustomCounter();
1905           WritableUtils.readFieldsFromByteArray(Base64.decode(
1906                   jsonCounters.getString(i)), customCounter);
1907           workerCounters.add(customCounter);
1908         }
1909         customCounters.mergeCounters(workerCounters);
1910       } catch (JSONException e) {
1911         LOG.warn("aggregateCounters: JSONException", e);
1912       } catch (KeeperException e) {
1913         LOG.warn("aggregateCounters: KeeperException", e);
1914       } catch (InterruptedException e) {
1915         LOG.warn("aggregateCounters: InterruptedException", e);
1916       } catch (IOException e) {
1917         LOG.warn("aggregateCounters: IOException", e);
1918       }
1919     }
1920     Mapper.Context context = getContext();
1921     Set<CustomCounter> masterCounters = new HashSet<>();
1922     // Add master counters too
1923     if (numWorkers != 1) {
1924       // If numWorkers=1, then the master and worker share the counters
1925       // Since we have already added the counters from the worker,
1926       // we should not add them again here.
1927       Counter counter;
1928       Set<CustomCounter> masterCounterNames =
1929               CustomCounters.getAndClearCustomCounters();
1930       for (CustomCounter customCounter : masterCounterNames) {
1931         String groupName = customCounter.getGroupName();
1932         String counterName = customCounter.getCounterName();
1933         counter = context.getCounter(groupName, counterName);
1934         customCounter.setValue(counter.getValue());
1935         masterCounters.add(customCounter);
1936       }
1937       // Adding Netty related counters
1938       Map<String, Set<String>> nettyCounters =
1939               NettyClient.getCounterGroupsAndNames();
1940       populateCountersFromContext(context, nettyCounters, masterCounters);
1941     }
1942     // Adding counters from MasterInputSplitsHandler
1943     // Even if number of workers = 1, we still need to add these counters
1944     // explicitly because it exists only on the master
1945     Map<String, Set<String>> inputSplitCounter =
1946             MasterInputSplitsHandler.getCounterGroupAndNames();
1947     populateCountersFromContext(context, inputSplitCounter, masterCounters);
1948     customCounters.mergeCounters(masterCounters);
1949     // Add GiraphStats
1950     List<CustomCounter> allCounters = new ArrayList<>();
1951     allCounters.addAll(GiraphStats.getInstance().getCounterList());
1952     // Custom counters
1953     allCounters.addAll(customCounters.getCounterList());
1954     // Store in Thrift Struct
1955     giraphCountersThriftStruct.setCounters(allCounters);
1956   }
1957 
1958   /**
1959    * We add the Giraph Timers separately, because we need to include
1960    * the time required for shutdown and cleanup
1961    * This will fetch the final Giraph Timers, and send all the counters
1962    * to the job client
1963    * @param superstep superstep for which the GiraphTimer will be sent
1964    *
1965    */
1966   public void addGiraphTimersAndSendCounters(long superstep) {
1967     List<CustomCounter> giraphCounters =
1968             giraphCountersThriftStruct.getCounters();
1969     giraphCounters.addAll(GiraphTimers.getInstance().getCounterList(superstep));
1970     giraphCountersThriftStruct.setCounters(giraphCounters);
1971     getJobProgressTracker().sendMasterCounters(giraphCountersThriftStruct);
1972   }
1973 
1974   /**
1975    * Need to clean up ZooKeeper nicely. This function ensures all the
1976    * masters and workers have reported ending their ZooKeeper connections.
1977    */
1978   private void waitForWorkersToCleanup() {
1979     try {
1980       getZkExt().createExt(cleanedUpPath,
1981           null,
1982           Ids.OPEN_ACL_UNSAFE,
1983           CreateMode.PERSISTENT,
1984           true);
1985     } catch (KeeperException.NodeExistsException e) {
1986       if (LOG.isInfoEnabled()) {
1987         LOG.info("cleanUpZooKeeper: Node " + cleanedUpPath +
1988             " already exists, no need to create.");
1989       }
1990     } catch (KeeperException e) {
1991       throw new IllegalStateException(
1992           "cleanupZooKeeper: Got KeeperException", e);
1993     } catch (InterruptedException e) {
1994       throw new IllegalStateException(
1995           "cleanupZooKeeper: Got IllegalStateException", e);
1996     }
1997     // Need to wait for the number of workers and masters to complete
1998     int maxTasks = BspInputFormat.getMaxTasks(getConfiguration());
1999     if ((getGraphTaskManager().getGraphFunctions() == GraphFunctions.ALL) ||
2000         (getGraphTaskManager().getGraphFunctions() ==
2001         GraphFunctions.ALL_EXCEPT_ZOOKEEPER)) {
2002       maxTasks *= 2;
2003     }
2004     List<String> cleanedUpChildrenList = null;
2005     while (true) {
2006       try {
2007         cleanedUpChildrenList =
2008             getZkExt().getChildrenExt(
2009                 cleanedUpPath, true, false, true);
2010         if (LOG.isInfoEnabled()) {
2011           LOG.info("cleanUpZooKeeper: Got " +
2012               cleanedUpChildrenList.size() + " of " +
2013               maxTasks  +  " desired children from " +
2014               cleanedUpPath);
2015         }
2016         if (cleanedUpChildrenList.size() == maxTasks) {
2017           break;
2018         }
2019         if (LOG.isInfoEnabled()) {
2020           LOG.info("cleanedUpZooKeeper: Waiting for the " +
2021               "children of " + cleanedUpPath +
2022               " to change since only got " +
2023               cleanedUpChildrenList.size() + " nodes.");
2024         }
2025       } catch (KeeperException e) {
2026         // We are in the cleanup phase -- just log the error
2027         LOG.error("cleanUpZooKeeper: Got KeeperException, " +
2028             "but will continue", e);
2029         return;
2030       } catch (InterruptedException e) {
2031         // We are in the cleanup phase -- just log the error
2032         LOG.error("cleanUpZooKeeper: Got InterruptedException, " +
2033             "but will continue", e);
2034         return;
2035       }
2036 
2037       getCleanedUpChildrenChangedEvent().waitForTimeoutOrFail(
2038           GiraphConstants.WAIT_FOR_OTHER_WORKERS_TIMEOUT_MSEC.get(
2039               getConfiguration()));
2040       getCleanedUpChildrenChangedEvent().reset();
2041     }
2042   }
2043 
2044   /**
2045    * This will perform the final cleanup of the zookeeper i.e.
2046    * delete the basePath directory
2047    */
2048   private void cleanUpZooKeeper() {
2049     // At this point, all processes have acknowledged the cleanup,
2050     // and the master can do any final cleanup if the ZooKeeper service was
2051     // provided (not dynamically started) and we don't want to keep the data
2052     try {
2053       if (getConfiguration().isZookeeperExternal() &&
2054               KEEP_ZOOKEEPER_DATA.isFalse(getConfiguration())) {
2055         if (LOG.isInfoEnabled()) {
2056           LOG.info("cleanupZooKeeper: Removing the following path " +
2057                   "and all children - " + basePath + " from ZooKeeper list " +
2058                   getConfiguration().getZookeeperList());
2059         }
2060         getZkExt().deleteExt(basePath, -1, true);
2061       }
2062     } catch (KeeperException e) {
2063       LOG.error("cleanupZooKeeper: Failed to do cleanup of " +
2064               basePath + " due to KeeperException", e);
2065     } catch (InterruptedException e) {
2066       LOG.error("cleanupZooKeeper: Failed to do cleanup of " +
2067               basePath + " due to InterruptedException", e);
2068     }
2069   }
2070 
2071   @Override
2072   public void postApplication() {
2073     for (MasterObserver observer : observers) {
2074       observer.postApplication();
2075       getContext().progress();
2076     }
2077   }
2078 
2079   @Override
2080   public void postSuperstep() {
2081     for (MasterObserver observer : observers) {
2082       observer.postSuperstep(getSuperstep());
2083       getContext().progress();
2084     }
2085   }
2086 
2087   @Override
2088   public void failureCleanup(Exception e) {
2089     for (MasterObserver observer : observers) {
2090       try {
2091         observer.applicationFailed(e);
2092         // CHECKSTYLE: stop IllegalCatchCheck
2093       } catch (RuntimeException re) {
2094         // CHECKSTYLE: resume IllegalCatchCheck
2095         LOG.error(re.getClass().getName() + " from observer " +
2096             observer.getClass().getName(), re);
2097       }
2098       getContext().progress();
2099     }
2100   }
2101 
2102   @Override
2103   public void cleanup(SuperstepState superstepState) throws IOException {
2104     ImmutableClassesGiraphConfiguration conf = getConfiguration();
2105 
2106     // All master processes should denote they are done by adding special
2107     // znode.  Once the number of znodes equals the number of partitions
2108     // for workers and masters, the master will clean up the ZooKeeper
2109     // znodes associated with this job.
2110     String masterCleanedUpPath = cleanedUpPath  + "/" +
2111         getTaskId() + MASTER_SUFFIX;
2112     try {
2113       String finalFinishedPath =
2114           getZkExt().createExt(masterCleanedUpPath,
2115               null,
2116               Ids.OPEN_ACL_UNSAFE,
2117               CreateMode.PERSISTENT,
2118               true);
2119       if (LOG.isInfoEnabled()) {
2120         LOG.info("cleanup: Notifying master its okay to cleanup with " +
2121             finalFinishedPath);
2122       }
2123     } catch (KeeperException.NodeExistsException e) {
2124       if (LOG.isInfoEnabled()) {
2125         LOG.info("cleanup: Couldn't create finished node '" +
2126             masterCleanedUpPath);
2127       }
2128     } catch (KeeperException e) {
2129       LOG.error("cleanup: Got KeeperException, continuing", e);
2130     } catch (InterruptedException e) {
2131       LOG.error("cleanup: Got InterruptedException, continuing", e);
2132     }
2133 
2134     if (isMaster) {
2135       getGraphTaskManager().setIsMaster(true);
2136       waitForWorkersToCleanup();
2137       aggregateCountersFromWorkersAndMaster();
2138       cleanUpZooKeeper();
2139       // If desired, cleanup the checkpoint directory
2140       if (superstepState == SuperstepState.ALL_SUPERSTEPS_DONE &&
2141           GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.get(conf)) {
2142         boolean success =
2143             getFs().delete(new Path(checkpointBasePath), true);
2144         if (LOG.isInfoEnabled()) {
2145           LOG.info("cleanup: Removed HDFS checkpoint directory (" +
2146               checkpointBasePath + ") with return = " +
2147               success + " since the job " + getContext().getJobName() +
2148               " succeeded ");
2149         }
2150       }
2151       if (superstepState == SuperstepState.CHECKPOINT_AND_HALT) {
2152         getFs().create(CheckpointingUtils.getCheckpointMarkPath(conf,
2153             getJobId()), true);
2154         failJob(new Exception("Checkpoint and halt requested. " +
2155             "Killing this job."));
2156       }
2157       globalCommHandler.getAggregatorHandler().close();
2158       masterClient.closeConnections();
2159       masterServer.close();
2160     }
2161 
2162     try {
2163       getZkExt().close();
2164     } catch (InterruptedException e) {
2165       // cleanup phase -- just log the error
2166       LOG.error("cleanup: Zookeeper failed to close", e);
2167     }
2168   }
2169 
2170   /**
2171    * Event that the master watches that denotes when a worker wrote checkpoint
2172    *
2173    * @return Event that denotes when a worker wrote checkpoint
2174    */
2175   public final BspEvent getWorkerWroteCheckpointEvent() {
2176     return workerWroteCheckpoint;
2177   }
2178 
2179   /**
2180    * Event that the master watches that denotes if a worker has done something
2181    * that changes the state of a superstep (either a worker completed or died)
2182    *
2183    * @return Event that denotes a superstep state change
2184    */
2185   public final BspEvent getSuperstepStateChangedEvent() {
2186     return superstepStateChanged;
2187   }
2188 
2189   /**
2190    * Should this worker failure cause the current superstep to fail?
2191    *
2192    * @param failedWorkerPath Full path to the failed worker
2193    */
2194   private void checkHealthyWorkerFailure(String failedWorkerPath) {
2195     if (getSuperstepFromPath(failedWorkerPath) < getSuperstep()) {
2196       return;
2197     }
2198 
2199     Collection<PartitionOwner> partitionOwners =
2200         masterGraphPartitioner.getCurrentPartitionOwners();
2201     String hostnameId =
2202         getHealthyHostnameIdFromPath(failedWorkerPath);
2203     for (PartitionOwner partitionOwner : partitionOwners) {
2204       WorkerInfo workerInfo = partitionOwner.getWorkerInfo();
2205       WorkerInfo previousWorkerInfo =
2206           partitionOwner.getPreviousWorkerInfo();
2207       if (workerInfo.getHostnameId().equals(hostnameId) ||
2208           ((previousWorkerInfo != null) &&
2209               previousWorkerInfo.getHostnameId().equals(hostnameId))) {
2210         LOG.warn("checkHealthyWorkerFailure: " +
2211             "at least one healthy worker went down " +
2212             "for superstep " + getSuperstep() + " - " +
2213             hostnameId + ", will try to restart from " +
2214             "checkpointed superstep " +
2215             lastCheckpointedSuperstep);
2216         superstepStateChanged.signal();
2217       }
2218     }
2219   }
2220 
2221   @Override
2222   public boolean processEvent(WatchedEvent event) {
2223     boolean foundEvent = false;
2224     if (event.getPath().contains(WORKER_HEALTHY_DIR) &&
2225         (event.getType() == EventType.NodeDeleted)) {
2226       if (LOG.isDebugEnabled()) {
2227         LOG.debug("processEvent: Healthy worker died (node deleted) " +
2228             "in " + event.getPath());
2229       }
2230       checkHealthyWorkerFailure(event.getPath());
2231       superstepStateChanged.signal();
2232       foundEvent = true;
2233     } else if (event.getPath().endsWith(METRICS_DIR) &&
2234         event.getType() == EventType.NodeChildrenChanged) {
2235       if (LOG.isDebugEnabled()) {
2236         LOG.debug("processEvent: Worker finished (node change) " +
2237             "event - superstepStateChanged signaled");
2238       }
2239       superstepStateChanged.signal();
2240       foundEvent = true;
2241     } else if (event.getPath().contains(WORKER_WROTE_CHECKPOINT_DIR) &&
2242         event.getType() == EventType.NodeChildrenChanged) {
2243       if (LOG.isDebugEnabled()) {
2244         LOG.debug("processEvent: Worker wrote checkpoint (node change) " +
2245             "event - workerWroteCheckpoint signaled");
2246       }
2247       workerWroteCheckpoint.signal();
2248       foundEvent = true;
2249     }
2250 
2251     return foundEvent;
2252   }
2253 
2254   /**
2255    * Set values of counters to match the ones from {@link GlobalStats}
2256    *
2257    * @param globalStats Global statistics which holds new counter values
2258    */
2259   private void updateCounters(GlobalStats globalStats) {
2260     GiraphStats gs = GiraphStats.getInstance();
2261     gs.getVertices().setValue(globalStats.getVertexCount());
2262     gs.getFinishedVertexes().setValue(globalStats.getFinishedVertexCount());
2263     gs.getEdges().setValue(globalStats.getEdgeCount());
2264     gs.getSentMessages().setValue(globalStats.getMessageCount());
2265     gs.getSentMessageBytes().setValue(globalStats.getMessageBytesCount());
2266     gs.getAggregateSentMessages().increment(globalStats.getMessageCount());
2267     gs.getAggregateSentMessageBytes()
2268       .increment(globalStats.getMessageBytesCount());
2269     gs.getAggregateOOCBytesLoaded()
2270       .increment(globalStats.getOocLoadBytesCount());
2271     gs.getAggregateOOCBytesStored()
2272       .increment(globalStats.getOocStoreBytesCount());
2273     // Updating the lowest percentage of graph in memory throughout the
2274     // execution across all the supersteps
2275     int percentage = (int) gs.getLowestGraphPercentageInMemory().getValue();
2276     gs.getLowestGraphPercentageInMemory().setValue(
2277         Math.min(percentage, globalStats.getLowestGraphPercentageInMemory()));
2278   }
2279 }