This project has retired. For details please refer to its Attic page.
BspService xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.bsp;
20  
21  import org.apache.giraph.conf.GiraphConstants;
22  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23  import org.apache.giraph.graph.GraphTaskManager;
24  import org.apache.giraph.job.JobProgressTracker;
25  import org.apache.giraph.partition.GraphPartitionerFactory;
26  import org.apache.giraph.utils.CheckpointingUtils;
27  import org.apache.giraph.worker.WorkerInfo;
28  import org.apache.giraph.writable.kryo.GiraphClassResolver;
29  import org.apache.giraph.zk.BspEvent;
30  import org.apache.giraph.zk.PredicateLock;
31  import org.apache.giraph.zk.ZooKeeperExt;
32  import org.apache.giraph.zk.ZooKeeperManager;
33  import org.apache.hadoop.fs.FileSystem;
34  import org.apache.hadoop.io.Writable;
35  import org.apache.hadoop.io.WritableComparable;
36  import org.apache.hadoop.mapreduce.Mapper;
37  import org.apache.log4j.Logger;
38  import org.apache.zookeeper.CreateMode;
39  import org.apache.zookeeper.KeeperException;
40  import org.apache.zookeeper.WatchedEvent;
41  import org.apache.zookeeper.Watcher;
42  import org.apache.zookeeper.Watcher.Event.EventType;
43  import org.apache.zookeeper.Watcher.Event.KeeperState;
44  import org.apache.zookeeper.ZooDefs.Ids;
45  import org.json.JSONException;
46  import org.json.JSONObject;
47  
48  import java.io.IOException;
49  import java.net.UnknownHostException;
50  import java.nio.charset.Charset;
51  import java.util.ArrayList;
52  import java.util.Collections;
53  import java.util.List;
54  
55  import static org.apache.giraph.conf.GiraphConstants.RESTART_JOB_ID;
56  
57  /**
58   * Zookeeper-based implementation of {@link CentralizedService}.
59   *
60   * @param <I> Vertex id
61   * @param <V> Vertex data
62   * @param <E> Edge data
63   */
64  @SuppressWarnings("rawtypes")
65  public abstract class BspService<I extends WritableComparable,
66      V extends Writable, E extends Writable>
67      implements Watcher, CentralizedService<I, V, E> {
68    /** Unset superstep */
69    public static final long UNSET_SUPERSTEP = Long.MIN_VALUE;
70    /** Input superstep (superstep when loading the vertices happens) */
71    public static final long INPUT_SUPERSTEP = -1;
72    /** Unset application attempt */
73    public static final long UNSET_APPLICATION_ATTEMPT = Long.MIN_VALUE;
74    /** Base ZooKeeper directory */
75    public static final String BASE_DIR = "/_hadoopBsp";
76    /** Master job state znode above base dir */
77    public static final String MASTER_JOB_STATE_NODE = "/_masterJobState";
78  
79    /** Input splits worker done directory */
80    public static final String INPUT_SPLITS_WORKER_DONE_DIR =
81        "/_inputSplitsWorkerDoneDir";
82    /** Input splits all done node*/
83    public static final String INPUT_SPLITS_ALL_DONE_NODE =
84        "/_inputSplitsAllDone";
85    /** Directory to store kryo className-ID assignment */
86    public static final String KRYO_REGISTERED_CLASS_DIR =
87            "/_kryo";
88    /** Directory of attempts of this application */
89    public static final String APPLICATION_ATTEMPTS_DIR =
90        "/_applicationAttemptsDir";
91    /** Where the master election happens */
92    public static final String MASTER_ELECTION_DIR = "/_masterElectionDir";
93    /** Superstep scope */
94    public static final String SUPERSTEP_DIR = "/_superstepDir";
95    /** Counter sub directory */
96    public static final String COUNTERS_DIR = "/_counters";
97    /** Metrics sub directory */
98    public static final String METRICS_DIR = "/_metrics";
99    /** Healthy workers register here. */
100   public static final String WORKER_HEALTHY_DIR = "/_workerHealthyDir";
101   /** Unhealthy workers register here. */
102   public static final String WORKER_UNHEALTHY_DIR = "/_workerUnhealthyDir";
103   /** Workers which wrote checkpoint notify here */
104   public static final String WORKER_WROTE_CHECKPOINT_DIR =
105       "/_workerWroteCheckpointDir";
106   /** Finished workers notify here */
107   public static final String WORKER_FINISHED_DIR = "/_workerFinishedDir";
108   /** Helps coordinate the partition exchnages */
109   public static final String PARTITION_EXCHANGE_DIR =
110       "/_partitionExchangeDir";
111   /** Denotes that the superstep is done */
112   public static final String SUPERSTEP_FINISHED_NODE = "/_superstepFinished";
113   /** Denotes that computation should be halted */
114   public static final String HALT_COMPUTATION_NODE = "/_haltComputation";
115   /** Memory observer dir */
116   public static final String MEMORY_OBSERVER_DIR = "/_memoryObserver";
117   /** User sets this flag to checkpoint and stop the job */
118   public static final String FORCE_CHECKPOINT_USER_FLAG = "/_checkpointAndStop";
119   /** Denotes which workers have been cleaned up */
120   public static final String CLEANED_UP_DIR = "/_cleanedUpDir";
121   /** JSON message count key */
122   public static final String JSONOBJ_NUM_MESSAGES_KEY = "_numMsgsKey";
123   /** JSON message bytes count key */
124   public static final String JSONOBJ_NUM_MESSAGE_BYTES_KEY = "_numMsgBytesKey";
125   /** JSON metrics key */
126   public static final String JSONOBJ_METRICS_KEY = "_metricsKey";
127 
128   /** JSON state key */
129   public static final String JSONOBJ_STATE_KEY = "_stateKey";
130   /** JSON application attempt key */
131   public static final String JSONOBJ_APPLICATION_ATTEMPT_KEY =
132       "_applicationAttemptKey";
133   /** JSON superstep key */
134   public static final String JSONOBJ_SUPERSTEP_KEY =
135       "_superstepKey";
136   /** Suffix denotes a worker */
137   public static final String WORKER_SUFFIX = "_worker";
138   /** Suffix denotes a master */
139   public static final String MASTER_SUFFIX = "_master";
140 
141   /** Class logger */
142   private static final Logger LOG = Logger.getLogger(BspService.class);
143   /** Path to the job's root */
144   protected final String basePath;
145   /** Path to the job state determined by the master (informative only) */
146   protected final String masterJobStatePath;
147   /** Input splits worker done directory */
148   protected final String inputSplitsWorkerDonePath;
149   /** Input splits all done node */
150   protected final String inputSplitsAllDonePath;
151   /** Path to the application attempts) */
152   protected final String applicationAttemptsPath;
153   /** Path to the cleaned up notifications */
154   protected final String cleanedUpPath;
155   /** Path to the checkpoint's root (including job id) */
156   protected final String checkpointBasePath;
157   /** Old checkpoint in case we want to restart some job */
158   protected final String savedCheckpointBasePath;
159   /** Path to the master election path */
160   protected final String masterElectionPath;
161   /** If this path exists computation will be halted */
162   protected final String haltComputationPath;
163   /** Path where memory observer stores data */
164   protected final String memoryObserverPath;
165   /** Kryo className-ID mapping directory */
166   protected final String kryoRegisteredClassPath;
167   /** Private ZooKeeper instance that implements the service */
168   private final ZooKeeperExt zk;
169   /** Has the Connection occurred? */
170   private final BspEvent connectedEvent;
171   /** Has worker registration changed (either healthy or unhealthy) */
172   private final BspEvent workerHealthRegistrationChanged;
173   /** Application attempt changed */
174   private final BspEvent applicationAttemptChanged;
175   /** Input splits worker done */
176   private final BspEvent inputSplitsWorkerDoneEvent;
177   /** Input splits all done */
178   private final BspEvent inputSplitsAllDoneEvent;
179   /** Superstep finished synchronization */
180   private final BspEvent superstepFinished;
181   /** Master election changed for any waited on attempt */
182   private final BspEvent masterElectionChildrenChanged;
183   /** Cleaned up directory children changed*/
184   private final BspEvent cleanedUpChildrenChanged;
185   /** Event to synchronize when workers have written their counters to the
186    * zookeeper*/
187   private final BspEvent writtenCountersToZK;
188   /** Registered list of BspEvents */
189   private final List<BspEvent> registeredBspEvents =
190       new ArrayList<BspEvent>();
191   /** Immutable configuration of the job*/
192   private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
193   /** Job context (mainly for progress) */
194   private final Mapper<?, ?, ?, ?>.Context context;
195   /** Cached superstep (from ZooKeeper) */
196   private long cachedSuperstep = UNSET_SUPERSTEP;
197   /** Restarted from a checkpoint (manual or automatic) */
198   private long restartedSuperstep = UNSET_SUPERSTEP;
199   /** Cached application attempt (from ZooKeeper) */
200   private long cachedApplicationAttempt = UNSET_APPLICATION_ATTEMPT;
201   /** Job id, to ensure uniqueness */
202   private final String jobId;
203   /** Task id, from partition and application attempt to ensure uniqueness */
204   private final int taskId;
205   /** My hostname */
206   private final String hostname;
207   /** Combination of hostname '_' task (unique id) */
208   private final String hostnameTaskId;
209   /** Graph partitioner */
210   private final GraphPartitionerFactory<I, V, E> graphPartitionerFactory;
211   /** Mapper that will do the graph computation */
212   private final GraphTaskManager<I, V, E> graphTaskManager;
213   /** File system */
214   private final FileSystem fs;
215 
216   /**
217    * Constructor.
218    *
219    * @param context Mapper context
220    * @param graphTaskManager GraphTaskManager for this compute node
221    */
222   public BspService(
223       Mapper<?, ?, ?, ?>.Context context,
224       GraphTaskManager<I, V, E> graphTaskManager) {
225     this.connectedEvent = new PredicateLock(context);
226     this.workerHealthRegistrationChanged = new PredicateLock(context);
227     this.applicationAttemptChanged = new PredicateLock(context);
228     this.inputSplitsWorkerDoneEvent = new PredicateLock(context);
229     this.inputSplitsAllDoneEvent = new PredicateLock(context);
230     this.superstepFinished = new PredicateLock(context);
231     this.masterElectionChildrenChanged = new PredicateLock(context);
232     this.cleanedUpChildrenChanged = new PredicateLock(context);
233     this.writtenCountersToZK = new PredicateLock(context);
234 
235     registerBspEvent(connectedEvent);
236     registerBspEvent(workerHealthRegistrationChanged);
237     registerBspEvent(inputSplitsWorkerDoneEvent);
238     registerBspEvent(inputSplitsAllDoneEvent);
239     registerBspEvent(applicationAttemptChanged);
240     registerBspEvent(superstepFinished);
241     registerBspEvent(masterElectionChildrenChanged);
242     registerBspEvent(cleanedUpChildrenChanged);
243     registerBspEvent(writtenCountersToZK);
244 
245     this.context = context;
246     this.graphTaskManager = graphTaskManager;
247     this.conf = graphTaskManager.getConf();
248 
249     this.jobId = conf.getJobId();
250     this.restartedSuperstep = conf.getLong(
251         GiraphConstants.RESTART_SUPERSTEP, UNSET_SUPERSTEP);
252     try {
253       this.hostname = conf.getLocalHostname();
254     } catch (UnknownHostException e) {
255       throw new RuntimeException(e);
256     }
257     this.graphPartitionerFactory = conf.createGraphPartitioner();
258 
259     basePath = ZooKeeperManager.getBasePath(conf) + BASE_DIR + "/" + jobId;
260     if (LOG.isInfoEnabled()) {
261       LOG.info(String.format("%s: %s",
262               GiraphConstants.ZOOKEEPER_BASE_PATH_COUNTER_GROUP, basePath));
263     }
264     masterJobStatePath = basePath + MASTER_JOB_STATE_NODE;
265     inputSplitsWorkerDonePath = basePath + INPUT_SPLITS_WORKER_DONE_DIR;
266     inputSplitsAllDonePath = basePath + INPUT_SPLITS_ALL_DONE_NODE;
267     applicationAttemptsPath = basePath + APPLICATION_ATTEMPTS_DIR;
268     cleanedUpPath = basePath + CLEANED_UP_DIR;
269     kryoRegisteredClassPath = basePath + KRYO_REGISTERED_CLASS_DIR;
270 
271 
272     String restartJobId = RESTART_JOB_ID.get(conf);
273 
274     savedCheckpointBasePath =
275         CheckpointingUtils.getCheckpointBasePath(getConfiguration(),
276             restartJobId == null ? getJobId() : restartJobId);
277 
278     checkpointBasePath = CheckpointingUtils.
279         getCheckpointBasePath(getConfiguration(), getJobId());
280 
281     masterElectionPath = basePath + MASTER_ELECTION_DIR;
282     String serverPortList = graphTaskManager.getZookeeperList();
283     haltComputationPath = basePath + HALT_COMPUTATION_NODE;
284     memoryObserverPath = basePath + MEMORY_OBSERVER_DIR;
285     getContext().getCounter(GiraphConstants.ZOOKEEPER_HALT_NODE_COUNTER_GROUP,
286         haltComputationPath);
287     if (LOG.isInfoEnabled()) {
288       LOG.info("BspService: Path to create to halt is " + haltComputationPath);
289     }
290     if (LOG.isInfoEnabled()) {
291       LOG.info("BspService: Connecting to ZooKeeper with job " + jobId +
292           ", partition " + conf.getTaskPartition() + " on " + serverPortList);
293     }
294     try {
295       this.zk = new ZooKeeperExt(serverPortList,
296                                  conf.getZooKeeperSessionTimeout(),
297                                  conf.getZookeeperOpsMaxAttempts(),
298                                  conf.getZookeeperOpsRetryWaitMsecs(),
299                                  this,
300                                  context);
301       connectedEvent.waitForTimeoutOrFail(
302           GiraphConstants.WAIT_ZOOKEEPER_TIMEOUT_MSEC.get(conf));
303       this.fs = FileSystem.get(getConfiguration());
304     } catch (IOException e) {
305       throw new RuntimeException(e);
306     }
307 
308     boolean disableGiraphResolver =
309             GiraphConstants.DISABLE_GIRAPH_CLASS_RESOLVER.get(conf);
310     if (!disableGiraphResolver) {
311       GiraphClassResolver.setZookeeperInfo(zk, kryoRegisteredClassPath);
312     }
313     this.taskId = (int) getApplicationAttempt() * conf.getMaxWorkers() +
314             conf.getTaskPartition();
315     this.hostnameTaskId = hostname + "_" + getTaskId();
316 
317     //Trying to restart from the latest superstep
318     if (restartJobId != null &&
319         restartedSuperstep == UNSET_SUPERSTEP) {
320       try {
321         restartedSuperstep = getLastCheckpointedSuperstep();
322       } catch (IOException e) {
323         throw new RuntimeException(e);
324       }
325     }
326     this.cachedSuperstep = restartedSuperstep;
327     if ((restartedSuperstep != UNSET_SUPERSTEP) &&
328         (restartedSuperstep < 0)) {
329       throw new IllegalArgumentException(
330           "BspService: Invalid superstep to restart - " +
331               restartedSuperstep);
332     }
333   }
334 
335   /**
336    * Get the superstep from a ZooKeeper path
337    *
338    * @param path Path to parse for the superstep
339    * @return Superstep from the path.
340    */
341   public static long getSuperstepFromPath(String path) {
342     int foundSuperstepStart = path.indexOf(SUPERSTEP_DIR);
343     if (foundSuperstepStart == -1) {
344       throw new IllegalArgumentException(
345           "getSuperstepFromPath: Cannot find " + SUPERSTEP_DIR +
346           "from " + path);
347     }
348     foundSuperstepStart += SUPERSTEP_DIR.length() + 1;
349     int endIndex = foundSuperstepStart +
350         path.substring(foundSuperstepStart).indexOf("/");
351     if (endIndex == -1) {
352       throw new IllegalArgumentException(
353           "getSuperstepFromPath: Cannot find end of superstep from " +
354               path);
355     }
356     if (LOG.isTraceEnabled()) {
357       LOG.trace("getSuperstepFromPath: Got path=" + path +
358           ", start=" + foundSuperstepStart + ", end=" + endIndex);
359     }
360     return Long.parseLong(path.substring(foundSuperstepStart, endIndex));
361   }
362 
363   /**
364    * Get the hostname and id from a "healthy" worker path
365    *
366    * @param path Path to check
367    * @return Hostname and id from path
368    */
369   public static String getHealthyHostnameIdFromPath(String path) {
370     int foundWorkerHealthyStart = path.indexOf(WORKER_HEALTHY_DIR);
371     if (foundWorkerHealthyStart == -1) {
372       throw new IllegalArgumentException(
373           "getHealthyHostnameidFromPath: Couldn't find " +
374               WORKER_HEALTHY_DIR + " from " + path);
375     }
376     foundWorkerHealthyStart += WORKER_HEALTHY_DIR.length();
377     return path.substring(foundWorkerHealthyStart);
378   }
379 
380   /**
381    * Generate the base superstep directory path for a given application
382    * attempt
383    *
384    * @param attempt application attempt number
385    * @return directory path based on the an attempt
386    */
387   public final String getSuperstepPath(long attempt) {
388     return applicationAttemptsPath + "/" + attempt + SUPERSTEP_DIR;
389   }
390 
391   /**
392    * Generate the worker information "healthy" directory path for a
393    * superstep
394    *
395    * @param attempt application attempt number
396    * @param superstep superstep to use
397    * @return directory path based on the a superstep
398    */
399   public final String getWorkerInfoHealthyPath(long attempt,
400       long superstep) {
401     return applicationAttemptsPath + "/" + attempt +
402         SUPERSTEP_DIR + "/" + superstep + WORKER_HEALTHY_DIR;
403   }
404 
405   /**
406    * Generate the worker information "unhealthy" directory path for a
407    * superstep
408    *
409    * @param attempt application attempt number
410    * @param superstep superstep to use
411    * @return directory path based on the a superstep
412    */
413   public final String getWorkerInfoUnhealthyPath(long attempt,
414       long superstep) {
415     return applicationAttemptsPath + "/" + attempt +
416         SUPERSTEP_DIR + "/" + superstep + WORKER_UNHEALTHY_DIR;
417   }
418 
419   /**
420    * Generate the worker "wrote checkpoint" directory path for a
421    * superstep
422    *
423    * @param attempt application attempt number
424    * @param superstep superstep to use
425    * @return directory path based on the a superstep
426    */
427   public final String getWorkerWroteCheckpointPath(long attempt,
428       long superstep) {
429     return applicationAttemptsPath + "/" + attempt +
430         SUPERSTEP_DIR + "/" + superstep + WORKER_WROTE_CHECKPOINT_DIR;
431   }
432 
433   /**
434    * Generate the worker "finished" directory path for a
435    * superstep, for storing the superstep-related metrics
436    *
437    * @param attempt application attempt number
438    * @param superstep superstep to use
439    * @return directory path based on the a superstep
440    */
441   public final String getWorkerMetricsFinishedPath(
442           long attempt, long superstep) {
443     return applicationAttemptsPath + "/" + attempt +
444             SUPERSTEP_DIR + "/" + superstep + WORKER_FINISHED_DIR + METRICS_DIR;
445   }
446 
447   /**
448    * Generate the worker "finished" directory path for a
449    * superstep, for storing the superstep-related counters
450    *
451    * @param attempt application attempt number
452    * @param superstep superstep to use
453    * @return directory path based on the a superstep
454    */
455   public final String getWorkerCountersFinishedPath(
456           long attempt, long superstep) {
457     return applicationAttemptsPath + "/" + attempt +
458             SUPERSTEP_DIR + "/" + superstep +
459             WORKER_FINISHED_DIR + COUNTERS_DIR;
460   }
461 
462   /**
463    * Generate the "partition exchange" directory path for a superstep
464    *
465    * @param attempt application attempt number
466    * @param superstep superstep to use
467    * @return directory path based on the a superstep
468    */
469   public final String getPartitionExchangePath(long attempt,
470       long superstep) {
471     return applicationAttemptsPath + "/" + attempt +
472         SUPERSTEP_DIR + "/" + superstep + PARTITION_EXCHANGE_DIR;
473   }
474 
475   /**
476    * Based on the superstep, worker info, and attempt, get the appropriate
477    * worker path for the exchange.
478    *
479    * @param attempt Application attempt
480    * @param superstep Superstep
481    * @param workerInfo Worker info of the exchange.
482    * @return Path of the desired worker
483    */
484   public final String getPartitionExchangeWorkerPath(long attempt,
485       long superstep,
486       WorkerInfo workerInfo) {
487     return getPartitionExchangePath(attempt, superstep) +
488         "/" + workerInfo.getHostnameId();
489   }
490 
491   /**
492    * Generate the "superstep finished" directory path for a superstep
493    *
494    * @param attempt application attempt number
495    * @param superstep superstep to use
496    * @return directory path based on the a superstep
497    */
498   public final String getSuperstepFinishedPath(long attempt, long superstep) {
499     return applicationAttemptsPath + "/" + attempt +
500         SUPERSTEP_DIR + "/" + superstep + SUPERSTEP_FINISHED_NODE;
501   }
502 
503   /**
504    * Generate the base superstep directory path for a given application
505    * attempt
506    *
507    * @param superstep Superstep to use
508    * @return Directory path based on the a superstep
509    */
510   public final String getCheckpointBasePath(long superstep) {
511     return checkpointBasePath + "/" + superstep;
512   }
513 
514   /**
515    * In case when we restart another job this will give us a path
516    * to saved checkpoint.
517    * @param superstep superstep to use
518    * @return Direcory path for restarted job based on the superstep
519    */
520   public final String getSavedCheckpointBasePath(long superstep) {
521     return savedCheckpointBasePath + "/" + superstep;
522   }
523 
524 
525   /**
526    * Get the ZooKeeperExt instance.
527    *
528    * @return ZooKeeperExt instance.
529    */
530   public final ZooKeeperExt getZkExt() {
531     return zk;
532   }
533 
534   @Override
535   public final long getRestartedSuperstep() {
536     return restartedSuperstep;
537   }
538 
539   /**
540    * Set the restarted superstep
541    *
542    * @param superstep Set the manually restarted superstep
543    */
544   public final void setRestartedSuperstep(long superstep) {
545     if (superstep < INPUT_SUPERSTEP) {
546       throw new IllegalArgumentException(
547           "setRestartedSuperstep: Bad argument " + superstep);
548     }
549     restartedSuperstep = superstep;
550   }
551 
552   /**
553    * Get the file system
554    *
555    * @return file system
556    */
557   public final FileSystem getFs() {
558     return fs;
559   }
560 
561   public final ImmutableClassesGiraphConfiguration<I, V, E>
562   getConfiguration() {
563     return conf;
564   }
565 
566   public final Mapper<?, ?, ?, ?>.Context getContext() {
567     return context;
568   }
569 
570   public final String getHostname() {
571     return hostname;
572   }
573 
574   public final String getHostnameTaskId() {
575     return hostnameTaskId;
576   }
577 
578   public final int getTaskId() {
579     return taskId;
580   }
581 
582   public final GraphTaskManager<I, V, E> getGraphTaskManager() {
583     return graphTaskManager;
584   }
585 
586   public final BspEvent getWorkerHealthRegistrationChangedEvent() {
587     return workerHealthRegistrationChanged;
588   }
589 
590   public final BspEvent getApplicationAttemptChangedEvent() {
591     return applicationAttemptChanged;
592   }
593 
594   public final BspEvent getInputSplitsWorkerDoneEvent() {
595     return inputSplitsWorkerDoneEvent;
596   }
597 
598   public final BspEvent getInputSplitsAllDoneEvent() {
599     return inputSplitsAllDoneEvent;
600   }
601 
602   public final BspEvent getSuperstepFinishedEvent() {
603     return superstepFinished;
604   }
605 
606 
607   public final BspEvent getMasterElectionChildrenChangedEvent() {
608     return masterElectionChildrenChanged;
609   }
610 
611   public final BspEvent getCleanedUpChildrenChangedEvent() {
612     return cleanedUpChildrenChanged;
613   }
614 
615   public final BspEvent getWrittenCountersToZKEvent() {
616     return writtenCountersToZK;
617   }
618 
619   /**
620    * Get the master commanded job state as a JSONObject.  Also sets the
621    * watches to see if the master commanded job state changes.
622    *
623    * @return Last job state or null if none
624    */
625   public final JSONObject getJobState() {
626     try {
627       getZkExt().createExt(masterJobStatePath,
628           null,
629           Ids.OPEN_ACL_UNSAFE,
630           CreateMode.PERSISTENT,
631           true);
632     } catch (KeeperException.NodeExistsException e) {
633       LOG.info("getJobState: Job state already exists (" +
634           masterJobStatePath + ")");
635     } catch (KeeperException e) {
636       throw new IllegalStateException("Failed to create job state path " +
637           "due to KeeperException", e);
638     } catch (InterruptedException e) {
639       throw new IllegalStateException("Failed to create job state path " +
640           "due to InterruptedException", e);
641     }
642     String jobState = null;
643     try {
644       List<String> childList =
645           getZkExt().getChildrenExt(
646               masterJobStatePath, true, true, true);
647       if (childList.isEmpty()) {
648         return null;
649       }
650       jobState =
651           new String(getZkExt().getData(childList.get(childList.size() - 1),
652               true, null), Charset.defaultCharset());
653     } catch (KeeperException.NoNodeException e) {
654       LOG.info("getJobState: Job state path is empty! - " +
655           masterJobStatePath);
656     } catch (KeeperException e) {
657       throw new IllegalStateException("Failed to get job state path " +
658           "children due to KeeperException", e);
659     } catch (InterruptedException e) {
660       throw new IllegalStateException("Failed to get job state path " +
661           "children due to InterruptedException", e);
662     }
663     try {
664       return new JSONObject(jobState);
665     } catch (JSONException e) {
666       throw new RuntimeException(
667           "getJobState: Failed to parse job state " + jobState);
668     }
669   }
670 
671   /**
672    * Get the job id
673    *
674    * @return job id
675    */
676   public final String getJobId() {
677     return jobId;
678   }
679 
680   /**
681    * Get the latest application attempt and cache it.
682    *
683    * @return the latest application attempt
684    */
685   public final long getApplicationAttempt() {
686     if (cachedApplicationAttempt != UNSET_APPLICATION_ATTEMPT) {
687       return cachedApplicationAttempt;
688     }
689     try {
690       getZkExt().createExt(applicationAttemptsPath,
691           null,
692           Ids.OPEN_ACL_UNSAFE,
693           CreateMode.PERSISTENT,
694           true);
695     } catch (KeeperException.NodeExistsException e) {
696       LOG.info("getApplicationAttempt: Node " +
697           applicationAttemptsPath + " already exists!");
698     } catch (KeeperException e) {
699       throw new IllegalStateException("Couldn't create application " +
700           "attempts path due to KeeperException", e);
701     } catch (InterruptedException e) {
702       throw new IllegalStateException("Couldn't create application " +
703           "attempts path due to InterruptedException", e);
704     }
705     try {
706       List<String> attemptList =
707           getZkExt().getChildrenExt(
708               applicationAttemptsPath, true, false, false);
709       if (attemptList.isEmpty()) {
710         cachedApplicationAttempt = 0;
711       } else {
712         cachedApplicationAttempt =
713             Long.parseLong(Collections.max(attemptList));
714       }
715     } catch (KeeperException e) {
716       throw new IllegalStateException("Couldn't get application " +
717           "attempts to KeeperException", e);
718     } catch (InterruptedException e) {
719       throw new IllegalStateException("Couldn't get application " +
720           "attempts to InterruptedException", e);
721     }
722 
723     return cachedApplicationAttempt;
724   }
725 
726   /**
727    * Get the latest superstep and cache it.
728    *
729    * @return the latest superstep
730    */
731   public final long getSuperstep() {
732     if (cachedSuperstep != UNSET_SUPERSTEP) {
733       return cachedSuperstep;
734     }
735     String superstepPath = getSuperstepPath(getApplicationAttempt());
736     try {
737       getZkExt().createExt(superstepPath,
738           null,
739           Ids.OPEN_ACL_UNSAFE,
740           CreateMode.PERSISTENT,
741           true);
742     } catch (KeeperException.NodeExistsException e) {
743       if (LOG.isInfoEnabled()) {
744         LOG.info("getApplicationAttempt: Node " +
745             applicationAttemptsPath + " already exists!");
746       }
747     } catch (KeeperException e) {
748       throw new IllegalStateException(
749           "getSuperstep: KeeperException", e);
750     } catch (InterruptedException e) {
751       throw new IllegalStateException(
752           "getSuperstep: InterruptedException", e);
753     }
754 
755     List<String> superstepList;
756     try {
757       superstepList =
758           getZkExt().getChildrenExt(superstepPath, true, false, false);
759     } catch (KeeperException e) {
760       throw new IllegalStateException(
761           "getSuperstep: KeeperException", e);
762     } catch (InterruptedException e) {
763       throw new IllegalStateException(
764           "getSuperstep: InterruptedException", e);
765     }
766     if (superstepList.isEmpty()) {
767       cachedSuperstep = INPUT_SUPERSTEP;
768     } else {
769       cachedSuperstep =
770           Long.parseLong(Collections.max(superstepList));
771     }
772 
773     return cachedSuperstep;
774   }
775 
776   /**
777    * Increment the cached superstep.  Shouldn't be the initial value anymore.
778    */
779   public final void incrCachedSuperstep() {
780     if (cachedSuperstep == UNSET_SUPERSTEP) {
781       throw new IllegalStateException(
782           "incrSuperstep: Invalid unset cached superstep " +
783               UNSET_SUPERSTEP);
784     }
785     ++cachedSuperstep;
786   }
787 
788   /**
789    * Set the cached superstep (should only be used for loading checkpoints
790    * or recovering from failure).
791    *
792    * @param superstep will be used as the next superstep iteration
793    */
794   public final void setCachedSuperstep(long superstep) {
795     cachedSuperstep = superstep;
796   }
797 
798   /**
799    * Set the cached application attempt (should only be used for restart from
800    * failure by the master)
801    *
802    * @param applicationAttempt Will denote the new application attempt
803    */
804   public final void setApplicationAttempt(long applicationAttempt) {
805     cachedApplicationAttempt = applicationAttempt;
806     String superstepPath = getSuperstepPath(cachedApplicationAttempt);
807     try {
808       getZkExt().createExt(superstepPath,
809           null,
810           Ids.OPEN_ACL_UNSAFE,
811           CreateMode.PERSISTENT,
812           true);
813     } catch (KeeperException.NodeExistsException e) {
814       throw new IllegalArgumentException(
815           "setApplicationAttempt: Attempt already exists! - " +
816               superstepPath, e);
817     } catch (KeeperException e) {
818       throw new RuntimeException(
819           "setApplicationAttempt: KeeperException - " +
820               superstepPath, e);
821     } catch (InterruptedException e) {
822       throw new RuntimeException(
823           "setApplicationAttempt: InterruptedException - " +
824               superstepPath, e);
825     }
826   }
827 
828   /**
829    * Register a BspEvent.  Ensure that it will be signaled
830    * by catastrophic failure so that threads waiting on an event signal
831    * will be unblocked.
832    *
833    * @param event Event to be registered.
834    */
835   public void registerBspEvent(BspEvent event) {
836     registeredBspEvents.add(event);
837   }
838 
839   /**
840    * Subclasses can use this to instantiate their respective partitioners
841    *
842    * @return Instantiated graph partitioner factory
843    */
844   protected GraphPartitionerFactory<I, V, E> getGraphPartitionerFactory() {
845     return graphPartitionerFactory;
846   }
847 
848   /**
849    * Derived classes that want additional ZooKeeper events to take action
850    * should override this.
851    *
852    * @param event Event that occurred
853    * @return true if the event was processed here, false otherwise
854    */
855   protected boolean processEvent(WatchedEvent event) {
856     return false;
857   }
858 
859   @Override
860   public final void process(WatchedEvent event) {
861     // 1. Process all shared events
862     // 2. Process specific derived class events
863     if (LOG.isDebugEnabled()) {
864       LOG.debug("process: Got a new event, path = " + event.getPath() +
865           ", type = " + event.getType() + ", state = " +
866           event.getState());
867     }
868 
869     if ((event.getPath() == null) && (event.getType() == EventType.None)) {
870       if (event.getState() == KeeperState.Disconnected) {
871         // Watches may not be triggered for some time, so signal all BspEvents
872         for (BspEvent bspEvent : registeredBspEvents) {
873           bspEvent.signal();
874         }
875         LOG.warn("process: Disconnected from ZooKeeper (will automatically " +
876             "try to recover) " + event);
877       } else if (event.getState() == KeeperState.SyncConnected) {
878         if (LOG.isInfoEnabled()) {
879           LOG.info("process: Asynchronous connection complete.");
880         }
881         connectedEvent.signal();
882       } else {
883         LOG.warn("process: Got unknown null path event " + event);
884       }
885       return;
886     }
887 
888     boolean eventProcessed = false;
889     if (event.getPath().startsWith(masterJobStatePath)) {
890       // This will cause all becomeMaster() MasterThreads to notice the
891       // change in job state and quit trying to become the master.
892       masterElectionChildrenChanged.signal();
893       eventProcessed = true;
894     } else if ((event.getPath().contains(WORKER_HEALTHY_DIR) ||
895         event.getPath().contains(WORKER_UNHEALTHY_DIR)) &&
896         (event.getType() == EventType.NodeChildrenChanged)) {
897       if (LOG.isDebugEnabled()) {
898         LOG.debug("process: workerHealthRegistrationChanged " +
899             "(worker health reported - healthy/unhealthy )");
900       }
901       workerHealthRegistrationChanged.signal();
902       eventProcessed = true;
903     } else if (event.getPath().contains(INPUT_SPLITS_ALL_DONE_NODE) &&
904         event.getType() == EventType.NodeCreated) {
905       if (LOG.isInfoEnabled()) {
906         LOG.info("process: all input splits done");
907       }
908       inputSplitsAllDoneEvent.signal();
909       eventProcessed = true;
910     } else if (event.getPath().contains(INPUT_SPLITS_WORKER_DONE_DIR) &&
911         event.getType() == EventType.NodeChildrenChanged) {
912       if (LOG.isDebugEnabled()) {
913         LOG.debug("process: worker done reading input splits");
914       }
915       inputSplitsWorkerDoneEvent.signal();
916       eventProcessed = true;
917     } else if (event.getPath().contains(SUPERSTEP_FINISHED_NODE) &&
918         event.getType() == EventType.NodeCreated) {
919       if (LOG.isInfoEnabled()) {
920         LOG.info("process: superstepFinished signaled");
921       }
922       superstepFinished.signal();
923       eventProcessed = true;
924     } else if (event.getPath().endsWith(applicationAttemptsPath) &&
925         event.getType() == EventType.NodeChildrenChanged) {
926       if (LOG.isInfoEnabled()) {
927         LOG.info("process: applicationAttemptChanged signaled");
928       }
929       applicationAttemptChanged.signal();
930       eventProcessed = true;
931     } else if (event.getPath().contains(MASTER_ELECTION_DIR) &&
932         event.getType() == EventType.NodeChildrenChanged) {
933       if (LOG.isInfoEnabled()) {
934         LOG.info("process: masterElectionChildrenChanged signaled");
935       }
936       masterElectionChildrenChanged.signal();
937       eventProcessed = true;
938     } else if (event.getPath().equals(cleanedUpPath) &&
939         event.getType() == EventType.NodeChildrenChanged) {
940       if (LOG.isInfoEnabled()) {
941         LOG.info("process: cleanedUpChildrenChanged signaled");
942       }
943       cleanedUpChildrenChanged.signal();
944       eventProcessed = true;
945     } else if (event.getPath().endsWith(COUNTERS_DIR) &&
946             event.getType() == EventType.NodeChildrenChanged) {
947       LOG.info("process: writtenCountersToZK signaled");
948       getWrittenCountersToZKEvent().signal();
949       eventProcessed = true;
950     }
951 
952     if (!(processEvent(event)) && (!eventProcessed)) {
953       LOG.warn("process: Unknown and unprocessed event (path=" +
954           event.getPath() + ", type=" + event.getType() +
955           ", state=" + event.getState() + ")");
956     }
957   }
958 
959   /**
960    * Get the last saved superstep.
961    *
962    * @return Last good superstep number
963    * @throws IOException
964    */
965   protected long getLastCheckpointedSuperstep() throws IOException {
966     return CheckpointingUtils.getLastCheckpointedSuperstep(getFs(),
967         savedCheckpointBasePath);
968   }
969 
970   @Override
971   public JobProgressTracker getJobProgressTracker() {
972     return getGraphTaskManager().getJobProgressTracker();
973   }
974 
975 
976   /**
977    * For every worker this method returns unique number
978    * between 0 and N, where N is the total number of workers.
979    * This number stays the same throughout the computation.
980    * TaskID may be different from this number and task ID
981    * is not necessarily continuous
982    * @param workerInfo worker info object
983    * @return worker number
984    */
985   protected int getWorkerId(WorkerInfo workerInfo) {
986     return getWorkerInfoList().indexOf(workerInfo);
987   }
988 
989   /**
990    * Returns worker info corresponding to specified worker id.
991    * @param id unique worker id
992    * @return WorkerInfo
993    */
994   protected WorkerInfo getWorkerInfoById(int id) {
995     return getWorkerInfoList().get(id);
996   }
997 }