This project has retired. For details please refer to its Attic page.
BspServiceWorker xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.worker;
20  
21  import java.io.DataInputStream;
22  import java.io.DataOutputStream;
23  import java.io.IOException;
24  import java.nio.charset.Charset;
25  import java.util.ArrayList;
26  import java.util.Collection;
27  import java.util.Collections;
28  import java.util.HashSet;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.Map.Entry;
32  import java.util.Queue;
33  import java.util.Set;
34  import java.util.concurrent.Callable;
35  import java.util.concurrent.ConcurrentLinkedQueue;
36  import java.util.concurrent.TimeUnit;
37  
38  import net.iharder.Base64;
39  
40  import org.apache.giraph.bsp.ApplicationState;
41  import org.apache.giraph.bsp.BspService;
42  import org.apache.giraph.bsp.CentralizedServiceWorker;
43  import org.apache.giraph.bsp.checkpoints.CheckpointStatus;
44  import org.apache.giraph.comm.ServerData;
45  import org.apache.giraph.comm.WorkerClient;
46  import org.apache.giraph.comm.WorkerClientRequestProcessor;
47  import org.apache.giraph.comm.WorkerServer;
48  import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
49  import org.apache.giraph.comm.messages.MessageStore;
50  import org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper;
51  import org.apache.giraph.comm.netty.NettyClient;
52  import org.apache.giraph.comm.netty.NettyWorkerAggregatorRequestProcessor;
53  import org.apache.giraph.comm.netty.NettyWorkerClient;
54  import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
55  import org.apache.giraph.comm.netty.NettyWorkerServer;
56  import org.apache.giraph.comm.requests.PartitionStatsRequest;
57  import org.apache.giraph.conf.GiraphConstants;
58  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
59  import org.apache.giraph.counters.CustomCounter;
60  import org.apache.giraph.counters.CustomCounters;
61  import org.apache.giraph.edge.Edge;
62  import org.apache.giraph.graph.AddressesAndPartitionsWritable;
63  import org.apache.giraph.graph.FinishedSuperstepStats;
64  import org.apache.giraph.graph.GlobalStats;
65  import org.apache.giraph.graph.GraphTaskManager;
66  import org.apache.giraph.graph.Vertex;
67  import org.apache.giraph.graph.VertexEdgeCount;
68  import org.apache.giraph.io.EdgeOutputFormat;
69  import org.apache.giraph.io.EdgeWriter;
70  import org.apache.giraph.io.VertexOutputFormat;
71  import org.apache.giraph.io.VertexWriter;
72  import org.apache.giraph.io.superstep_output.SuperstepOutput;
73  import org.apache.giraph.mapping.translate.TranslateEdge;
74  import org.apache.giraph.master.MasterInfo;
75  import org.apache.giraph.master.SuperstepClasses;
76  import org.apache.giraph.metrics.GiraphMetrics;
77  import org.apache.giraph.metrics.GiraphTimer;
78  import org.apache.giraph.metrics.GiraphTimerContext;
79  import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
80  import org.apache.giraph.metrics.SuperstepMetricsRegistry;
81  import org.apache.giraph.metrics.WorkerSuperstepMetrics;
82  import org.apache.giraph.ooc.OutOfCoreEngine;
83  import org.apache.giraph.partition.Partition;
84  import org.apache.giraph.partition.PartitionExchange;
85  import org.apache.giraph.partition.PartitionOwner;
86  import org.apache.giraph.partition.PartitionStats;
87  import org.apache.giraph.partition.PartitionStore;
88  import org.apache.giraph.partition.WorkerGraphPartitioner;
89  import org.apache.giraph.utils.BlockingElementsSet;
90  import org.apache.giraph.utils.CallableFactory;
91  import org.apache.giraph.utils.CheckpointingUtils;
92  import org.apache.giraph.utils.JMapHistoDumper;
93  import org.apache.giraph.utils.LoggerUtils;
94  import org.apache.giraph.utils.MemoryUtils;
95  import org.apache.giraph.utils.ProgressableUtils;
96  import org.apache.giraph.utils.ReactiveJMapHistoDumper;
97  import org.apache.giraph.utils.WritableUtils;
98  import org.apache.giraph.zk.BspEvent;
99  import org.apache.giraph.zk.PredicateLock;
100 import org.apache.hadoop.fs.FSDataInputStream;
101 import org.apache.hadoop.fs.FSDataOutputStream;
102 import org.apache.hadoop.fs.Path;
103 import org.apache.hadoop.io.Writable;
104 import org.apache.hadoop.io.WritableComparable;
105 import org.apache.hadoop.io.compress.CompressionCodec;
106 import org.apache.hadoop.io.compress.CompressionCodecFactory;
107 import org.apache.hadoop.mapreduce.Counter;
108 import org.apache.hadoop.mapreduce.Mapper;
109 import org.apache.hadoop.mapreduce.OutputCommitter;
110 import org.apache.log4j.Level;
111 import org.apache.log4j.Logger;
112 import org.apache.zookeeper.CreateMode;
113 import org.apache.zookeeper.KeeperException;
114 import org.apache.zookeeper.WatchedEvent;
115 import org.apache.zookeeper.Watcher.Event.EventType;
116 import org.apache.zookeeper.ZooDefs.Ids;
117 import org.apache.zookeeper.data.Stat;
118 import org.json.JSONArray;
119 import org.json.JSONException;
120 import org.json.JSONObject;
121 
122 import com.google.common.collect.Lists;
123 
124 import static org.apache.giraph.graph.GraphTaskManager.isConnectionResetByPeer;
125 
126 /**
127  * ZooKeeper-based implementation of {@link CentralizedServiceWorker}.
128  *
129  * @param <I> Vertex id
130  * @param <V> Vertex data
131  * @param <E> Edge data
132  */
133 @SuppressWarnings("rawtypes")
134 public class BspServiceWorker<I extends WritableComparable,
135     V extends Writable, E extends Writable>
136     extends BspService<I, V, E>
137     implements CentralizedServiceWorker<I, V, E>,
138     ResetSuperstepMetricsObserver {
139   /** Name of gauge for time spent waiting on other workers */
140   public static final String TIMER_WAIT_REQUESTS = "wait-requests-us";
141   /** Class logger */
142   private static final Logger LOG = Logger.getLogger(BspServiceWorker.class);
143   /** My process health znode */
144   private String myHealthZnode;
145   /** Worker info */
146   private final WorkerInfo workerInfo;
147   /** Worker graph partitioner */
148   private final WorkerGraphPartitioner<I, V, E> workerGraphPartitioner;
149   /** Local Data for each worker */
150   private final LocalData<I, V, E, ? extends Writable> localData;
151   /** Used to translate Edges during vertex input phase based on localData */
152   private final TranslateEdge<I, E> translateEdge;
153   /** IPC Client */
154   private final WorkerClient<I, V, E> workerClient;
155   /** IPC Server */
156   private final WorkerServer<I, V, E> workerServer;
157   /** Request processor for aggregator requests */
158   private final WorkerAggregatorRequestProcessor
159   workerAggregatorRequestProcessor;
160   /** Master info */
161   private MasterInfo masterInfo = new MasterInfo();
162   /** List of workers */
163   private List<WorkerInfo> workerInfoList = Lists.newArrayList();
164   /** Have the partition exchange children (workers) changed? */
165   private final BspEvent partitionExchangeChildrenChanged;
166 
167   /** Addresses and partitions transfer */
168   private BlockingElementsSet<AddressesAndPartitionsWritable>
169       addressesAndPartitionsHolder = new BlockingElementsSet<>();
170 
171   /** Worker Context */
172   private final WorkerContext workerContext;
173 
174   /** Handler for aggregators */
175   private final WorkerAggregatorHandler globalCommHandler;
176 
177   /** Superstep output */
178   private final SuperstepOutput<I, V, E> superstepOutput;
179 
180   /** array of observers to call back to */
181   private final WorkerObserver[] observers;
182   /** Writer for worker progress */
183   private final WorkerProgressWriter workerProgressWriter;
184 
185   // Per-Superstep Metrics
186   /** Timer for WorkerContext#postSuperstep */
187   private GiraphTimer wcPostSuperstepTimer;
188   /** Time spent waiting on requests to finish */
189   private GiraphTimer waitRequestsTimer;
190 
191   /** InputSplit handlers used in INPUT_SUPERSTEP */
192   private final WorkerInputSplitsHandler inputSplitsHandler;
193 
194   /** Memory observer */
195   private final MemoryObserver memoryObserver;
196 
197   /**
198    * Constructor for setting up the worker.
199    *
200    * @param context Mapper context
201    * @param graphTaskManager GraphTaskManager for this compute node
202    * @throws IOException
203    * @throws InterruptedException
204    */
205   public BspServiceWorker(
206     Mapper<?, ?, ?, ?>.Context context,
207     GraphTaskManager<I, V, E> graphTaskManager)
208     throws IOException, InterruptedException {
209     super(context, graphTaskManager);
210     ImmutableClassesGiraphConfiguration<I, V, E> conf = getConfiguration();
211     localData = new LocalData<>(conf);
212     translateEdge = getConfiguration().edgeTranslationInstance();
213     if (translateEdge != null) {
214       translateEdge.initialize(this);
215     }
216     partitionExchangeChildrenChanged = new PredicateLock(context);
217     registerBspEvent(partitionExchangeChildrenChanged);
218     workerGraphPartitioner =
219         getGraphPartitionerFactory().createWorkerGraphPartitioner();
220     workerInfo = new WorkerInfo();
221     workerServer = new NettyWorkerServer<I, V, E>(conf, this, context,
222         graphTaskManager.createUncaughtExceptionHandler(
223           (thread, throwable) -> {
224             // If the connection was closed by the client, then we just log
225             // the error, we do not fail the job, since the client will
226             // attempt to reconnect.
227             return !isConnectionResetByPeer(throwable);
228           }
229         )
230     );
231     workerInfo.setInetSocketAddress(workerServer.getMyAddress(),
232         workerServer.getLocalHostOrIp());
233     workerInfo.setTaskId(getTaskId());
234     workerClient = new NettyWorkerClient<I, V, E>(context, conf, this,
235         graphTaskManager.createUncaughtExceptionHandler());
236     workerServer.setFlowControl(workerClient.getFlowControl());
237     OutOfCoreEngine oocEngine = workerServer.getServerData().getOocEngine();
238     if (oocEngine != null) {
239       oocEngine.setFlowControl(workerClient.getFlowControl());
240     }
241 
242     workerAggregatorRequestProcessor =
243         new NettyWorkerAggregatorRequestProcessor(getContext(), conf, this);
244 
245     globalCommHandler = new WorkerAggregatorHandler(this, conf, context);
246 
247     workerContext = conf.createWorkerContext();
248     workerContext.setWorkerGlobalCommUsage(globalCommHandler);
249 
250     superstepOutput = conf.createSuperstepOutput(context);
251 
252     if (conf.isJMapHistogramDumpEnabled()) {
253       conf.addWorkerObserverClass(JMapHistoDumper.class);
254     }
255     if (conf.isReactiveJmapHistogramDumpEnabled()) {
256       conf.addWorkerObserverClass(ReactiveJMapHistoDumper.class);
257     }
258     observers = conf.createWorkerObservers(context);
259 
260     WorkerProgress.get().setTaskId(getTaskId());
261     workerProgressWriter = conf.trackJobProgressOnClient() ?
262         new WorkerProgressWriter(graphTaskManager.getJobProgressTracker()) :
263         null;
264 
265     GiraphMetrics.get().addSuperstepResetObserver(this);
266 
267     inputSplitsHandler = new WorkerInputSplitsHandler(
268         workerInfo, masterInfo.getTaskId(), workerClient);
269 
270     memoryObserver = new MemoryObserver(getZkExt(), memoryObserverPath, conf);
271   }
272 
273   @Override
274   public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
275     waitRequestsTimer = new GiraphTimer(superstepMetrics,
276         TIMER_WAIT_REQUESTS, TimeUnit.MICROSECONDS);
277     wcPostSuperstepTimer = new GiraphTimer(superstepMetrics,
278         "worker-context-post-superstep", TimeUnit.MICROSECONDS);
279   }
280 
281   @Override
282   public WorkerContext getWorkerContext() {
283     return workerContext;
284   }
285 
286   @Override
287   public WorkerObserver[] getWorkerObservers() {
288     return observers;
289   }
290 
291   @Override
292   public WorkerClient<I, V, E> getWorkerClient() {
293     return workerClient;
294   }
295 
296   public LocalData<I, V, E, ? extends Writable> getLocalData() {
297     return localData;
298   }
299 
300   public TranslateEdge<I, E> getTranslateEdge() {
301     return translateEdge;
302   }
303 
304   /**
305    * Intended to check the health of the node.  For instance, can it ssh,
306    * dmesg, etc. For now, does nothing.
307    * TODO: Make this check configurable by the user (i.e. search dmesg for
308    * problems).
309    *
310    * @return True if healthy (always in this case).
311    */
312   public boolean isHealthy() {
313     return true;
314   }
315 
316   /**
317    * Load the vertices/edges from input slits. Do this until all the
318    * InputSplits have been processed.
319    * All workers will try to do as many InputSplits as they can.  The master
320    * will monitor progress and stop this once all the InputSplits have been
321    * loaded and check-pointed.  Keep track of the last input split path to
322    * ensure the input split cache is flushed prior to marking the last input
323    * split complete.
324    *
325    * Use one or more threads to do the loading.
326    *
327    * @param inputSplitsCallableFactory Factory for {@link InputSplitsCallable}s
328    * @return Statistics of the vertices and edges loaded
329    * @throws InterruptedException
330    * @throws KeeperException
331    */
332   private VertexEdgeCount loadInputSplits(
333       CallableFactory<VertexEdgeCount> inputSplitsCallableFactory)
334     throws KeeperException, InterruptedException {
335     VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
336     int numThreads = getConfiguration().getNumInputSplitsThreads();
337     if (LOG.isInfoEnabled()) {
338       LOG.info("loadInputSplits: Using " + numThreads + " thread(s), " +
339           "originally " + getConfiguration().getNumInputSplitsThreads() +
340           " threads(s)");
341     }
342 
343     List<VertexEdgeCount> results =
344         ProgressableUtils.getResultsWithNCallables(inputSplitsCallableFactory,
345             numThreads, "load-%d", getContext());
346     for (VertexEdgeCount result : results) {
347       vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(result);
348     }
349 
350     workerClient.waitAllRequests();
351     return vertexEdgeCount;
352   }
353 
354   /**
355    * Load the mapping entries from the user-defined
356    * {@link org.apache.giraph.io.MappingReader}
357    *
358    * @return Count of mapping entries loaded
359    */
360   private long loadMapping() throws KeeperException,
361     InterruptedException {
362     MappingInputSplitsCallableFactory<I, V, E, ? extends Writable>
363         inputSplitsCallableFactory =
364         new MappingInputSplitsCallableFactory<>(
365             getConfiguration().createWrappedMappingInputFormat(),
366             getContext(),
367             getConfiguration(),
368             this,
369             inputSplitsHandler);
370 
371     long mappingsLoaded =
372         loadInputSplits(inputSplitsCallableFactory).getMappingCount();
373 
374     // after all threads finish loading - call postFilling
375     localData.getMappingStore().postFilling();
376     return mappingsLoaded;
377   }
378 
379   /**
380    * Load the vertices from the user-defined
381    * {@link org.apache.giraph.io.VertexReader}
382    *
383    * @return Count of vertices and edges loaded
384    */
385   private VertexEdgeCount loadVertices() throws KeeperException,
386       InterruptedException {
387     VertexInputSplitsCallableFactory<I, V, E> inputSplitsCallableFactory =
388         new VertexInputSplitsCallableFactory<I, V, E>(
389             getConfiguration().createWrappedVertexInputFormat(),
390             getContext(),
391             getConfiguration(),
392             this,
393             inputSplitsHandler);
394 
395     return loadInputSplits(inputSplitsCallableFactory);
396   }
397 
398   /**
399    * Load the edges from the user-defined
400    * {@link org.apache.giraph.io.EdgeReader}.
401    *
402    * @return Number of edges loaded
403    */
404   private long loadEdges() throws KeeperException, InterruptedException {
405     EdgeInputSplitsCallableFactory<I, V, E> inputSplitsCallableFactory =
406         new EdgeInputSplitsCallableFactory<I, V, E>(
407             getConfiguration().createWrappedEdgeInputFormat(),
408             getContext(),
409             getConfiguration(),
410             this,
411             inputSplitsHandler);
412 
413     return loadInputSplits(inputSplitsCallableFactory).getEdgeCount();
414   }
415 
416   @Override
417   public MasterInfo getMasterInfo() {
418     return masterInfo;
419   }
420 
421   @Override
422   public List<WorkerInfo> getWorkerInfoList() {
423     return workerInfoList;
424   }
425 
426   /**
427    * Mark current worker as done and then wait for all workers
428    * to finish processing input splits.
429    */
430   private void markCurrentWorkerDoneReadingThenWaitForOthers() {
431     String workerInputSplitsDonePath =
432         inputSplitsWorkerDonePath + "/" + getWorkerInfo().getHostnameId();
433     try {
434       getZkExt().createExt(workerInputSplitsDonePath,
435           null,
436           Ids.OPEN_ACL_UNSAFE,
437           CreateMode.PERSISTENT,
438           true);
439     } catch (KeeperException e) {
440       throw new IllegalStateException(
441           "markCurrentWorkerDoneThenWaitForOthers: " +
442               "KeeperException creating worker done splits", e);
443     } catch (InterruptedException e) {
444       throw new IllegalStateException(
445           "markCurrentWorkerDoneThenWaitForOthers: " +
446               "InterruptedException creating worker done splits", e);
447     }
448     while (true) {
449       Stat inputSplitsDoneStat;
450       try {
451         inputSplitsDoneStat =
452             getZkExt().exists(inputSplitsAllDonePath, true);
453       } catch (KeeperException e) {
454         throw new IllegalStateException(
455             "markCurrentWorkerDoneThenWaitForOthers: " +
456                 "KeeperException waiting on worker done splits", e);
457       } catch (InterruptedException e) {
458         throw new IllegalStateException(
459             "markCurrentWorkerDoneThenWaitForOthers: " +
460                 "InterruptedException waiting on worker done splits", e);
461       }
462       if (inputSplitsDoneStat != null) {
463         break;
464       }
465       getInputSplitsAllDoneEvent().waitForTimeoutOrFail(
466           GiraphConstants.WAIT_FOR_OTHER_WORKERS_TIMEOUT_MSEC.get(
467               getConfiguration()));
468       getInputSplitsAllDoneEvent().reset();
469     }
470   }
471 
472   @Override
473   public FinishedSuperstepStats setup() {
474     // Unless doing a restart, prepare for computation:
475     // 1. Start superstep INPUT_SUPERSTEP (no computation)
476     // 2. Wait until the INPUT_SPLIT_ALL_READY_PATH node has been created
477     // 3. Process input splits until there are no more.
478     // 4. Wait until the INPUT_SPLIT_ALL_DONE_PATH node has been created
479     // 5. Process any mutations deriving from add edge requests
480     // 6. Wait for superstep INPUT_SUPERSTEP to complete.
481     if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
482       setCachedSuperstep(getRestartedSuperstep());
483       return new FinishedSuperstepStats(0, false, 0, 0, true,
484           CheckpointStatus.NONE);
485     }
486 
487     JSONObject jobState = getJobState();
488     if (jobState != null) {
489       try {
490         if ((ApplicationState.valueOf(jobState.getString(JSONOBJ_STATE_KEY)) ==
491             ApplicationState.START_SUPERSTEP) &&
492             jobState.getLong(JSONOBJ_SUPERSTEP_KEY) ==
493             getSuperstep()) {
494           if (LOG.isInfoEnabled()) {
495             LOG.info("setup: Restarting from an automated " +
496                 "checkpointed superstep " +
497                 getSuperstep() + ", attempt " +
498                 getApplicationAttempt());
499           }
500           setRestartedSuperstep(getSuperstep());
501           return new FinishedSuperstepStats(0, false, 0, 0, true,
502               CheckpointStatus.NONE);
503         }
504       } catch (JSONException e) {
505         throw new RuntimeException(
506             "setup: Failed to get key-values from " +
507                 jobState.toString(), e);
508       }
509     }
510 
511     // Add the partitions that this worker owns
512     Collection<? extends PartitionOwner> masterSetPartitionOwners =
513         startSuperstep();
514     workerGraphPartitioner.updatePartitionOwners(
515         getWorkerInfo(), masterSetPartitionOwners);
516     getPartitionStore().initialize();
517 
518 /*if[HADOOP_NON_SECURE]
519     workerClient.setup();
520 else[HADOOP_NON_SECURE]*/
521     workerClient.setup(getConfiguration().authenticate());
522 /*end[HADOOP_NON_SECURE]*/
523 
524     // Initialize aggregator at worker side during setup.
525     // Do this just before vertex and edge loading.
526     globalCommHandler.prepareSuperstep(workerAggregatorRequestProcessor);
527 
528     VertexEdgeCount vertexEdgeCount;
529     long entriesLoaded;
530 
531     if (getConfiguration().hasMappingInputFormat()) {
532       getContext().progress();
533       try {
534         entriesLoaded = loadMapping();
535         // successfully loaded mapping
536         // now initialize graphPartitionerFactory with this data
537         getGraphPartitionerFactory().initialize(localData);
538       } catch (InterruptedException e) {
539         throw new IllegalStateException(
540             "setup: loadMapping failed with InterruptedException", e);
541       } catch (KeeperException e) {
542         throw new IllegalStateException(
543             "setup: loadMapping failed with KeeperException", e);
544       }
545       getContext().progress();
546       if (LOG.isInfoEnabled()) {
547         LOG.info("setup: Finally loaded a total of " +
548             entriesLoaded + " entries from inputSplits");
549       }
550 
551       // Print stats for data stored in localData once mapping is fully
552       // loaded on all the workers
553       localData.printStats();
554     }
555 
556     if (getConfiguration().hasVertexInputFormat()) {
557       getContext().progress();
558       try {
559         vertexEdgeCount = loadVertices();
560       } catch (InterruptedException e) {
561         throw new IllegalStateException(
562             "setup: loadVertices failed with InterruptedException", e);
563       } catch (KeeperException e) {
564         throw new IllegalStateException(
565             "setup: loadVertices failed with KeeperException", e);
566       }
567       getContext().progress();
568     } else {
569       vertexEdgeCount = new VertexEdgeCount();
570     }
571     WorkerProgress.get().finishLoadingVertices();
572 
573     if (getConfiguration().hasEdgeInputFormat()) {
574       getContext().progress();
575       try {
576         vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(0, loadEdges());
577       } catch (InterruptedException e) {
578         throw new IllegalStateException(
579             "setup: loadEdges failed with InterruptedException", e);
580       } catch (KeeperException e) {
581         throw new IllegalStateException(
582             "setup: loadEdges failed with KeeperException", e);
583       }
584       getContext().progress();
585     }
586     WorkerProgress.get().finishLoadingEdges();
587 
588     if (LOG.isInfoEnabled()) {
589       LOG.info("setup: Finally loaded a total of " + vertexEdgeCount);
590     }
591 
592     markCurrentWorkerDoneReadingThenWaitForOthers();
593 
594     // Create remaining partitions owned by this worker.
595     for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
596       if (partitionOwner.getWorkerInfo().equals(getWorkerInfo()) &&
597           !getPartitionStore().hasPartition(
598               partitionOwner.getPartitionId())) {
599         Partition<I, V, E> partition =
600             getConfiguration().createPartition(
601                 partitionOwner.getPartitionId(), getContext());
602         getPartitionStore().addPartition(partition);
603       }
604     }
605 
606     // remove mapping store if possible
607     localData.removeMappingStoreIfPossible();
608 
609     if (getConfiguration().hasEdgeInputFormat()) {
610       // Move edges from temporary storage to their source vertices.
611       getServerData().getEdgeStore().moveEdgesToVertices();
612     }
613 
614     // Generate the partition stats for the input superstep and process
615     // if necessary
616     List<PartitionStats> partitionStatsList =
617         new ArrayList<PartitionStats>();
618     PartitionStore<I, V, E> partitionStore = getPartitionStore();
619     for (Integer partitionId : partitionStore.getPartitionIds()) {
620       PartitionStats partitionStats =
621           new PartitionStats(partitionId,
622               partitionStore.getPartitionVertexCount(partitionId),
623               0,
624               partitionStore.getPartitionEdgeCount(partitionId),
625               0,
626               0,
627               workerInfo.getHostnameId());
628       partitionStatsList.add(partitionStats);
629     }
630     workerGraphPartitioner.finalizePartitionStats(
631         partitionStatsList, getPartitionStore());
632 
633     return finishSuperstep(partitionStatsList, null);
634   }
635 
636   /**
637    * Register the health of this worker for a given superstep
638    */
639   private void registerHealth() {
640 
641     String myHealthPath = null;
642     if (isHealthy()) {
643       myHealthPath = getWorkerInfoHealthyPath(getApplicationAttempt(),
644           getSuperstep());
645     } else {
646       myHealthPath = getWorkerInfoUnhealthyPath(getApplicationAttempt(),
647           getSuperstep());
648     }
649     myHealthPath = myHealthPath + "/" + workerInfo.getHostnameId();
650     try {
651       myHealthZnode = getZkExt().createExt(
652           myHealthPath,
653           WritableUtils.writeToByteArray(workerInfo),
654           Ids.OPEN_ACL_UNSAFE,
655           CreateMode.EPHEMERAL,
656           true);
657     } catch (KeeperException.NodeExistsException e) {
658       LOG.warn("registerHealth: myHealthPath already exists (likely " +
659           "from previous failure): " + myHealthPath +
660           ".  Waiting for change in attempts " +
661           "to re-join the application");
662       getApplicationAttemptChangedEvent().waitForTimeoutOrFail(
663           GiraphConstants.WAIT_ZOOKEEPER_TIMEOUT_MSEC.get(
664               getConfiguration()));
665       if (LOG.isInfoEnabled()) {
666         LOG.info("registerHealth: Got application " +
667             "attempt changed event, killing self");
668       }
669       throw new IllegalStateException(
670           "registerHealth: Trying " +
671               "to get the new application attempt by killing self", e);
672     } catch (KeeperException e) {
673       throw new IllegalStateException("Creating " + myHealthPath +
674           " failed with KeeperException", e);
675     } catch (InterruptedException e) {
676       throw new IllegalStateException("Creating " + myHealthPath +
677           " failed with InterruptedException", e);
678     }
679     if (LOG.isInfoEnabled()) {
680       LOG.info("registerHealth: Created my health node for attempt=" +
681           getApplicationAttempt() + ", superstep=" +
682           getSuperstep() + " with " + myHealthZnode +
683           " and workerInfo= " + workerInfo);
684     }
685   }
686 
687   /**
688    * Do this to help notify the master quicker that this worker has failed.
689    */
690   private void unregisterHealth() {
691     LOG.error("unregisterHealth: Got failure, unregistering health on " +
692         myHealthZnode + " on superstep " + getSuperstep());
693     try {
694       getZkExt().deleteExt(myHealthZnode, -1, false);
695     } catch (InterruptedException e) {
696       throw new IllegalStateException(
697           "unregisterHealth: InterruptedException - Couldn't delete " +
698               myHealthZnode, e);
699     } catch (KeeperException e) {
700       throw new IllegalStateException(
701           "unregisterHealth: KeeperException - Couldn't delete " +
702               myHealthZnode, e);
703     }
704   }
705 
706   @Override
707   public void failureCleanup() {
708     unregisterHealth();
709   }
710 
711   @Override
712   public Collection<? extends PartitionOwner> startSuperstep() {
713     // Algorithm:
714     // 1. Communication service will combine message from previous
715     //    superstep
716     // 2. Register my health for the next superstep.
717     // 3. Wait until the partition assignment is complete and get it
718     // 4. Get the aggregator values from the previous superstep
719     if (getSuperstep() != INPUT_SUPERSTEP) {
720       workerServer.prepareSuperstep();
721     }
722 
723     registerHealth();
724 
725     AddressesAndPartitionsWritable addressesAndPartitions =
726         addressesAndPartitionsHolder.getElement(getContext());
727 
728     workerInfoList.clear();
729     workerInfoList = addressesAndPartitions.getWorkerInfos();
730     masterInfo = addressesAndPartitions.getMasterInfo();
731     workerServer.resetBytesReceivedPerSuperstep();
732 
733     if (LOG.isInfoEnabled()) {
734       LOG.info("startSuperstep: " + masterInfo);
735     }
736 
737     getContext().setStatus("startSuperstep: " +
738         getGraphTaskManager().getGraphFunctions().toString() +
739         " - Attempt=" + getApplicationAttempt() +
740         ", Superstep=" + getSuperstep());
741 
742     if (LOG.isDebugEnabled()) {
743       LOG.debug("startSuperstep: addressesAndPartitions" +
744           addressesAndPartitions.getWorkerInfos());
745       for (PartitionOwner partitionOwner : addressesAndPartitions
746           .getPartitionOwners()) {
747         LOG.debug(partitionOwner.getPartitionId() + " " +
748             partitionOwner.getWorkerInfo());
749       }
750     }
751 
752     return addressesAndPartitions.getPartitionOwners();
753   }
754 
755   @Override
756   public FinishedSuperstepStats finishSuperstep(
757       List<PartitionStats> partitionStatsList,
758       GiraphTimerContext superstepTimerContext) {
759     // This barrier blocks until success (or the master signals it to
760     // restart).
761     //
762     // Master will coordinate the barriers and aggregate "doneness" of all
763     // the vertices.  Each worker will:
764     // 1. Ensure that the requests are complete
765     // 2. Execute user postSuperstep() if necessary.
766     // 3. Save aggregator values that are in use.
767     // 4. Report the statistics (vertices, edges, messages, etc.)
768     //    of this worker
769     // 5. Let the master know it is finished.
770     // 6. Wait for the master's superstep info, and check if done
771     waitForRequestsToFinish();
772 
773     getGraphTaskManager().notifyFinishedCommunication();
774 
775     long workerSentMessages = 0;
776     long workerSentMessageBytes = 0;
777     long localVertices = 0;
778     for (PartitionStats partitionStats : partitionStatsList) {
779       workerSentMessages += partitionStats.getMessagesSentCount();
780       workerSentMessageBytes += partitionStats.getMessageBytesSentCount();
781       localVertices += partitionStats.getVertexCount();
782     }
783 
784     if (getSuperstep() != INPUT_SUPERSTEP) {
785       postSuperstepCallbacks();
786     }
787 
788     globalCommHandler.finishSuperstep(workerAggregatorRequestProcessor);
789 
790     MessageStore<I, Writable> incomingMessageStore =
791         getServerData().getIncomingMessageStore();
792     if (incomingMessageStore instanceof AsyncMessageStoreWrapper) {
793       ((AsyncMessageStoreWrapper) incomingMessageStore).waitToComplete();
794     }
795 
796     if (LOG.isInfoEnabled()) {
797       LOG.info("finishSuperstep: Superstep " + getSuperstep() +
798           ", messages = " + workerSentMessages + " " +
799           ", message bytes = " + workerSentMessageBytes + " , " +
800           MemoryUtils.getRuntimeMemoryStats());
801     }
802 
803     if (superstepTimerContext != null) {
804       superstepTimerContext.stop();
805     }
806     writeFinshedSuperstepInfoToZK(partitionStatsList,
807       workerSentMessages, workerSentMessageBytes);
808     // Store the counters uptil the end of the current superstep to the
809     // zookeeper, as best-effort so that the master has some counter values
810     // in case of a job failure
811     storeCountersInZooKeeper(false);
812 
813     LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
814         "finishSuperstep: (waiting for rest " +
815             "of workers) " +
816             getGraphTaskManager().getGraphFunctions().toString() +
817             " - Attempt=" + getApplicationAttempt() +
818             ", Superstep=" + getSuperstep());
819 
820     String superstepFinishedNode =
821         getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep());
822 
823     waitForOtherWorkers(superstepFinishedNode);
824 
825     GlobalStats globalStats = new GlobalStats();
826     SuperstepClasses superstepClasses = SuperstepClasses.createToRead(
827         getConfiguration());
828     WritableUtils.readFieldsFromZnode(
829         getZkExt(), superstepFinishedNode, false, null, globalStats,
830         superstepClasses);
831     if (LOG.isInfoEnabled()) {
832       LOG.info("finishSuperstep: Completed superstep " + getSuperstep() +
833           " with global stats " + globalStats + " and classes " +
834           superstepClasses);
835     }
836     getContext().setStatus("finishSuperstep: (all workers done) " +
837         getGraphTaskManager().getGraphFunctions().toString() +
838         " - Attempt=" + getApplicationAttempt() +
839         ", Superstep=" + getSuperstep());
840     incrCachedSuperstep();
841     getConfiguration().updateSuperstepClasses(superstepClasses);
842 
843     return new FinishedSuperstepStats(
844         localVertices,
845         globalStats.getHaltComputation(),
846         globalStats.getVertexCount(),
847         globalStats.getEdgeCount(),
848         false,
849         globalStats.getCheckpointStatus());
850   }
851 
852   /**
853    * Handle post-superstep callbacks
854    */
855   private void postSuperstepCallbacks() {
856     GiraphTimerContext timerContext = wcPostSuperstepTimer.time();
857     getWorkerContext().postSuperstep();
858     timerContext.stop();
859     getContext().progress();
860 
861     for (WorkerObserver obs : getWorkerObservers()) {
862       obs.postSuperstep(getSuperstep());
863       getContext().progress();
864     }
865   }
866 
867   /**
868    * Wait for all the requests to finish.
869    */
870   private void waitForRequestsToFinish() {
871     if (LOG.isInfoEnabled()) {
872       LOG.info("finishSuperstep: Waiting on all requests, superstep " +
873           getSuperstep() + " " +
874           MemoryUtils.getRuntimeMemoryStats());
875     }
876     GiraphTimerContext timerContext = waitRequestsTimer.time();
877     workerClient.waitAllRequests();
878     timerContext.stop();
879   }
880 
881   /**
882    * Wait for all the other Workers to finish the superstep.
883    *
884    * @param superstepFinishedNode ZooKeeper path to wait on.
885    */
886   private void waitForOtherWorkers(String superstepFinishedNode) {
887     try {
888       while (getZkExt().exists(superstepFinishedNode, true) == null) {
889         getSuperstepFinishedEvent().waitForTimeoutOrFail(
890             GiraphConstants.WAIT_FOR_OTHER_WORKERS_TIMEOUT_MSEC.get(
891                 getConfiguration()));
892         getSuperstepFinishedEvent().reset();
893       }
894     } catch (KeeperException e) {
895       throw new IllegalStateException(
896           "finishSuperstep: Failed while waiting for master to " +
897               "signal completion of superstep " + getSuperstep(), e);
898     } catch (InterruptedException e) {
899       throw new IllegalStateException(
900           "finishSuperstep: Failed while waiting for master to " +
901               "signal completion of superstep " + getSuperstep(), e);
902     }
903   }
904 
905   /**
906    * Write finished superstep info to ZooKeeper.
907    *
908    * @param partitionStatsList List of partition stats from superstep.
909    * @param workerSentMessages Number of messages sent in superstep.
910    * @param workerSentMessageBytes Number of message bytes sent
911    *                               in superstep.
912    */
913   private void writeFinshedSuperstepInfoToZK(
914       List<PartitionStats> partitionStatsList, long workerSentMessages,
915       long workerSentMessageBytes) {
916     Collection<PartitionStats> finalizedPartitionStats =
917         workerGraphPartitioner.finalizePartitionStats(
918             partitionStatsList, getPartitionStore());
919     workerClient.sendWritableRequest(masterInfo.getTaskId(),
920         new PartitionStatsRequest(finalizedPartitionStats));
921     WorkerSuperstepMetrics metrics = new WorkerSuperstepMetrics();
922     metrics.readFromRegistry();
923     byte[] metricsBytes = WritableUtils.writeToByteArray(metrics);
924 
925     JSONObject workerFinishedInfoObj = new JSONObject();
926     try {
927       workerFinishedInfoObj.put(JSONOBJ_NUM_MESSAGES_KEY, workerSentMessages);
928       workerFinishedInfoObj.put(JSONOBJ_NUM_MESSAGE_BYTES_KEY,
929         workerSentMessageBytes);
930       workerFinishedInfoObj.put(JSONOBJ_METRICS_KEY,
931           Base64.encodeBytes(metricsBytes));
932     } catch (JSONException e) {
933       throw new RuntimeException(e);
934     }
935 
936     String finishedWorkerPath =
937         getWorkerMetricsFinishedPath(getApplicationAttempt(), getSuperstep()) +
938         "/" + workerInfo.getHostnameId();
939     try {
940       getZkExt().createExt(finishedWorkerPath,
941           workerFinishedInfoObj.toString().getBytes(Charset.defaultCharset()),
942           Ids.OPEN_ACL_UNSAFE,
943           CreateMode.PERSISTENT,
944           true);
945     } catch (KeeperException.NodeExistsException e) {
946       LOG.warn("finishSuperstep: finished worker path " +
947           finishedWorkerPath + " already exists!");
948     } catch (KeeperException e) {
949       throw new IllegalStateException("Creating " + finishedWorkerPath +
950           " failed with KeeperException", e);
951     } catch (InterruptedException e) {
952       throw new IllegalStateException("Creating " + finishedWorkerPath +
953           " failed with InterruptedException", e);
954     }
955   }
956 
957   /**
958    * Save the vertices using the user-defined VertexOutputFormat from our
959    * vertexArray based on the split.
960    *
961    * @param numLocalVertices Number of local vertices
962    * @throws InterruptedException
963    */
964   private void saveVertices(long numLocalVertices) throws IOException,
965       InterruptedException {
966     ImmutableClassesGiraphConfiguration<I, V, E>  conf = getConfiguration();
967 
968     if (conf.getVertexOutputFormatClass() == null) {
969       LOG.warn("saveVertices: " +
970           GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS +
971           " not specified -- there will be no saved output");
972       return;
973     }
974     if (conf.doOutputDuringComputation()) {
975       if (LOG.isInfoEnabled()) {
976         LOG.info("saveVertices: The option for doing output during " +
977             "computation is selected, so there will be no saving of the " +
978             "output in the end of application");
979       }
980       return;
981     }
982 
983     final int numPartitions = getPartitionStore().getNumPartitions();
984     int numThreads = Math.min(getConfiguration().getNumOutputThreads(),
985         numPartitions);
986     LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
987         "saveVertices: Starting to save " + numLocalVertices + " vertices " +
988             "using " + numThreads + " threads");
989     final VertexOutputFormat<I, V, E> vertexOutputFormat =
990         getConfiguration().createWrappedVertexOutputFormat();
991     vertexOutputFormat.preWriting(getContext());
992 
993     getPartitionStore().startIteration();
994 
995     long verticesToStore = 0;
996     PartitionStore<I, V, E> partitionStore = getPartitionStore();
997     for (int partitionId : partitionStore.getPartitionIds()) {
998       verticesToStore += partitionStore.getPartitionVertexCount(partitionId);
999     }
1000     WorkerProgress.get().startStoring(
1001         verticesToStore, getPartitionStore().getNumPartitions());
1002 
1003     CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
1004       @Override
1005       public Callable<Void> newCallable(int callableId) {
1006         return new Callable<Void>() {
1007           /** How often to update WorkerProgress */
1008           private static final long VERTICES_TO_UPDATE_PROGRESS = 100000;
1009 
1010           @Override
1011           public Void call() throws Exception {
1012             VertexWriter<I, V, E> vertexWriter =
1013                 vertexOutputFormat.createVertexWriter(getContext());
1014             vertexWriter.setConf(getConfiguration());
1015             vertexWriter.initialize(getContext());
1016             long nextPrintVertices = 0;
1017             long nextUpdateProgressVertices = VERTICES_TO_UPDATE_PROGRESS;
1018             long nextPrintMsecs = System.currentTimeMillis() + 15000;
1019             int partitionIndex = 0;
1020             int numPartitions = getPartitionStore().getNumPartitions();
1021             while (true) {
1022               Partition<I, V, E> partition =
1023                   getPartitionStore().getNextPartition();
1024               if (partition == null) {
1025                 break;
1026               }
1027 
1028               long verticesWritten = 0;
1029               for (Vertex<I, V, E> vertex : partition) {
1030                 vertexWriter.writeVertex(vertex);
1031                 ++verticesWritten;
1032 
1033                 // Update status at most every 250k vertices or 15 seconds
1034                 if (verticesWritten > nextPrintVertices &&
1035                     System.currentTimeMillis() > nextPrintMsecs) {
1036                   LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
1037                       "saveVertices: Saved " + verticesWritten + " out of " +
1038                           partition.getVertexCount() + " partition vertices, " +
1039                           "on partition " + partitionIndex +
1040                           " out of " + numPartitions);
1041                   nextPrintMsecs = System.currentTimeMillis() + 15000;
1042                   nextPrintVertices = verticesWritten + 250000;
1043                 }
1044 
1045                 if (verticesWritten >= nextUpdateProgressVertices) {
1046                   WorkerProgress.get().addVerticesStored(
1047                       VERTICES_TO_UPDATE_PROGRESS);
1048                   nextUpdateProgressVertices += VERTICES_TO_UPDATE_PROGRESS;
1049                 }
1050               }
1051               getPartitionStore().putPartition(partition);
1052               ++partitionIndex;
1053               WorkerProgress.get().addVerticesStored(
1054                   verticesWritten % VERTICES_TO_UPDATE_PROGRESS);
1055               WorkerProgress.get().incrementPartitionsStored();
1056             }
1057             vertexWriter.close(getContext()); // the temp results are saved now
1058             return null;
1059           }
1060         };
1061       }
1062     };
1063     ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
1064         "save-vertices-%d", getContext());
1065 
1066     vertexOutputFormat.postWriting(getContext());
1067 
1068     LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
1069       "saveVertices: Done saving vertices.");
1070     // YARN: must complete the commit the "task" output, Hadoop isn't there.
1071     if (getConfiguration().isPureYarnJob() &&
1072       getConfiguration().getVertexOutputFormatClass() != null) {
1073       try {
1074         OutputCommitter outputCommitter =
1075           vertexOutputFormat.getOutputCommitter(getContext());
1076         if (outputCommitter.needsTaskCommit(getContext())) {
1077           LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
1078             "OutputCommitter: committing task output.");
1079           // transfer from temp dirs to "task commit" dirs to prep for
1080           // the master's OutputCommitter#commitJob(context) call to finish.
1081           outputCommitter.commitTask(getContext());
1082         }
1083       } catch (InterruptedException ie) {
1084         LOG.error("Interrupted while attempting to obtain " +
1085           "OutputCommitter.", ie);
1086       } catch (IOException ioe) {
1087         LOG.error("Master task's attempt to commit output has " +
1088           "FAILED.", ioe);
1089       }
1090     }
1091   }
1092 
1093   /**
1094    * Save the edges using the user-defined EdgeOutputFormat from our
1095    * vertexArray based on the split.
1096    *
1097    * @throws InterruptedException
1098    */
1099   private void saveEdges() throws IOException, InterruptedException {
1100     final ImmutableClassesGiraphConfiguration<I, V, E>  conf =
1101       getConfiguration();
1102 
1103     if (conf.getEdgeOutputFormatClass() == null) {
1104       LOG.warn("saveEdges: " +
1105                GiraphConstants.EDGE_OUTPUT_FORMAT_CLASS +
1106                "Make sure that the EdgeOutputFormat is not required.");
1107       return;
1108     }
1109 
1110     final int numPartitions = getPartitionStore().getNumPartitions();
1111     int numThreads = Math.min(conf.getNumOutputThreads(),
1112         numPartitions);
1113     LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
1114         "saveEdges: Starting to save the edges using " +
1115         numThreads + " threads");
1116     final EdgeOutputFormat<I, V, E> edgeOutputFormat =
1117         conf.createWrappedEdgeOutputFormat();
1118     edgeOutputFormat.preWriting(getContext());
1119 
1120     getPartitionStore().startIteration();
1121 
1122     CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
1123       @Override
1124       public Callable<Void> newCallable(int callableId) {
1125         return new Callable<Void>() {
1126           @Override
1127           public Void call() throws Exception {
1128             EdgeWriter<I, V, E>  edgeWriter =
1129                 edgeOutputFormat.createEdgeWriter(getContext());
1130             edgeWriter.setConf(conf);
1131             edgeWriter.initialize(getContext());
1132 
1133             long nextPrintVertices = 0;
1134             long nextPrintMsecs = System.currentTimeMillis() + 15000;
1135             int partitionIndex = 0;
1136             int numPartitions = getPartitionStore().getNumPartitions();
1137             while (true) {
1138               Partition<I, V, E> partition =
1139                   getPartitionStore().getNextPartition();
1140               if (partition == null) {
1141                 break;
1142               }
1143 
1144               long vertices = 0;
1145               long edges = 0;
1146               long partitionEdgeCount = partition.getEdgeCount();
1147               for (Vertex<I, V, E> vertex : partition) {
1148                 for (Edge<I, E> edge : vertex.getEdges()) {
1149                   edgeWriter.writeEdge(vertex.getId(), vertex.getValue(), edge);
1150                   ++edges;
1151                 }
1152                 ++vertices;
1153 
1154                 // Update status at most every 250k vertices or 15 seconds
1155                 if (vertices > nextPrintVertices &&
1156                     System.currentTimeMillis() > nextPrintMsecs) {
1157                   LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
1158                       "saveEdges: Saved " + edges +
1159                       " edges out of " + partitionEdgeCount +
1160                       " partition edges, on partition " + partitionIndex +
1161                       " out of " + numPartitions);
1162                   nextPrintMsecs = System.currentTimeMillis() + 15000;
1163                   nextPrintVertices = vertices + 250000;
1164                 }
1165               }
1166               getPartitionStore().putPartition(partition);
1167               ++partitionIndex;
1168             }
1169             edgeWriter.close(getContext()); // the temp results are saved now
1170             return null;
1171           }
1172         };
1173       }
1174     };
1175     ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
1176         "save-vertices-%d", getContext());
1177 
1178     edgeOutputFormat.postWriting(getContext());
1179 
1180     LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
1181       "saveEdges: Done saving edges.");
1182     // YARN: must complete the commit the "task" output, Hadoop isn't there.
1183     if (conf.isPureYarnJob() &&
1184       conf.getVertexOutputFormatClass() != null) {
1185       try {
1186         OutputCommitter outputCommitter =
1187           edgeOutputFormat.getOutputCommitter(getContext());
1188         if (outputCommitter.needsTaskCommit(getContext())) {
1189           LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
1190             "OutputCommitter: committing task output.");
1191           // transfer from temp dirs to "task commit" dirs to prep for
1192           // the master's OutputCommitter#commitJob(context) call to finish.
1193           outputCommitter.commitTask(getContext());
1194         }
1195       } catch (InterruptedException ie) {
1196         LOG.error("Interrupted while attempting to obtain " +
1197           "OutputCommitter.", ie);
1198       } catch (IOException ioe) {
1199         LOG.error("Master task's attempt to commit output has " +
1200           "FAILED.", ioe);
1201       }
1202     }
1203   }
1204 
1205   @Override
1206   public void cleanup(FinishedSuperstepStats finishedSuperstepStats)
1207     throws IOException, InterruptedException {
1208     workerClient.closeConnections();
1209     setCachedSuperstep(getSuperstep() - 1);
1210     if (finishedSuperstepStats.getCheckpointStatus() !=
1211         CheckpointStatus.CHECKPOINT_AND_HALT) {
1212       saveVertices(finishedSuperstepStats.getLocalVertexCount());
1213       saveEdges();
1214     }
1215     WorkerProgress.get().finishStoring();
1216     if (workerProgressWriter != null) {
1217       workerProgressWriter.stop();
1218     }
1219     getPartitionStore().shutdown();
1220     // All worker processes should denote they are done by adding special
1221     // znode.  Once the number of znodes equals the number of partitions
1222     // for workers and masters, the master will clean up the ZooKeeper
1223     // znodes associated with this job.
1224     String workerCleanedUpPath = cleanedUpPath  + "/" +
1225         getTaskId() + WORKER_SUFFIX;
1226     try {
1227       String finalFinishedPath =
1228           getZkExt().createExt(workerCleanedUpPath,
1229               null,
1230               Ids.OPEN_ACL_UNSAFE,
1231               CreateMode.PERSISTENT,
1232               true);
1233       if (LOG.isInfoEnabled()) {
1234         LOG.info("cleanup: Notifying master its okay to cleanup with " +
1235             finalFinishedPath);
1236       }
1237     } catch (KeeperException.NodeExistsException e) {
1238       if (LOG.isInfoEnabled()) {
1239         LOG.info("cleanup: Couldn't create finished node '" +
1240             workerCleanedUpPath);
1241       }
1242     } catch (KeeperException e) {
1243       // Cleaning up, it's okay to fail after cleanup is successful
1244       LOG.error("cleanup: Got KeeperException on notification " +
1245           "to master about cleanup", e);
1246     } catch (InterruptedException e) {
1247       // Cleaning up, it's okay to fail after cleanup is successful
1248       LOG.error("cleanup: Got InterruptedException on notification " +
1249           "to master about cleanup", e);
1250     }
1251   }
1252 
1253   /**
1254    * Method to store the counter values in the zookeeper
1255    * This is called at the end of each superstep and after finishing all the
1256    * supersteps
1257    * @param allSuperstepsDone Whether the job has finished all supersteps
1258    * This is needed to ensure the superstep number is the same for master and
1259    * worker, when all supersteps are finished
1260    *
1261    */
1262   public void storeCountersInZooKeeper(boolean allSuperstepsDone) {
1263     Set<CustomCounter> additionalCounters =
1264             CustomCounters.getAndClearCustomCounters();
1265 
1266     JSONArray jsonCounters = new JSONArray();
1267     Mapper.Context context = getContext();
1268     Counter counter;
1269     for (CustomCounter customCounter : additionalCounters) {
1270       String groupName = customCounter.getGroupName();
1271       String counterName = customCounter.getCounterName();
1272       counter = context.getCounter(groupName, counterName);
1273       customCounter.setValue(counter.getValue());
1274       jsonCounters.put(Base64.encodeBytes(
1275               WritableUtils.writeToByteArray(customCounter)));
1276     }
1277     // Add the Netty counters
1278     Map<String, Set<String>> nettyCounters =
1279             NettyClient.getCounterGroupsAndNames();
1280     for (Map.Entry<String, Set<String>> entry : nettyCounters.entrySet()) {
1281       String groupName = entry.getKey();
1282       for (String counterName: entry.getValue()) {
1283         CustomCounter customCounter = new CustomCounter(groupName, counterName,
1284                 CustomCounter.Aggregation.SUM);
1285         counter = context.getCounter(groupName, counterName);
1286         customCounter.setValue(counter.getValue());
1287         jsonCounters.put(Base64.encodeBytes(
1288                 WritableUtils.writeToByteArray(customCounter)));
1289       }
1290     }
1291     long superStep = getSuperstep() + (allSuperstepsDone ? 1 : 0);
1292     String finishedWorkerPath =
1293             getWorkerCountersFinishedPath(getApplicationAttempt(), superStep) +
1294                     "/" + workerInfo.getHostnameId();
1295     LOG.info(String.format("Writing counters to zookeeper for superstep: %d",
1296             superStep));
1297     try {
1298       getZkExt().createExt(finishedWorkerPath,
1299               jsonCounters.toString().getBytes(
1300                       Charset.defaultCharset()),
1301               Ids.OPEN_ACL_UNSAFE,
1302               CreateMode.PERSISTENT,
1303               true);
1304     } catch (KeeperException.NodeExistsException e) {
1305       LOG.warn("storeCountersInZookeeper: finished worker path " +
1306               finishedWorkerPath + " already exists!");
1307     } catch (KeeperException e) {
1308       LOG.warn("Creating " + finishedWorkerPath +
1309               " failed with KeeperException", e);
1310     } catch (InterruptedException e) {
1311       LOG.warn("Creating " + finishedWorkerPath +
1312               " failed with InterruptedException", e);
1313     }
1314   }
1315 
1316   /**
1317    * Method to close the zookeeper connection, after the worker has sent
1318    * the counters to the master
1319    *
1320    */
1321   public void closeZooKeeper() {
1322     try {
1323       getZkExt().close();
1324     } catch (InterruptedException e) {
1325       // cleanup phase -- just log the error
1326       LOG.error("cleanup: Zookeeper failed to close with " + e);
1327     }
1328 
1329     if (getConfiguration().metricsEnabled()) {
1330       GiraphMetrics.get().dumpToStream(System.err);
1331     }
1332 
1333     // Preferably would shut down the service only after
1334     // all clients have disconnected (or the exceptions on the
1335     // client side ignored).
1336     workerServer.close();
1337   }
1338 
1339   @Override
1340   public void storeCheckpoint() throws IOException {
1341     LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
1342         "storeCheckpoint: Starting checkpoint " +
1343             getGraphTaskManager().getGraphFunctions().toString() +
1344             " - Attempt=" + getApplicationAttempt() +
1345             ", Superstep=" + getSuperstep());
1346 
1347     // Algorithm:
1348     // For each partition, dump vertices and messages
1349     Path metadataFilePath = createCheckpointFilePathSafe(
1350         CheckpointingUtils.CHECKPOINT_METADATA_POSTFIX);
1351     Path validFilePath = createCheckpointFilePathSafe(
1352         CheckpointingUtils.CHECKPOINT_VALID_POSTFIX);
1353     Path checkpointFilePath = createCheckpointFilePathSafe(
1354         CheckpointingUtils.CHECKPOINT_DATA_POSTFIX);
1355 
1356 
1357     // Metadata is buffered and written at the end since it's small and
1358     // needs to know how many partitions this worker owns
1359     FSDataOutputStream metadataOutputStream =
1360         getFs().create(metadataFilePath);
1361     metadataOutputStream.writeInt(getPartitionStore().getNumPartitions());
1362 
1363     for (Integer partitionId : getPartitionStore().getPartitionIds()) {
1364       metadataOutputStream.writeInt(partitionId);
1365     }
1366     metadataOutputStream.close();
1367 
1368     storeCheckpointVertices();
1369 
1370     FSDataOutputStream checkpointOutputStream =
1371         getFs().create(checkpointFilePath);
1372     workerContext.write(checkpointOutputStream);
1373     getContext().progress();
1374 
1375     // TODO: checkpointing messages along with vertices to avoid multiple loads
1376     //       of a partition when out-of-core is enabled.
1377     for (Integer partitionId : getPartitionStore().getPartitionIds()) {
1378       // write messages
1379       checkpointOutputStream.writeInt(partitionId);
1380       getServerData().getCurrentMessageStore()
1381           .writePartition(checkpointOutputStream, partitionId);
1382       getContext().progress();
1383 
1384     }
1385 
1386     List<Writable> w2wMessages =
1387         getServerData().getCurrentWorkerToWorkerMessages();
1388     WritableUtils.writeList(w2wMessages, checkpointOutputStream);
1389 
1390     checkpointOutputStream.close();
1391 
1392     getFs().createNewFile(validFilePath);
1393 
1394     // Notify master that checkpoint is stored
1395     String workerWroteCheckpoint =
1396         getWorkerWroteCheckpointPath(getApplicationAttempt(),
1397             getSuperstep()) + "/" + workerInfo.getHostnameId();
1398     try {
1399       getZkExt().createExt(workerWroteCheckpoint,
1400           new byte[0],
1401           Ids.OPEN_ACL_UNSAFE,
1402           CreateMode.PERSISTENT,
1403           true);
1404     } catch (KeeperException.NodeExistsException e) {
1405       LOG.warn("storeCheckpoint: wrote checkpoint worker path " +
1406           workerWroteCheckpoint + " already exists!");
1407     } catch (KeeperException e) {
1408       throw new IllegalStateException("Creating " + workerWroteCheckpoint +
1409           " failed with KeeperException", e);
1410     } catch (InterruptedException e) {
1411       throw new IllegalStateException("Creating " +
1412           workerWroteCheckpoint +
1413           " failed with InterruptedException", e);
1414     }
1415   }
1416 
1417   /**
1418    * Create checkpoint file safely. If file already exists remove it first.
1419    * @param name file extension
1420    * @return full file path to newly created file
1421    * @throws IOException
1422    */
1423   private Path createCheckpointFilePathSafe(String name) throws IOException {
1424     Path validFilePath = new Path(getCheckpointBasePath(getSuperstep()) + '.' +
1425         getWorkerId(workerInfo) + name);
1426     // Remove these files if they already exist (shouldn't though, unless
1427     // of previous failure of this worker)
1428     if (getFs().delete(validFilePath, false)) {
1429       LOG.warn("storeCheckpoint: Removed " + name + " file " +
1430           validFilePath);
1431     }
1432     return validFilePath;
1433   }
1434 
1435   /**
1436    * Returns path to saved checkpoint.
1437    * Doesn't check if file actually exists.
1438    * @param superstep saved superstep.
1439    * @param name extension name
1440    * @return fill file path to checkpoint file
1441    */
1442   private Path getSavedCheckpoint(long superstep, String name) {
1443     return new Path(getSavedCheckpointBasePath(superstep) + '.' +
1444         getWorkerId(workerInfo) + name);
1445   }
1446 
1447   /**
1448    * Save partitions. To speed up this operation
1449    * runs in multiple threads.
1450    */
1451   private void storeCheckpointVertices() {
1452     final int numPartitions = getPartitionStore().getNumPartitions();
1453     int numThreads = Math.min(
1454         GiraphConstants.NUM_CHECKPOINT_IO_THREADS.get(getConfiguration()),
1455         numPartitions);
1456 
1457     getPartitionStore().startIteration();
1458 
1459     final CompressionCodec codec =
1460         new CompressionCodecFactory(getConfiguration())
1461             .getCodec(new Path(
1462                 GiraphConstants.CHECKPOINT_COMPRESSION_CODEC
1463                     .get(getConfiguration())));
1464 
1465     long t0 = System.currentTimeMillis();
1466 
1467     CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
1468       @Override
1469       public Callable<Void> newCallable(int callableId) {
1470         return new Callable<Void>() {
1471 
1472           @Override
1473           public Void call() throws Exception {
1474             while (true) {
1475               Partition<I, V, E> partition =
1476                   getPartitionStore().getNextPartition();
1477               if (partition == null) {
1478                 break;
1479               }
1480               Path path =
1481                   createCheckpointFilePathSafe("_" + partition.getId() +
1482                       CheckpointingUtils.CHECKPOINT_VERTICES_POSTFIX);
1483 
1484               FSDataOutputStream uncompressedStream =
1485                   getFs().create(path);
1486 
1487 
1488               DataOutputStream stream = codec == null ? uncompressedStream :
1489                   new DataOutputStream(
1490                       codec.createOutputStream(uncompressedStream));
1491 
1492 
1493               partition.write(stream);
1494 
1495               getPartitionStore().putPartition(partition);
1496 
1497               stream.close();
1498               uncompressedStream.close();
1499             }
1500             return null;
1501           }
1502 
1503 
1504         };
1505       }
1506     };
1507 
1508     ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
1509         "checkpoint-vertices-%d", getContext());
1510 
1511     LOG.info("Save checkpoint in " + (System.currentTimeMillis() - t0) +
1512         " ms, using " + numThreads + " threads");
1513   }
1514 
1515   /**
1516    * Load saved partitions in multiple threads.
1517    * @param superstep superstep to load
1518    * @param partitions list of partitions to load
1519    */
1520   private void loadCheckpointVertices(final long superstep,
1521                                       List<Integer> partitions) {
1522     int numThreads = Math.min(
1523         GiraphConstants.NUM_CHECKPOINT_IO_THREADS.get(getConfiguration()),
1524         partitions.size());
1525 
1526     final Queue<Integer> partitionIdQueue =
1527         new ConcurrentLinkedQueue<>(partitions);
1528 
1529     final CompressionCodec codec =
1530         new CompressionCodecFactory(getConfiguration())
1531             .getCodec(new Path(
1532                 GiraphConstants.CHECKPOINT_COMPRESSION_CODEC
1533                     .get(getConfiguration())));
1534 
1535     long t0 = System.currentTimeMillis();
1536 
1537     CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
1538       @Override
1539       public Callable<Void> newCallable(int callableId) {
1540         return new Callable<Void>() {
1541 
1542           @Override
1543           public Void call() throws Exception {
1544             while (!partitionIdQueue.isEmpty()) {
1545               Integer partitionId = partitionIdQueue.poll();
1546               if (partitionId == null) {
1547                 break;
1548               }
1549               Path path =
1550                   getSavedCheckpoint(superstep, "_" + partitionId +
1551                       CheckpointingUtils.CHECKPOINT_VERTICES_POSTFIX);
1552 
1553               FSDataInputStream compressedStream =
1554                   getFs().open(path);
1555 
1556               DataInputStream stream = codec == null ? compressedStream :
1557                   new DataInputStream(
1558                       codec.createInputStream(compressedStream));
1559 
1560               Partition<I, V, E> partition =
1561                   getConfiguration().createPartition(partitionId, getContext());
1562 
1563               partition.readFields(stream);
1564 
1565               getPartitionStore().addPartition(partition);
1566 
1567               stream.close();
1568             }
1569             return null;
1570           }
1571 
1572         };
1573       }
1574     };
1575 
1576     ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
1577         "load-vertices-%d", getContext());
1578 
1579     LOG.info("Loaded checkpoint in " + (System.currentTimeMillis() - t0) +
1580         " ms, using " + numThreads + " threads");
1581   }
1582 
1583   @Override
1584   public VertexEdgeCount loadCheckpoint(long superstep) {
1585     Path metadataFilePath = getSavedCheckpoint(
1586         superstep, CheckpointingUtils.CHECKPOINT_METADATA_POSTFIX);
1587 
1588     Path checkpointFilePath = getSavedCheckpoint(
1589         superstep, CheckpointingUtils.CHECKPOINT_DATA_POSTFIX);
1590     // Algorithm:
1591     // Examine all the partition owners and load the ones
1592     // that match my hostname and id from the master designated checkpoint
1593     // prefixes.
1594     try {
1595       DataInputStream metadataStream =
1596           getFs().open(metadataFilePath);
1597 
1598       int partitions = metadataStream.readInt();
1599       List<Integer> partitionIds = new ArrayList<>(partitions);
1600       for (int i = 0; i < partitions; i++) {
1601         int partitionId = metadataStream.readInt();
1602         partitionIds.add(partitionId);
1603       }
1604 
1605       loadCheckpointVertices(superstep, partitionIds);
1606 
1607       getContext().progress();
1608 
1609       metadataStream.close();
1610 
1611       DataInputStream checkpointStream =
1612           getFs().open(checkpointFilePath);
1613       workerContext.readFields(checkpointStream);
1614 
1615       // Load global stats and superstep classes
1616       GlobalStats globalStats = new GlobalStats();
1617       SuperstepClasses superstepClasses = SuperstepClasses.createToRead(
1618           getConfiguration());
1619       String finalizedCheckpointPath = getSavedCheckpointBasePath(superstep) +
1620           CheckpointingUtils.CHECKPOINT_FINALIZED_POSTFIX;
1621       DataInputStream finalizedStream =
1622           getFs().open(new Path(finalizedCheckpointPath));
1623       globalStats.readFields(finalizedStream);
1624       superstepClasses.readFields(finalizedStream);
1625       getConfiguration().updateSuperstepClasses(superstepClasses);
1626       getServerData().resetMessageStores();
1627 
1628       // TODO: checkpointing messages along with vertices to avoid multiple
1629       //       loads of a partition when out-of-core is enabled.
1630       for (int i = 0; i < partitions; i++) {
1631         int partitionId = checkpointStream.readInt();
1632         getServerData().getCurrentMessageStore()
1633             .readFieldsForPartition(checkpointStream, partitionId);
1634       }
1635 
1636       List<Writable> w2wMessages = (List<Writable>) WritableUtils.readList(
1637           checkpointStream);
1638       getServerData().getCurrentWorkerToWorkerMessages().addAll(w2wMessages);
1639 
1640       checkpointStream.close();
1641 
1642       if (LOG.isInfoEnabled()) {
1643         LOG.info("loadCheckpoint: Loaded " +
1644             workerGraphPartitioner.getPartitionOwners().size() +
1645             " total.");
1646       }
1647 
1648       // Communication service needs to setup the connections prior to
1649       // processing vertices
1650 /*if[HADOOP_NON_SECURE]
1651       workerClient.setup();
1652 else[HADOOP_NON_SECURE]*/
1653       workerClient.setup(getConfiguration().authenticate());
1654 /*end[HADOOP_NON_SECURE]*/
1655       return new VertexEdgeCount(globalStats.getVertexCount(),
1656           globalStats.getEdgeCount(), 0);
1657 
1658     } catch (IOException e) {
1659       throw new RuntimeException(
1660           "loadCheckpoint: Failed for superstep=" + superstep, e);
1661     }
1662   }
1663 
1664   /**
1665    * Send the worker partitions to their destination workers
1666    *
1667    * @param workerPartitionMap Map of worker info to the partitions stored
1668    *        on this worker to be sent
1669    */
1670   private void sendWorkerPartitions(
1671       Map<WorkerInfo, List<Integer>> workerPartitionMap) {
1672     List<Entry<WorkerInfo, List<Integer>>> randomEntryList =
1673         new ArrayList<Entry<WorkerInfo, List<Integer>>>(
1674             workerPartitionMap.entrySet());
1675     Collections.shuffle(randomEntryList);
1676     WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor =
1677         new NettyWorkerClientRequestProcessor<I, V, E>(getContext(),
1678             getConfiguration(), this,
1679             false /* useOneMessageToManyIdsEncoding */);
1680     for (Entry<WorkerInfo, List<Integer>> workerPartitionList :
1681       randomEntryList) {
1682       for (Integer partitionId : workerPartitionList.getValue()) {
1683         Partition<I, V, E> partition =
1684             getPartitionStore().removePartition(partitionId);
1685         if (partition == null) {
1686           throw new IllegalStateException(
1687               "sendWorkerPartitions: Couldn't find partition " +
1688                   partitionId + " to send to " +
1689                   workerPartitionList.getKey());
1690         }
1691         if (LOG.isInfoEnabled()) {
1692           LOG.info("sendWorkerPartitions: Sending worker " +
1693               workerPartitionList.getKey() + " partition " +
1694               partitionId);
1695         }
1696         workerClientRequestProcessor.sendPartitionRequest(
1697             workerPartitionList.getKey(),
1698             partition);
1699       }
1700     }
1701 
1702     try {
1703       workerClientRequestProcessor.flush();
1704       workerClient.waitAllRequests();
1705     } catch (IOException e) {
1706       throw new IllegalStateException("sendWorkerPartitions: Flush failed", e);
1707     }
1708     String myPartitionExchangeDonePath =
1709         getPartitionExchangeWorkerPath(
1710             getApplicationAttempt(), getSuperstep(), getWorkerInfo());
1711     try {
1712       getZkExt().createExt(myPartitionExchangeDonePath,
1713           null,
1714           Ids.OPEN_ACL_UNSAFE,
1715           CreateMode.PERSISTENT,
1716           true);
1717     } catch (KeeperException e) {
1718       throw new IllegalStateException(
1719           "sendWorkerPartitions: KeeperException to create " +
1720               myPartitionExchangeDonePath, e);
1721     } catch (InterruptedException e) {
1722       throw new IllegalStateException(
1723           "sendWorkerPartitions: InterruptedException to create " +
1724               myPartitionExchangeDonePath, e);
1725     }
1726     if (LOG.isInfoEnabled()) {
1727       LOG.info("sendWorkerPartitions: Done sending all my partitions.");
1728     }
1729   }
1730 
1731   @Override
1732   public final void exchangeVertexPartitions(
1733       Collection<? extends PartitionOwner> masterSetPartitionOwners) {
1734     // 1. Fix the addresses of the partition ids if they have changed.
1735     // 2. Send all the partitions to their destination workers in a random
1736     //    fashion.
1737     // 3. Notify completion with a ZooKeeper stamp
1738     // 4. Wait for all my dependencies to be done (if any)
1739     // 5. Add the partitions to myself.
1740     PartitionExchange partitionExchange =
1741         workerGraphPartitioner.updatePartitionOwners(
1742             getWorkerInfo(), masterSetPartitionOwners);
1743     workerClient.openConnections();
1744 
1745     Map<WorkerInfo, List<Integer>> sendWorkerPartitionMap =
1746         partitionExchange.getSendWorkerPartitionMap();
1747     if (!getPartitionStore().isEmpty()) {
1748       sendWorkerPartitions(sendWorkerPartitionMap);
1749     }
1750 
1751     Set<WorkerInfo> myDependencyWorkerSet =
1752         partitionExchange.getMyDependencyWorkerSet();
1753     Set<String> workerIdSet = new HashSet<String>();
1754     for (WorkerInfo tmpWorkerInfo : myDependencyWorkerSet) {
1755       if (!workerIdSet.add(tmpWorkerInfo.getHostnameId())) {
1756         throw new IllegalStateException(
1757             "exchangeVertexPartitions: Duplicate entry " + tmpWorkerInfo);
1758       }
1759     }
1760     if (myDependencyWorkerSet.isEmpty() && getPartitionStore().isEmpty()) {
1761       if (LOG.isInfoEnabled()) {
1762         LOG.info("exchangeVertexPartitions: Nothing to exchange, " +
1763             "exiting early");
1764       }
1765       return;
1766     }
1767 
1768     String vertexExchangePath =
1769         getPartitionExchangePath(getApplicationAttempt(), getSuperstep());
1770     List<String> workerDoneList;
1771     try {
1772       while (true) {
1773         workerDoneList = getZkExt().getChildrenExt(
1774             vertexExchangePath, true, false, false);
1775         workerIdSet.removeAll(workerDoneList);
1776         if (workerIdSet.isEmpty()) {
1777           break;
1778         }
1779         if (LOG.isInfoEnabled()) {
1780           LOG.info("exchangeVertexPartitions: Waiting for workers " +
1781               workerIdSet);
1782         }
1783         getPartitionExchangeChildrenChangedEvent().waitForTimeoutOrFail(
1784             GiraphConstants.WAIT_FOR_OTHER_WORKERS_TIMEOUT_MSEC.get(
1785                 getConfiguration()));
1786         getPartitionExchangeChildrenChangedEvent().reset();
1787       }
1788     } catch (KeeperException | InterruptedException e) {
1789       throw new RuntimeException(
1790           "exchangeVertexPartitions: Got runtime exception", e);
1791     }
1792 
1793     if (LOG.isInfoEnabled()) {
1794       LOG.info("exchangeVertexPartitions: Done with exchange.");
1795     }
1796   }
1797 
1798   /**
1799    * Get event when the state of a partition exchange has changed.
1800    *
1801    * @return Event to check.
1802    */
1803   public final BspEvent getPartitionExchangeChildrenChangedEvent() {
1804     return partitionExchangeChildrenChanged;
1805   }
1806 
1807   @Override
1808   protected boolean processEvent(WatchedEvent event) {
1809     boolean foundEvent = false;
1810     if (event.getPath().startsWith(masterJobStatePath) &&
1811         (event.getType() == EventType.NodeChildrenChanged)) {
1812       if (LOG.isInfoEnabled()) {
1813         LOG.info("processEvent: Job state changed, checking " +
1814             "to see if it needs to restart");
1815       }
1816       JSONObject jsonObj = getJobState();
1817       // in YARN, we have to manually commit our own output in 2 stages that we
1818       // do not have to do in Hadoop-based Giraph. So jsonObj can be null.
1819       if (getConfiguration().isPureYarnJob() && null == jsonObj) {
1820         LOG.error("BspServiceWorker#getJobState() came back NULL.");
1821         return false; // the event has been processed.
1822       }
1823       try {
1824         if ((ApplicationState.valueOf(jsonObj.getString(JSONOBJ_STATE_KEY)) ==
1825             ApplicationState.START_SUPERSTEP) &&
1826             jsonObj.getLong(JSONOBJ_APPLICATION_ATTEMPT_KEY) !=
1827             getApplicationAttempt()) {
1828           LOG.fatal("processEvent: Worker will restart " +
1829               "from command - " + jsonObj.toString());
1830           System.exit(-1);
1831         }
1832       } catch (JSONException e) {
1833         throw new RuntimeException(
1834             "processEvent: Couldn't properly get job state from " +
1835                 jsonObj.toString());
1836       }
1837       foundEvent = true;
1838     } else if (event.getPath().contains(PARTITION_EXCHANGE_DIR) &&
1839         event.getType() == EventType.NodeChildrenChanged) {
1840       if (LOG.isInfoEnabled()) {
1841         LOG.info("processEvent : partitionExchangeChildrenChanged " +
1842             "(at least one worker is done sending partitions)");
1843       }
1844       partitionExchangeChildrenChanged.signal();
1845       foundEvent = true;
1846     } else if (event.getPath().contains(MEMORY_OBSERVER_DIR) &&
1847         event.getType() == EventType.NodeChildrenChanged) {
1848       memoryObserver.callGc();
1849       foundEvent = true;
1850     }
1851 
1852     return foundEvent;
1853   }
1854 
1855   @Override
1856   public WorkerInfo getWorkerInfo() {
1857     return workerInfo;
1858   }
1859 
1860   @Override
1861   public PartitionStore<I, V, E> getPartitionStore() {
1862     return getServerData().getPartitionStore();
1863   }
1864 
1865   @Override
1866   public PartitionOwner getVertexPartitionOwner(I vertexId) {
1867     return workerGraphPartitioner.getPartitionOwner(vertexId);
1868   }
1869 
1870   @Override
1871   public Iterable<? extends PartitionOwner> getPartitionOwners() {
1872     return workerGraphPartitioner.getPartitionOwners();
1873   }
1874 
1875   @Override
1876   public int getPartitionId(I vertexId) {
1877     PartitionOwner partitionOwner = getVertexPartitionOwner(vertexId);
1878     return partitionOwner.getPartitionId();
1879   }
1880 
1881   @Override
1882   public boolean hasPartition(Integer partitionId) {
1883     return getPartitionStore().hasPartition(partitionId);
1884   }
1885 
1886   @Override
1887   public Iterable<Integer> getPartitionIds() {
1888     return getPartitionStore().getPartitionIds();
1889   }
1890 
1891   @Override
1892   public long getPartitionVertexCount(Integer partitionId) {
1893     return getPartitionStore().getPartitionVertexCount(partitionId);
1894   }
1895 
1896   @Override
1897   public void startIteration() {
1898     getPartitionStore().startIteration();
1899   }
1900 
1901   @Override
1902   public Partition getNextPartition() {
1903     return getPartitionStore().getNextPartition();
1904   }
1905 
1906   @Override
1907   public void putPartition(Partition partition) {
1908     getPartitionStore().putPartition(partition);
1909   }
1910 
1911   @Override
1912   public ServerData<I, V, E> getServerData() {
1913     return workerServer.getServerData();
1914   }
1915 
1916 
1917   @Override
1918   public WorkerAggregatorHandler getAggregatorHandler() {
1919     return globalCommHandler;
1920   }
1921 
1922   @Override
1923   public void prepareSuperstep() {
1924     if (getSuperstep() != INPUT_SUPERSTEP) {
1925       globalCommHandler.prepareSuperstep(workerAggregatorRequestProcessor);
1926     }
1927   }
1928 
1929   @Override
1930   public SuperstepOutput<I, V, E> getSuperstepOutput() {
1931     return superstepOutput;
1932   }
1933 
1934   @Override
1935   public GlobalStats getGlobalStats() {
1936     GlobalStats globalStats = new GlobalStats();
1937     if (getSuperstep() > Math.max(INPUT_SUPERSTEP, getRestartedSuperstep())) {
1938       String superstepFinishedNode =
1939           getSuperstepFinishedPath(getApplicationAttempt(),
1940               getSuperstep() - 1);
1941       WritableUtils.readFieldsFromZnode(
1942           getZkExt(), superstepFinishedNode, false, null,
1943           globalStats);
1944     }
1945     return globalStats;
1946   }
1947 
1948   @Override
1949   public WorkerInputSplitsHandler getInputSplitsHandler() {
1950     return inputSplitsHandler;
1951   }
1952 
1953   @Override
1954   public void addressesAndPartitionsReceived(
1955       AddressesAndPartitionsWritable addressesAndPartitions) {
1956     addressesAndPartitionsHolder.offer(addressesAndPartitions);
1957   }
1958 }