This project has retired. For details please refer to its Attic page.
GraphTaskManager xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.graph;
20  
21  import java.io.IOException;
22  import java.lang.management.GarbageCollectorMXBean;
23  import java.lang.management.ManagementFactory;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.Enumeration;
27  import java.util.List;
28  import java.util.concurrent.Callable;
29  import java.util.concurrent.TimeUnit;
30  
31  import com.sun.management.GarbageCollectionNotificationInfo;
32  import com.yammer.metrics.core.Counter;
33  
34  import org.apache.commons.lang3.exception.ExceptionUtils;
35  import org.apache.giraph.bsp.BspService;
36  import org.apache.giraph.bsp.CentralizedServiceMaster;
37  import org.apache.giraph.bsp.CentralizedServiceWorker;
38  import org.apache.giraph.bsp.checkpoints.CheckpointStatus;
39  import org.apache.giraph.comm.messages.MessageStore;
40  import org.apache.giraph.conf.ClassConfOption;
41  import org.apache.giraph.conf.GiraphConstants;
42  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
43  import org.apache.giraph.job.JobProgressTracker;
44  import org.apache.giraph.master.BspServiceMaster;
45  import org.apache.giraph.master.MasterThread;
46  import org.apache.giraph.metrics.GiraphMetrics;
47  import org.apache.giraph.metrics.GiraphMetricsRegistry;
48  import org.apache.giraph.metrics.GiraphTimer;
49  import org.apache.giraph.metrics.GiraphTimerContext;
50  import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
51  import org.apache.giraph.metrics.SuperstepMetricsRegistry;
52  import org.apache.giraph.ooc.OutOfCoreEngine;
53  import org.apache.giraph.partition.PartitionOwner;
54  import org.apache.giraph.partition.PartitionStats;
55  import org.apache.giraph.partition.PartitionStore;
56  import org.apache.giraph.scripting.ScriptLoader;
57  import org.apache.giraph.utils.CallableFactory;
58  import org.apache.giraph.utils.GcObserver;
59  import org.apache.giraph.utils.MemoryUtils;
60  import org.apache.giraph.utils.ProgressableUtils;
61  import org.apache.giraph.worker.BspServiceWorker;
62  import org.apache.giraph.worker.InputSplitsCallable;
63  import org.apache.giraph.worker.WorkerContext;
64  import org.apache.giraph.worker.WorkerObserver;
65  import org.apache.giraph.worker.WorkerProgress;
66  import org.apache.giraph.writable.kryo.KryoWritableWrapper;
67  import org.apache.giraph.zk.ZooKeeperManager;
68  import org.apache.hadoop.conf.Configuration;
69  import org.apache.hadoop.fs.Path;
70  import org.apache.hadoop.io.Writable;
71  import org.apache.hadoop.io.WritableComparable;
72  import org.apache.hadoop.mapreduce.Mapper;
73  import org.apache.log4j.Appender;
74  import org.apache.log4j.Level;
75  import org.apache.log4j.LogManager;
76  import org.apache.log4j.Logger;
77  import org.apache.log4j.PatternLayout;
78  
79  import javax.management.Notification;
80  import javax.management.NotificationEmitter;
81  import javax.management.NotificationListener;
82  import javax.management.openmbean.CompositeData;
83  
84  /**
85   * The Giraph-specific business logic for a single BSP
86   * compute node in whatever underlying type of cluster
87   * our Giraph job will run on. Owning object will provide
88   * the glue into the underlying cluster framework
89   * and will call this object to perform Giraph work.
90   *
91   * @param <I> Vertex id
92   * @param <V> Vertex data
93   * @param <E> Edge data
94   */
95  @SuppressWarnings("rawtypes")
96  public class GraphTaskManager<I extends WritableComparable, V extends Writable,
97    E extends Writable> implements
98    ResetSuperstepMetricsObserver {
99  /*if_not[PURE_YARN]
100   static { // Eliminate this? Even MRv1 tasks should not need it here.
101     Configuration.addDefaultResource("giraph-site.xml");
102   }
103 end[PURE_YARN]*/
104   /**
105    * Class which checks if an exception on some thread should cause worker
106    * to fail
107    */
108   public static final ClassConfOption<CheckerIfWorkerShouldFailAfterException>
109   CHECKER_IF_WORKER_SHOULD_FAIL_AFTER_EXCEPTION_CLASS = ClassConfOption.create(
110       "giraph.checkerIfWorkerShouldFailAfterExceptionClass",
111       FailWithEveryExceptionOccurred.class,
112       CheckerIfWorkerShouldFailAfterException.class,
113       "Class which checks if an exception on some thread should cause worker " +
114           "to fail, by default all exceptions cause failure");
115   /** Name of metric for superstep time in msec */
116   public static final String TIMER_SUPERSTEP_TIME = "superstep-time-ms";
117   /** Name of metric for compute on all vertices in msec */
118   public static final String TIMER_COMPUTE_ALL = "compute-all-ms";
119   /** Name of metric for time from begin compute to first message sent */
120   public static final String TIMER_TIME_TO_FIRST_MSG =
121       "time-to-first-message-ms";
122   /** Name of metric for time from first message till last message flushed */
123   public static final String TIMER_COMMUNICATION_TIME = "communication-time-ms";
124   /** Name of metric for time spent doing GC per superstep in msec */
125   public static final String TIMER_SUPERSTEP_GC_TIME = "superstep-gc-time-ms";
126 
127   /** Class logger */
128   private static final Logger LOG = Logger.getLogger(GraphTaskManager.class);
129   /** Coordination service worker */
130   private CentralizedServiceWorker<I, V, E> serviceWorker;
131   /** Coordination service master */
132   private CentralizedServiceMaster<I, V, E> serviceMaster;
133   /** Coordination service master thread */
134   private Thread masterThread = null;
135   /** The worker should be run exactly once, or else there is a problem. */
136   private boolean alreadyRun = false;
137   /** Manages the ZooKeeper servers if necessary (dynamic startup) */
138   private ZooKeeperManager zkManager;
139   /** Configuration */
140   private ImmutableClassesGiraphConfiguration<I, V, E> conf;
141   /** Already complete? */
142   private boolean done = false;
143   /** What kind of functions is this mapper doing? */
144   private GraphFunctions graphFunctions = GraphFunctions.UNKNOWN;
145   /** Superstep stats */
146   private FinishedSuperstepStats finishedSuperstepStats =
147       new FinishedSuperstepStats(0, false, 0, 0, false, CheckpointStatus.NONE);
148   /** Job progress tracker */
149   private JobProgressTrackerClient jobProgressTracker;
150 
151   // Per-Job Metrics
152   /** Timer for WorkerContext#preApplication() */
153   private GiraphTimer wcPreAppTimer;
154   /** Timer for WorkerContext#postApplication() */
155   private GiraphTimer wcPostAppTimer;
156 
157   // Per-Superstep Metrics
158   /** Time for how long superstep took */
159   private GiraphTimer superstepTimer;
160   /** Time for all compute() calls in a superstep */
161   private GiraphTimer computeAll;
162   /** Time from starting compute to sending first message */
163   private GiraphTimer timeToFirstMessage;
164   /** Context for timing time to first message above */
165   private GiraphTimerContext timeToFirstMessageTimerContext;
166   /** Time from first sent message till last message flushed. */
167   private GiraphTimer communicationTimer;
168   /** Context for timing communication time above */
169   private GiraphTimerContext communicationTimerContext;
170   /** Timer for WorkerContext#preSuperstep() */
171   private GiraphTimer wcPreSuperstepTimer;
172   /** Timer to keep aggregated time spent in GC in a superstep */
173   private Counter gcTimeMetric;
174   /** The Hadoop Mapper#Context for this job */
175   private final Mapper<?, ?, ?, ?>.Context context;
176   /** is this GraphTaskManager the master? */
177   private boolean isMaster;
178   /** Mapper observers */
179   private MapperObserver[] mapperObservers;
180 
181   /**
182    * Default constructor for GiraphTaskManager.
183    * @param context a handle to the underlying cluster framework.
184    *                For Hadoop clusters, this is a Mapper#Context.
185    */
186   public GraphTaskManager(Mapper<?, ?, ?, ?>.Context context) {
187     this.context = context;
188     this.isMaster = false;
189   }
190 
191   /**
192    * Run the user's input checking code.
193    */
194   private void checkInput() {
195     if (conf.hasEdgeInputFormat()) {
196       conf.createWrappedEdgeInputFormat().checkInputSpecs(conf);
197     }
198     if (conf.hasVertexInputFormat()) {
199       conf.createWrappedVertexInputFormat().checkInputSpecs(conf);
200     }
201   }
202 
203   /**
204    * In order for job client to know which ZooKeeper the job is using,
205    * we create a counter with server:port as its name inside of
206    * ZOOKEEPER_SERVER_PORT_COUNTER_GROUP.
207    *
208    * @param serverPortList Server:port list for ZooKeeper used
209    */
210   private void createZooKeeperCounter(String serverPortList) {
211     // Getting the counter will actually create it.
212     context.getCounter(GiraphConstants.ZOOKEEPER_SERVER_PORT_COUNTER_GROUP,
213         serverPortList);
214   }
215 
216   /**
217    * Called by owner of this GraphTaskManager on each compute node
218    *
219    * @param zkPathList the path to the ZK jars we need to run the job
220    */
221   public void setup(Path[] zkPathList)
222     throws IOException, InterruptedException {
223     context.setStatus("setup: Beginning worker setup.");
224     Configuration hadoopConf = context.getConfiguration();
225     conf = new ImmutableClassesGiraphConfiguration<I, V, E>(hadoopConf);
226     initializeJobProgressTracker();
227     // Setting the default handler for uncaught exceptions.
228     Thread.setDefaultUncaughtExceptionHandler(createUncaughtExceptionHandler());
229     setupMapperObservers();
230     // Write user's graph types (I,V,E,M) back to configuration parameters so
231     // that they are set for quicker access later. These types are often
232     // inferred from the Computation class used.
233     conf.getGiraphTypes().writeIfUnset(conf);
234     // configure global logging level for Giraph job
235     initializeAndConfigureLogging();
236     // init the metrics objects
237     setupAndInitializeGiraphMetrics();
238     // Check input
239     checkInput();
240     // Load any scripts that were deployed
241     ScriptLoader.loadScripts(conf);
242     // One time setup for computation factory
243     conf.createComputationFactory().initialize(conf);
244     // Do some task setup (possibly starting up a Zookeeper service)
245     context.setStatus("setup: Initializing Zookeeper services.");
246     String serverPortList = conf.getZookeeperList();
247     if (serverPortList.isEmpty()) {
248       if (startZooKeeperManager()) {
249         return; // ZK connect/startup failed
250       }
251     } else {
252       createZooKeeperCounter(serverPortList);
253     }
254     if (zkManager != null && zkManager.runsZooKeeper()) {
255       if (LOG.isInfoEnabled()) {
256         LOG.info("setup: Chosen to run ZooKeeper...");
257       }
258     }
259     context
260         .setStatus("setup: Connected to Zookeeper service " + serverPortList);
261     this.graphFunctions = determineGraphFunctions(conf, zkManager);
262     if (zkManager != null && this.graphFunctions.isMaster()) {
263       zkManager.cleanupOnExit();
264     }
265     try {
266       instantiateBspService();
267     } catch (IOException e) {
268       LOG.error("setup: Caught exception just before end of setup", e);
269       if (zkManager != null) {
270         zkManager.offlineZooKeeperServers(ZooKeeperManager.State.FAILED);
271       }
272       throw new RuntimeException(
273         "setup: Offlining servers due to exception...", e);
274     }
275     context.setStatus(getGraphFunctions().toString() + " starting...");
276   }
277 
278   /**
279    * Create and connect a client to JobProgressTrackerService,
280    * or no-op implementation if progress shouldn't be tracked or something
281    * goes wrong
282    */
283   private void initializeJobProgressTracker() {
284     if (!conf.trackJobProgressOnClient()) {
285       jobProgressTracker = new JobProgressTrackerClientNoOp();
286     } else {
287       jobProgressTracker =
288         GiraphConstants.JOB_PROGRESS_TRACKER_CLIENT_CLASS.newInstance(conf);
289       try {
290         jobProgressTracker.init(conf);
291         // CHECKSTYLE: stop IllegalCatch
292       } catch (Exception e) {
293         // CHECKSTYLE: resume IllegalCatch
294         throw new RuntimeException(
295           "Failed to initialize JobProgressTrackerClient", e);
296       }
297     }
298     jobProgressTracker.mapperStarted();
299   }
300 
301   /**
302   * Perform the work assigned to this compute node for this job run.
303   * 1) Run checkpoint per frequency policy.
304   * 2) For every vertex on this mapper, run the compute() function
305   * 3) Wait until all messaging is done.
306   * 4) Check if all vertices are done.  If not goto 2).
307   * 5) Dump output.
308   */
309   public void execute() throws IOException, InterruptedException {
310     if (checkTaskState()) {
311       return;
312     }
313     preLoadOnWorkerObservers();
314     GiraphTimerContext superstepTimerContext = superstepTimer.time();
315     finishedSuperstepStats = serviceWorker.setup();
316     superstepTimerContext.stop();
317     if (collectInputSuperstepStats(finishedSuperstepStats)) {
318       return;
319     }
320     prepareGraphStateAndWorkerContext();
321     List<PartitionStats> partitionStatsList = new ArrayList<PartitionStats>();
322     int numComputeThreads = conf.getNumComputeThreads();
323 
324     // main superstep processing loop
325     while (!finishedSuperstepStats.allVerticesHalted()) {
326       final long superstep = serviceWorker.getSuperstep();
327       superstepTimerContext = getTimerForThisSuperstep(superstep);
328       GraphState graphState = new GraphState(superstep,
329           finishedSuperstepStats.getVertexCount(),
330           finishedSuperstepStats.getEdgeCount(),
331           context);
332       Collection<? extends PartitionOwner> masterAssignedPartitionOwners =
333         serviceWorker.startSuperstep();
334       if (LOG.isDebugEnabled()) {
335         LOG.debug("execute: " + MemoryUtils.getRuntimeMemoryStats());
336       }
337       context.progress();
338       serviceWorker.exchangeVertexPartitions(masterAssignedPartitionOwners);
339       context.progress();
340       boolean hasBeenRestarted = checkSuperstepRestarted(superstep);
341 
342       GlobalStats globalStats = serviceWorker.getGlobalStats();
343 
344       if (hasBeenRestarted) {
345         graphState = new GraphState(superstep,
346             finishedSuperstepStats.getVertexCount(),
347             finishedSuperstepStats.getEdgeCount(),
348             context);
349       } else if (storeCheckpoint(globalStats.getCheckpointStatus())) {
350         break;
351       }
352       serviceWorker.getServerData().prepareResolveMutations();
353       context.progress();
354       prepareForSuperstep(graphState);
355       context.progress();
356       MessageStore<I, Writable> messageStore =
357           serviceWorker.getServerData().getCurrentMessageStore();
358       int numPartitions = serviceWorker.getPartitionStore().getNumPartitions();
359       int numThreads = Math.min(numComputeThreads, numPartitions);
360       if (LOG.isInfoEnabled()) {
361         LOG.info("execute: " + numPartitions + " partitions to process with " +
362           numThreads + " compute thread(s), originally " +
363           numComputeThreads + " thread(s) on superstep " + superstep);
364       }
365       partitionStatsList.clear();
366       // execute the current superstep
367       if (numPartitions > 0) {
368         processGraphPartitions(context, partitionStatsList, graphState,
369           messageStore, numThreads);
370       }
371       finishedSuperstepStats = completeSuperstepAndCollectStats(
372         partitionStatsList, superstepTimerContext);
373 
374       // END of superstep compute loop
375     }
376 
377     if (LOG.isInfoEnabled()) {
378       LOG.info("execute: BSP application done (global vertices marked done)");
379     }
380     updateSuperstepGraphState();
381     postApplication();
382   }
383 
384   /**
385    * Handle post-application callbacks.
386    */
387   private void postApplication() throws IOException, InterruptedException {
388     GiraphTimerContext postAppTimerContext = wcPostAppTimer.time();
389     serviceWorker.getWorkerContext().postApplication();
390     serviceWorker.getSuperstepOutput().postApplication();
391     postAppTimerContext.stop();
392     context.progress();
393 
394     for (WorkerObserver obs : serviceWorker.getWorkerObservers()) {
395       obs.postApplication();
396       context.progress();
397     }
398   }
399 
400   /**
401    * Sets the "isMaster" flag for final output commit to happen on master.
402    * @param im the boolean input to set isMaster. Applies to "pure YARN only"
403    */
404   public void setIsMaster(final boolean im) {
405     this.isMaster = im;
406   }
407 
408   /**
409    * Get "isMaster" status flag -- we need to know if we're the master in the
410    * "finally" block of our GiraphYarnTask#execute() to commit final job output.
411    * @return true if this task IS the master.
412    */
413   public boolean isMaster() {
414     return isMaster;
415   }
416 
417   /**
418    * Produce a reference to the "start" superstep timer for the current
419    * superstep.
420    * @param superstep the current superstep count
421    * @return a GiraphTimerContext representing the "start" of the supestep
422    */
423   private GiraphTimerContext getTimerForThisSuperstep(long superstep) {
424     GiraphMetrics.get().resetSuperstepMetrics(superstep);
425     return superstepTimer.time();
426   }
427 
428   /**
429    * Utility to encapsulate Giraph metrics setup calls
430    */
431   private void setupAndInitializeGiraphMetrics() {
432     GiraphMetrics.init(conf);
433     GiraphMetrics.get().addSuperstepResetObserver(this);
434     initJobMetrics();
435     MemoryUtils.initMetrics();
436     InputSplitsCallable.initMetrics();
437   }
438 
439   /**
440    * Instantiate and configure ZooKeeperManager for this job. This will
441    * result in a Giraph-owned Zookeeper instance, a connection to an
442    * existing quorum as specified in the job configuration, or task failure
443    * @return true if this task should terminate
444    */
445   private boolean startZooKeeperManager()
446     throws IOException, InterruptedException {
447     zkManager = new ZooKeeperManager(context, conf);
448     context.setStatus("setup: Setting up Zookeeper manager.");
449     zkManager.setup();
450     if (zkManager.computationDone()) {
451       done = true;
452       return true;
453     }
454     zkManager.onlineZooKeeperServer();
455     String serverPortList = zkManager.getZooKeeperServerPortString();
456     conf.setZookeeperList(serverPortList);
457     createZooKeeperCounter(serverPortList);
458     return false;
459   }
460 
461   /**
462    * Utility to place a new, updated GraphState object into the serviceWorker.
463    */
464   private void updateSuperstepGraphState() {
465     serviceWorker.getWorkerContext().setGraphState(
466         new GraphState(serviceWorker.getSuperstep(),
467             finishedSuperstepStats.getVertexCount(),
468             finishedSuperstepStats.getEdgeCount(), context));
469   }
470 
471   /**
472    * Utility function for boilerplate updates and cleanup done at the
473    * end of each superstep processing loop in the <code>execute</code> method.
474    * @param partitionStatsList list of stas for each superstep to append to
475    * @param superstepTimerContext for job metrics
476    * @return the collected stats at the close of the current superstep.
477    */
478   private FinishedSuperstepStats completeSuperstepAndCollectStats(
479     List<PartitionStats> partitionStatsList,
480     GiraphTimerContext superstepTimerContext) {
481 
482     // the superstep timer is stopped inside the finishSuperstep function
483     // (otherwise metrics are not available at the end of the computation
484     //  using giraph.metrics.enable=true).
485     finishedSuperstepStats =
486       serviceWorker.finishSuperstep(partitionStatsList, superstepTimerContext);
487     if (conf.metricsEnabled()) {
488       GiraphMetrics.get().perSuperstep().printSummary(System.err);
489     }
490     return finishedSuperstepStats;
491   }
492 
493   /**
494    * Utility function to prepare various objects managing BSP superstep
495    * operations for the next superstep.
496    * @param graphState graph state metadata object
497    */
498   private void prepareForSuperstep(GraphState graphState) {
499     serviceWorker.prepareSuperstep();
500 
501     serviceWorker.getWorkerContext().setGraphState(graphState);
502     serviceWorker.getWorkerContext().setupSuperstep(serviceWorker);
503     GiraphTimerContext preSuperstepTimer = wcPreSuperstepTimer.time();
504     serviceWorker.getWorkerContext().preSuperstep();
505     preSuperstepTimer.stop();
506     context.progress();
507 
508     for (WorkerObserver obs : serviceWorker.getWorkerObservers()) {
509       obs.preSuperstep(graphState.getSuperstep());
510       context.progress();
511     }
512   }
513 
514   /**
515    * Prepare graph state and worker context for superstep cycles.
516    */
517   private void prepareGraphStateAndWorkerContext() {
518     updateSuperstepGraphState();
519     workerContextPreApp();
520   }
521 
522   /**
523     * Get the worker function enum.
524     *
525     * @return an enum detailing the roles assigned to this
526     *         compute node for this Giraph job.
527     */
528   public GraphFunctions getGraphFunctions() {
529     return graphFunctions;
530   }
531 
532   public final WorkerContext getWorkerContext() {
533     return serviceWorker.getWorkerContext();
534   }
535 
536   public JobProgressTracker getJobProgressTracker() {
537     return jobProgressTracker;
538   }
539 
540   /**
541    * Figure out what roles this BSP compute node should take on in the job.
542    * Basic logic is as follows:
543    * 1) If not split master, everyone does the everything and/or running
544    *    ZooKeeper.
545    * 2) If split master/worker, masters also run ZooKeeper
546    *
547    * 3) If split master/worker == true and <code>giraph.zkList</code> is
548    *    externally provided, the master will not instantiate a ZK instance, but
549    *    will assume a quorum is already active on the cluster for Giraph to use.
550    *
551    * @param conf Configuration to use
552    * @param zkManager ZooKeeper manager to help determine whether to run
553    *        ZooKeeper.
554    * @return Functions that this mapper should do.
555    */
556   private static GraphFunctions determineGraphFunctions(
557       ImmutableClassesGiraphConfiguration conf,
558       ZooKeeperManager zkManager) {
559     boolean splitMasterWorker = conf.getSplitMasterWorker();
560     int taskPartition = conf.getTaskPartition();
561     boolean zkAlreadyProvided = conf.isZookeeperExternal();
562     GraphFunctions functions = GraphFunctions.UNKNOWN;
563     // What functions should this mapper do?
564     if (!splitMasterWorker) {
565       if ((zkManager != null) && zkManager.runsZooKeeper()) {
566         functions = GraphFunctions.ALL;
567       } else {
568         functions = GraphFunctions.ALL_EXCEPT_ZOOKEEPER;
569       }
570     } else {
571       if (zkAlreadyProvided) {
572         if (taskPartition == 0) {
573           functions = GraphFunctions.MASTER_ONLY;
574         } else {
575           functions = GraphFunctions.WORKER_ONLY;
576         }
577       } else {
578         if ((zkManager != null) && zkManager.runsZooKeeper()) {
579           functions = GraphFunctions.MASTER_ZOOKEEPER_ONLY;
580         } else {
581           functions = GraphFunctions.WORKER_ONLY;
582         }
583       }
584     }
585     return functions;
586   }
587 
588   /**
589    * Instantiate the appropriate BspService object (Master or Worker)
590    * for this compute node.
591    */
592   private void instantiateBspService()
593     throws IOException, InterruptedException {
594     if (graphFunctions.isMaster()) {
595       if (LOG.isInfoEnabled()) {
596         LOG.info("setup: Starting up BspServiceMaster " +
597           "(master thread)...");
598       }
599       serviceMaster = new BspServiceMaster<I, V, E>(context, this);
600       masterThread = new MasterThread<I, V, E>(serviceMaster, context);
601       masterThread.setUncaughtExceptionHandler(
602           createUncaughtExceptionHandler());
603       masterThread.start();
604     }
605     if (graphFunctions.isWorker()) {
606       if (LOG.isInfoEnabled()) {
607         LOG.info("setup: Starting up BspServiceWorker...");
608       }
609       serviceWorker = new BspServiceWorker<I, V, E>(context, this);
610       installGCMonitoring();
611       if (LOG.isInfoEnabled()) {
612         LOG.info("setup: Registering health of this worker...");
613       }
614     }
615   }
616 
617   /**
618    * Install GC monitoring. This method intercepts all GC, log the gc, and
619    * notifies an out-of-core engine (if any is used) about the GC.
620    */
621   private void installGCMonitoring() {
622     final GcObserver[] gcObservers = conf.createGcObservers(context);
623     List<GarbageCollectorMXBean> mxBeans = ManagementFactory
624         .getGarbageCollectorMXBeans();
625     final OutOfCoreEngine oocEngine =
626         serviceWorker.getServerData().getOocEngine();
627     for (GarbageCollectorMXBean gcBean : mxBeans) {
628       NotificationEmitter emitter = (NotificationEmitter) gcBean;
629       NotificationListener listener = new NotificationListener() {
630         @Override
631         public void handleNotification(Notification notification,
632                                        Object handle) {
633           if (notification.getType().equals(GarbageCollectionNotificationInfo
634               .GARBAGE_COLLECTION_NOTIFICATION)) {
635             GarbageCollectionNotificationInfo info =
636                 GarbageCollectionNotificationInfo.from(
637                     (CompositeData) notification.getUserData());
638 
639             if (LOG.isInfoEnabled()) {
640               LOG.info("installGCMonitoring: name = " + info.getGcName() +
641                   ", action = " + info.getGcAction() + ", cause = " +
642                   info.getGcCause() + ", duration = " +
643                   info.getGcInfo().getDuration() + "ms");
644             }
645             gcTimeMetric.inc(info.getGcInfo().getDuration());
646             GiraphMetrics.get().getGcTracker().gcOccurred(info.getGcInfo());
647             for (GcObserver gcObserver : gcObservers) {
648               gcObserver.gcOccurred(info);
649             }
650             if (oocEngine != null) {
651               oocEngine.gcCompleted(info);
652             }
653           }
654         }
655       };
656       //Add the listener
657       emitter.addNotificationListener(listener, null, null);
658     }
659   }
660 
661   /**
662    * Initialize the root logger and appender to the settings in conf.
663    */
664   private void initializeAndConfigureLogging() {
665     // Set the log level
666     String logLevel = conf.getLocalLevel();
667     if (!Logger.getRootLogger().getLevel().equals(Level.toLevel(logLevel))) {
668       Logger.getRootLogger().setLevel(Level.toLevel(logLevel));
669       if (LOG.isInfoEnabled()) {
670         LOG.info("setup: Set log level to " + logLevel);
671       }
672     } else {
673       if (LOG.isInfoEnabled()) {
674         LOG.info("setup: Log level remains at " + logLevel);
675       }
676     }
677     // Sets pattern layout for all appenders
678     if (conf.useLogThreadLayout()) {
679       PatternLayout layout =
680         new PatternLayout("%-7p %d [%t] %c %x - %m%n");
681       Enumeration<Appender> appenderEnum =
682         Logger.getRootLogger().getAllAppenders();
683       while (appenderEnum.hasMoreElements()) {
684         appenderEnum.nextElement().setLayout(layout);
685       }
686     }
687     // Change ZooKeeper logging level to error (info is quite verbose) for
688     // testing only
689     if (conf.getLocalTestMode()) {
690       LogManager.getLogger(org.apache.zookeeper.server.PrepRequestProcessor.
691           class.getName()).setLevel(Level.ERROR);
692     }
693   }
694 
695   /**
696    * Initialize job-level metrics used by this class.
697    */
698   private void initJobMetrics() {
699     GiraphMetricsRegistry jobMetrics = GiraphMetrics.get().perJobOptional();
700     wcPreAppTimer = new GiraphTimer(jobMetrics, "worker-context-pre-app",
701         TimeUnit.MILLISECONDS);
702     wcPostAppTimer = new GiraphTimer(jobMetrics, "worker-context-post-app",
703         TimeUnit.MILLISECONDS);
704   }
705 
706   @Override
707   public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
708     superstepTimer = new GiraphTimer(superstepMetrics,
709         TIMER_SUPERSTEP_TIME, TimeUnit.MILLISECONDS);
710     computeAll = new GiraphTimer(superstepMetrics,
711         TIMER_COMPUTE_ALL, TimeUnit.MILLISECONDS);
712     timeToFirstMessage = new GiraphTimer(superstepMetrics,
713         TIMER_TIME_TO_FIRST_MSG, TimeUnit.MICROSECONDS);
714     communicationTimer = new GiraphTimer(superstepMetrics,
715         TIMER_COMMUNICATION_TIME, TimeUnit.MILLISECONDS);
716     gcTimeMetric = superstepMetrics.getCounter(TIMER_SUPERSTEP_GC_TIME);
717     wcPreSuperstepTimer = new GiraphTimer(superstepMetrics,
718         "worker-context-pre-superstep", TimeUnit.MILLISECONDS);
719   }
720 
721   /**
722    * Notification from Vertex that a message has been sent.
723    */
724   public void notifySentMessages() {
725     // We are tracking the time between when the compute started and the first
726     // message get sent. We use null to flag that we have already recorded it.
727     GiraphTimerContext tmp = timeToFirstMessageTimerContext;
728     if (tmp != null) {
729       synchronized (timeToFirstMessage) {
730         if (timeToFirstMessageTimerContext != null) {
731           timeToFirstMessageTimerContext.stop();
732           timeToFirstMessageTimerContext = null;
733           communicationTimerContext = communicationTimer.time();
734         }
735       }
736     }
737   }
738 
739   /**
740    * Notification of last message flushed. Comes when we finish the superstep
741    * and are done waiting for all messages to send.
742    */
743   public void notifyFinishedCommunication() {
744     GiraphTimerContext tmp = communicationTimerContext;
745     if (tmp != null) {
746       synchronized (communicationTimer) {
747         if (communicationTimerContext != null) {
748           communicationTimerContext.stop();
749           communicationTimerContext = null;
750         }
751       }
752     }
753   }
754 
755   /**
756    * Process graph data partitions active in this superstep.
757    * @param context handle to the underlying cluster framework
758    * @param partitionStatsList to pick up this superstep's processing stats
759    * @param graphState the BSP graph state
760    * @param messageStore the messages to be processed in this superstep
761    * @param numThreads number of concurrent threads to do processing
762    */
763   private void processGraphPartitions(final Mapper<?, ?, ?, ?>.Context context,
764       List<PartitionStats> partitionStatsList,
765       final GraphState graphState,
766       final MessageStore<I, Writable> messageStore,
767       int numThreads) {
768     PartitionStore<I, V, E> partitionStore = serviceWorker.getPartitionStore();
769     long verticesToCompute = 0;
770     for (Integer partitionId : partitionStore.getPartitionIds()) {
771       verticesToCompute += partitionStore.getPartitionVertexCount(partitionId);
772     }
773     WorkerProgress.get().startSuperstep(
774         serviceWorker.getSuperstep(), verticesToCompute,
775         serviceWorker.getPartitionStore().getNumPartitions());
776     partitionStore.startIteration();
777 
778     GiraphTimerContext computeAllTimerContext = computeAll.time();
779     timeToFirstMessageTimerContext = timeToFirstMessage.time();
780 
781     CallableFactory<Collection<PartitionStats>> callableFactory =
782       new CallableFactory<Collection<PartitionStats>>() {
783         @Override
784         public Callable<Collection<PartitionStats>> newCallable(
785             int callableId) {
786           return new ComputeCallable<I, V, E, Writable, Writable>(
787               context,
788               graphState,
789               messageStore,
790               conf,
791               serviceWorker);
792         }
793       };
794     List<Collection<PartitionStats>> results =
795         ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
796             "compute-%d", context);
797 
798     for (Collection<PartitionStats> result : results) {
799       partitionStatsList.addAll(result);
800     }
801 
802     computeAllTimerContext.stop();
803   }
804 
805   /**
806    * Handle the event that this superstep is a restart of a failed one.
807    * @param superstep current superstep
808    * @return the graph state, updated if this is a restart superstep
809    */
810   private boolean checkSuperstepRestarted(long superstep) throws IOException {
811     // Might need to restart from another superstep
812     // (manually or automatic), or store a checkpoint
813     if (serviceWorker.getRestartedSuperstep() == superstep) {
814       if (LOG.isInfoEnabled()) {
815         LOG.info("execute: Loading from checkpoint " + superstep);
816       }
817       VertexEdgeCount vertexEdgeCount = serviceWorker.loadCheckpoint(
818         serviceWorker.getRestartedSuperstep());
819       finishedSuperstepStats = new FinishedSuperstepStats(0, false,
820           vertexEdgeCount.getVertexCount(), vertexEdgeCount.getEdgeCount(),
821           false, CheckpointStatus.NONE);
822       return true;
823     }
824     return false;
825   }
826 
827   /**
828    * Check if it's time to checkpoint and actually does checkpointing
829    * if it is.
830    * @param checkpointStatus master's decision
831    * @return true if we need to stop computation after checkpoint
832    * @throws IOException
833    */
834   private boolean storeCheckpoint(CheckpointStatus checkpointStatus)
835     throws IOException {
836     if (checkpointStatus != CheckpointStatus.NONE) {
837       serviceWorker.storeCheckpoint();
838     }
839     return checkpointStatus == CheckpointStatus.CHECKPOINT_AND_HALT;
840   }
841 
842   /**
843    * Attempt to collect the final statistics on the graph data
844    * processed in this superstep by this compute node
845    * @param inputSuperstepStats the final graph data stats object for the
846    *                            input superstep
847    * @return true if the graph data has no vertices (error?) and
848    *         this node should terminate
849    */
850   private boolean collectInputSuperstepStats(
851     FinishedSuperstepStats inputSuperstepStats) {
852     if (inputSuperstepStats.getVertexCount() == 0 &&
853         !inputSuperstepStats.mustLoadCheckpoint()) {
854       LOG.warn("map: No vertices in the graph, exiting.");
855       return true;
856     }
857     if (conf.metricsEnabled()) {
858       GiraphMetrics.get().perSuperstep().printSummary(System.err);
859     }
860     return false;
861   }
862 
863   /**
864    * Did the state of this compute node change?
865    * @return true if the processing of supersteps should terminate.
866    */
867   private boolean checkTaskState() {
868     if (done) {
869       return true;
870     }
871     GiraphMetrics.get().resetSuperstepMetrics(BspService.INPUT_SUPERSTEP);
872     if (graphFunctions.isNotAWorker()) {
873       if (LOG.isInfoEnabled()) {
874         LOG.info("map: No need to do anything when not a worker");
875       }
876       return true;
877     }
878     if (alreadyRun) {
879       throw new RuntimeException("map: In BSP, map should have only been" +
880         " run exactly once, (already run)");
881     }
882     alreadyRun = true;
883     return false;
884   }
885 
886   /**
887    * Call to the WorkerContext before application begins.
888    */
889   private void workerContextPreApp() {
890     GiraphTimerContext preAppTimerContext = wcPreAppTimer.time();
891     try {
892       serviceWorker.getWorkerContext().preApplication();
893     } catch (InstantiationException e) {
894       LOG.fatal("execute: preApplication failed in instantiation", e);
895       throw new RuntimeException(
896           "execute: preApplication failed in instantiation", e);
897     } catch (IllegalAccessException e) {
898       LOG.fatal("execute: preApplication failed in access", e);
899       throw new RuntimeException(
900           "execute: preApplication failed in access", e);
901     }
902     preAppTimerContext.stop();
903     context.progress();
904 
905     for (WorkerObserver obs : serviceWorker.getWorkerObservers()) {
906       obs.preApplication();
907       context.progress();
908     }
909   }
910 
911   /**
912    * Setup mapper observers
913    */
914   public void setupMapperObservers() {
915     mapperObservers = conf.createMapperObservers(context);
916     for (MapperObserver mapperObserver : mapperObservers) {
917       mapperObserver.setup();
918     }
919   }
920 
921   /**
922    * Executes preLoad() on worker observers.
923    */
924   private void preLoadOnWorkerObservers() {
925     for (WorkerObserver obs : serviceWorker.getWorkerObservers()) {
926       obs.preLoad();
927       context.progress();
928     }
929   }
930 
931   /**
932    * Executes postSave() on worker observers.
933    */
934   private void postSaveOnWorkerObservers() {
935     for (WorkerObserver obs : serviceWorker.getWorkerObservers()) {
936       obs.postSave();
937       context.progress();
938     }
939   }
940 
941   /**
942    * Called by owner of this GraphTaskManager object on each compute node
943    */
944   public void cleanup()
945     throws IOException, InterruptedException {
946     if (LOG.isInfoEnabled()) {
947       LOG.info("cleanup: Starting for " + getGraphFunctions());
948     }
949     jobProgressTracker.cleanup();
950     if (done) {
951       return;
952     }
953 
954     if (serviceWorker != null) {
955       serviceWorker.cleanup(finishedSuperstepStats);
956     }
957   }
958 
959   /**
960    * Method to send the counter values from the worker to the master,
961    * after all supersteps are done, and finish cleanup
962    */
963   public void sendWorkerCountersAndFinishCleanup() {
964     if (serviceWorker != null) {
965       postSaveOnWorkerObservers();
966       serviceWorker.storeCountersInZooKeeper(true);
967       serviceWorker.closeZooKeeper();
968     }
969     try {
970       if (masterThread != null) {
971         masterThread.join();
972         LOG.info("cleanup: Joined with master thread");
973       }
974     } catch (InterruptedException e) {
975       // cleanup phase -- just log the error
976       LOG.error("cleanup: Master thread couldn't join");
977     }
978     if (zkManager != null) {
979       LOG.info("cleanup: Offlining ZooKeeper servers");
980       try {
981         zkManager.offlineZooKeeperServers(ZooKeeperManager.State.FINISHED);
982         // We need this here cause apparently exceptions are eaten by Hadoop
983         // when they come from the cleanup lifecycle and it's useful to know
984         // if something is wrong.
985         //
986         // And since it's cleanup nothing too bad should happen if we don't
987         // propagate and just allow the job to finish normally.
988         // CHECKSTYLE: stop IllegalCatch
989       } catch (Throwable e) {
990         // CHECKSTYLE: resume IllegalCatch
991         LOG.error("cleanup: Error offlining zookeeper", e);
992       }
993     }
994 
995     // Stop tracking metrics
996     GiraphMetrics.get().shutdown();
997   }
998 
999   /**
1000    * Cleanup a ZooKeeper instance managed by this
1001    * GiraphWorker upon job run failure.
1002    */
1003   public void zooKeeperCleanup() {
1004     if (graphFunctions.isZooKeeper()) {
1005       // ZooKeeper may have had an issue
1006       if (zkManager != null) {
1007         zkManager.cleanup();
1008       }
1009     }
1010   }
1011 
1012   /**
1013    * Cleanup all of Giraph's framework-agnostic resources
1014    * regardless of which type of cluster Giraph is running on.
1015    */
1016   public void workerFailureCleanup() {
1017     try {
1018       if (graphFunctions.isWorker()) {
1019         serviceWorker.failureCleanup();
1020       }
1021       // Stop tracking metrics
1022       GiraphMetrics.get().shutdown();
1023     // Checkstyle exception due to needing to get the original
1024     // exception on failure
1025     // CHECKSTYLE: stop IllegalCatch
1026     } catch (RuntimeException e1) {
1027     // CHECKSTYLE: resume IllegalCatch
1028       LOG.error("run: Worker failure failed on another RuntimeException, " +
1029           "original expection will be rethrown", e1);
1030     }
1031   }
1032 
1033   /**
1034    * Creates exception handler that will terminate process gracefully in case
1035    * of any uncaught exception.
1036    * @return new exception handler object.
1037    */
1038   public Thread.UncaughtExceptionHandler createUncaughtExceptionHandler() {
1039     return new OverrideExceptionHandler(
1040         CHECKER_IF_WORKER_SHOULD_FAIL_AFTER_EXCEPTION_CLASS.newInstance(
1041             getConf()), getJobProgressTracker());
1042   }
1043 
1044   /**
1045    * Creates exception handler with the passed implementation of
1046    * {@link CheckerIfWorkerShouldFailAfterException}.
1047    *
1048    * @param checker Instance that checks whether the job should fail.
1049    * @return Exception handler.
1050    */
1051   public Thread.UncaughtExceptionHandler createUncaughtExceptionHandler(
1052     CheckerIfWorkerShouldFailAfterException checker) {
1053     return new OverrideExceptionHandler(checker, getJobProgressTracker());
1054   }
1055 
1056   public ImmutableClassesGiraphConfiguration<I, V, E> getConf() {
1057     return conf;
1058   }
1059 
1060   /**
1061    * @return Time spent in GC recorder by the GC listener
1062    */
1063   public long getSuperstepGCTime() {
1064     return (gcTimeMetric == null) ? 0 : gcTimeMetric.count();
1065   }
1066 
1067   /**
1068    * Returns a list of zookeeper servers to connect to.
1069    * If the port is set to 0 and Giraph is starting a single
1070    * ZooKeeper server, then Zookeeper will pick its own port.
1071    * Otherwise, the ZooKeeper port set by the user will be used.
1072    * @return host:port,host:port for each zookeeper
1073    */
1074   public String getZookeeperList() {
1075     if (zkManager != null) {
1076       return zkManager.getZooKeeperServerPortString();
1077     } else {
1078       return conf.getZookeeperList();
1079     }
1080   }
1081 
1082   /**
1083    * Default handler for uncaught exceptions.
1084    * It will do the best to clean up and then will terminate current giraph job.
1085    */
1086   class OverrideExceptionHandler implements Thread.UncaughtExceptionHandler {
1087     /** Checker if worker should fail after a thread gets an exception */
1088     private final CheckerIfWorkerShouldFailAfterException checker;
1089     /** JobProgressTracker to log problems to */
1090     private final JobProgressTracker jobProgressTracker;
1091 
1092     /**
1093      * Constructor
1094      *
1095      * @param checker Checker if worker should fail after a thread gets an
1096      *                exception
1097      * @param jobProgressTracker JobProgressTracker to log problems to
1098      */
1099     public OverrideExceptionHandler(
1100         CheckerIfWorkerShouldFailAfterException checker,
1101         JobProgressTracker jobProgressTracker) {
1102       this.checker = checker;
1103       this.jobProgressTracker = jobProgressTracker;
1104     }
1105 
1106     @Override
1107     public void uncaughtException(final Thread t, final Throwable e) {
1108       if (!checker.checkIfWorkerShouldFail(t, e)) {
1109         LOG.error(
1110           "uncaughtException: OverrideExceptionHandler on thread " +
1111             t.getName() + ", msg = " +  e.getMessage(), e);
1112         return;
1113       }
1114       try {
1115         LOG.fatal(
1116             "uncaughtException: OverrideExceptionHandler on thread " +
1117                 t.getName() + ", msg = " +  e.getMessage() + ", exiting...", e);
1118         byte [] exByteArray = KryoWritableWrapper.convertToByteArray(e);
1119         jobProgressTracker.logError(ExceptionUtils.getStackTrace(e),
1120                 exByteArray);
1121         zooKeeperCleanup();
1122         workerFailureCleanup();
1123       } finally {
1124         System.exit(1);
1125       }
1126     }
1127   }
1128 
1129   /**
1130    * Interface to check if worker should fail after a thread gets an exception
1131    */
1132   public interface CheckerIfWorkerShouldFailAfterException {
1133     /**
1134      * Check if worker should fail after a thread gets an exception
1135      *
1136      * @param thread Thread which raised the exception
1137      * @param exception Exception which occurred
1138      * @return True iff worker should fail after this exception
1139      */
1140     boolean checkIfWorkerShouldFail(Thread thread, Throwable exception);
1141   }
1142 
1143   /**
1144    * Class to use by default, where each exception causes job failure
1145    */
1146   public static class FailWithEveryExceptionOccurred
1147       implements CheckerIfWorkerShouldFailAfterException {
1148     @Override
1149     public boolean checkIfWorkerShouldFail(Thread thread, Throwable exception) {
1150       return true;
1151     }
1152   }
1153 
1154   /**
1155    * Checks the message of a throwable, and checks whether it is a
1156    * "connection reset by peer" type of exception.
1157    *
1158    * @param throwable Throwable
1159    * @return True if the throwable is a "connection reset by peer",
1160    * false otherwise.
1161    */
1162   public static boolean isConnectionResetByPeer(Throwable throwable) {
1163     return throwable.getMessage().startsWith(
1164       "Connection reset by peer") ? true : false;
1165   }
1166 }