View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.bsp;
20  
21  import org.apache.giraph.conf.GiraphConstants;
22  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23  import org.apache.giraph.graph.GraphTaskManager;
24  import org.apache.giraph.job.JobProgressTracker;
25  import org.apache.giraph.partition.GraphPartitionerFactory;
26  import org.apache.giraph.utils.CheckpointingUtils;
27  import org.apache.giraph.worker.WorkerInfo;
28  import org.apache.giraph.zk.BspEvent;
29  import org.apache.giraph.zk.PredicateLock;
30  import org.apache.giraph.zk.ZooKeeperExt;
31  import org.apache.giraph.zk.ZooKeeperManager;
32  import org.apache.hadoop.fs.FileSystem;
33  import org.apache.hadoop.io.Writable;
34  import org.apache.hadoop.io.WritableComparable;
35  import org.apache.hadoop.mapreduce.Mapper;
36  import org.apache.log4j.Logger;
37  import org.apache.zookeeper.CreateMode;
38  import org.apache.zookeeper.KeeperException;
39  import org.apache.zookeeper.WatchedEvent;
40  import org.apache.zookeeper.Watcher;
41  import org.apache.zookeeper.Watcher.Event.EventType;
42  import org.apache.zookeeper.Watcher.Event.KeeperState;
43  import org.apache.zookeeper.ZooDefs.Ids;
44  import org.json.JSONException;
45  import org.json.JSONObject;
46  
47  import java.io.IOException;
48  import java.net.UnknownHostException;
49  import java.nio.charset.Charset;
50  import java.util.ArrayList;
51  import java.util.Collections;
52  import java.util.List;
53  
54  import static org.apache.giraph.conf.GiraphConstants.RESTART_JOB_ID;
55  
56  /**
57   * Zookeeper-based implementation of {@link CentralizedService}.
58   *
59   * @param <I> Vertex id
60   * @param <V> Vertex data
61   * @param <E> Edge data
62   */
63  @SuppressWarnings("rawtypes")
64  public abstract class BspService<I extends WritableComparable,
65      V extends Writable, E extends Writable>
66      implements Watcher, CentralizedService<I, V, E> {
67    /** Unset superstep */
68    public static final long UNSET_SUPERSTEP = Long.MIN_VALUE;
69    /** Input superstep (superstep when loading the vertices happens) */
70    public static final long INPUT_SUPERSTEP = -1;
71    /** Unset application attempt */
72    public static final long UNSET_APPLICATION_ATTEMPT = Long.MIN_VALUE;
73    /** Base ZooKeeper directory */
74    public static final String BASE_DIR = "/_hadoopBsp";
75    /** Master job state znode above base dir */
76    public static final String MASTER_JOB_STATE_NODE = "/_masterJobState";
77  
78    /** Input splits worker done directory */
79    public static final String INPUT_SPLITS_WORKER_DONE_DIR =
80        "/_inputSplitsWorkerDoneDir";
81    /** Input splits all done node*/
82    public static final String INPUT_SPLITS_ALL_DONE_NODE =
83        "/_inputSplitsAllDone";
84  
85    /** Directory of attempts of this application */
86    public static final String APPLICATION_ATTEMPTS_DIR =
87        "/_applicationAttemptsDir";
88    /** Where the master election happens */
89    public static final String MASTER_ELECTION_DIR = "/_masterElectionDir";
90    /** Superstep scope */
91    public static final String SUPERSTEP_DIR = "/_superstepDir";
92    /** Healthy workers register here. */
93    public static final String WORKER_HEALTHY_DIR = "/_workerHealthyDir";
94    /** Unhealthy workers register here. */
95    public static final String WORKER_UNHEALTHY_DIR = "/_workerUnhealthyDir";
96    /** Workers which wrote checkpoint notify here */
97    public static final String WORKER_WROTE_CHECKPOINT_DIR =
98        "/_workerWroteCheckpointDir";
99    /** Finished workers notify here */
100   public static final String WORKER_FINISHED_DIR = "/_workerFinishedDir";
101   /** Helps coordinate the partition exchnages */
102   public static final String PARTITION_EXCHANGE_DIR =
103       "/_partitionExchangeDir";
104   /** Denotes that the superstep is done */
105   public static final String SUPERSTEP_FINISHED_NODE = "/_superstepFinished";
106   /** Denotes that computation should be halted */
107   public static final String HALT_COMPUTATION_NODE = "/_haltComputation";
108   /** Memory observer dir */
109   public static final String MEMORY_OBSERVER_DIR = "/_memoryObserver";
110   /** User sets this flag to checkpoint and stop the job */
111   public static final String FORCE_CHECKPOINT_USER_FLAG = "/_checkpointAndStop";
112   /** Denotes which workers have been cleaned up */
113   public static final String CLEANED_UP_DIR = "/_cleanedUpDir";
114   /** JSON message count key */
115   public static final String JSONOBJ_NUM_MESSAGES_KEY = "_numMsgsKey";
116   /** JSON message bytes count key */
117   public static final String JSONOBJ_NUM_MESSAGE_BYTES_KEY = "_numMsgBytesKey";
118   /** JSON metrics key */
119   public static final String JSONOBJ_METRICS_KEY = "_metricsKey";
120 
121   /** JSON state key */
122   public static final String JSONOBJ_STATE_KEY = "_stateKey";
123   /** JSON application attempt key */
124   public static final String JSONOBJ_APPLICATION_ATTEMPT_KEY =
125       "_applicationAttemptKey";
126   /** JSON superstep key */
127   public static final String JSONOBJ_SUPERSTEP_KEY =
128       "_superstepKey";
129   /** Suffix denotes a worker */
130   public static final String WORKER_SUFFIX = "_worker";
131   /** Suffix denotes a master */
132   public static final String MASTER_SUFFIX = "_master";
133 
134   /** Class logger */
135   private static final Logger LOG = Logger.getLogger(BspService.class);
136   /** Path to the job's root */
137   protected final String basePath;
138   /** Path to the job state determined by the master (informative only) */
139   protected final String masterJobStatePath;
140   /** Input splits worker done directory */
141   protected final String inputSplitsWorkerDonePath;
142   /** Input splits all done node */
143   protected final String inputSplitsAllDonePath;
144   /** Path to the application attempts) */
145   protected final String applicationAttemptsPath;
146   /** Path to the cleaned up notifications */
147   protected final String cleanedUpPath;
148   /** Path to the checkpoint's root (including job id) */
149   protected final String checkpointBasePath;
150   /** Old checkpoint in case we want to restart some job */
151   protected final String savedCheckpointBasePath;
152   /** Path to the master election path */
153   protected final String masterElectionPath;
154   /** If this path exists computation will be halted */
155   protected final String haltComputationPath;
156   /** Path where memory observer stores data */
157   protected final String memoryObserverPath;
158   /** Private ZooKeeper instance that implements the service */
159   private final ZooKeeperExt zk;
160   /** Has the Connection occurred? */
161   private final BspEvent connectedEvent;
162   /** Has worker registration changed (either healthy or unhealthy) */
163   private final BspEvent workerHealthRegistrationChanged;
164   /** Application attempt changed */
165   private final BspEvent applicationAttemptChanged;
166   /** Input splits worker done */
167   private final BspEvent inputSplitsWorkerDoneEvent;
168   /** Input splits all done */
169   private final BspEvent inputSplitsAllDoneEvent;
170   /** Superstep finished synchronization */
171   private final BspEvent superstepFinished;
172   /** Master election changed for any waited on attempt */
173   private final BspEvent masterElectionChildrenChanged;
174   /** Cleaned up directory children changed*/
175   private final BspEvent cleanedUpChildrenChanged;
176   /** Registered list of BspEvents */
177   private final List<BspEvent> registeredBspEvents =
178       new ArrayList<BspEvent>();
179   /** Immutable configuration of the job*/
180   private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
181   /** Job context (mainly for progress) */
182   private final Mapper<?, ?, ?, ?>.Context context;
183   /** Cached superstep (from ZooKeeper) */
184   private long cachedSuperstep = UNSET_SUPERSTEP;
185   /** Restarted from a checkpoint (manual or automatic) */
186   private long restartedSuperstep = UNSET_SUPERSTEP;
187   /** Cached application attempt (from ZooKeeper) */
188   private long cachedApplicationAttempt = UNSET_APPLICATION_ATTEMPT;
189   /** Job id, to ensure uniqueness */
190   private final String jobId;
191   /** Task partition, to ensure uniqueness */
192   private final int taskPartition;
193   /** My hostname */
194   private final String hostname;
195   /** Combination of hostname '_' partition (unique id) */
196   private final String hostnamePartitionId;
197   /** Graph partitioner */
198   private final GraphPartitionerFactory<I, V, E> graphPartitionerFactory;
199   /** Mapper that will do the graph computation */
200   private final GraphTaskManager<I, V, E> graphTaskManager;
201   /** File system */
202   private final FileSystem fs;
203 
204   /**
205    * Constructor.
206    *
207    * @param context Mapper context
208    * @param graphTaskManager GraphTaskManager for this compute node
209    */
210   public BspService(
211       Mapper<?, ?, ?, ?>.Context context,
212       GraphTaskManager<I, V, E> graphTaskManager) {
213     this.connectedEvent = new PredicateLock(context);
214     this.workerHealthRegistrationChanged = new PredicateLock(context);
215     this.applicationAttemptChanged = new PredicateLock(context);
216     this.inputSplitsWorkerDoneEvent = new PredicateLock(context);
217     this.inputSplitsAllDoneEvent = new PredicateLock(context);
218     this.superstepFinished = new PredicateLock(context);
219     this.masterElectionChildrenChanged = new PredicateLock(context);
220     this.cleanedUpChildrenChanged = new PredicateLock(context);
221 
222     registerBspEvent(connectedEvent);
223     registerBspEvent(workerHealthRegistrationChanged);
224     registerBspEvent(inputSplitsWorkerDoneEvent);
225     registerBspEvent(inputSplitsAllDoneEvent);
226     registerBspEvent(applicationAttemptChanged);
227     registerBspEvent(superstepFinished);
228     registerBspEvent(masterElectionChildrenChanged);
229     registerBspEvent(cleanedUpChildrenChanged);
230 
231     this.context = context;
232     this.graphTaskManager = graphTaskManager;
233     this.conf = graphTaskManager.getConf();
234     this.jobId = conf.getJobId();
235     this.taskPartition = conf.getTaskPartition();
236     this.restartedSuperstep = conf.getLong(
237         GiraphConstants.RESTART_SUPERSTEP, UNSET_SUPERSTEP);
238     try {
239       this.hostname = conf.getLocalHostname();
240     } catch (UnknownHostException e) {
241       throw new RuntimeException(e);
242     }
243     this.hostnamePartitionId = hostname + "_" + getTaskPartition();
244     this.graphPartitionerFactory = conf.createGraphPartitioner();
245 
246     basePath = ZooKeeperManager.getBasePath(conf) + BASE_DIR + "/" + jobId;
247     getContext().getCounter(GiraphConstants.ZOOKEEPER_BASE_PATH_COUNTER_GROUP,
248         basePath);
249     masterJobStatePath = basePath + MASTER_JOB_STATE_NODE;
250     inputSplitsWorkerDonePath = basePath + INPUT_SPLITS_WORKER_DONE_DIR;
251     inputSplitsAllDonePath = basePath + INPUT_SPLITS_ALL_DONE_NODE;
252     applicationAttemptsPath = basePath + APPLICATION_ATTEMPTS_DIR;
253     cleanedUpPath = basePath + CLEANED_UP_DIR;
254 
255     String restartJobId = RESTART_JOB_ID.get(conf);
256 
257     savedCheckpointBasePath =
258         CheckpointingUtils.getCheckpointBasePath(getConfiguration(),
259             restartJobId == null ? getJobId() : restartJobId);
260 
261     checkpointBasePath = CheckpointingUtils.
262         getCheckpointBasePath(getConfiguration(), getJobId());
263 
264     masterElectionPath = basePath + MASTER_ELECTION_DIR;
265     String serverPortList = graphTaskManager.getZookeeperList();
266     haltComputationPath = basePath + HALT_COMPUTATION_NODE;
267     memoryObserverPath = basePath + MEMORY_OBSERVER_DIR;
268     getContext().getCounter(GiraphConstants.ZOOKEEPER_HALT_NODE_COUNTER_GROUP,
269         haltComputationPath);
270     if (LOG.isInfoEnabled()) {
271       LOG.info("BspService: Path to create to halt is " + haltComputationPath);
272     }
273     if (LOG.isInfoEnabled()) {
274       LOG.info("BspService: Connecting to ZooKeeper with job " + jobId +
275           ", " + getTaskPartition() + " on " + serverPortList);
276     }
277     try {
278       this.zk = new ZooKeeperExt(serverPortList,
279                                  conf.getZooKeeperSessionTimeout(),
280                                  conf.getZookeeperOpsMaxAttempts(),
281                                  conf.getZookeeperOpsRetryWaitMsecs(),
282                                  this,
283                                  context);
284       connectedEvent.waitForever();
285       this.fs = FileSystem.get(getConfiguration());
286     } catch (IOException e) {
287       throw new RuntimeException(e);
288     }
289 
290     //Trying to restart from the latest superstep
291     if (restartJobId != null &&
292         restartedSuperstep == UNSET_SUPERSTEP) {
293       try {
294         restartedSuperstep = getLastCheckpointedSuperstep();
295       } catch (IOException e) {
296         throw new RuntimeException(e);
297       }
298     }
299     this.cachedSuperstep = restartedSuperstep;
300     if ((restartedSuperstep != UNSET_SUPERSTEP) &&
301         (restartedSuperstep < 0)) {
302       throw new IllegalArgumentException(
303           "BspService: Invalid superstep to restart - " +
304               restartedSuperstep);
305     }
306   }
307 
308   /**
309    * Get the superstep from a ZooKeeper path
310    *
311    * @param path Path to parse for the superstep
312    * @return Superstep from the path.
313    */
314   public static long getSuperstepFromPath(String path) {
315     int foundSuperstepStart = path.indexOf(SUPERSTEP_DIR);
316     if (foundSuperstepStart == -1) {
317       throw new IllegalArgumentException(
318           "getSuperstepFromPath: Cannot find " + SUPERSTEP_DIR +
319           "from " + path);
320     }
321     foundSuperstepStart += SUPERSTEP_DIR.length() + 1;
322     int endIndex = foundSuperstepStart +
323         path.substring(foundSuperstepStart).indexOf("/");
324     if (endIndex == -1) {
325       throw new IllegalArgumentException(
326           "getSuperstepFromPath: Cannot find end of superstep from " +
327               path);
328     }
329     if (LOG.isTraceEnabled()) {
330       LOG.trace("getSuperstepFromPath: Got path=" + path +
331           ", start=" + foundSuperstepStart + ", end=" + endIndex);
332     }
333     return Long.parseLong(path.substring(foundSuperstepStart, endIndex));
334   }
335 
336   /**
337    * Get the hostname and id from a "healthy" worker path
338    *
339    * @param path Path to check
340    * @return Hostname and id from path
341    */
342   public static String getHealthyHostnameIdFromPath(String path) {
343     int foundWorkerHealthyStart = path.indexOf(WORKER_HEALTHY_DIR);
344     if (foundWorkerHealthyStart == -1) {
345       throw new IllegalArgumentException(
346           "getHealthyHostnameidFromPath: Couldn't find " +
347               WORKER_HEALTHY_DIR + " from " + path);
348     }
349     foundWorkerHealthyStart += WORKER_HEALTHY_DIR.length();
350     return path.substring(foundWorkerHealthyStart);
351   }
352 
353   /**
354    * Generate the base superstep directory path for a given application
355    * attempt
356    *
357    * @param attempt application attempt number
358    * @return directory path based on the an attempt
359    */
360   public final String getSuperstepPath(long attempt) {
361     return applicationAttemptsPath + "/" + attempt + SUPERSTEP_DIR;
362   }
363 
364   /**
365    * Generate the worker information "healthy" directory path for a
366    * superstep
367    *
368    * @param attempt application attempt number
369    * @param superstep superstep to use
370    * @return directory path based on the a superstep
371    */
372   public final String getWorkerInfoHealthyPath(long attempt,
373       long superstep) {
374     return applicationAttemptsPath + "/" + attempt +
375         SUPERSTEP_DIR + "/" + superstep + WORKER_HEALTHY_DIR;
376   }
377 
378   /**
379    * Generate the worker information "unhealthy" directory path for a
380    * superstep
381    *
382    * @param attempt application attempt number
383    * @param superstep superstep to use
384    * @return directory path based on the a superstep
385    */
386   public final String getWorkerInfoUnhealthyPath(long attempt,
387       long superstep) {
388     return applicationAttemptsPath + "/" + attempt +
389         SUPERSTEP_DIR + "/" + superstep + WORKER_UNHEALTHY_DIR;
390   }
391 
392   /**
393    * Generate the worker "wrote checkpoint" directory path for a
394    * superstep
395    *
396    * @param attempt application attempt number
397    * @param superstep superstep to use
398    * @return directory path based on the a superstep
399    */
400   public final String getWorkerWroteCheckpointPath(long attempt,
401       long superstep) {
402     return applicationAttemptsPath + "/" + attempt +
403         SUPERSTEP_DIR + "/" + superstep + WORKER_WROTE_CHECKPOINT_DIR;
404   }
405 
406   /**
407    * Generate the worker "finished" directory path for a
408    * superstep
409    *
410    * @param attempt application attempt number
411    * @param superstep superstep to use
412    * @return directory path based on the a superstep
413    */
414   public final String getWorkerFinishedPath(long attempt, long superstep) {
415     return applicationAttemptsPath + "/" + attempt +
416         SUPERSTEP_DIR + "/" + superstep + WORKER_FINISHED_DIR;
417   }
418 
419   /**
420    * Generate the "partition exchange" directory path for a superstep
421    *
422    * @param attempt application attempt number
423    * @param superstep superstep to use
424    * @return directory path based on the a superstep
425    */
426   public final String getPartitionExchangePath(long attempt,
427       long superstep) {
428     return applicationAttemptsPath + "/" + attempt +
429         SUPERSTEP_DIR + "/" + superstep + PARTITION_EXCHANGE_DIR;
430   }
431 
432   /**
433    * Based on the superstep, worker info, and attempt, get the appropriate
434    * worker path for the exchange.
435    *
436    * @param attempt Application attempt
437    * @param superstep Superstep
438    * @param workerInfo Worker info of the exchange.
439    * @return Path of the desired worker
440    */
441   public final String getPartitionExchangeWorkerPath(long attempt,
442       long superstep,
443       WorkerInfo workerInfo) {
444     return getPartitionExchangePath(attempt, superstep) +
445         "/" + workerInfo.getHostnameId();
446   }
447 
448   /**
449    * Generate the "superstep finished" directory path for a superstep
450    *
451    * @param attempt application attempt number
452    * @param superstep superstep to use
453    * @return directory path based on the a superstep
454    */
455   public final String getSuperstepFinishedPath(long attempt, long superstep) {
456     return applicationAttemptsPath + "/" + attempt +
457         SUPERSTEP_DIR + "/" + superstep + SUPERSTEP_FINISHED_NODE;
458   }
459 
460   /**
461    * Generate the base superstep directory path for a given application
462    * attempt
463    *
464    * @param superstep Superstep to use
465    * @return Directory path based on the a superstep
466    */
467   public final String getCheckpointBasePath(long superstep) {
468     return checkpointBasePath + "/" + superstep;
469   }
470 
471   /**
472    * In case when we restart another job this will give us a path
473    * to saved checkpoint.
474    * @param superstep superstep to use
475    * @return Direcory path for restarted job based on the superstep
476    */
477   public final String getSavedCheckpointBasePath(long superstep) {
478     return savedCheckpointBasePath + "/" + superstep;
479   }
480 
481 
482   /**
483    * Get the ZooKeeperExt instance.
484    *
485    * @return ZooKeeperExt instance.
486    */
487   public final ZooKeeperExt getZkExt() {
488     return zk;
489   }
490 
491   @Override
492   public final long getRestartedSuperstep() {
493     return restartedSuperstep;
494   }
495 
496   /**
497    * Set the restarted superstep
498    *
499    * @param superstep Set the manually restarted superstep
500    */
501   public final void setRestartedSuperstep(long superstep) {
502     if (superstep < INPUT_SUPERSTEP) {
503       throw new IllegalArgumentException(
504           "setRestartedSuperstep: Bad argument " + superstep);
505     }
506     restartedSuperstep = superstep;
507   }
508 
509   /**
510    * Get the file system
511    *
512    * @return file system
513    */
514   public final FileSystem getFs() {
515     return fs;
516   }
517 
518   public final ImmutableClassesGiraphConfiguration<I, V, E>
519   getConfiguration() {
520     return conf;
521   }
522 
523   public final Mapper<?, ?, ?, ?>.Context getContext() {
524     return context;
525   }
526 
527   public final String getHostname() {
528     return hostname;
529   }
530 
531   public final String getHostnamePartitionId() {
532     return hostnamePartitionId;
533   }
534 
535   public final int getTaskPartition() {
536     return taskPartition;
537   }
538 
539   public final GraphTaskManager<I, V, E> getGraphTaskManager() {
540     return graphTaskManager;
541   }
542 
543   public final BspEvent getWorkerHealthRegistrationChangedEvent() {
544     return workerHealthRegistrationChanged;
545   }
546 
547   public final BspEvent getApplicationAttemptChangedEvent() {
548     return applicationAttemptChanged;
549   }
550 
551   public final BspEvent getInputSplitsWorkerDoneEvent() {
552     return inputSplitsWorkerDoneEvent;
553   }
554 
555   public final BspEvent getInputSplitsAllDoneEvent() {
556     return inputSplitsAllDoneEvent;
557   }
558 
559   public final BspEvent getSuperstepFinishedEvent() {
560     return superstepFinished;
561   }
562 
563 
564   public final BspEvent getMasterElectionChildrenChangedEvent() {
565     return masterElectionChildrenChanged;
566   }
567 
568   public final BspEvent getCleanedUpChildrenChangedEvent() {
569     return cleanedUpChildrenChanged;
570   }
571 
572   /**
573    * Get the master commanded job state as a JSONObject.  Also sets the
574    * watches to see if the master commanded job state changes.
575    *
576    * @return Last job state or null if none
577    */
578   public final JSONObject getJobState() {
579     try {
580       getZkExt().createExt(masterJobStatePath,
581           null,
582           Ids.OPEN_ACL_UNSAFE,
583           CreateMode.PERSISTENT,
584           true);
585     } catch (KeeperException.NodeExistsException e) {
586       LOG.info("getJobState: Job state already exists (" +
587           masterJobStatePath + ")");
588     } catch (KeeperException e) {
589       throw new IllegalStateException("Failed to create job state path " +
590           "due to KeeperException", e);
591     } catch (InterruptedException e) {
592       throw new IllegalStateException("Failed to create job state path " +
593           "due to InterruptedException", e);
594     }
595     String jobState = null;
596     try {
597       List<String> childList =
598           getZkExt().getChildrenExt(
599               masterJobStatePath, true, true, true);
600       if (childList.isEmpty()) {
601         return null;
602       }
603       jobState =
604           new String(getZkExt().getData(childList.get(childList.size() - 1),
605               true, null), Charset.defaultCharset());
606     } catch (KeeperException.NoNodeException e) {
607       LOG.info("getJobState: Job state path is empty! - " +
608           masterJobStatePath);
609     } catch (KeeperException e) {
610       throw new IllegalStateException("Failed to get job state path " +
611           "children due to KeeperException", e);
612     } catch (InterruptedException e) {
613       throw new IllegalStateException("Failed to get job state path " +
614           "children due to InterruptedException", e);
615     }
616     try {
617       return new JSONObject(jobState);
618     } catch (JSONException e) {
619       throw new RuntimeException(
620           "getJobState: Failed to parse job state " + jobState);
621     }
622   }
623 
624   /**
625    * Get the job id
626    *
627    * @return job id
628    */
629   public final String getJobId() {
630     return jobId;
631   }
632 
633   /**
634    * Get the latest application attempt and cache it.
635    *
636    * @return the latest application attempt
637    */
638   public final long getApplicationAttempt() {
639     if (cachedApplicationAttempt != UNSET_APPLICATION_ATTEMPT) {
640       return cachedApplicationAttempt;
641     }
642     try {
643       getZkExt().createExt(applicationAttemptsPath,
644           null,
645           Ids.OPEN_ACL_UNSAFE,
646           CreateMode.PERSISTENT,
647           true);
648     } catch (KeeperException.NodeExistsException e) {
649       LOG.info("getApplicationAttempt: Node " +
650           applicationAttemptsPath + " already exists!");
651     } catch (KeeperException e) {
652       throw new IllegalStateException("Couldn't create application " +
653           "attempts path due to KeeperException", e);
654     } catch (InterruptedException e) {
655       throw new IllegalStateException("Couldn't create application " +
656           "attempts path due to InterruptedException", e);
657     }
658     try {
659       List<String> attemptList =
660           getZkExt().getChildrenExt(
661               applicationAttemptsPath, true, false, false);
662       if (attemptList.isEmpty()) {
663         cachedApplicationAttempt = 0;
664       } else {
665         cachedApplicationAttempt =
666             Long.parseLong(Collections.max(attemptList));
667       }
668     } catch (KeeperException e) {
669       throw new IllegalStateException("Couldn't get application " +
670           "attempts to KeeperException", e);
671     } catch (InterruptedException e) {
672       throw new IllegalStateException("Couldn't get application " +
673           "attempts to InterruptedException", e);
674     }
675 
676     return cachedApplicationAttempt;
677   }
678 
679   /**
680    * Get the latest superstep and cache it.
681    *
682    * @return the latest superstep
683    */
684   public final long getSuperstep() {
685     if (cachedSuperstep != UNSET_SUPERSTEP) {
686       return cachedSuperstep;
687     }
688     String superstepPath = getSuperstepPath(getApplicationAttempt());
689     try {
690       getZkExt().createExt(superstepPath,
691           null,
692           Ids.OPEN_ACL_UNSAFE,
693           CreateMode.PERSISTENT,
694           true);
695     } catch (KeeperException.NodeExistsException e) {
696       if (LOG.isInfoEnabled()) {
697         LOG.info("getApplicationAttempt: Node " +
698             applicationAttemptsPath + " already exists!");
699       }
700     } catch (KeeperException e) {
701       throw new IllegalStateException(
702           "getSuperstep: KeeperException", e);
703     } catch (InterruptedException e) {
704       throw new IllegalStateException(
705           "getSuperstep: InterruptedException", e);
706     }
707 
708     List<String> superstepList;
709     try {
710       superstepList =
711           getZkExt().getChildrenExt(superstepPath, true, false, false);
712     } catch (KeeperException e) {
713       throw new IllegalStateException(
714           "getSuperstep: KeeperException", e);
715     } catch (InterruptedException e) {
716       throw new IllegalStateException(
717           "getSuperstep: InterruptedException", e);
718     }
719     if (superstepList.isEmpty()) {
720       cachedSuperstep = INPUT_SUPERSTEP;
721     } else {
722       cachedSuperstep =
723           Long.parseLong(Collections.max(superstepList));
724     }
725 
726     return cachedSuperstep;
727   }
728 
729   /**
730    * Increment the cached superstep.  Shouldn't be the initial value anymore.
731    */
732   public final void incrCachedSuperstep() {
733     if (cachedSuperstep == UNSET_SUPERSTEP) {
734       throw new IllegalStateException(
735           "incrSuperstep: Invalid unset cached superstep " +
736               UNSET_SUPERSTEP);
737     }
738     ++cachedSuperstep;
739   }
740 
741   /**
742    * Set the cached superstep (should only be used for loading checkpoints
743    * or recovering from failure).
744    *
745    * @param superstep will be used as the next superstep iteration
746    */
747   public final void setCachedSuperstep(long superstep) {
748     cachedSuperstep = superstep;
749   }
750 
751   /**
752    * Set the cached application attempt (should only be used for restart from
753    * failure by the master)
754    *
755    * @param applicationAttempt Will denote the new application attempt
756    */
757   public final void setApplicationAttempt(long applicationAttempt) {
758     cachedApplicationAttempt = applicationAttempt;
759     String superstepPath = getSuperstepPath(cachedApplicationAttempt);
760     try {
761       getZkExt().createExt(superstepPath,
762           null,
763           Ids.OPEN_ACL_UNSAFE,
764           CreateMode.PERSISTENT,
765           true);
766     } catch (KeeperException.NodeExistsException e) {
767       throw new IllegalArgumentException(
768           "setApplicationAttempt: Attempt already exists! - " +
769               superstepPath, e);
770     } catch (KeeperException e) {
771       throw new RuntimeException(
772           "setApplicationAttempt: KeeperException - " +
773               superstepPath, e);
774     } catch (InterruptedException e) {
775       throw new RuntimeException(
776           "setApplicationAttempt: InterruptedException - " +
777               superstepPath, e);
778     }
779   }
780 
781   /**
782    * Register a BspEvent.  Ensure that it will be signaled
783    * by catastrophic failure so that threads waiting on an event signal
784    * will be unblocked.
785    *
786    * @param event Event to be registered.
787    */
788   public void registerBspEvent(BspEvent event) {
789     registeredBspEvents.add(event);
790   }
791 
792   /**
793    * Subclasses can use this to instantiate their respective partitioners
794    *
795    * @return Instantiated graph partitioner factory
796    */
797   protected GraphPartitionerFactory<I, V, E> getGraphPartitionerFactory() {
798     return graphPartitionerFactory;
799   }
800 
801   /**
802    * Derived classes that want additional ZooKeeper events to take action
803    * should override this.
804    *
805    * @param event Event that occurred
806    * @return true if the event was processed here, false otherwise
807    */
808   protected boolean processEvent(WatchedEvent event) {
809     return false;
810   }
811 
812   @Override
813   public final void process(WatchedEvent event) {
814     // 1. Process all shared events
815     // 2. Process specific derived class events
816     if (LOG.isDebugEnabled()) {
817       LOG.debug("process: Got a new event, path = " + event.getPath() +
818           ", type = " + event.getType() + ", state = " +
819           event.getState());
820     }
821 
822     if ((event.getPath() == null) && (event.getType() == EventType.None)) {
823       if (event.getState() == KeeperState.Disconnected) {
824         // Watches may not be triggered for some time, so signal all BspEvents
825         for (BspEvent bspEvent : registeredBspEvents) {
826           bspEvent.signal();
827         }
828         LOG.warn("process: Disconnected from ZooKeeper (will automatically " +
829             "try to recover) " + event);
830       } else if (event.getState() == KeeperState.SyncConnected) {
831         if (LOG.isInfoEnabled()) {
832           LOG.info("process: Asynchronous connection complete.");
833         }
834         connectedEvent.signal();
835       } else {
836         LOG.warn("process: Got unknown null path event " + event);
837       }
838       return;
839     }
840 
841     boolean eventProcessed = false;
842     if (event.getPath().startsWith(masterJobStatePath)) {
843       // This will cause all becomeMaster() MasterThreads to notice the
844       // change in job state and quit trying to become the master.
845       masterElectionChildrenChanged.signal();
846       eventProcessed = true;
847     } else if ((event.getPath().contains(WORKER_HEALTHY_DIR) ||
848         event.getPath().contains(WORKER_UNHEALTHY_DIR)) &&
849         (event.getType() == EventType.NodeChildrenChanged)) {
850       if (LOG.isDebugEnabled()) {
851         LOG.debug("process: workerHealthRegistrationChanged " +
852             "(worker health reported - healthy/unhealthy )");
853       }
854       workerHealthRegistrationChanged.signal();
855       eventProcessed = true;
856     } else if (event.getPath().contains(INPUT_SPLITS_ALL_DONE_NODE) &&
857         event.getType() == EventType.NodeCreated) {
858       if (LOG.isInfoEnabled()) {
859         LOG.info("process: all input splits done");
860       }
861       inputSplitsAllDoneEvent.signal();
862       eventProcessed = true;
863     } else if (event.getPath().contains(INPUT_SPLITS_WORKER_DONE_DIR) &&
864         event.getType() == EventType.NodeChildrenChanged) {
865       if (LOG.isDebugEnabled()) {
866         LOG.debug("process: worker done reading input splits");
867       }
868       inputSplitsWorkerDoneEvent.signal();
869       eventProcessed = true;
870     } else if (event.getPath().contains(SUPERSTEP_FINISHED_NODE) &&
871         event.getType() == EventType.NodeCreated) {
872       if (LOG.isInfoEnabled()) {
873         LOG.info("process: superstepFinished signaled");
874       }
875       superstepFinished.signal();
876       eventProcessed = true;
877     } else if (event.getPath().endsWith(applicationAttemptsPath) &&
878         event.getType() == EventType.NodeChildrenChanged) {
879       if (LOG.isInfoEnabled()) {
880         LOG.info("process: applicationAttemptChanged signaled");
881       }
882       applicationAttemptChanged.signal();
883       eventProcessed = true;
884     } else if (event.getPath().contains(MASTER_ELECTION_DIR) &&
885         event.getType() == EventType.NodeChildrenChanged) {
886       if (LOG.isInfoEnabled()) {
887         LOG.info("process: masterElectionChildrenChanged signaled");
888       }
889       masterElectionChildrenChanged.signal();
890       eventProcessed = true;
891     } else if (event.getPath().equals(cleanedUpPath) &&
892         event.getType() == EventType.NodeChildrenChanged) {
893       if (LOG.isInfoEnabled()) {
894         LOG.info("process: cleanedUpChildrenChanged signaled");
895       }
896       cleanedUpChildrenChanged.signal();
897       eventProcessed = true;
898     }
899 
900     if (!(processEvent(event)) && (!eventProcessed)) {
901       LOG.warn("process: Unknown and unprocessed event (path=" +
902           event.getPath() + ", type=" + event.getType() +
903           ", state=" + event.getState() + ")");
904     }
905   }
906 
907   /**
908    * Get the last saved superstep.
909    *
910    * @return Last good superstep number
911    * @throws IOException
912    */
913   protected long getLastCheckpointedSuperstep() throws IOException {
914     return CheckpointingUtils.getLastCheckpointedSuperstep(getFs(),
915         savedCheckpointBasePath);
916   }
917 
918   @Override
919   public JobProgressTracker getJobProgressTracker() {
920     return getGraphTaskManager().getJobProgressTracker();
921   }
922 
923 
924   /**
925    * For every worker this method returns unique number
926    * between 0 and N, where N is the total number of workers.
927    * This number stays the same throughout the computation.
928    * TaskID may be different from this number and task ID
929    * is not necessarily continuous
930    * @param workerInfo worker info object
931    * @return worker number
932    */
933   protected int getWorkerId(WorkerInfo workerInfo) {
934     return getWorkerInfoList().indexOf(workerInfo);
935   }
936 
937   /**
938    * Returns worker info corresponding to specified worker id.
939    * @param id unique worker id
940    * @return WorkerInfo
941    */
942   protected WorkerInfo getWorkerInfoById(int id) {
943     return getWorkerInfoList().get(id);
944   }
945 }