View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.graph;
20  
21  import java.io.IOException;
22  import java.lang.management.GarbageCollectorMXBean;
23  import java.lang.management.ManagementFactory;
24  import java.net.URL;
25  import java.net.URLDecoder;
26  import java.util.ArrayList;
27  import java.util.Collection;
28  import java.util.Enumeration;
29  import java.util.List;
30  import java.util.concurrent.Callable;
31  import java.util.concurrent.ExecutionException;
32  import java.util.concurrent.TimeUnit;
33  
34  import com.sun.management.GarbageCollectionNotificationInfo;
35  import com.yammer.metrics.core.Counter;
36  
37  import org.apache.commons.lang3.exception.ExceptionUtils;
38  import org.apache.giraph.bsp.BspService;
39  import org.apache.giraph.bsp.CentralizedServiceMaster;
40  import org.apache.giraph.bsp.CentralizedServiceWorker;
41  import org.apache.giraph.bsp.checkpoints.CheckpointStatus;
42  import org.apache.giraph.comm.messages.MessageStore;
43  import org.apache.giraph.conf.ClassConfOption;
44  import org.apache.giraph.conf.GiraphConstants;
45  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
46  import org.apache.giraph.job.JobProgressTracker;
47  import org.apache.giraph.master.BspServiceMaster;
48  import org.apache.giraph.master.MasterThread;
49  import org.apache.giraph.metrics.GiraphMetrics;
50  import org.apache.giraph.metrics.GiraphMetricsRegistry;
51  import org.apache.giraph.metrics.GiraphTimer;
52  import org.apache.giraph.metrics.GiraphTimerContext;
53  import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
54  import org.apache.giraph.metrics.SuperstepMetricsRegistry;
55  import org.apache.giraph.ooc.OutOfCoreEngine;
56  import org.apache.giraph.partition.PartitionOwner;
57  import org.apache.giraph.partition.PartitionStats;
58  import org.apache.giraph.partition.PartitionStore;
59  import org.apache.giraph.scripting.ScriptLoader;
60  import org.apache.giraph.utils.CallableFactory;
61  import org.apache.giraph.utils.GcObserver;
62  import org.apache.giraph.utils.MemoryUtils;
63  import org.apache.giraph.utils.ProgressableUtils;
64  import org.apache.giraph.worker.BspServiceWorker;
65  import org.apache.giraph.worker.InputSplitsCallable;
66  import org.apache.giraph.worker.WorkerContext;
67  import org.apache.giraph.worker.WorkerObserver;
68  import org.apache.giraph.worker.WorkerProgress;
69  import org.apache.giraph.writable.kryo.KryoWritableWrapper;
70  import org.apache.giraph.zk.ZooKeeperManager;
71  import org.apache.hadoop.conf.Configuration;
72  import org.apache.hadoop.fs.Path;
73  import org.apache.hadoop.io.Writable;
74  import org.apache.hadoop.io.WritableComparable;
75  import org.apache.hadoop.mapreduce.Mapper;
76  import org.apache.log4j.Appender;
77  import org.apache.log4j.Level;
78  import org.apache.log4j.LogManager;
79  import org.apache.log4j.Logger;
80  import org.apache.log4j.PatternLayout;
81  
82  import javax.management.Notification;
83  import javax.management.NotificationEmitter;
84  import javax.management.NotificationListener;
85  import javax.management.openmbean.CompositeData;
86  
87  /**
88   * The Giraph-specific business logic for a single BSP
89   * compute node in whatever underlying type of cluster
90   * our Giraph job will run on. Owning object will provide
91   * the glue into the underlying cluster framework
92   * and will call this object to perform Giraph work.
93   *
94   * @param <I> Vertex id
95   * @param <V> Vertex data
96   * @param <E> Edge data
97   */
98  @SuppressWarnings("rawtypes")
99  public class GraphTaskManager<I extends WritableComparable, V extends Writable,
100   E extends Writable> implements
101   ResetSuperstepMetricsObserver {
102 /*if_not[PURE_YARN]
103   static { // Eliminate this? Even MRv1 tasks should not need it here.
104     Configuration.addDefaultResource("giraph-site.xml");
105   }
106 end[PURE_YARN]*/
107   /**
108    * Class which checks if an exception on some thread should cause worker
109    * to fail
110    */
111   public static final ClassConfOption<CheckerIfWorkerShouldFailAfterException>
112   CHECKER_IF_WORKER_SHOULD_FAIL_AFTER_EXCEPTION_CLASS = ClassConfOption.create(
113       "giraph.checkerIfWorkerShouldFailAfterExceptionClass",
114       FailWithEveryExceptionOccurred.class,
115       CheckerIfWorkerShouldFailAfterException.class,
116       "Class which checks if an exception on some thread should cause worker " +
117           "to fail, by default all exceptions cause failure");
118   /** Name of metric for superstep time in msec */
119   public static final String TIMER_SUPERSTEP_TIME = "superstep-time-ms";
120   /** Name of metric for compute on all vertices in msec */
121   public static final String TIMER_COMPUTE_ALL = "compute-all-ms";
122   /** Name of metric for time from begin compute to first message sent */
123   public static final String TIMER_TIME_TO_FIRST_MSG =
124       "time-to-first-message-ms";
125   /** Name of metric for time from first message till last message flushed */
126   public static final String TIMER_COMMUNICATION_TIME = "communication-time-ms";
127   /** Name of metric for time spent doing GC per superstep in msec */
128   public static final String TIMER_SUPERSTEP_GC_TIME = "superstep-gc-time-ms";
129 
130   /** Class logger */
131   private static final Logger LOG = Logger.getLogger(GraphTaskManager.class);
132   /** Coordination service worker */
133   private CentralizedServiceWorker<I, V, E> serviceWorker;
134   /** Coordination service master */
135   private CentralizedServiceMaster<I, V, E> serviceMaster;
136   /** Coordination service master thread */
137   private Thread masterThread = null;
138   /** The worker should be run exactly once, or else there is a problem. */
139   private boolean alreadyRun = false;
140   /** Manages the ZooKeeper servers if necessary (dynamic startup) */
141   private ZooKeeperManager zkManager;
142   /** Configuration */
143   private ImmutableClassesGiraphConfiguration<I, V, E> conf;
144   /** Already complete? */
145   private boolean done = false;
146   /** What kind of functions is this mapper doing? */
147   private GraphFunctions graphFunctions = GraphFunctions.UNKNOWN;
148   /** Superstep stats */
149   private FinishedSuperstepStats finishedSuperstepStats =
150       new FinishedSuperstepStats(0, false, 0, 0, false, CheckpointStatus.NONE);
151   /** Job progress tracker */
152   private JobProgressTrackerClient jobProgressTracker;
153 
154   // Per-Job Metrics
155   /** Timer for WorkerContext#preApplication() */
156   private GiraphTimer wcPreAppTimer;
157   /** Timer for WorkerContext#postApplication() */
158   private GiraphTimer wcPostAppTimer;
159 
160   // Per-Superstep Metrics
161   /** Time for how long superstep took */
162   private GiraphTimer superstepTimer;
163   /** Time for all compute() calls in a superstep */
164   private GiraphTimer computeAll;
165   /** Time from starting compute to sending first message */
166   private GiraphTimer timeToFirstMessage;
167   /** Context for timing time to first message above */
168   private GiraphTimerContext timeToFirstMessageTimerContext;
169   /** Time from first sent message till last message flushed. */
170   private GiraphTimer communicationTimer;
171   /** Context for timing communication time above */
172   private GiraphTimerContext communicationTimerContext;
173   /** Timer for WorkerContext#preSuperstep() */
174   private GiraphTimer wcPreSuperstepTimer;
175   /** Timer to keep aggregated time spent in GC in a superstep */
176   private Counter gcTimeMetric;
177   /** The Hadoop Mapper#Context for this job */
178   private final Mapper<?, ?, ?, ?>.Context context;
179   /** is this GraphTaskManager the master? */
180   private boolean isMaster;
181   /** Mapper observers */
182   private MapperObserver[] mapperObservers;
183 
184   /**
185    * Default constructor for GiraphTaskManager.
186    * @param context a handle to the underlying cluster framework.
187    *                For Hadoop clusters, this is a Mapper#Context.
188    */
189   public GraphTaskManager(Mapper<?, ?, ?, ?>.Context context) {
190     this.context = context;
191     this.isMaster = false;
192   }
193 
194   /**
195    * Run the user's input checking code.
196    */
197   private void checkInput() {
198     if (conf.hasEdgeInputFormat()) {
199       conf.createWrappedEdgeInputFormat().checkInputSpecs(conf);
200     }
201     if (conf.hasVertexInputFormat()) {
202       conf.createWrappedVertexInputFormat().checkInputSpecs(conf);
203     }
204   }
205 
206   /**
207    * In order for job client to know which ZooKeeper the job is using,
208    * we create a counter with server:port as its name inside of
209    * ZOOKEEPER_SERVER_PORT_COUNTER_GROUP.
210    *
211    * @param serverPortList Server:port list for ZooKeeper used
212    */
213   private void createZooKeeperCounter(String serverPortList) {
214     // Getting the counter will actually create it.
215     context.getCounter(GiraphConstants.ZOOKEEPER_SERVER_PORT_COUNTER_GROUP,
216         serverPortList);
217   }
218 
219   /**
220    * Called by owner of this GraphTaskManager on each compute node
221    *
222    * @param zkPathList the path to the ZK jars we need to run the job
223    */
224   public void setup(Path[] zkPathList)
225     throws IOException, InterruptedException {
226     context.setStatus("setup: Beginning worker setup.");
227     Configuration hadoopConf = context.getConfiguration();
228     conf = new ImmutableClassesGiraphConfiguration<I, V, E>(hadoopConf);
229     initializeJobProgressTracker();
230     // Setting the default handler for uncaught exceptions.
231     Thread.setDefaultUncaughtExceptionHandler(createUncaughtExceptionHandler());
232     setupMapperObservers();
233     // Write user's graph types (I,V,E,M) back to configuration parameters so
234     // that they are set for quicker access later. These types are often
235     // inferred from the Computation class used.
236     conf.getGiraphTypes().writeIfUnset(conf);
237     // configure global logging level for Giraph job
238     initializeAndConfigureLogging();
239     // init the metrics objects
240     setupAndInitializeGiraphMetrics();
241     // Check input
242     checkInput();
243     // Load any scripts that were deployed
244     ScriptLoader.loadScripts(conf);
245     // One time setup for computation factory
246     conf.createComputationFactory().initialize(conf);
247     // Do some task setup (possibly starting up a Zookeeper service)
248     context.setStatus("setup: Initializing Zookeeper services.");
249     String serverPortList = conf.getZookeeperList();
250     if (serverPortList.isEmpty()) {
251       if (startZooKeeperManager()) {
252         return; // ZK connect/startup failed
253       }
254     } else {
255       createZooKeeperCounter(serverPortList);
256     }
257     if (zkManager != null && zkManager.runsZooKeeper()) {
258       if (LOG.isInfoEnabled()) {
259         LOG.info("setup: Chosen to run ZooKeeper...");
260       }
261     }
262     context
263         .setStatus("setup: Connected to Zookeeper service " + serverPortList);
264     this.graphFunctions = determineGraphFunctions(conf, zkManager);
265     if (zkManager != null && this.graphFunctions.isMaster()) {
266       zkManager.cleanupOnExit();
267     }
268     try {
269       instantiateBspService();
270     } catch (IOException e) {
271       LOG.error("setup: Caught exception just before end of setup", e);
272       if (zkManager != null) {
273         zkManager.offlineZooKeeperServers(ZooKeeperManager.State.FAILED);
274       }
275       throw new RuntimeException(
276         "setup: Offlining servers due to exception...", e);
277     }
278     context.setStatus(getGraphFunctions().toString() + " starting...");
279   }
280 
281   /**
282    * Create and connect a client to JobProgressTrackerService,
283    * or no-op implementation if progress shouldn't be tracked or something
284    * goes wrong
285    */
286   private void initializeJobProgressTracker() {
287     if (!conf.trackJobProgressOnClient()) {
288       jobProgressTracker = new JobProgressTrackerClientNoOp();
289     } else {
290       try {
291         jobProgressTracker = new RetryableJobProgressTrackerClient(conf);
292       } catch (InterruptedException | ExecutionException e) {
293         LOG.warn("createJobProgressClient: Exception occurred while trying to" +
294             " connect to JobProgressTracker - not reporting progress", e);
295         jobProgressTracker = new JobProgressTrackerClientNoOp();
296       }
297     }
298     jobProgressTracker.mapperStarted();
299   }
300 
301   /**
302   * Perform the work assigned to this compute node for this job run.
303   * 1) Run checkpoint per frequency policy.
304   * 2) For every vertex on this mapper, run the compute() function
305   * 3) Wait until all messaging is done.
306   * 4) Check if all vertices are done.  If not goto 2).
307   * 5) Dump output.
308   */
309   public void execute() throws IOException, InterruptedException {
310     if (checkTaskState()) {
311       return;
312     }
313     preLoadOnWorkerObservers();
314     GiraphTimerContext superstepTimerContext = superstepTimer.time();
315     finishedSuperstepStats = serviceWorker.setup();
316     superstepTimerContext.stop();
317     if (collectInputSuperstepStats(finishedSuperstepStats)) {
318       return;
319     }
320     prepareGraphStateAndWorkerContext();
321     List<PartitionStats> partitionStatsList = new ArrayList<PartitionStats>();
322     int numComputeThreads = conf.getNumComputeThreads();
323 
324     // main superstep processing loop
325     while (!finishedSuperstepStats.allVerticesHalted()) {
326       final long superstep = serviceWorker.getSuperstep();
327       superstepTimerContext = getTimerForThisSuperstep(superstep);
328       GraphState graphState = new GraphState(superstep,
329           finishedSuperstepStats.getVertexCount(),
330           finishedSuperstepStats.getEdgeCount(),
331           context);
332       Collection<? extends PartitionOwner> masterAssignedPartitionOwners =
333         serviceWorker.startSuperstep();
334       if (LOG.isDebugEnabled()) {
335         LOG.debug("execute: " + MemoryUtils.getRuntimeMemoryStats());
336       }
337       context.progress();
338       serviceWorker.exchangeVertexPartitions(masterAssignedPartitionOwners);
339       context.progress();
340       boolean hasBeenRestarted = checkSuperstepRestarted(superstep);
341 
342       GlobalStats globalStats = serviceWorker.getGlobalStats();
343 
344       if (hasBeenRestarted) {
345         graphState = new GraphState(superstep,
346             finishedSuperstepStats.getVertexCount(),
347             finishedSuperstepStats.getEdgeCount(),
348             context);
349       } else if (storeCheckpoint(globalStats.getCheckpointStatus())) {
350         break;
351       }
352       serviceWorker.getServerData().prepareResolveMutations();
353       context.progress();
354       prepareForSuperstep(graphState);
355       context.progress();
356       MessageStore<I, Writable> messageStore =
357           serviceWorker.getServerData().getCurrentMessageStore();
358       int numPartitions = serviceWorker.getPartitionStore().getNumPartitions();
359       int numThreads = Math.min(numComputeThreads, numPartitions);
360       if (LOG.isInfoEnabled()) {
361         LOG.info("execute: " + numPartitions + " partitions to process with " +
362           numThreads + " compute thread(s), originally " +
363           numComputeThreads + " thread(s) on superstep " + superstep);
364       }
365       partitionStatsList.clear();
366       // execute the current superstep
367       if (numPartitions > 0) {
368         processGraphPartitions(context, partitionStatsList, graphState,
369           messageStore, numThreads);
370       }
371       finishedSuperstepStats = completeSuperstepAndCollectStats(
372         partitionStatsList, superstepTimerContext);
373 
374       // END of superstep compute loop
375     }
376 
377     if (LOG.isInfoEnabled()) {
378       LOG.info("execute: BSP application done (global vertices marked done)");
379     }
380     updateSuperstepGraphState();
381     postApplication();
382   }
383 
384   /**
385    * Handle post-application callbacks.
386    */
387   private void postApplication() throws IOException, InterruptedException {
388     GiraphTimerContext postAppTimerContext = wcPostAppTimer.time();
389     serviceWorker.getWorkerContext().postApplication();
390     serviceWorker.getSuperstepOutput().postApplication();
391     postAppTimerContext.stop();
392     context.progress();
393 
394     for (WorkerObserver obs : serviceWorker.getWorkerObservers()) {
395       obs.postApplication();
396       context.progress();
397     }
398   }
399 
400   /**
401    * Sets the "isMaster" flag for final output commit to happen on master.
402    * @param im the boolean input to set isMaster. Applies to "pure YARN only"
403    */
404   public void setIsMaster(final boolean im) {
405     this.isMaster = im;
406   }
407 
408   /**
409    * Get "isMaster" status flag -- we need to know if we're the master in the
410    * "finally" block of our GiraphYarnTask#execute() to commit final job output.
411    * @return true if this task IS the master.
412    */
413   public boolean isMaster() {
414     return isMaster;
415   }
416 
417   /**
418    * Produce a reference to the "start" superstep timer for the current
419    * superstep.
420    * @param superstep the current superstep count
421    * @return a GiraphTimerContext representing the "start" of the supestep
422    */
423   private GiraphTimerContext getTimerForThisSuperstep(long superstep) {
424     GiraphMetrics.get().resetSuperstepMetrics(superstep);
425     return superstepTimer.time();
426   }
427 
428   /**
429    * Utility to encapsulate Giraph metrics setup calls
430    */
431   private void setupAndInitializeGiraphMetrics() {
432     GiraphMetrics.init(conf);
433     GiraphMetrics.get().addSuperstepResetObserver(this);
434     initJobMetrics();
435     MemoryUtils.initMetrics();
436     InputSplitsCallable.initMetrics();
437   }
438 
439   /**
440    * Instantiate and configure ZooKeeperManager for this job. This will
441    * result in a Giraph-owned Zookeeper instance, a connection to an
442    * existing quorum as specified in the job configuration, or task failure
443    * @return true if this task should terminate
444    */
445   private boolean startZooKeeperManager()
446     throws IOException, InterruptedException {
447     zkManager = new ZooKeeperManager(context, conf);
448     context.setStatus("setup: Setting up Zookeeper manager.");
449     zkManager.setup();
450     if (zkManager.computationDone()) {
451       done = true;
452       return true;
453     }
454     zkManager.onlineZooKeeperServer();
455     String serverPortList = zkManager.getZooKeeperServerPortString();
456     conf.setZookeeperList(serverPortList);
457     createZooKeeperCounter(serverPortList);
458     return false;
459   }
460 
461   /**
462    * Utility to place a new, updated GraphState object into the serviceWorker.
463    */
464   private void updateSuperstepGraphState() {
465     serviceWorker.getWorkerContext().setGraphState(
466         new GraphState(serviceWorker.getSuperstep(),
467             finishedSuperstepStats.getVertexCount(),
468             finishedSuperstepStats.getEdgeCount(), context));
469   }
470 
471   /**
472    * Utility function for boilerplate updates and cleanup done at the
473    * end of each superstep processing loop in the <code>execute</code> method.
474    * @param partitionStatsList list of stas for each superstep to append to
475    * @param superstepTimerContext for job metrics
476    * @return the collected stats at the close of the current superstep.
477    */
478   private FinishedSuperstepStats completeSuperstepAndCollectStats(
479     List<PartitionStats> partitionStatsList,
480     GiraphTimerContext superstepTimerContext) {
481 
482     // the superstep timer is stopped inside the finishSuperstep function
483     // (otherwise metrics are not available at the end of the computation
484     //  using giraph.metrics.enable=true).
485     finishedSuperstepStats =
486       serviceWorker.finishSuperstep(partitionStatsList, superstepTimerContext);
487     if (conf.metricsEnabled()) {
488       GiraphMetrics.get().perSuperstep().printSummary(System.err);
489     }
490     return finishedSuperstepStats;
491   }
492 
493   /**
494    * Utility function to prepare various objects managing BSP superstep
495    * operations for the next superstep.
496    * @param graphState graph state metadata object
497    */
498   private void prepareForSuperstep(GraphState graphState) {
499     serviceWorker.prepareSuperstep();
500 
501     serviceWorker.getWorkerContext().setGraphState(graphState);
502     serviceWorker.getWorkerContext().setupSuperstep(serviceWorker);
503     GiraphTimerContext preSuperstepTimer = wcPreSuperstepTimer.time();
504     serviceWorker.getWorkerContext().preSuperstep();
505     preSuperstepTimer.stop();
506     context.progress();
507 
508     for (WorkerObserver obs : serviceWorker.getWorkerObservers()) {
509       obs.preSuperstep(graphState.getSuperstep());
510       context.progress();
511     }
512   }
513 
514   /**
515    * Prepare graph state and worker context for superstep cycles.
516    */
517   private void prepareGraphStateAndWorkerContext() {
518     updateSuperstepGraphState();
519     workerContextPreApp();
520   }
521 
522   /**
523     * Get the worker function enum.
524     *
525     * @return an enum detailing the roles assigned to this
526     *         compute node for this Giraph job.
527     */
528   public GraphFunctions getGraphFunctions() {
529     return graphFunctions;
530   }
531 
532   public final WorkerContext getWorkerContext() {
533     return serviceWorker.getWorkerContext();
534   }
535 
536   public JobProgressTracker getJobProgressTracker() {
537     return jobProgressTracker;
538   }
539 
540   /**
541    * Copied from JobConf to get the location of this jar.  Workaround for
542    * things like Oozie map-reduce jobs. NOTE: Pure YARN profile cannot
543    * make use of this, as the jars are unpacked at each container site.
544    *
545    * @param myClass Class to search the class loader path for to locate
546    *        the relevant jar file
547    * @return Location of the jar file containing myClass
548    */
549   private static String findContainingJar(Class<?> myClass) {
550     ClassLoader loader = myClass.getClassLoader();
551     String classFile =
552         myClass.getName().replaceAll("\\.", "/") + ".class";
553     try {
554       for (Enumeration<?> itr = loader.getResources(classFile);
555           itr.hasMoreElements();) {
556         URL url = (URL) itr.nextElement();
557         if ("jar".equals(url.getProtocol())) {
558           String toReturn = url.getPath();
559           if (toReturn.startsWith("file:")) {
560             toReturn = toReturn.substring("file:".length());
561           }
562           toReturn = URLDecoder.decode(toReturn, "UTF-8");
563           return toReturn.replaceAll("!.*$", "");
564         }
565       }
566     } catch (IOException e) {
567       throw new RuntimeException(e);
568     }
569     return null;
570   }
571 
572   /**
573    * Figure out what roles this BSP compute node should take on in the job.
574    * Basic logic is as follows:
575    * 1) If not split master, everyone does the everything and/or running
576    *    ZooKeeper.
577    * 2) If split master/worker, masters also run ZooKeeper
578    *
579    * 3) If split master/worker == true and <code>giraph.zkList</code> is
580    *    externally provided, the master will not instantiate a ZK instance, but
581    *    will assume a quorum is already active on the cluster for Giraph to use.
582    *
583    * @param conf Configuration to use
584    * @param zkManager ZooKeeper manager to help determine whether to run
585    *        ZooKeeper.
586    * @return Functions that this mapper should do.
587    */
588   private static GraphFunctions determineGraphFunctions(
589       ImmutableClassesGiraphConfiguration conf,
590       ZooKeeperManager zkManager) {
591     boolean splitMasterWorker = conf.getSplitMasterWorker();
592     int taskPartition = conf.getTaskPartition();
593     boolean zkAlreadyProvided = conf.isZookeeperExternal();
594     GraphFunctions functions = GraphFunctions.UNKNOWN;
595     // What functions should this mapper do?
596     if (!splitMasterWorker) {
597       if ((zkManager != null) && zkManager.runsZooKeeper()) {
598         functions = GraphFunctions.ALL;
599       } else {
600         functions = GraphFunctions.ALL_EXCEPT_ZOOKEEPER;
601       }
602     } else {
603       if (zkAlreadyProvided) {
604         if (taskPartition == 0) {
605           functions = GraphFunctions.MASTER_ONLY;
606         } else {
607           functions = GraphFunctions.WORKER_ONLY;
608         }
609       } else {
610         if ((zkManager != null) && zkManager.runsZooKeeper()) {
611           functions = GraphFunctions.MASTER_ZOOKEEPER_ONLY;
612         } else {
613           functions = GraphFunctions.WORKER_ONLY;
614         }
615       }
616     }
617     return functions;
618   }
619 
620   /**
621    * Instantiate the appropriate BspService object (Master or Worker)
622    * for this compute node.
623    */
624   private void instantiateBspService()
625     throws IOException, InterruptedException {
626     if (graphFunctions.isMaster()) {
627       if (LOG.isInfoEnabled()) {
628         LOG.info("setup: Starting up BspServiceMaster " +
629           "(master thread)...");
630       }
631       serviceMaster = new BspServiceMaster<I, V, E>(context, this);
632       masterThread = new MasterThread<I, V, E>(serviceMaster, context);
633       masterThread.setUncaughtExceptionHandler(
634           createUncaughtExceptionHandler());
635       masterThread.start();
636     }
637     if (graphFunctions.isWorker()) {
638       if (LOG.isInfoEnabled()) {
639         LOG.info("setup: Starting up BspServiceWorker...");
640       }
641       serviceWorker = new BspServiceWorker<I, V, E>(context, this);
642       installGCMonitoring();
643       if (LOG.isInfoEnabled()) {
644         LOG.info("setup: Registering health of this worker...");
645       }
646     }
647   }
648 
649   /**
650    * Install GC monitoring. This method intercepts all GC, log the gc, and
651    * notifies an out-of-core engine (if any is used) about the GC.
652    */
653   private void installGCMonitoring() {
654     final GcObserver[] gcObservers = conf.createGcObservers(context);
655     List<GarbageCollectorMXBean> mxBeans = ManagementFactory
656         .getGarbageCollectorMXBeans();
657     final OutOfCoreEngine oocEngine =
658         serviceWorker.getServerData().getOocEngine();
659     for (GarbageCollectorMXBean gcBean : mxBeans) {
660       NotificationEmitter emitter = (NotificationEmitter) gcBean;
661       NotificationListener listener = new NotificationListener() {
662         @Override
663         public void handleNotification(Notification notification,
664                                        Object handle) {
665           if (notification.getType().equals(GarbageCollectionNotificationInfo
666               .GARBAGE_COLLECTION_NOTIFICATION)) {
667             GarbageCollectionNotificationInfo info =
668                 GarbageCollectionNotificationInfo.from(
669                     (CompositeData) notification.getUserData());
670 
671             if (LOG.isInfoEnabled()) {
672               LOG.info("installGCMonitoring: name = " + info.getGcName() +
673                   ", action = " + info.getGcAction() + ", cause = " +
674                   info.getGcCause() + ", duration = " +
675                   info.getGcInfo().getDuration() + "ms");
676             }
677             gcTimeMetric.inc(info.getGcInfo().getDuration());
678             GiraphMetrics.get().getGcTracker().gcOccurred(info.getGcInfo());
679             for (GcObserver gcObserver : gcObservers) {
680               gcObserver.gcOccurred(info);
681             }
682             if (oocEngine != null) {
683               oocEngine.gcCompleted(info);
684             }
685           }
686         }
687       };
688       //Add the listener
689       emitter.addNotificationListener(listener, null, null);
690     }
691   }
692 
693   /**
694    * Initialize the root logger and appender to the settings in conf.
695    */
696   private void initializeAndConfigureLogging() {
697     // Set the log level
698     String logLevel = conf.getLocalLevel();
699     if (!Logger.getRootLogger().getLevel().equals(Level.toLevel(logLevel))) {
700       Logger.getRootLogger().setLevel(Level.toLevel(logLevel));
701       if (LOG.isInfoEnabled()) {
702         LOG.info("setup: Set log level to " + logLevel);
703       }
704     } else {
705       if (LOG.isInfoEnabled()) {
706         LOG.info("setup: Log level remains at " + logLevel);
707       }
708     }
709     // Sets pattern layout for all appenders
710     if (conf.useLogThreadLayout()) {
711       PatternLayout layout =
712         new PatternLayout("%-7p %d [%t] %c %x - %m%n");
713       Enumeration<Appender> appenderEnum =
714         Logger.getRootLogger().getAllAppenders();
715       while (appenderEnum.hasMoreElements()) {
716         appenderEnum.nextElement().setLayout(layout);
717       }
718     }
719     // Change ZooKeeper logging level to error (info is quite verbose) for
720     // testing only
721     if (conf.getLocalTestMode()) {
722       LogManager.getLogger(org.apache.zookeeper.server.PrepRequestProcessor.
723           class.getName()).setLevel(Level.ERROR);
724     }
725   }
726 
727   /**
728    * Initialize job-level metrics used by this class.
729    */
730   private void initJobMetrics() {
731     GiraphMetricsRegistry jobMetrics = GiraphMetrics.get().perJobOptional();
732     wcPreAppTimer = new GiraphTimer(jobMetrics, "worker-context-pre-app",
733         TimeUnit.MILLISECONDS);
734     wcPostAppTimer = new GiraphTimer(jobMetrics, "worker-context-post-app",
735         TimeUnit.MILLISECONDS);
736   }
737 
738   @Override
739   public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
740     superstepTimer = new GiraphTimer(superstepMetrics,
741         TIMER_SUPERSTEP_TIME, TimeUnit.MILLISECONDS);
742     computeAll = new GiraphTimer(superstepMetrics,
743         TIMER_COMPUTE_ALL, TimeUnit.MILLISECONDS);
744     timeToFirstMessage = new GiraphTimer(superstepMetrics,
745         TIMER_TIME_TO_FIRST_MSG, TimeUnit.MICROSECONDS);
746     communicationTimer = new GiraphTimer(superstepMetrics,
747         TIMER_COMMUNICATION_TIME, TimeUnit.MILLISECONDS);
748     gcTimeMetric = superstepMetrics.getCounter(TIMER_SUPERSTEP_GC_TIME);
749     wcPreSuperstepTimer = new GiraphTimer(superstepMetrics,
750         "worker-context-pre-superstep", TimeUnit.MILLISECONDS);
751   }
752 
753   /**
754    * Notification from Vertex that a message has been sent.
755    */
756   public void notifySentMessages() {
757     // We are tracking the time between when the compute started and the first
758     // message get sent. We use null to flag that we have already recorded it.
759     GiraphTimerContext tmp = timeToFirstMessageTimerContext;
760     if (tmp != null) {
761       synchronized (timeToFirstMessage) {
762         if (timeToFirstMessageTimerContext != null) {
763           timeToFirstMessageTimerContext.stop();
764           timeToFirstMessageTimerContext = null;
765           communicationTimerContext = communicationTimer.time();
766         }
767       }
768     }
769   }
770 
771   /**
772    * Notification of last message flushed. Comes when we finish the superstep
773    * and are done waiting for all messages to send.
774    */
775   public void notifyFinishedCommunication() {
776     GiraphTimerContext tmp = communicationTimerContext;
777     if (tmp != null) {
778       synchronized (communicationTimer) {
779         if (communicationTimerContext != null) {
780           communicationTimerContext.stop();
781           communicationTimerContext = null;
782         }
783       }
784     }
785   }
786 
787   /**
788    * Process graph data partitions active in this superstep.
789    * @param context handle to the underlying cluster framework
790    * @param partitionStatsList to pick up this superstep's processing stats
791    * @param graphState the BSP graph state
792    * @param messageStore the messages to be processed in this superstep
793    * @param numThreads number of concurrent threads to do processing
794    */
795   private void processGraphPartitions(final Mapper<?, ?, ?, ?>.Context context,
796       List<PartitionStats> partitionStatsList,
797       final GraphState graphState,
798       final MessageStore<I, Writable> messageStore,
799       int numThreads) {
800     PartitionStore<I, V, E> partitionStore = serviceWorker.getPartitionStore();
801     long verticesToCompute = 0;
802     for (Integer partitionId : partitionStore.getPartitionIds()) {
803       verticesToCompute += partitionStore.getPartitionVertexCount(partitionId);
804     }
805     WorkerProgress.get().startSuperstep(
806         serviceWorker.getSuperstep(), verticesToCompute,
807         serviceWorker.getPartitionStore().getNumPartitions());
808     partitionStore.startIteration();
809 
810     GiraphTimerContext computeAllTimerContext = computeAll.time();
811     timeToFirstMessageTimerContext = timeToFirstMessage.time();
812 
813     CallableFactory<Collection<PartitionStats>> callableFactory =
814       new CallableFactory<Collection<PartitionStats>>() {
815         @Override
816         public Callable<Collection<PartitionStats>> newCallable(
817             int callableId) {
818           return new ComputeCallable<I, V, E, Writable, Writable>(
819               context,
820               graphState,
821               messageStore,
822               conf,
823               serviceWorker);
824         }
825       };
826     List<Collection<PartitionStats>> results =
827         ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
828             "compute-%d", context);
829 
830     for (Collection<PartitionStats> result : results) {
831       partitionStatsList.addAll(result);
832     }
833 
834     computeAllTimerContext.stop();
835   }
836 
837   /**
838    * Handle the event that this superstep is a restart of a failed one.
839    * @param superstep current superstep
840    * @return the graph state, updated if this is a restart superstep
841    */
842   private boolean checkSuperstepRestarted(long superstep) throws IOException {
843     // Might need to restart from another superstep
844     // (manually or automatic), or store a checkpoint
845     if (serviceWorker.getRestartedSuperstep() == superstep) {
846       if (LOG.isInfoEnabled()) {
847         LOG.info("execute: Loading from checkpoint " + superstep);
848       }
849       VertexEdgeCount vertexEdgeCount = serviceWorker.loadCheckpoint(
850         serviceWorker.getRestartedSuperstep());
851       finishedSuperstepStats = new FinishedSuperstepStats(0, false,
852           vertexEdgeCount.getVertexCount(), vertexEdgeCount.getEdgeCount(),
853           false, CheckpointStatus.NONE);
854       return true;
855     }
856     return false;
857   }
858 
859   /**
860    * Check if it's time to checkpoint and actually does checkpointing
861    * if it is.
862    * @param checkpointStatus master's decision
863    * @return true if we need to stop computation after checkpoint
864    * @throws IOException
865    */
866   private boolean storeCheckpoint(CheckpointStatus checkpointStatus)
867     throws IOException {
868     if (checkpointStatus != CheckpointStatus.NONE) {
869       serviceWorker.storeCheckpoint();
870     }
871     return checkpointStatus == CheckpointStatus.CHECKPOINT_AND_HALT;
872   }
873 
874   /**
875    * Attempt to collect the final statistics on the graph data
876    * processed in this superstep by this compute node
877    * @param inputSuperstepStats the final graph data stats object for the
878    *                            input superstep
879    * @return true if the graph data has no vertices (error?) and
880    *         this node should terminate
881    */
882   private boolean collectInputSuperstepStats(
883     FinishedSuperstepStats inputSuperstepStats) {
884     if (inputSuperstepStats.getVertexCount() == 0 &&
885         !inputSuperstepStats.mustLoadCheckpoint()) {
886       LOG.warn("map: No vertices in the graph, exiting.");
887       return true;
888     }
889     if (conf.metricsEnabled()) {
890       GiraphMetrics.get().perSuperstep().printSummary(System.err);
891     }
892     return false;
893   }
894 
895   /**
896    * Did the state of this compute node change?
897    * @return true if the processing of supersteps should terminate.
898    */
899   private boolean checkTaskState() {
900     if (done) {
901       return true;
902     }
903     GiraphMetrics.get().resetSuperstepMetrics(BspService.INPUT_SUPERSTEP);
904     if (graphFunctions.isNotAWorker()) {
905       if (LOG.isInfoEnabled()) {
906         LOG.info("map: No need to do anything when not a worker");
907       }
908       return true;
909     }
910     if (alreadyRun) {
911       throw new RuntimeException("map: In BSP, map should have only been" +
912         " run exactly once, (already run)");
913     }
914     alreadyRun = true;
915     return false;
916   }
917 
918   /**
919    * Call to the WorkerContext before application begins.
920    */
921   private void workerContextPreApp() {
922     GiraphTimerContext preAppTimerContext = wcPreAppTimer.time();
923     try {
924       serviceWorker.getWorkerContext().preApplication();
925     } catch (InstantiationException e) {
926       LOG.fatal("execute: preApplication failed in instantiation", e);
927       throw new RuntimeException(
928           "execute: preApplication failed in instantiation", e);
929     } catch (IllegalAccessException e) {
930       LOG.fatal("execute: preApplication failed in access", e);
931       throw new RuntimeException(
932           "execute: preApplication failed in access", e);
933     }
934     preAppTimerContext.stop();
935     context.progress();
936 
937     for (WorkerObserver obs : serviceWorker.getWorkerObservers()) {
938       obs.preApplication();
939       context.progress();
940     }
941   }
942 
943   /**
944    * Setup mapper observers
945    */
946   public void setupMapperObservers() {
947     mapperObservers = conf.createMapperObservers(context);
948     for (MapperObserver mapperObserver : mapperObservers) {
949       mapperObserver.setup();
950     }
951   }
952 
953   /**
954    * Executes preLoad() on worker observers.
955    */
956   private void preLoadOnWorkerObservers() {
957     for (WorkerObserver obs : serviceWorker.getWorkerObservers()) {
958       obs.preLoad();
959       context.progress();
960     }
961   }
962 
963   /**
964    * Executes postSave() on worker observers.
965    */
966   private void postSaveOnWorkerObservers() {
967     for (WorkerObserver obs : serviceWorker.getWorkerObservers()) {
968       obs.postSave();
969       context.progress();
970     }
971   }
972 
973   /**
974    * Called by owner of this GraphTaskManager object on each compute node
975    */
976   public void cleanup()
977     throws IOException, InterruptedException {
978     if (LOG.isInfoEnabled()) {
979       LOG.info("cleanup: Starting for " + getGraphFunctions());
980     }
981     jobProgressTracker.cleanup();
982     if (done) {
983       return;
984     }
985 
986     if (serviceWorker != null) {
987       serviceWorker.cleanup(finishedSuperstepStats);
988       postSaveOnWorkerObservers();
989     }
990     try {
991       if (masterThread != null) {
992         masterThread.join();
993         LOG.info("cleanup: Joined with master thread");
994       }
995     } catch (InterruptedException e) {
996       // cleanup phase -- just log the error
997       LOG.error("cleanup: Master thread couldn't join");
998     }
999     if (zkManager != null) {
1000       LOG.info("cleanup: Offlining ZooKeeper servers");
1001       try {
1002         zkManager.offlineZooKeeperServers(ZooKeeperManager.State.FINISHED);
1003       // We need this here cause apparently exceptions are eaten by Hadoop
1004       // when they come from the cleanup lifecycle and it's useful to know
1005       // if something is wrong.
1006       //
1007       // And since it's cleanup nothing too bad should happen if we don't
1008       // propagate and just allow the job to finish normally.
1009       // CHECKSTYLE: stop IllegalCatch
1010       } catch (Throwable e) {
1011       // CHECKSTYLE: resume IllegalCatch
1012         LOG.error("cleanup: Error offlining zookeeper", e);
1013       }
1014     }
1015 
1016     // Stop tracking metrics
1017     GiraphMetrics.get().shutdown();
1018   }
1019 
1020   /**
1021    * Cleanup a ZooKeeper instance managed by this
1022    * GiraphWorker upon job run failure.
1023    */
1024   public void zooKeeperCleanup() {
1025     if (graphFunctions.isZooKeeper()) {
1026       // ZooKeeper may have had an issue
1027       if (zkManager != null) {
1028         zkManager.cleanup();
1029       }
1030     }
1031   }
1032 
1033   /**
1034    * Cleanup all of Giraph's framework-agnostic resources
1035    * regardless of which type of cluster Giraph is running on.
1036    */
1037   public void workerFailureCleanup() {
1038     try {
1039       if (graphFunctions.isWorker()) {
1040         serviceWorker.failureCleanup();
1041       }
1042       // Stop tracking metrics
1043       GiraphMetrics.get().shutdown();
1044     // Checkstyle exception due to needing to get the original
1045     // exception on failure
1046     // CHECKSTYLE: stop IllegalCatch
1047     } catch (RuntimeException e1) {
1048     // CHECKSTYLE: resume IllegalCatch
1049       LOG.error("run: Worker failure failed on another RuntimeException, " +
1050           "original expection will be rethrown", e1);
1051     }
1052   }
1053 
1054   /**
1055    * Creates exception handler that will terminate process gracefully in case
1056    * of any uncaught exception.
1057    * @return new exception handler object.
1058    */
1059   public Thread.UncaughtExceptionHandler createUncaughtExceptionHandler() {
1060     return new OverrideExceptionHandler(
1061         CHECKER_IF_WORKER_SHOULD_FAIL_AFTER_EXCEPTION_CLASS.newInstance(
1062             getConf()), getJobProgressTracker());
1063   }
1064 
1065   public ImmutableClassesGiraphConfiguration<I, V, E> getConf() {
1066     return conf;
1067   }
1068 
1069   /**
1070    * @return Time spent in GC recorder by the GC listener
1071    */
1072   public long getSuperstepGCTime() {
1073     return (gcTimeMetric == null) ? 0 : gcTimeMetric.count();
1074   }
1075 
1076   /**
1077    * Returns a list of zookeeper servers to connect to.
1078    * If the port is set to 0 and Giraph is starting a single
1079    * ZooKeeper server, then Zookeeper will pick its own port.
1080    * Otherwise, the ZooKeeper port set by the user will be used.
1081    * @return host:port,host:port for each zookeeper
1082    */
1083   public String getZookeeperList() {
1084     if (zkManager != null) {
1085       return zkManager.getZooKeeperServerPortString();
1086     } else {
1087       return conf.getZookeeperList();
1088     }
1089   }
1090 
1091   /**
1092    * Default handler for uncaught exceptions.
1093    * It will do the best to clean up and then will terminate current giraph job.
1094    */
1095   class OverrideExceptionHandler implements Thread.UncaughtExceptionHandler {
1096     /** Checker if worker should fail after a thread gets an exception */
1097     private final CheckerIfWorkerShouldFailAfterException checker;
1098     /** JobProgressTracker to log problems to */
1099     private final JobProgressTracker jobProgressTracker;
1100 
1101     /**
1102      * Constructor
1103      *
1104      * @param checker Checker if worker should fail after a thread gets an
1105      *                exception
1106      * @param jobProgressTracker JobProgressTracker to log problems to
1107      */
1108     public OverrideExceptionHandler(
1109         CheckerIfWorkerShouldFailAfterException checker,
1110         JobProgressTracker jobProgressTracker) {
1111       this.checker = checker;
1112       this.jobProgressTracker = jobProgressTracker;
1113     }
1114 
1115     @Override
1116     public void uncaughtException(final Thread t, final Throwable e) {
1117       if (!checker.checkIfWorkerShouldFail(t, e)) {
1118         return;
1119       }
1120       try {
1121         LOG.fatal(
1122             "uncaughtException: OverrideExceptionHandler on thread " +
1123                 t.getName() + ", msg = " +  e.getMessage() + ", exiting...", e);
1124         byte [] exByteArray = KryoWritableWrapper.convertToByteArray(e);
1125         jobProgressTracker.logError(ExceptionUtils.getStackTrace(e),
1126                 exByteArray);
1127         zooKeeperCleanup();
1128         workerFailureCleanup();
1129       } finally {
1130         System.exit(1);
1131       }
1132     }
1133   }
1134 
1135   /**
1136    * Interface to check if worker should fail after a thread gets an exception
1137    */
1138   public interface CheckerIfWorkerShouldFailAfterException {
1139     /**
1140      * Check if worker should fail after a thread gets an exception
1141      *
1142      * @param thread Thread which raised the exception
1143      * @param exception Exception which occurred
1144      * @return True iff worker should fail after this exception
1145      */
1146     boolean checkIfWorkerShouldFail(Thread thread, Throwable exception);
1147   }
1148 
1149   /**
1150    * Class to use by default, where each exception causes job failure
1151    */
1152   public static class FailWithEveryExceptionOccurred
1153       implements CheckerIfWorkerShouldFailAfterException {
1154     @Override
1155     public boolean checkIfWorkerShouldFail(Thread thread, Throwable exception) {
1156       return true;
1157     }
1158   }
1159 }