View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.bsp;
20  
21  import org.apache.giraph.conf.GiraphConstants;
22  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23  import org.apache.giraph.graph.GraphTaskManager;
24  import org.apache.giraph.job.JobProgressTracker;
25  import org.apache.giraph.partition.GraphPartitionerFactory;
26  import org.apache.giraph.utils.CheckpointingUtils;
27  import org.apache.giraph.worker.WorkerInfo;
28  import org.apache.giraph.writable.kryo.GiraphClassResolver;
29  import org.apache.giraph.zk.BspEvent;
30  import org.apache.giraph.zk.PredicateLock;
31  import org.apache.giraph.zk.ZooKeeperExt;
32  import org.apache.giraph.zk.ZooKeeperManager;
33  import org.apache.hadoop.fs.FileSystem;
34  import org.apache.hadoop.io.Writable;
35  import org.apache.hadoop.io.WritableComparable;
36  import org.apache.hadoop.mapreduce.Mapper;
37  import org.apache.log4j.Logger;
38  import org.apache.zookeeper.CreateMode;
39  import org.apache.zookeeper.KeeperException;
40  import org.apache.zookeeper.WatchedEvent;
41  import org.apache.zookeeper.Watcher;
42  import org.apache.zookeeper.Watcher.Event.EventType;
43  import org.apache.zookeeper.Watcher.Event.KeeperState;
44  import org.apache.zookeeper.ZooDefs.Ids;
45  import org.json.JSONException;
46  import org.json.JSONObject;
47  
48  import java.io.IOException;
49  import java.net.UnknownHostException;
50  import java.nio.charset.Charset;
51  import java.util.ArrayList;
52  import java.util.Collections;
53  import java.util.List;
54  
55  import static org.apache.giraph.conf.GiraphConstants.RESTART_JOB_ID;
56  
57  /**
58   * Zookeeper-based implementation of {@link CentralizedService}.
59   *
60   * @param <I> Vertex id
61   * @param <V> Vertex data
62   * @param <E> Edge data
63   */
64  @SuppressWarnings("rawtypes")
65  public abstract class BspService<I extends WritableComparable,
66      V extends Writable, E extends Writable>
67      implements Watcher, CentralizedService<I, V, E> {
68    /** Unset superstep */
69    public static final long UNSET_SUPERSTEP = Long.MIN_VALUE;
70    /** Input superstep (superstep when loading the vertices happens) */
71    public static final long INPUT_SUPERSTEP = -1;
72    /** Unset application attempt */
73    public static final long UNSET_APPLICATION_ATTEMPT = Long.MIN_VALUE;
74    /** Base ZooKeeper directory */
75    public static final String BASE_DIR = "/_hadoopBsp";
76    /** Master job state znode above base dir */
77    public static final String MASTER_JOB_STATE_NODE = "/_masterJobState";
78  
79    /** Input splits worker done directory */
80    public static final String INPUT_SPLITS_WORKER_DONE_DIR =
81        "/_inputSplitsWorkerDoneDir";
82    /** Input splits all done node*/
83    public static final String INPUT_SPLITS_ALL_DONE_NODE =
84        "/_inputSplitsAllDone";
85    /** Directory to store kryo className-ID assignment */
86    public static final String KRYO_REGISTERED_CLASS_DIR =
87            "/_kryo";
88    /** Directory of attempts of this application */
89    public static final String APPLICATION_ATTEMPTS_DIR =
90        "/_applicationAttemptsDir";
91    /** Where the master election happens */
92    public static final String MASTER_ELECTION_DIR = "/_masterElectionDir";
93    /** Superstep scope */
94    public static final String SUPERSTEP_DIR = "/_superstepDir";
95    /** Healthy workers register here. */
96    public static final String WORKER_HEALTHY_DIR = "/_workerHealthyDir";
97    /** Unhealthy workers register here. */
98    public static final String WORKER_UNHEALTHY_DIR = "/_workerUnhealthyDir";
99    /** Workers which wrote checkpoint notify here */
100   public static final String WORKER_WROTE_CHECKPOINT_DIR =
101       "/_workerWroteCheckpointDir";
102   /** Finished workers notify here */
103   public static final String WORKER_FINISHED_DIR = "/_workerFinishedDir";
104   /** Helps coordinate the partition exchnages */
105   public static final String PARTITION_EXCHANGE_DIR =
106       "/_partitionExchangeDir";
107   /** Denotes that the superstep is done */
108   public static final String SUPERSTEP_FINISHED_NODE = "/_superstepFinished";
109   /** Denotes that computation should be halted */
110   public static final String HALT_COMPUTATION_NODE = "/_haltComputation";
111   /** Memory observer dir */
112   public static final String MEMORY_OBSERVER_DIR = "/_memoryObserver";
113   /** User sets this flag to checkpoint and stop the job */
114   public static final String FORCE_CHECKPOINT_USER_FLAG = "/_checkpointAndStop";
115   /** Denotes which workers have been cleaned up */
116   public static final String CLEANED_UP_DIR = "/_cleanedUpDir";
117   /** JSON message count key */
118   public static final String JSONOBJ_NUM_MESSAGES_KEY = "_numMsgsKey";
119   /** JSON message bytes count key */
120   public static final String JSONOBJ_NUM_MESSAGE_BYTES_KEY = "_numMsgBytesKey";
121   /** JSON metrics key */
122   public static final String JSONOBJ_METRICS_KEY = "_metricsKey";
123 
124   /** JSON state key */
125   public static final String JSONOBJ_STATE_KEY = "_stateKey";
126   /** JSON application attempt key */
127   public static final String JSONOBJ_APPLICATION_ATTEMPT_KEY =
128       "_applicationAttemptKey";
129   /** JSON superstep key */
130   public static final String JSONOBJ_SUPERSTEP_KEY =
131       "_superstepKey";
132   /** Suffix denotes a worker */
133   public static final String WORKER_SUFFIX = "_worker";
134   /** Suffix denotes a master */
135   public static final String MASTER_SUFFIX = "_master";
136 
137   /** Class logger */
138   private static final Logger LOG = Logger.getLogger(BspService.class);
139   /** Path to the job's root */
140   protected final String basePath;
141   /** Path to the job state determined by the master (informative only) */
142   protected final String masterJobStatePath;
143   /** Input splits worker done directory */
144   protected final String inputSplitsWorkerDonePath;
145   /** Input splits all done node */
146   protected final String inputSplitsAllDonePath;
147   /** Path to the application attempts) */
148   protected final String applicationAttemptsPath;
149   /** Path to the cleaned up notifications */
150   protected final String cleanedUpPath;
151   /** Path to the checkpoint's root (including job id) */
152   protected final String checkpointBasePath;
153   /** Old checkpoint in case we want to restart some job */
154   protected final String savedCheckpointBasePath;
155   /** Path to the master election path */
156   protected final String masterElectionPath;
157   /** If this path exists computation will be halted */
158   protected final String haltComputationPath;
159   /** Path where memory observer stores data */
160   protected final String memoryObserverPath;
161   /** Kryo className-ID mapping directory */
162   protected final String kryoRegisteredClassPath;
163   /** Private ZooKeeper instance that implements the service */
164   private final ZooKeeperExt zk;
165   /** Has the Connection occurred? */
166   private final BspEvent connectedEvent;
167   /** Has worker registration changed (either healthy or unhealthy) */
168   private final BspEvent workerHealthRegistrationChanged;
169   /** Application attempt changed */
170   private final BspEvent applicationAttemptChanged;
171   /** Input splits worker done */
172   private final BspEvent inputSplitsWorkerDoneEvent;
173   /** Input splits all done */
174   private final BspEvent inputSplitsAllDoneEvent;
175   /** Superstep finished synchronization */
176   private final BspEvent superstepFinished;
177   /** Master election changed for any waited on attempt */
178   private final BspEvent masterElectionChildrenChanged;
179   /** Cleaned up directory children changed*/
180   private final BspEvent cleanedUpChildrenChanged;
181   /** Registered list of BspEvents */
182   private final List<BspEvent> registeredBspEvents =
183       new ArrayList<BspEvent>();
184   /** Immutable configuration of the job*/
185   private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
186   /** Job context (mainly for progress) */
187   private final Mapper<?, ?, ?, ?>.Context context;
188   /** Cached superstep (from ZooKeeper) */
189   private long cachedSuperstep = UNSET_SUPERSTEP;
190   /** Restarted from a checkpoint (manual or automatic) */
191   private long restartedSuperstep = UNSET_SUPERSTEP;
192   /** Cached application attempt (from ZooKeeper) */
193   private long cachedApplicationAttempt = UNSET_APPLICATION_ATTEMPT;
194   /** Job id, to ensure uniqueness */
195   private final String jobId;
196   /** Task id, from partition and application attempt to ensure uniqueness */
197   private final int taskId;
198   /** My hostname */
199   private final String hostname;
200   /** Combination of hostname '_' task (unique id) */
201   private final String hostnameTaskId;
202   /** Graph partitioner */
203   private final GraphPartitionerFactory<I, V, E> graphPartitionerFactory;
204   /** Mapper that will do the graph computation */
205   private final GraphTaskManager<I, V, E> graphTaskManager;
206   /** File system */
207   private final FileSystem fs;
208 
209   /**
210    * Constructor.
211    *
212    * @param context Mapper context
213    * @param graphTaskManager GraphTaskManager for this compute node
214    */
215   public BspService(
216       Mapper<?, ?, ?, ?>.Context context,
217       GraphTaskManager<I, V, E> graphTaskManager) {
218     this.connectedEvent = new PredicateLock(context);
219     this.workerHealthRegistrationChanged = new PredicateLock(context);
220     this.applicationAttemptChanged = new PredicateLock(context);
221     this.inputSplitsWorkerDoneEvent = new PredicateLock(context);
222     this.inputSplitsAllDoneEvent = new PredicateLock(context);
223     this.superstepFinished = new PredicateLock(context);
224     this.masterElectionChildrenChanged = new PredicateLock(context);
225     this.cleanedUpChildrenChanged = new PredicateLock(context);
226 
227     registerBspEvent(connectedEvent);
228     registerBspEvent(workerHealthRegistrationChanged);
229     registerBspEvent(inputSplitsWorkerDoneEvent);
230     registerBspEvent(inputSplitsAllDoneEvent);
231     registerBspEvent(applicationAttemptChanged);
232     registerBspEvent(superstepFinished);
233     registerBspEvent(masterElectionChildrenChanged);
234     registerBspEvent(cleanedUpChildrenChanged);
235 
236     this.context = context;
237     this.graphTaskManager = graphTaskManager;
238     this.conf = graphTaskManager.getConf();
239 
240     this.jobId = conf.getJobId();
241     this.restartedSuperstep = conf.getLong(
242         GiraphConstants.RESTART_SUPERSTEP, UNSET_SUPERSTEP);
243     try {
244       this.hostname = conf.getLocalHostname();
245     } catch (UnknownHostException e) {
246       throw new RuntimeException(e);
247     }
248     this.graphPartitionerFactory = conf.createGraphPartitioner();
249 
250     basePath = ZooKeeperManager.getBasePath(conf) + BASE_DIR + "/" + jobId;
251     getContext().getCounter(GiraphConstants.ZOOKEEPER_BASE_PATH_COUNTER_GROUP,
252         basePath);
253     masterJobStatePath = basePath + MASTER_JOB_STATE_NODE;
254     inputSplitsWorkerDonePath = basePath + INPUT_SPLITS_WORKER_DONE_DIR;
255     inputSplitsAllDonePath = basePath + INPUT_SPLITS_ALL_DONE_NODE;
256     applicationAttemptsPath = basePath + APPLICATION_ATTEMPTS_DIR;
257     cleanedUpPath = basePath + CLEANED_UP_DIR;
258     kryoRegisteredClassPath = basePath + KRYO_REGISTERED_CLASS_DIR;
259 
260 
261     String restartJobId = RESTART_JOB_ID.get(conf);
262 
263     savedCheckpointBasePath =
264         CheckpointingUtils.getCheckpointBasePath(getConfiguration(),
265             restartJobId == null ? getJobId() : restartJobId);
266 
267     checkpointBasePath = CheckpointingUtils.
268         getCheckpointBasePath(getConfiguration(), getJobId());
269 
270     masterElectionPath = basePath + MASTER_ELECTION_DIR;
271     String serverPortList = graphTaskManager.getZookeeperList();
272     haltComputationPath = basePath + HALT_COMPUTATION_NODE;
273     memoryObserverPath = basePath + MEMORY_OBSERVER_DIR;
274     getContext().getCounter(GiraphConstants.ZOOKEEPER_HALT_NODE_COUNTER_GROUP,
275         haltComputationPath);
276     if (LOG.isInfoEnabled()) {
277       LOG.info("BspService: Path to create to halt is " + haltComputationPath);
278     }
279     if (LOG.isInfoEnabled()) {
280       LOG.info("BspService: Connecting to ZooKeeper with job " + jobId +
281           ", partition " + conf.getTaskPartition() + " on " + serverPortList);
282     }
283     try {
284       this.zk = new ZooKeeperExt(serverPortList,
285                                  conf.getZooKeeperSessionTimeout(),
286                                  conf.getZookeeperOpsMaxAttempts(),
287                                  conf.getZookeeperOpsRetryWaitMsecs(),
288                                  this,
289                                  context);
290       connectedEvent.waitForTimeoutOrFail(
291           GiraphConstants.WAIT_ZOOKEEPER_TIMEOUT_MSEC.get(conf));
292       this.fs = FileSystem.get(getConfiguration());
293     } catch (IOException e) {
294       throw new RuntimeException(e);
295     }
296 
297     boolean disableGiraphResolver =
298             GiraphConstants.DISABLE_GIRAPH_CLASS_RESOLVER.get(conf);
299     if (!disableGiraphResolver) {
300       GiraphClassResolver.setZookeeperInfo(zk, kryoRegisteredClassPath);
301     }
302     this.taskId = (int) getApplicationAttempt() * conf.getMaxWorkers() +
303             conf.getTaskPartition();
304     this.hostnameTaskId = hostname + "_" + getTaskId();
305 
306     //Trying to restart from the latest superstep
307     if (restartJobId != null &&
308         restartedSuperstep == UNSET_SUPERSTEP) {
309       try {
310         restartedSuperstep = getLastCheckpointedSuperstep();
311       } catch (IOException e) {
312         throw new RuntimeException(e);
313       }
314     }
315     this.cachedSuperstep = restartedSuperstep;
316     if ((restartedSuperstep != UNSET_SUPERSTEP) &&
317         (restartedSuperstep < 0)) {
318       throw new IllegalArgumentException(
319           "BspService: Invalid superstep to restart - " +
320               restartedSuperstep);
321     }
322   }
323 
324   /**
325    * Get the superstep from a ZooKeeper path
326    *
327    * @param path Path to parse for the superstep
328    * @return Superstep from the path.
329    */
330   public static long getSuperstepFromPath(String path) {
331     int foundSuperstepStart = path.indexOf(SUPERSTEP_DIR);
332     if (foundSuperstepStart == -1) {
333       throw new IllegalArgumentException(
334           "getSuperstepFromPath: Cannot find " + SUPERSTEP_DIR +
335           "from " + path);
336     }
337     foundSuperstepStart += SUPERSTEP_DIR.length() + 1;
338     int endIndex = foundSuperstepStart +
339         path.substring(foundSuperstepStart).indexOf("/");
340     if (endIndex == -1) {
341       throw new IllegalArgumentException(
342           "getSuperstepFromPath: Cannot find end of superstep from " +
343               path);
344     }
345     if (LOG.isTraceEnabled()) {
346       LOG.trace("getSuperstepFromPath: Got path=" + path +
347           ", start=" + foundSuperstepStart + ", end=" + endIndex);
348     }
349     return Long.parseLong(path.substring(foundSuperstepStart, endIndex));
350   }
351 
352   /**
353    * Get the hostname and id from a "healthy" worker path
354    *
355    * @param path Path to check
356    * @return Hostname and id from path
357    */
358   public static String getHealthyHostnameIdFromPath(String path) {
359     int foundWorkerHealthyStart = path.indexOf(WORKER_HEALTHY_DIR);
360     if (foundWorkerHealthyStart == -1) {
361       throw new IllegalArgumentException(
362           "getHealthyHostnameidFromPath: Couldn't find " +
363               WORKER_HEALTHY_DIR + " from " + path);
364     }
365     foundWorkerHealthyStart += WORKER_HEALTHY_DIR.length();
366     return path.substring(foundWorkerHealthyStart);
367   }
368 
369   /**
370    * Generate the base superstep directory path for a given application
371    * attempt
372    *
373    * @param attempt application attempt number
374    * @return directory path based on the an attempt
375    */
376   public final String getSuperstepPath(long attempt) {
377     return applicationAttemptsPath + "/" + attempt + SUPERSTEP_DIR;
378   }
379 
380   /**
381    * Generate the worker information "healthy" directory path for a
382    * superstep
383    *
384    * @param attempt application attempt number
385    * @param superstep superstep to use
386    * @return directory path based on the a superstep
387    */
388   public final String getWorkerInfoHealthyPath(long attempt,
389       long superstep) {
390     return applicationAttemptsPath + "/" + attempt +
391         SUPERSTEP_DIR + "/" + superstep + WORKER_HEALTHY_DIR;
392   }
393 
394   /**
395    * Generate the worker information "unhealthy" directory path for a
396    * superstep
397    *
398    * @param attempt application attempt number
399    * @param superstep superstep to use
400    * @return directory path based on the a superstep
401    */
402   public final String getWorkerInfoUnhealthyPath(long attempt,
403       long superstep) {
404     return applicationAttemptsPath + "/" + attempt +
405         SUPERSTEP_DIR + "/" + superstep + WORKER_UNHEALTHY_DIR;
406   }
407 
408   /**
409    * Generate the worker "wrote checkpoint" directory path for a
410    * superstep
411    *
412    * @param attempt application attempt number
413    * @param superstep superstep to use
414    * @return directory path based on the a superstep
415    */
416   public final String getWorkerWroteCheckpointPath(long attempt,
417       long superstep) {
418     return applicationAttemptsPath + "/" + attempt +
419         SUPERSTEP_DIR + "/" + superstep + WORKER_WROTE_CHECKPOINT_DIR;
420   }
421 
422   /**
423    * Generate the worker "finished" directory path for a
424    * superstep
425    *
426    * @param attempt application attempt number
427    * @param superstep superstep to use
428    * @return directory path based on the a superstep
429    */
430   public final String getWorkerFinishedPath(long attempt, long superstep) {
431     return applicationAttemptsPath + "/" + attempt +
432         SUPERSTEP_DIR + "/" + superstep + WORKER_FINISHED_DIR;
433   }
434 
435   /**
436    * Generate the "partition exchange" directory path for a superstep
437    *
438    * @param attempt application attempt number
439    * @param superstep superstep to use
440    * @return directory path based on the a superstep
441    */
442   public final String getPartitionExchangePath(long attempt,
443       long superstep) {
444     return applicationAttemptsPath + "/" + attempt +
445         SUPERSTEP_DIR + "/" + superstep + PARTITION_EXCHANGE_DIR;
446   }
447 
448   /**
449    * Based on the superstep, worker info, and attempt, get the appropriate
450    * worker path for the exchange.
451    *
452    * @param attempt Application attempt
453    * @param superstep Superstep
454    * @param workerInfo Worker info of the exchange.
455    * @return Path of the desired worker
456    */
457   public final String getPartitionExchangeWorkerPath(long attempt,
458       long superstep,
459       WorkerInfo workerInfo) {
460     return getPartitionExchangePath(attempt, superstep) +
461         "/" + workerInfo.getHostnameId();
462   }
463 
464   /**
465    * Generate the "superstep finished" directory path for a superstep
466    *
467    * @param attempt application attempt number
468    * @param superstep superstep to use
469    * @return directory path based on the a superstep
470    */
471   public final String getSuperstepFinishedPath(long attempt, long superstep) {
472     return applicationAttemptsPath + "/" + attempt +
473         SUPERSTEP_DIR + "/" + superstep + SUPERSTEP_FINISHED_NODE;
474   }
475 
476   /**
477    * Generate the base superstep directory path for a given application
478    * attempt
479    *
480    * @param superstep Superstep to use
481    * @return Directory path based on the a superstep
482    */
483   public final String getCheckpointBasePath(long superstep) {
484     return checkpointBasePath + "/" + superstep;
485   }
486 
487   /**
488    * In case when we restart another job this will give us a path
489    * to saved checkpoint.
490    * @param superstep superstep to use
491    * @return Direcory path for restarted job based on the superstep
492    */
493   public final String getSavedCheckpointBasePath(long superstep) {
494     return savedCheckpointBasePath + "/" + superstep;
495   }
496 
497 
498   /**
499    * Get the ZooKeeperExt instance.
500    *
501    * @return ZooKeeperExt instance.
502    */
503   public final ZooKeeperExt getZkExt() {
504     return zk;
505   }
506 
507   @Override
508   public final long getRestartedSuperstep() {
509     return restartedSuperstep;
510   }
511 
512   /**
513    * Set the restarted superstep
514    *
515    * @param superstep Set the manually restarted superstep
516    */
517   public final void setRestartedSuperstep(long superstep) {
518     if (superstep < INPUT_SUPERSTEP) {
519       throw new IllegalArgumentException(
520           "setRestartedSuperstep: Bad argument " + superstep);
521     }
522     restartedSuperstep = superstep;
523   }
524 
525   /**
526    * Get the file system
527    *
528    * @return file system
529    */
530   public final FileSystem getFs() {
531     return fs;
532   }
533 
534   public final ImmutableClassesGiraphConfiguration<I, V, E>
535   getConfiguration() {
536     return conf;
537   }
538 
539   public final Mapper<?, ?, ?, ?>.Context getContext() {
540     return context;
541   }
542 
543   public final String getHostname() {
544     return hostname;
545   }
546 
547   public final String getHostnameTaskId() {
548     return hostnameTaskId;
549   }
550 
551   public final int getTaskId() {
552     return taskId;
553   }
554 
555   public final GraphTaskManager<I, V, E> getGraphTaskManager() {
556     return graphTaskManager;
557   }
558 
559   public final BspEvent getWorkerHealthRegistrationChangedEvent() {
560     return workerHealthRegistrationChanged;
561   }
562 
563   public final BspEvent getApplicationAttemptChangedEvent() {
564     return applicationAttemptChanged;
565   }
566 
567   public final BspEvent getInputSplitsWorkerDoneEvent() {
568     return inputSplitsWorkerDoneEvent;
569   }
570 
571   public final BspEvent getInputSplitsAllDoneEvent() {
572     return inputSplitsAllDoneEvent;
573   }
574 
575   public final BspEvent getSuperstepFinishedEvent() {
576     return superstepFinished;
577   }
578 
579 
580   public final BspEvent getMasterElectionChildrenChangedEvent() {
581     return masterElectionChildrenChanged;
582   }
583 
584   public final BspEvent getCleanedUpChildrenChangedEvent() {
585     return cleanedUpChildrenChanged;
586   }
587 
588   /**
589    * Get the master commanded job state as a JSONObject.  Also sets the
590    * watches to see if the master commanded job state changes.
591    *
592    * @return Last job state or null if none
593    */
594   public final JSONObject getJobState() {
595     try {
596       getZkExt().createExt(masterJobStatePath,
597           null,
598           Ids.OPEN_ACL_UNSAFE,
599           CreateMode.PERSISTENT,
600           true);
601     } catch (KeeperException.NodeExistsException e) {
602       LOG.info("getJobState: Job state already exists (" +
603           masterJobStatePath + ")");
604     } catch (KeeperException e) {
605       throw new IllegalStateException("Failed to create job state path " +
606           "due to KeeperException", e);
607     } catch (InterruptedException e) {
608       throw new IllegalStateException("Failed to create job state path " +
609           "due to InterruptedException", e);
610     }
611     String jobState = null;
612     try {
613       List<String> childList =
614           getZkExt().getChildrenExt(
615               masterJobStatePath, true, true, true);
616       if (childList.isEmpty()) {
617         return null;
618       }
619       jobState =
620           new String(getZkExt().getData(childList.get(childList.size() - 1),
621               true, null), Charset.defaultCharset());
622     } catch (KeeperException.NoNodeException e) {
623       LOG.info("getJobState: Job state path is empty! - " +
624           masterJobStatePath);
625     } catch (KeeperException e) {
626       throw new IllegalStateException("Failed to get job state path " +
627           "children due to KeeperException", e);
628     } catch (InterruptedException e) {
629       throw new IllegalStateException("Failed to get job state path " +
630           "children due to InterruptedException", e);
631     }
632     try {
633       return new JSONObject(jobState);
634     } catch (JSONException e) {
635       throw new RuntimeException(
636           "getJobState: Failed to parse job state " + jobState);
637     }
638   }
639 
640   /**
641    * Get the job id
642    *
643    * @return job id
644    */
645   public final String getJobId() {
646     return jobId;
647   }
648 
649   /**
650    * Get the latest application attempt and cache it.
651    *
652    * @return the latest application attempt
653    */
654   public final long getApplicationAttempt() {
655     if (cachedApplicationAttempt != UNSET_APPLICATION_ATTEMPT) {
656       return cachedApplicationAttempt;
657     }
658     try {
659       getZkExt().createExt(applicationAttemptsPath,
660           null,
661           Ids.OPEN_ACL_UNSAFE,
662           CreateMode.PERSISTENT,
663           true);
664     } catch (KeeperException.NodeExistsException e) {
665       LOG.info("getApplicationAttempt: Node " +
666           applicationAttemptsPath + " already exists!");
667     } catch (KeeperException e) {
668       throw new IllegalStateException("Couldn't create application " +
669           "attempts path due to KeeperException", e);
670     } catch (InterruptedException e) {
671       throw new IllegalStateException("Couldn't create application " +
672           "attempts path due to InterruptedException", e);
673     }
674     try {
675       List<String> attemptList =
676           getZkExt().getChildrenExt(
677               applicationAttemptsPath, true, false, false);
678       if (attemptList.isEmpty()) {
679         cachedApplicationAttempt = 0;
680       } else {
681         cachedApplicationAttempt =
682             Long.parseLong(Collections.max(attemptList));
683       }
684     } catch (KeeperException e) {
685       throw new IllegalStateException("Couldn't get application " +
686           "attempts to KeeperException", e);
687     } catch (InterruptedException e) {
688       throw new IllegalStateException("Couldn't get application " +
689           "attempts to InterruptedException", e);
690     }
691 
692     return cachedApplicationAttempt;
693   }
694 
695   /**
696    * Get the latest superstep and cache it.
697    *
698    * @return the latest superstep
699    */
700   public final long getSuperstep() {
701     if (cachedSuperstep != UNSET_SUPERSTEP) {
702       return cachedSuperstep;
703     }
704     String superstepPath = getSuperstepPath(getApplicationAttempt());
705     try {
706       getZkExt().createExt(superstepPath,
707           null,
708           Ids.OPEN_ACL_UNSAFE,
709           CreateMode.PERSISTENT,
710           true);
711     } catch (KeeperException.NodeExistsException e) {
712       if (LOG.isInfoEnabled()) {
713         LOG.info("getApplicationAttempt: Node " +
714             applicationAttemptsPath + " already exists!");
715       }
716     } catch (KeeperException e) {
717       throw new IllegalStateException(
718           "getSuperstep: KeeperException", e);
719     } catch (InterruptedException e) {
720       throw new IllegalStateException(
721           "getSuperstep: InterruptedException", e);
722     }
723 
724     List<String> superstepList;
725     try {
726       superstepList =
727           getZkExt().getChildrenExt(superstepPath, true, false, false);
728     } catch (KeeperException e) {
729       throw new IllegalStateException(
730           "getSuperstep: KeeperException", e);
731     } catch (InterruptedException e) {
732       throw new IllegalStateException(
733           "getSuperstep: InterruptedException", e);
734     }
735     if (superstepList.isEmpty()) {
736       cachedSuperstep = INPUT_SUPERSTEP;
737     } else {
738       cachedSuperstep =
739           Long.parseLong(Collections.max(superstepList));
740     }
741 
742     return cachedSuperstep;
743   }
744 
745   /**
746    * Increment the cached superstep.  Shouldn't be the initial value anymore.
747    */
748   public final void incrCachedSuperstep() {
749     if (cachedSuperstep == UNSET_SUPERSTEP) {
750       throw new IllegalStateException(
751           "incrSuperstep: Invalid unset cached superstep " +
752               UNSET_SUPERSTEP);
753     }
754     ++cachedSuperstep;
755   }
756 
757   /**
758    * Set the cached superstep (should only be used for loading checkpoints
759    * or recovering from failure).
760    *
761    * @param superstep will be used as the next superstep iteration
762    */
763   public final void setCachedSuperstep(long superstep) {
764     cachedSuperstep = superstep;
765   }
766 
767   /**
768    * Set the cached application attempt (should only be used for restart from
769    * failure by the master)
770    *
771    * @param applicationAttempt Will denote the new application attempt
772    */
773   public final void setApplicationAttempt(long applicationAttempt) {
774     cachedApplicationAttempt = applicationAttempt;
775     String superstepPath = getSuperstepPath(cachedApplicationAttempt);
776     try {
777       getZkExt().createExt(superstepPath,
778           null,
779           Ids.OPEN_ACL_UNSAFE,
780           CreateMode.PERSISTENT,
781           true);
782     } catch (KeeperException.NodeExistsException e) {
783       throw new IllegalArgumentException(
784           "setApplicationAttempt: Attempt already exists! - " +
785               superstepPath, e);
786     } catch (KeeperException e) {
787       throw new RuntimeException(
788           "setApplicationAttempt: KeeperException - " +
789               superstepPath, e);
790     } catch (InterruptedException e) {
791       throw new RuntimeException(
792           "setApplicationAttempt: InterruptedException - " +
793               superstepPath, e);
794     }
795   }
796 
797   /**
798    * Register a BspEvent.  Ensure that it will be signaled
799    * by catastrophic failure so that threads waiting on an event signal
800    * will be unblocked.
801    *
802    * @param event Event to be registered.
803    */
804   public void registerBspEvent(BspEvent event) {
805     registeredBspEvents.add(event);
806   }
807 
808   /**
809    * Subclasses can use this to instantiate their respective partitioners
810    *
811    * @return Instantiated graph partitioner factory
812    */
813   protected GraphPartitionerFactory<I, V, E> getGraphPartitionerFactory() {
814     return graphPartitionerFactory;
815   }
816 
817   /**
818    * Derived classes that want additional ZooKeeper events to take action
819    * should override this.
820    *
821    * @param event Event that occurred
822    * @return true if the event was processed here, false otherwise
823    */
824   protected boolean processEvent(WatchedEvent event) {
825     return false;
826   }
827 
828   @Override
829   public final void process(WatchedEvent event) {
830     // 1. Process all shared events
831     // 2. Process specific derived class events
832     if (LOG.isDebugEnabled()) {
833       LOG.debug("process: Got a new event, path = " + event.getPath() +
834           ", type = " + event.getType() + ", state = " +
835           event.getState());
836     }
837 
838     if ((event.getPath() == null) && (event.getType() == EventType.None)) {
839       if (event.getState() == KeeperState.Disconnected) {
840         // Watches may not be triggered for some time, so signal all BspEvents
841         for (BspEvent bspEvent : registeredBspEvents) {
842           bspEvent.signal();
843         }
844         LOG.warn("process: Disconnected from ZooKeeper (will automatically " +
845             "try to recover) " + event);
846       } else if (event.getState() == KeeperState.SyncConnected) {
847         if (LOG.isInfoEnabled()) {
848           LOG.info("process: Asynchronous connection complete.");
849         }
850         connectedEvent.signal();
851       } else {
852         LOG.warn("process: Got unknown null path event " + event);
853       }
854       return;
855     }
856 
857     boolean eventProcessed = false;
858     if (event.getPath().startsWith(masterJobStatePath)) {
859       // This will cause all becomeMaster() MasterThreads to notice the
860       // change in job state and quit trying to become the master.
861       masterElectionChildrenChanged.signal();
862       eventProcessed = true;
863     } else if ((event.getPath().contains(WORKER_HEALTHY_DIR) ||
864         event.getPath().contains(WORKER_UNHEALTHY_DIR)) &&
865         (event.getType() == EventType.NodeChildrenChanged)) {
866       if (LOG.isDebugEnabled()) {
867         LOG.debug("process: workerHealthRegistrationChanged " +
868             "(worker health reported - healthy/unhealthy )");
869       }
870       workerHealthRegistrationChanged.signal();
871       eventProcessed = true;
872     } else if (event.getPath().contains(INPUT_SPLITS_ALL_DONE_NODE) &&
873         event.getType() == EventType.NodeCreated) {
874       if (LOG.isInfoEnabled()) {
875         LOG.info("process: all input splits done");
876       }
877       inputSplitsAllDoneEvent.signal();
878       eventProcessed = true;
879     } else if (event.getPath().contains(INPUT_SPLITS_WORKER_DONE_DIR) &&
880         event.getType() == EventType.NodeChildrenChanged) {
881       if (LOG.isDebugEnabled()) {
882         LOG.debug("process: worker done reading input splits");
883       }
884       inputSplitsWorkerDoneEvent.signal();
885       eventProcessed = true;
886     } else if (event.getPath().contains(SUPERSTEP_FINISHED_NODE) &&
887         event.getType() == EventType.NodeCreated) {
888       if (LOG.isInfoEnabled()) {
889         LOG.info("process: superstepFinished signaled");
890       }
891       superstepFinished.signal();
892       eventProcessed = true;
893     } else if (event.getPath().endsWith(applicationAttemptsPath) &&
894         event.getType() == EventType.NodeChildrenChanged) {
895       if (LOG.isInfoEnabled()) {
896         LOG.info("process: applicationAttemptChanged signaled");
897       }
898       applicationAttemptChanged.signal();
899       eventProcessed = true;
900     } else if (event.getPath().contains(MASTER_ELECTION_DIR) &&
901         event.getType() == EventType.NodeChildrenChanged) {
902       if (LOG.isInfoEnabled()) {
903         LOG.info("process: masterElectionChildrenChanged signaled");
904       }
905       masterElectionChildrenChanged.signal();
906       eventProcessed = true;
907     } else if (event.getPath().equals(cleanedUpPath) &&
908         event.getType() == EventType.NodeChildrenChanged) {
909       if (LOG.isInfoEnabled()) {
910         LOG.info("process: cleanedUpChildrenChanged signaled");
911       }
912       cleanedUpChildrenChanged.signal();
913       eventProcessed = true;
914     }
915 
916     if (!(processEvent(event)) && (!eventProcessed)) {
917       LOG.warn("process: Unknown and unprocessed event (path=" +
918           event.getPath() + ", type=" + event.getType() +
919           ", state=" + event.getState() + ")");
920     }
921   }
922 
923   /**
924    * Get the last saved superstep.
925    *
926    * @return Last good superstep number
927    * @throws IOException
928    */
929   protected long getLastCheckpointedSuperstep() throws IOException {
930     return CheckpointingUtils.getLastCheckpointedSuperstep(getFs(),
931         savedCheckpointBasePath);
932   }
933 
934   @Override
935   public JobProgressTracker getJobProgressTracker() {
936     return getGraphTaskManager().getJobProgressTracker();
937   }
938 
939 
940   /**
941    * For every worker this method returns unique number
942    * between 0 and N, where N is the total number of workers.
943    * This number stays the same throughout the computation.
944    * TaskID may be different from this number and task ID
945    * is not necessarily continuous
946    * @param workerInfo worker info object
947    * @return worker number
948    */
949   protected int getWorkerId(WorkerInfo workerInfo) {
950     return getWorkerInfoList().indexOf(workerInfo);
951   }
952 
953   /**
954    * Returns worker info corresponding to specified worker id.
955    * @param id unique worker id
956    * @return WorkerInfo
957    */
958   protected WorkerInfo getWorkerInfoById(int id) {
959     return getWorkerInfoList().get(id);
960   }
961 }