View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.graph;
20  
21  import java.io.IOException;
22  import java.lang.management.GarbageCollectorMXBean;
23  import java.lang.management.ManagementFactory;
24  import java.net.URL;
25  import java.net.URLDecoder;
26  import java.util.ArrayList;
27  import java.util.Collection;
28  import java.util.Enumeration;
29  import java.util.List;
30  import java.util.concurrent.Callable;
31  import java.util.concurrent.ExecutionException;
32  import java.util.concurrent.TimeUnit;
33  
34  import com.sun.management.GarbageCollectionNotificationInfo;
35  import com.yammer.metrics.core.Counter;
36  
37  import org.apache.commons.lang3.exception.ExceptionUtils;
38  import org.apache.giraph.bsp.BspService;
39  import org.apache.giraph.bsp.CentralizedServiceMaster;
40  import org.apache.giraph.bsp.CentralizedServiceWorker;
41  import org.apache.giraph.bsp.checkpoints.CheckpointStatus;
42  import org.apache.giraph.comm.messages.MessageStore;
43  import org.apache.giraph.conf.ClassConfOption;
44  import org.apache.giraph.conf.GiraphConstants;
45  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
46  import org.apache.giraph.job.JobProgressTracker;
47  import org.apache.giraph.master.BspServiceMaster;
48  import org.apache.giraph.master.MasterThread;
49  import org.apache.giraph.metrics.GiraphMetrics;
50  import org.apache.giraph.metrics.GiraphMetricsRegistry;
51  import org.apache.giraph.metrics.GiraphTimer;
52  import org.apache.giraph.metrics.GiraphTimerContext;
53  import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
54  import org.apache.giraph.metrics.SuperstepMetricsRegistry;
55  import org.apache.giraph.ooc.OutOfCoreEngine;
56  import org.apache.giraph.partition.PartitionOwner;
57  import org.apache.giraph.partition.PartitionStats;
58  import org.apache.giraph.partition.PartitionStore;
59  import org.apache.giraph.scripting.ScriptLoader;
60  import org.apache.giraph.utils.CallableFactory;
61  import org.apache.giraph.utils.MemoryUtils;
62  import org.apache.giraph.utils.ProgressableUtils;
63  import org.apache.giraph.worker.BspServiceWorker;
64  import org.apache.giraph.worker.InputSplitsCallable;
65  import org.apache.giraph.worker.WorkerContext;
66  import org.apache.giraph.worker.WorkerObserver;
67  import org.apache.giraph.worker.WorkerProgress;
68  import org.apache.giraph.zk.ZooKeeperManager;
69  import org.apache.hadoop.conf.Configuration;
70  import org.apache.hadoop.fs.Path;
71  import org.apache.hadoop.io.Writable;
72  import org.apache.hadoop.io.WritableComparable;
73  import org.apache.hadoop.mapreduce.Mapper;
74  import org.apache.log4j.Appender;
75  import org.apache.log4j.Level;
76  import org.apache.log4j.LogManager;
77  import org.apache.log4j.Logger;
78  import org.apache.log4j.PatternLayout;
79  
80  import javax.management.Notification;
81  import javax.management.NotificationEmitter;
82  import javax.management.NotificationListener;
83  import javax.management.openmbean.CompositeData;
84  
85  /**
86   * The Giraph-specific business logic for a single BSP
87   * compute node in whatever underlying type of cluster
88   * our Giraph job will run on. Owning object will provide
89   * the glue into the underlying cluster framework
90   * and will call this object to perform Giraph work.
91   *
92   * @param <I> Vertex id
93   * @param <V> Vertex data
94   * @param <E> Edge data
95   */
96  @SuppressWarnings("rawtypes")
97  public class GraphTaskManager<I extends WritableComparable, V extends Writable,
98    E extends Writable> implements
99    ResetSuperstepMetricsObserver {
100 /*if_not[PURE_YARN]
101   static { // Eliminate this? Even MRv1 tasks should not need it here.
102     Configuration.addDefaultResource("giraph-site.xml");
103   }
104 end[PURE_YARN]*/
105   /**
106    * Class which checks if an exception on some thread should cause worker
107    * to fail
108    */
109   public static final ClassConfOption<CheckerIfWorkerShouldFailAfterException>
110   CHECKER_IF_WORKER_SHOULD_FAIL_AFTER_EXCEPTION_CLASS = ClassConfOption.create(
111       "giraph.checkerIfWorkerShouldFailAfterExceptionClass",
112       FailWithEveryExceptionOccurred.class,
113       CheckerIfWorkerShouldFailAfterException.class,
114       "Class which checks if an exception on some thread should cause worker " +
115           "to fail, by default all exceptions cause failure");
116   /** Name of metric for superstep time in msec */
117   public static final String TIMER_SUPERSTEP_TIME = "superstep-time-ms";
118   /** Name of metric for compute on all vertices in msec */
119   public static final String TIMER_COMPUTE_ALL = "compute-all-ms";
120   /** Name of metric for time from begin compute to first message sent */
121   public static final String TIMER_TIME_TO_FIRST_MSG =
122       "time-to-first-message-ms";
123   /** Name of metric for time from first message till last message flushed */
124   public static final String TIMER_COMMUNICATION_TIME = "communication-time-ms";
125   /** Name of metric for time spent doing GC per superstep in msec */
126   public static final String TIMER_SUPERSTEP_GC_TIME = "superstep-gc-time-ms";
127 
128   /** Class logger */
129   private static final Logger LOG = Logger.getLogger(GraphTaskManager.class);
130   /** Coordination service worker */
131   private CentralizedServiceWorker<I, V, E> serviceWorker;
132   /** Coordination service master */
133   private CentralizedServiceMaster<I, V, E> serviceMaster;
134   /** Coordination service master thread */
135   private Thread masterThread = null;
136   /** The worker should be run exactly once, or else there is a problem. */
137   private boolean alreadyRun = false;
138   /** Manages the ZooKeeper servers if necessary (dynamic startup) */
139   private ZooKeeperManager zkManager;
140   /** Configuration */
141   private ImmutableClassesGiraphConfiguration<I, V, E> conf;
142   /** Already complete? */
143   private boolean done = false;
144   /** What kind of functions is this mapper doing? */
145   private GraphFunctions graphFunctions = GraphFunctions.UNKNOWN;
146   /** Superstep stats */
147   private FinishedSuperstepStats finishedSuperstepStats =
148       new FinishedSuperstepStats(0, false, 0, 0, false, CheckpointStatus.NONE);
149   /** Job progress tracker */
150   private JobProgressTrackerClient jobProgressTracker;
151 
152   // Per-Job Metrics
153   /** Timer for WorkerContext#preApplication() */
154   private GiraphTimer wcPreAppTimer;
155   /** Timer for WorkerContext#postApplication() */
156   private GiraphTimer wcPostAppTimer;
157 
158   // Per-Superstep Metrics
159   /** Time for how long superstep took */
160   private GiraphTimer superstepTimer;
161   /** Time for all compute() calls in a superstep */
162   private GiraphTimer computeAll;
163   /** Time from starting compute to sending first message */
164   private GiraphTimer timeToFirstMessage;
165   /** Context for timing time to first message above */
166   private GiraphTimerContext timeToFirstMessageTimerContext;
167   /** Time from first sent message till last message flushed. */
168   private GiraphTimer communicationTimer;
169   /** Context for timing communication time above */
170   private GiraphTimerContext communicationTimerContext;
171   /** Timer for WorkerContext#preSuperstep() */
172   private GiraphTimer wcPreSuperstepTimer;
173   /** Timer to keep aggregated time spent in GC in a superstep */
174   private Counter gcTimeMetric;
175   /** The Hadoop Mapper#Context for this job */
176   private final Mapper<?, ?, ?, ?>.Context context;
177   /** is this GraphTaskManager the master? */
178   private boolean isMaster;
179   /** Mapper observers */
180   private MapperObserver[] mapperObservers;
181 
182   /**
183    * Default constructor for GiraphTaskManager.
184    * @param context a handle to the underlying cluster framework.
185    *                For Hadoop clusters, this is a Mapper#Context.
186    */
187   public GraphTaskManager(Mapper<?, ?, ?, ?>.Context context) {
188     this.context = context;
189     this.isMaster = false;
190   }
191 
192   /**
193    * Run the user's input checking code.
194    */
195   private void checkInput() {
196     if (conf.hasEdgeInputFormat()) {
197       conf.createWrappedEdgeInputFormat().checkInputSpecs(conf);
198     }
199     if (conf.hasVertexInputFormat()) {
200       conf.createWrappedVertexInputFormat().checkInputSpecs(conf);
201     }
202   }
203 
204   /**
205    * In order for job client to know which ZooKeeper the job is using,
206    * we create a counter with server:port as its name inside of
207    * ZOOKEEPER_SERVER_PORT_COUNTER_GROUP.
208    *
209    * @param serverPortList Server:port list for ZooKeeper used
210    */
211   private void createZooKeeperCounter(String serverPortList) {
212     // Getting the counter will actually create it.
213     context.getCounter(GiraphConstants.ZOOKEEPER_SERVER_PORT_COUNTER_GROUP,
214         serverPortList);
215   }
216 
217   /**
218    * Called by owner of this GraphTaskManager on each compute node
219    *
220    * @param zkPathList the path to the ZK jars we need to run the job
221    */
222   public void setup(Path[] zkPathList)
223     throws IOException, InterruptedException {
224     context.setStatus("setup: Beginning worker setup.");
225     Configuration hadoopConf = context.getConfiguration();
226     conf = new ImmutableClassesGiraphConfiguration<I, V, E>(hadoopConf);
227     setupMapperObservers();
228     initializeJobProgressTracker();
229     // Write user's graph types (I,V,E,M) back to configuration parameters so
230     // that they are set for quicker access later. These types are often
231     // inferred from the Computation class used.
232     conf.getGiraphTypes().writeIfUnset(conf);
233     // configure global logging level for Giraph job
234     initializeAndConfigureLogging();
235     // init the metrics objects
236     setupAndInitializeGiraphMetrics();
237     // Check input
238     checkInput();
239     // Load any scripts that were deployed
240     ScriptLoader.loadScripts(conf);
241     // One time setup for computation factory
242     conf.createComputationFactory().initialize(conf);
243     // Do some task setup (possibly starting up a Zookeeper service)
244     context.setStatus("setup: Initializing Zookeeper services.");
245     String serverPortList = conf.getZookeeperList();
246     if (serverPortList.isEmpty()) {
247       if (startZooKeeperManager()) {
248         return; // ZK connect/startup failed
249       }
250     } else {
251       createZooKeeperCounter(serverPortList);
252     }
253     if (zkManager != null && zkManager.runsZooKeeper()) {
254       if (LOG.isInfoEnabled()) {
255         LOG.info("setup: Chosen to run ZooKeeper...");
256       }
257     }
258     context
259         .setStatus("setup: Connected to Zookeeper service " + serverPortList);
260     this.graphFunctions = determineGraphFunctions(conf, zkManager);
261     try {
262       instantiateBspService();
263     } catch (IOException e) {
264       LOG.error("setup: Caught exception just before end of setup", e);
265       if (zkManager != null) {
266         zkManager.offlineZooKeeperServers(ZooKeeperManager.State.FAILED);
267       }
268       throw new RuntimeException(
269         "setup: Offlining servers due to exception...", e);
270     }
271     context.setStatus(getGraphFunctions().toString() + " starting...");
272   }
273 
274   /**
275    * Create and connect a client to JobProgressTrackerService,
276    * or no-op implementation if progress shouldn't be tracked or something
277    * goes wrong
278    */
279   private void initializeJobProgressTracker() {
280     if (!conf.trackJobProgressOnClient()) {
281       jobProgressTracker = new JobProgressTrackerClientNoOp();
282     } else {
283       try {
284         jobProgressTracker = new RetryableJobProgressTrackerClient(conf);
285       } catch (InterruptedException | ExecutionException e) {
286         LOG.warn("createJobProgressClient: Exception occurred while trying to" +
287             " connect to JobProgressTracker - not reporting progress", e);
288         jobProgressTracker = new JobProgressTrackerClientNoOp();
289       }
290     }
291     jobProgressTracker.mapperStarted();
292   }
293 
294   /**
295   * Perform the work assigned to this compute node for this job run.
296   * 1) Run checkpoint per frequency policy.
297   * 2) For every vertex on this mapper, run the compute() function
298   * 3) Wait until all messaging is done.
299   * 4) Check if all vertices are done.  If not goto 2).
300   * 5) Dump output.
301   */
302   public void execute() throws IOException, InterruptedException {
303     if (checkTaskState()) {
304       return;
305     }
306     preLoadOnWorkerObservers();
307     GiraphTimerContext superstepTimerContext = superstepTimer.time();
308     finishedSuperstepStats = serviceWorker.setup();
309     superstepTimerContext.stop();
310     if (collectInputSuperstepStats(finishedSuperstepStats)) {
311       return;
312     }
313     prepareGraphStateAndWorkerContext();
314     List<PartitionStats> partitionStatsList = new ArrayList<PartitionStats>();
315     int numComputeThreads = conf.getNumComputeThreads();
316 
317     // main superstep processing loop
318     while (!finishedSuperstepStats.allVerticesHalted()) {
319       final long superstep = serviceWorker.getSuperstep();
320       superstepTimerContext = getTimerForThisSuperstep(superstep);
321       GraphState graphState = new GraphState(superstep,
322           finishedSuperstepStats.getVertexCount(),
323           finishedSuperstepStats.getEdgeCount(),
324           context);
325       Collection<? extends PartitionOwner> masterAssignedPartitionOwners =
326         serviceWorker.startSuperstep();
327       if (LOG.isDebugEnabled()) {
328         LOG.debug("execute: " + MemoryUtils.getRuntimeMemoryStats());
329       }
330       context.progress();
331       serviceWorker.exchangeVertexPartitions(masterAssignedPartitionOwners);
332       context.progress();
333       boolean hasBeenRestarted = checkSuperstepRestarted(superstep);
334 
335       GlobalStats globalStats = serviceWorker.getGlobalStats();
336 
337       if (hasBeenRestarted) {
338         graphState = new GraphState(superstep,
339             finishedSuperstepStats.getVertexCount(),
340             finishedSuperstepStats.getEdgeCount(),
341             context);
342       } else if (storeCheckpoint(globalStats.getCheckpointStatus())) {
343         break;
344       }
345       serviceWorker.getServerData().prepareResolveMutations();
346       context.progress();
347       prepareForSuperstep(graphState);
348       context.progress();
349       MessageStore<I, Writable> messageStore =
350           serviceWorker.getServerData().getCurrentMessageStore();
351       int numPartitions = serviceWorker.getPartitionStore().getNumPartitions();
352       int numThreads = Math.min(numComputeThreads, numPartitions);
353       if (LOG.isInfoEnabled()) {
354         LOG.info("execute: " + numPartitions + " partitions to process with " +
355           numThreads + " compute thread(s), originally " +
356           numComputeThreads + " thread(s) on superstep " + superstep);
357       }
358       partitionStatsList.clear();
359       // execute the current superstep
360       if (numPartitions > 0) {
361         processGraphPartitions(context, partitionStatsList, graphState,
362           messageStore, numThreads);
363       }
364       finishedSuperstepStats = completeSuperstepAndCollectStats(
365         partitionStatsList, superstepTimerContext);
366 
367       // END of superstep compute loop
368     }
369 
370     if (LOG.isInfoEnabled()) {
371       LOG.info("execute: BSP application done (global vertices marked done)");
372     }
373     updateSuperstepGraphState();
374     postApplication();
375   }
376 
377   /**
378    * Handle post-application callbacks.
379    */
380   private void postApplication() throws IOException, InterruptedException {
381     GiraphTimerContext postAppTimerContext = wcPostAppTimer.time();
382     serviceWorker.getWorkerContext().postApplication();
383     serviceWorker.getSuperstepOutput().postApplication();
384     postAppTimerContext.stop();
385     context.progress();
386 
387     for (WorkerObserver obs : serviceWorker.getWorkerObservers()) {
388       obs.postApplication();
389       context.progress();
390     }
391   }
392 
393   /**
394    * Sets the "isMaster" flag for final output commit to happen on master.
395    * @param im the boolean input to set isMaster. Applies to "pure YARN only"
396    */
397   public void setIsMaster(final boolean im) {
398     this.isMaster = im;
399   }
400 
401   /**
402    * Get "isMaster" status flag -- we need to know if we're the master in the
403    * "finally" block of our GiraphYarnTask#execute() to commit final job output.
404    * @return true if this task IS the master.
405    */
406   public boolean isMaster() {
407     return isMaster;
408   }
409 
410   /**
411    * Produce a reference to the "start" superstep timer for the current
412    * superstep.
413    * @param superstep the current superstep count
414    * @return a GiraphTimerContext representing the "start" of the supestep
415    */
416   private GiraphTimerContext getTimerForThisSuperstep(long superstep) {
417     GiraphMetrics.get().resetSuperstepMetrics(superstep);
418     return superstepTimer.time();
419   }
420 
421   /**
422    * Utility to encapsulate Giraph metrics setup calls
423    */
424   private void setupAndInitializeGiraphMetrics() {
425     GiraphMetrics.init(conf);
426     GiraphMetrics.get().addSuperstepResetObserver(this);
427     initJobMetrics();
428     MemoryUtils.initMetrics();
429     InputSplitsCallable.initMetrics();
430   }
431 
432   /**
433    * Instantiate and configure ZooKeeperManager for this job. This will
434    * result in a Giraph-owned Zookeeper instance, a connection to an
435    * existing quorum as specified in the job configuration, or task failure
436    * @return true if this task should terminate
437    */
438   private boolean startZooKeeperManager()
439     throws IOException, InterruptedException {
440     zkManager = new ZooKeeperManager(context, conf);
441     context.setStatus("setup: Setting up Zookeeper manager.");
442     zkManager.setup();
443     if (zkManager.computationDone()) {
444       done = true;
445       return true;
446     }
447     zkManager.onlineZooKeeperServer();
448     String serverPortList = zkManager.getZooKeeperServerPortString();
449     conf.setZookeeperList(serverPortList);
450     createZooKeeperCounter(serverPortList);
451     return false;
452   }
453 
454   /**
455    * Utility to place a new, updated GraphState object into the serviceWorker.
456    */
457   private void updateSuperstepGraphState() {
458     serviceWorker.getWorkerContext().setGraphState(
459         new GraphState(serviceWorker.getSuperstep(),
460             finishedSuperstepStats.getVertexCount(),
461             finishedSuperstepStats.getEdgeCount(), context));
462   }
463 
464   /**
465    * Utility function for boilerplate updates and cleanup done at the
466    * end of each superstep processing loop in the <code>execute</code> method.
467    * @param partitionStatsList list of stas for each superstep to append to
468    * @param superstepTimerContext for job metrics
469    * @return the collected stats at the close of the current superstep.
470    */
471   private FinishedSuperstepStats completeSuperstepAndCollectStats(
472     List<PartitionStats> partitionStatsList,
473     GiraphTimerContext superstepTimerContext) {
474 
475     // the superstep timer is stopped inside the finishSuperstep function
476     // (otherwise metrics are not available at the end of the computation
477     //  using giraph.metrics.enable=true).
478     finishedSuperstepStats =
479       serviceWorker.finishSuperstep(partitionStatsList, superstepTimerContext);
480     if (conf.metricsEnabled()) {
481       GiraphMetrics.get().perSuperstep().printSummary(System.err);
482     }
483     return finishedSuperstepStats;
484   }
485 
486   /**
487    * Utility function to prepare various objects managing BSP superstep
488    * operations for the next superstep.
489    * @param graphState graph state metadata object
490    */
491   private void prepareForSuperstep(GraphState graphState) {
492     serviceWorker.prepareSuperstep();
493 
494     serviceWorker.getWorkerContext().setGraphState(graphState);
495     serviceWorker.getWorkerContext().setupSuperstep(serviceWorker);
496     GiraphTimerContext preSuperstepTimer = wcPreSuperstepTimer.time();
497     serviceWorker.getWorkerContext().preSuperstep();
498     preSuperstepTimer.stop();
499     context.progress();
500 
501     for (WorkerObserver obs : serviceWorker.getWorkerObservers()) {
502       obs.preSuperstep(graphState.getSuperstep());
503       context.progress();
504     }
505   }
506 
507   /**
508    * Prepare graph state and worker context for superstep cycles.
509    */
510   private void prepareGraphStateAndWorkerContext() {
511     updateSuperstepGraphState();
512     workerContextPreApp();
513   }
514 
515   /**
516     * Get the worker function enum.
517     *
518     * @return an enum detailing the roles assigned to this
519     *         compute node for this Giraph job.
520     */
521   public GraphFunctions getGraphFunctions() {
522     return graphFunctions;
523   }
524 
525   public final WorkerContext getWorkerContext() {
526     return serviceWorker.getWorkerContext();
527   }
528 
529   public JobProgressTracker getJobProgressTracker() {
530     return jobProgressTracker;
531   }
532 
533   /**
534    * Copied from JobConf to get the location of this jar.  Workaround for
535    * things like Oozie map-reduce jobs. NOTE: Pure YARN profile cannot
536    * make use of this, as the jars are unpacked at each container site.
537    *
538    * @param myClass Class to search the class loader path for to locate
539    *        the relevant jar file
540    * @return Location of the jar file containing myClass
541    */
542   private static String findContainingJar(Class<?> myClass) {
543     ClassLoader loader = myClass.getClassLoader();
544     String classFile =
545         myClass.getName().replaceAll("\\.", "/") + ".class";
546     try {
547       for (Enumeration<?> itr = loader.getResources(classFile);
548           itr.hasMoreElements();) {
549         URL url = (URL) itr.nextElement();
550         if ("jar".equals(url.getProtocol())) {
551           String toReturn = url.getPath();
552           if (toReturn.startsWith("file:")) {
553             toReturn = toReturn.substring("file:".length());
554           }
555           toReturn = URLDecoder.decode(toReturn, "UTF-8");
556           return toReturn.replaceAll("!.*$", "");
557         }
558       }
559     } catch (IOException e) {
560       throw new RuntimeException(e);
561     }
562     return null;
563   }
564 
565   /**
566    * Figure out what roles this BSP compute node should take on in the job.
567    * Basic logic is as follows:
568    * 1) If not split master, everyone does the everything and/or running
569    *    ZooKeeper.
570    * 2) If split master/worker, masters also run ZooKeeper
571    *
572    * 3) If split master/worker == true and <code>giraph.zkList</code> is
573    *    externally provided, the master will not instantiate a ZK instance, but
574    *    will assume a quorum is already active on the cluster for Giraph to use.
575    *
576    * @param conf Configuration to use
577    * @param zkManager ZooKeeper manager to help determine whether to run
578    *        ZooKeeper.
579    * @return Functions that this mapper should do.
580    */
581   private static GraphFunctions determineGraphFunctions(
582       ImmutableClassesGiraphConfiguration conf,
583       ZooKeeperManager zkManager) {
584     boolean splitMasterWorker = conf.getSplitMasterWorker();
585     int taskPartition = conf.getTaskPartition();
586     boolean zkAlreadyProvided = conf.isZookeeperExternal();
587     GraphFunctions functions = GraphFunctions.UNKNOWN;
588     // What functions should this mapper do?
589     if (!splitMasterWorker) {
590       if ((zkManager != null) && zkManager.runsZooKeeper()) {
591         functions = GraphFunctions.ALL;
592       } else {
593         functions = GraphFunctions.ALL_EXCEPT_ZOOKEEPER;
594       }
595     } else {
596       if (zkAlreadyProvided) {
597         if (taskPartition == 0) {
598           functions = GraphFunctions.MASTER_ONLY;
599         } else {
600           functions = GraphFunctions.WORKER_ONLY;
601         }
602       } else {
603         if ((zkManager != null) && zkManager.runsZooKeeper()) {
604           functions = GraphFunctions.MASTER_ZOOKEEPER_ONLY;
605         } else {
606           functions = GraphFunctions.WORKER_ONLY;
607         }
608       }
609     }
610     return functions;
611   }
612 
613   /**
614    * Instantiate the appropriate BspService object (Master or Worker)
615    * for this compute node.
616    */
617   private void instantiateBspService()
618     throws IOException, InterruptedException {
619     if (graphFunctions.isMaster()) {
620       if (LOG.isInfoEnabled()) {
621         LOG.info("setup: Starting up BspServiceMaster " +
622           "(master thread)...");
623       }
624       serviceMaster = new BspServiceMaster<I, V, E>(context, this);
625       masterThread = new MasterThread<I, V, E>(serviceMaster, context);
626       masterThread.start();
627     }
628     if (graphFunctions.isWorker()) {
629       if (LOG.isInfoEnabled()) {
630         LOG.info("setup: Starting up BspServiceWorker...");
631       }
632       serviceWorker = new BspServiceWorker<I, V, E>(context, this);
633       installGCMonitoring();
634       if (LOG.isInfoEnabled()) {
635         LOG.info("setup: Registering health of this worker...");
636       }
637     }
638   }
639 
640   /**
641    * Install GC monitoring. This method intercepts all GC, log the gc, and
642    * notifies an out-of-core engine (if any is used) about the GC.
643    */
644   private void installGCMonitoring() {
645     List<GarbageCollectorMXBean> mxBeans = ManagementFactory
646         .getGarbageCollectorMXBeans();
647     final OutOfCoreEngine oocEngine =
648         serviceWorker.getServerData().getOocEngine();
649     for (GarbageCollectorMXBean gcBean : mxBeans) {
650       NotificationEmitter emitter = (NotificationEmitter) gcBean;
651       NotificationListener listener = new NotificationListener() {
652         @Override
653         public void handleNotification(Notification notification,
654                                        Object handle) {
655           if (notification.getType().equals(GarbageCollectionNotificationInfo
656               .GARBAGE_COLLECTION_NOTIFICATION)) {
657             GarbageCollectionNotificationInfo info =
658                 GarbageCollectionNotificationInfo.from(
659                     (CompositeData) notification.getUserData());
660 
661             if (LOG.isInfoEnabled()) {
662               LOG.info("installGCMonitoring: name = " + info.getGcName() +
663                   ", action = " + info.getGcAction() + ", cause = " +
664                   info.getGcCause() + ", duration = " +
665                   info.getGcInfo().getDuration() + "ms");
666             }
667             gcTimeMetric.inc(info.getGcInfo().getDuration());
668             if (oocEngine != null) {
669               oocEngine.gcCompleted(info);
670             }
671           }
672         }
673       };
674       //Add the listener
675       emitter.addNotificationListener(listener, null, null);
676     }
677   }
678 
679   /**
680    * Initialize the root logger and appender to the settings in conf.
681    */
682   private void initializeAndConfigureLogging() {
683     // Set the log level
684     String logLevel = conf.getLocalLevel();
685     if (!Logger.getRootLogger().getLevel().equals(Level.toLevel(logLevel))) {
686       Logger.getRootLogger().setLevel(Level.toLevel(logLevel));
687       if (LOG.isInfoEnabled()) {
688         LOG.info("setup: Set log level to " + logLevel);
689       }
690     } else {
691       if (LOG.isInfoEnabled()) {
692         LOG.info("setup: Log level remains at " + logLevel);
693       }
694     }
695     // Sets pattern layout for all appenders
696     if (conf.useLogThreadLayout()) {
697       PatternLayout layout =
698         new PatternLayout("%-7p %d [%t] %c %x - %m%n");
699       Enumeration<Appender> appenderEnum =
700         Logger.getRootLogger().getAllAppenders();
701       while (appenderEnum.hasMoreElements()) {
702         appenderEnum.nextElement().setLayout(layout);
703       }
704     }
705     // Change ZooKeeper logging level to error (info is quite verbose) for
706     // testing only
707     if (conf.getLocalTestMode()) {
708       LogManager.getLogger(org.apache.zookeeper.server.PrepRequestProcessor.
709           class.getName()).setLevel(Level.ERROR);
710     }
711   }
712 
713   /**
714    * Initialize job-level metrics used by this class.
715    */
716   private void initJobMetrics() {
717     GiraphMetricsRegistry jobMetrics = GiraphMetrics.get().perJobOptional();
718     wcPreAppTimer = new GiraphTimer(jobMetrics, "worker-context-pre-app",
719         TimeUnit.MILLISECONDS);
720     wcPostAppTimer = new GiraphTimer(jobMetrics, "worker-context-post-app",
721         TimeUnit.MILLISECONDS);
722   }
723 
724   @Override
725   public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
726     superstepTimer = new GiraphTimer(superstepMetrics,
727         TIMER_SUPERSTEP_TIME, TimeUnit.MILLISECONDS);
728     computeAll = new GiraphTimer(superstepMetrics,
729         TIMER_COMPUTE_ALL, TimeUnit.MILLISECONDS);
730     timeToFirstMessage = new GiraphTimer(superstepMetrics,
731         TIMER_TIME_TO_FIRST_MSG, TimeUnit.MICROSECONDS);
732     communicationTimer = new GiraphTimer(superstepMetrics,
733         TIMER_COMMUNICATION_TIME, TimeUnit.MILLISECONDS);
734     gcTimeMetric = superstepMetrics.getCounter(TIMER_SUPERSTEP_GC_TIME);
735     wcPreSuperstepTimer = new GiraphTimer(superstepMetrics,
736         "worker-context-pre-superstep", TimeUnit.MILLISECONDS);
737   }
738 
739   /**
740    * Notification from Vertex that a message has been sent.
741    */
742   public void notifySentMessages() {
743     // We are tracking the time between when the compute started and the first
744     // message get sent. We use null to flag that we have already recorded it.
745     GiraphTimerContext tmp = timeToFirstMessageTimerContext;
746     if (tmp != null) {
747       synchronized (timeToFirstMessage) {
748         if (timeToFirstMessageTimerContext != null) {
749           timeToFirstMessageTimerContext.stop();
750           timeToFirstMessageTimerContext = null;
751           communicationTimerContext = communicationTimer.time();
752         }
753       }
754     }
755   }
756 
757   /**
758    * Notification of last message flushed. Comes when we finish the superstep
759    * and are done waiting for all messages to send.
760    */
761   public void notifyFinishedCommunication() {
762     GiraphTimerContext tmp = communicationTimerContext;
763     if (tmp != null) {
764       synchronized (communicationTimer) {
765         if (communicationTimerContext != null) {
766           communicationTimerContext.stop();
767           communicationTimerContext = null;
768         }
769       }
770     }
771   }
772 
773   /**
774    * Process graph data partitions active in this superstep.
775    * @param context handle to the underlying cluster framework
776    * @param partitionStatsList to pick up this superstep's processing stats
777    * @param graphState the BSP graph state
778    * @param messageStore the messages to be processed in this superstep
779    * @param numThreads number of concurrent threads to do processing
780    */
781   private void processGraphPartitions(final Mapper<?, ?, ?, ?>.Context context,
782       List<PartitionStats> partitionStatsList,
783       final GraphState graphState,
784       final MessageStore<I, Writable> messageStore,
785       int numThreads) {
786     PartitionStore<I, V, E> partitionStore = serviceWorker.getPartitionStore();
787     long verticesToCompute = 0;
788     for (Integer partitionId : partitionStore.getPartitionIds()) {
789       verticesToCompute += partitionStore.getPartitionVertexCount(partitionId);
790     }
791     WorkerProgress.get().startSuperstep(
792         serviceWorker.getSuperstep(), verticesToCompute,
793         serviceWorker.getPartitionStore().getNumPartitions());
794     partitionStore.startIteration();
795 
796     GiraphTimerContext computeAllTimerContext = computeAll.time();
797     timeToFirstMessageTimerContext = timeToFirstMessage.time();
798 
799     CallableFactory<Collection<PartitionStats>> callableFactory =
800       new CallableFactory<Collection<PartitionStats>>() {
801         @Override
802         public Callable<Collection<PartitionStats>> newCallable(
803             int callableId) {
804           return new ComputeCallable<I, V, E, Writable, Writable>(
805               context,
806               graphState,
807               messageStore,
808               conf,
809               serviceWorker);
810         }
811       };
812     List<Collection<PartitionStats>> results =
813         ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
814             "compute-%d", context);
815 
816     for (Collection<PartitionStats> result : results) {
817       partitionStatsList.addAll(result);
818     }
819 
820     computeAllTimerContext.stop();
821   }
822 
823   /**
824    * Handle the event that this superstep is a restart of a failed one.
825    * @param superstep current superstep
826    * @return the graph state, updated if this is a restart superstep
827    */
828   private boolean checkSuperstepRestarted(long superstep) throws IOException {
829     // Might need to restart from another superstep
830     // (manually or automatic), or store a checkpoint
831     if (serviceWorker.getRestartedSuperstep() == superstep) {
832       if (LOG.isInfoEnabled()) {
833         LOG.info("execute: Loading from checkpoint " + superstep);
834       }
835       VertexEdgeCount vertexEdgeCount = serviceWorker.loadCheckpoint(
836         serviceWorker.getRestartedSuperstep());
837       finishedSuperstepStats = new FinishedSuperstepStats(0, false,
838           vertexEdgeCount.getVertexCount(), vertexEdgeCount.getEdgeCount(),
839           false, CheckpointStatus.NONE);
840       return true;
841     }
842     return false;
843   }
844 
845   /**
846    * Check if it's time to checkpoint and actually does checkpointing
847    * if it is.
848    * @param checkpointStatus master's decision
849    * @return true if we need to stop computation after checkpoint
850    * @throws IOException
851    */
852   private boolean storeCheckpoint(CheckpointStatus checkpointStatus)
853     throws IOException {
854     if (checkpointStatus != CheckpointStatus.NONE) {
855       serviceWorker.storeCheckpoint();
856     }
857     return checkpointStatus == CheckpointStatus.CHECKPOINT_AND_HALT;
858   }
859 
860   /**
861    * Attempt to collect the final statistics on the graph data
862    * processed in this superstep by this compute node
863    * @param inputSuperstepStats the final graph data stats object for the
864    *                            input superstep
865    * @return true if the graph data has no vertices (error?) and
866    *         this node should terminate
867    */
868   private boolean collectInputSuperstepStats(
869     FinishedSuperstepStats inputSuperstepStats) {
870     if (inputSuperstepStats.getVertexCount() == 0 &&
871         !inputSuperstepStats.mustLoadCheckpoint()) {
872       LOG.warn("map: No vertices in the graph, exiting.");
873       return true;
874     }
875     if (conf.metricsEnabled()) {
876       GiraphMetrics.get().perSuperstep().printSummary(System.err);
877     }
878     return false;
879   }
880 
881   /**
882    * Did the state of this compute node change?
883    * @return true if the processing of supersteps should terminate.
884    */
885   private boolean checkTaskState() {
886     if (done) {
887       return true;
888     }
889     GiraphMetrics.get().resetSuperstepMetrics(BspService.INPUT_SUPERSTEP);
890     if (graphFunctions.isNotAWorker()) {
891       if (LOG.isInfoEnabled()) {
892         LOG.info("map: No need to do anything when not a worker");
893       }
894       return true;
895     }
896     if (alreadyRun) {
897       throw new RuntimeException("map: In BSP, map should have only been" +
898         " run exactly once, (already run)");
899     }
900     alreadyRun = true;
901     return false;
902   }
903 
904   /**
905    * Call to the WorkerContext before application begins.
906    */
907   private void workerContextPreApp() {
908     GiraphTimerContext preAppTimerContext = wcPreAppTimer.time();
909     try {
910       serviceWorker.getWorkerContext().preApplication();
911     } catch (InstantiationException e) {
912       LOG.fatal("execute: preApplication failed in instantiation", e);
913       throw new RuntimeException(
914           "execute: preApplication failed in instantiation", e);
915     } catch (IllegalAccessException e) {
916       LOG.fatal("execute: preApplication failed in access", e);
917       throw new RuntimeException(
918           "execute: preApplication failed in access", e);
919     }
920     preAppTimerContext.stop();
921     context.progress();
922 
923     for (WorkerObserver obs : serviceWorker.getWorkerObservers()) {
924       obs.preApplication();
925       context.progress();
926     }
927   }
928 
929   /**
930    * Setup mapper observers
931    */
932   public void setupMapperObservers() {
933     mapperObservers = conf.createMapperObservers();
934     for (MapperObserver mapperObserver : mapperObservers) {
935       mapperObserver.setup();
936     }
937   }
938 
939   /**
940    * Executes preLoad() on worker observers.
941    */
942   private void preLoadOnWorkerObservers() {
943     for (WorkerObserver obs : serviceWorker.getWorkerObservers()) {
944       obs.preLoad();
945       context.progress();
946     }
947   }
948 
949   /**
950    * Executes postSave() on worker observers.
951    */
952   private void postSaveOnWorkerObservers() {
953     for (WorkerObserver obs : serviceWorker.getWorkerObservers()) {
954       obs.postSave();
955       context.progress();
956     }
957   }
958 
959   /**
960    * Called by owner of this GraphTaskManager object on each compute node
961    */
962   public void cleanup()
963     throws IOException, InterruptedException {
964     if (LOG.isInfoEnabled()) {
965       LOG.info("cleanup: Starting for " + getGraphFunctions());
966     }
967     jobProgressTracker.cleanup();
968     if (done) {
969       return;
970     }
971 
972     if (serviceWorker != null) {
973       serviceWorker.cleanup(finishedSuperstepStats);
974       postSaveOnWorkerObservers();
975     }
976     try {
977       if (masterThread != null) {
978         masterThread.join();
979         LOG.info("cleanup: Joined with master thread");
980       }
981     } catch (InterruptedException e) {
982       // cleanup phase -- just log the error
983       LOG.error("cleanup: Master thread couldn't join");
984     }
985     if (zkManager != null) {
986       LOG.info("cleanup: Offlining ZooKeeper servers");
987       try {
988         zkManager.offlineZooKeeperServers(ZooKeeperManager.State.FINISHED);
989       // We need this here cause apparently exceptions are eaten by Hadoop
990       // when they come from the cleanup lifecycle and it's useful to know
991       // if something is wrong.
992       //
993       // And since it's cleanup nothing too bad should happen if we don't
994       // propagate and just allow the job to finish normally.
995       // CHECKSTYLE: stop IllegalCatch
996       } catch (Throwable e) {
997       // CHECKSTYLE: resume IllegalCatch
998         LOG.error("cleanup: Error offlining zookeeper", e);
999       }
1000     }
1001 
1002     // Stop tracking metrics
1003     GiraphMetrics.get().shutdown();
1004   }
1005 
1006   /**
1007    * Cleanup a ZooKeeper instance managed by this
1008    * GiraphWorker upon job run failure.
1009    */
1010   public void zooKeeperCleanup() {
1011     if (graphFunctions.isZooKeeper()) {
1012       // ZooKeeper may have had an issue
1013       if (zkManager != null) {
1014         zkManager.cleanup();
1015       }
1016     }
1017   }
1018 
1019   /**
1020    * Cleanup all of Giraph's framework-agnostic resources
1021    * regardless of which type of cluster Giraph is running on.
1022    */
1023   public void workerFailureCleanup() {
1024     try {
1025       if (graphFunctions.isWorker()) {
1026         serviceWorker.failureCleanup();
1027       }
1028       // Stop tracking metrics
1029       GiraphMetrics.get().shutdown();
1030     // Checkstyle exception due to needing to get the original
1031     // exception on failure
1032     // CHECKSTYLE: stop IllegalCatch
1033     } catch (RuntimeException e1) {
1034     // CHECKSTYLE: resume IllegalCatch
1035       LOG.error("run: Worker failure failed on another RuntimeException, " +
1036           "original expection will be rethrown", e1);
1037     }
1038   }
1039 
1040   /**
1041    * Creates exception handler that will terminate process gracefully in case
1042    * of any uncaught exception.
1043    * @return new exception handler object.
1044    */
1045   public Thread.UncaughtExceptionHandler createUncaughtExceptionHandler() {
1046     return new OverrideExceptionHandler(
1047         CHECKER_IF_WORKER_SHOULD_FAIL_AFTER_EXCEPTION_CLASS.newInstance(
1048             getConf()), getJobProgressTracker());
1049   }
1050 
1051   public ImmutableClassesGiraphConfiguration<I, V, E> getConf() {
1052     return conf;
1053   }
1054 
1055   /**
1056    * @return Time spent in GC recorder by the GC listener
1057    */
1058   public long getSuperstepGCTime() {
1059     return (gcTimeMetric == null) ? 0 : gcTimeMetric.count();
1060   }
1061 
1062   /**
1063    * Returns a list of zookeeper servers to connect to.
1064    * If the port is set to 0 and Giraph is starting a single
1065    * ZooKeeper server, then Zookeeper will pick its own port.
1066    * Otherwise, the ZooKeeper port set by the user will be used.
1067    * @return host:port,host:port for each zookeeper
1068    */
1069   public String getZookeeperList() {
1070     if (zkManager != null) {
1071       return zkManager.getZooKeeperServerPortString();
1072     } else {
1073       return conf.getZookeeperList();
1074     }
1075   }
1076 
1077   /**
1078    * Default handler for uncaught exceptions.
1079    * It will do the best to clean up and then will terminate current giraph job.
1080    */
1081   class OverrideExceptionHandler implements Thread.UncaughtExceptionHandler {
1082     /** Checker if worker should fail after a thread gets an exception */
1083     private final CheckerIfWorkerShouldFailAfterException checker;
1084     /** JobProgressTracker to log problems to */
1085     private final JobProgressTracker jobProgressTracker;
1086 
1087     /**
1088      * Constructor
1089      *
1090      * @param checker Checker if worker should fail after a thread gets an
1091      *                exception
1092      * @param jobProgressTracker JobProgressTracker to log problems to
1093      */
1094     public OverrideExceptionHandler(
1095         CheckerIfWorkerShouldFailAfterException checker,
1096         JobProgressTracker jobProgressTracker) {
1097       this.checker = checker;
1098       this.jobProgressTracker = jobProgressTracker;
1099     }
1100 
1101     @Override
1102     public void uncaughtException(final Thread t, final Throwable e) {
1103       if (!checker.checkIfWorkerShouldFail(t, e)) {
1104         return;
1105       }
1106       try {
1107         LOG.fatal(
1108             "uncaughtException: OverrideExceptionHandler on thread " +
1109                 t.getName() + ", msg = " +  e.getMessage() + ", exiting...", e);
1110         jobProgressTracker.logError(ExceptionUtils.getStackTrace(e));
1111 
1112         zooKeeperCleanup();
1113         workerFailureCleanup();
1114       } finally {
1115         System.exit(1);
1116       }
1117     }
1118   }
1119 
1120   /**
1121    * Interface to check if worker should fail after a thread gets an exception
1122    */
1123   public interface CheckerIfWorkerShouldFailAfterException {
1124     /**
1125      * Check if worker should fail after a thread gets an exception
1126      *
1127      * @param thread Thread which raised the exception
1128      * @param exception Exception which occurred
1129      * @return True iff worker should fail after this exception
1130      */
1131     boolean checkIfWorkerShouldFail(Thread thread, Throwable exception);
1132   }
1133 
1134   /**
1135    * Class to use by default, where each exception causes job failure
1136    */
1137   public static class FailWithEveryExceptionOccurred
1138       implements CheckerIfWorkerShouldFailAfterException {
1139     @Override
1140     public boolean checkIfWorkerShouldFail(Thread thread, Throwable exception) {
1141       return true;
1142     }
1143   }
1144 }