View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.worker;
20  
21  import java.io.DataInputStream;
22  import java.io.DataOutputStream;
23  import java.io.IOException;
24  import java.nio.charset.Charset;
25  import java.util.ArrayList;
26  import java.util.Collection;
27  import java.util.Collections;
28  import java.util.HashSet;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.Map.Entry;
32  import java.util.Queue;
33  import java.util.Set;
34  import java.util.concurrent.Callable;
35  import java.util.concurrent.ConcurrentLinkedQueue;
36  import java.util.concurrent.TimeUnit;
37  
38  import net.iharder.Base64;
39  
40  import org.apache.giraph.bsp.ApplicationState;
41  import org.apache.giraph.bsp.BspService;
42  import org.apache.giraph.bsp.CentralizedServiceWorker;
43  import org.apache.giraph.bsp.checkpoints.CheckpointStatus;
44  import org.apache.giraph.comm.ServerData;
45  import org.apache.giraph.comm.WorkerClient;
46  import org.apache.giraph.comm.WorkerClientRequestProcessor;
47  import org.apache.giraph.comm.WorkerServer;
48  import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
49  import org.apache.giraph.comm.messages.MessageStore;
50  import org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper;
51  import org.apache.giraph.comm.netty.NettyWorkerAggregatorRequestProcessor;
52  import org.apache.giraph.comm.netty.NettyWorkerClient;
53  import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
54  import org.apache.giraph.comm.netty.NettyWorkerServer;
55  import org.apache.giraph.comm.requests.PartitionStatsRequest;
56  import org.apache.giraph.conf.GiraphConstants;
57  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
58  import org.apache.giraph.edge.Edge;
59  import org.apache.giraph.graph.AddressesAndPartitionsWritable;
60  import org.apache.giraph.graph.FinishedSuperstepStats;
61  import org.apache.giraph.graph.GlobalStats;
62  import org.apache.giraph.graph.GraphTaskManager;
63  import org.apache.giraph.graph.Vertex;
64  import org.apache.giraph.graph.VertexEdgeCount;
65  import org.apache.giraph.io.EdgeOutputFormat;
66  import org.apache.giraph.io.EdgeWriter;
67  import org.apache.giraph.io.VertexOutputFormat;
68  import org.apache.giraph.io.VertexWriter;
69  import org.apache.giraph.io.superstep_output.SuperstepOutput;
70  import org.apache.giraph.mapping.translate.TranslateEdge;
71  import org.apache.giraph.master.MasterInfo;
72  import org.apache.giraph.master.SuperstepClasses;
73  import org.apache.giraph.metrics.GiraphMetrics;
74  import org.apache.giraph.metrics.GiraphTimer;
75  import org.apache.giraph.metrics.GiraphTimerContext;
76  import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
77  import org.apache.giraph.metrics.SuperstepMetricsRegistry;
78  import org.apache.giraph.metrics.WorkerSuperstepMetrics;
79  import org.apache.giraph.ooc.OutOfCoreEngine;
80  import org.apache.giraph.partition.Partition;
81  import org.apache.giraph.partition.PartitionExchange;
82  import org.apache.giraph.partition.PartitionOwner;
83  import org.apache.giraph.partition.PartitionStats;
84  import org.apache.giraph.partition.PartitionStore;
85  import org.apache.giraph.partition.WorkerGraphPartitioner;
86  import org.apache.giraph.utils.BlockingElementsSet;
87  import org.apache.giraph.utils.CallableFactory;
88  import org.apache.giraph.utils.CheckpointingUtils;
89  import org.apache.giraph.utils.JMapHistoDumper;
90  import org.apache.giraph.utils.LoggerUtils;
91  import org.apache.giraph.utils.MemoryUtils;
92  import org.apache.giraph.utils.ProgressableUtils;
93  import org.apache.giraph.utils.ReactiveJMapHistoDumper;
94  import org.apache.giraph.utils.WritableUtils;
95  import org.apache.giraph.zk.BspEvent;
96  import org.apache.giraph.zk.PredicateLock;
97  import org.apache.hadoop.fs.FSDataInputStream;
98  import org.apache.hadoop.fs.FSDataOutputStream;
99  import org.apache.hadoop.fs.Path;
100 import org.apache.hadoop.io.Writable;
101 import org.apache.hadoop.io.WritableComparable;
102 import org.apache.hadoop.io.compress.CompressionCodec;
103 import org.apache.hadoop.io.compress.CompressionCodecFactory;
104 import org.apache.hadoop.mapreduce.Mapper;
105 import org.apache.hadoop.mapreduce.OutputCommitter;
106 import org.apache.log4j.Level;
107 import org.apache.log4j.Logger;
108 import org.apache.zookeeper.CreateMode;
109 import org.apache.zookeeper.KeeperException;
110 import org.apache.zookeeper.WatchedEvent;
111 import org.apache.zookeeper.Watcher.Event.EventType;
112 import org.apache.zookeeper.ZooDefs.Ids;
113 import org.apache.zookeeper.data.Stat;
114 import org.json.JSONArray;
115 import org.json.JSONException;
116 import org.json.JSONObject;
117 
118 import com.google.common.collect.Lists;
119 
120 /**
121  * ZooKeeper-based implementation of {@link CentralizedServiceWorker}.
122  *
123  * @param <I> Vertex id
124  * @param <V> Vertex data
125  * @param <E> Edge data
126  */
127 @SuppressWarnings("rawtypes")
128 public class BspServiceWorker<I extends WritableComparable,
129     V extends Writable, E extends Writable>
130     extends BspService<I, V, E>
131     implements CentralizedServiceWorker<I, V, E>,
132     ResetSuperstepMetricsObserver {
133   /** Name of gauge for time spent waiting on other workers */
134   public static final String TIMER_WAIT_REQUESTS = "wait-requests-us";
135   /** Class logger */
136   private static final Logger LOG = Logger.getLogger(BspServiceWorker.class);
137   /** My process health znode */
138   private String myHealthZnode;
139   /** Worker info */
140   private final WorkerInfo workerInfo;
141   /** Worker graph partitioner */
142   private final WorkerGraphPartitioner<I, V, E> workerGraphPartitioner;
143   /** Local Data for each worker */
144   private final LocalData<I, V, E, ? extends Writable> localData;
145   /** Used to translate Edges during vertex input phase based on localData */
146   private final TranslateEdge<I, E> translateEdge;
147   /** IPC Client */
148   private final WorkerClient<I, V, E> workerClient;
149   /** IPC Server */
150   private final WorkerServer<I, V, E> workerServer;
151   /** Request processor for aggregator requests */
152   private final WorkerAggregatorRequestProcessor
153   workerAggregatorRequestProcessor;
154   /** Master info */
155   private MasterInfo masterInfo = new MasterInfo();
156   /** List of workers */
157   private List<WorkerInfo> workerInfoList = Lists.newArrayList();
158   /** Have the partition exchange children (workers) changed? */
159   private final BspEvent partitionExchangeChildrenChanged;
160 
161   /** Addresses and partitions transfer */
162   private BlockingElementsSet<AddressesAndPartitionsWritable>
163       addressesAndPartitionsHolder = new BlockingElementsSet<>();
164 
165   /** Worker Context */
166   private final WorkerContext workerContext;
167 
168   /** Handler for aggregators */
169   private final WorkerAggregatorHandler globalCommHandler;
170 
171   /** Superstep output */
172   private final SuperstepOutput<I, V, E> superstepOutput;
173 
174   /** array of observers to call back to */
175   private final WorkerObserver[] observers;
176   /** Writer for worker progress */
177   private final WorkerProgressWriter workerProgressWriter;
178 
179   // Per-Superstep Metrics
180   /** Timer for WorkerContext#postSuperstep */
181   private GiraphTimer wcPostSuperstepTimer;
182   /** Time spent waiting on requests to finish */
183   private GiraphTimer waitRequestsTimer;
184 
185   /** InputSplit handlers used in INPUT_SUPERSTEP */
186   private final WorkerInputSplitsHandler inputSplitsHandler;
187 
188   /** Memory observer */
189   private final MemoryObserver memoryObserver;
190 
191   /**
192    * Constructor for setting up the worker.
193    *
194    * @param context Mapper context
195    * @param graphTaskManager GraphTaskManager for this compute node
196    * @throws IOException
197    * @throws InterruptedException
198    */
199   public BspServiceWorker(
200     Mapper<?, ?, ?, ?>.Context context,
201     GraphTaskManager<I, V, E> graphTaskManager)
202     throws IOException, InterruptedException {
203     super(context, graphTaskManager);
204     ImmutableClassesGiraphConfiguration<I, V, E> conf = getConfiguration();
205     localData = new LocalData<>(conf);
206     translateEdge = getConfiguration().edgeTranslationInstance();
207     if (translateEdge != null) {
208       translateEdge.initialize(this);
209     }
210     partitionExchangeChildrenChanged = new PredicateLock(context);
211     registerBspEvent(partitionExchangeChildrenChanged);
212     workerGraphPartitioner =
213         getGraphPartitionerFactory().createWorkerGraphPartitioner();
214     workerInfo = new WorkerInfo();
215     workerServer = new NettyWorkerServer<I, V, E>(conf, this, context,
216         graphTaskManager.createUncaughtExceptionHandler());
217     workerInfo.setInetSocketAddress(workerServer.getMyAddress(),
218         workerServer.getLocalHostOrIp());
219     workerInfo.setTaskId(getTaskId());
220     workerClient = new NettyWorkerClient<I, V, E>(context, conf, this,
221         graphTaskManager.createUncaughtExceptionHandler());
222     workerServer.setFlowControl(workerClient.getFlowControl());
223     OutOfCoreEngine oocEngine = workerServer.getServerData().getOocEngine();
224     if (oocEngine != null) {
225       oocEngine.setFlowControl(workerClient.getFlowControl());
226     }
227 
228     workerAggregatorRequestProcessor =
229         new NettyWorkerAggregatorRequestProcessor(getContext(), conf, this);
230 
231     globalCommHandler = new WorkerAggregatorHandler(this, conf, context);
232 
233     workerContext = conf.createWorkerContext();
234     workerContext.setWorkerGlobalCommUsage(globalCommHandler);
235 
236     superstepOutput = conf.createSuperstepOutput(context);
237 
238     if (conf.isJMapHistogramDumpEnabled()) {
239       conf.addWorkerObserverClass(JMapHistoDumper.class);
240     }
241     if (conf.isReactiveJmapHistogramDumpEnabled()) {
242       conf.addWorkerObserverClass(ReactiveJMapHistoDumper.class);
243     }
244     observers = conf.createWorkerObservers(context);
245 
246     WorkerProgress.get().setTaskId(getTaskId());
247     workerProgressWriter = conf.trackJobProgressOnClient() ?
248         new WorkerProgressWriter(graphTaskManager.getJobProgressTracker()) :
249         null;
250 
251     GiraphMetrics.get().addSuperstepResetObserver(this);
252 
253     inputSplitsHandler = new WorkerInputSplitsHandler(
254         workerInfo, masterInfo.getTaskId(), workerClient);
255 
256     memoryObserver = new MemoryObserver(getZkExt(), memoryObserverPath, conf);
257   }
258 
259   @Override
260   public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
261     waitRequestsTimer = new GiraphTimer(superstepMetrics,
262         TIMER_WAIT_REQUESTS, TimeUnit.MICROSECONDS);
263     wcPostSuperstepTimer = new GiraphTimer(superstepMetrics,
264         "worker-context-post-superstep", TimeUnit.MICROSECONDS);
265   }
266 
267   @Override
268   public WorkerContext getWorkerContext() {
269     return workerContext;
270   }
271 
272   @Override
273   public WorkerObserver[] getWorkerObservers() {
274     return observers;
275   }
276 
277   @Override
278   public WorkerClient<I, V, E> getWorkerClient() {
279     return workerClient;
280   }
281 
282   public LocalData<I, V, E, ? extends Writable> getLocalData() {
283     return localData;
284   }
285 
286   public TranslateEdge<I, E> getTranslateEdge() {
287     return translateEdge;
288   }
289 
290   /**
291    * Intended to check the health of the node.  For instance, can it ssh,
292    * dmesg, etc. For now, does nothing.
293    * TODO: Make this check configurable by the user (i.e. search dmesg for
294    * problems).
295    *
296    * @return True if healthy (always in this case).
297    */
298   public boolean isHealthy() {
299     return true;
300   }
301 
302   /**
303    * Load the vertices/edges from input slits. Do this until all the
304    * InputSplits have been processed.
305    * All workers will try to do as many InputSplits as they can.  The master
306    * will monitor progress and stop this once all the InputSplits have been
307    * loaded and check-pointed.  Keep track of the last input split path to
308    * ensure the input split cache is flushed prior to marking the last input
309    * split complete.
310    *
311    * Use one or more threads to do the loading.
312    *
313    * @param inputSplitsCallableFactory Factory for {@link InputSplitsCallable}s
314    * @return Statistics of the vertices and edges loaded
315    * @throws InterruptedException
316    * @throws KeeperException
317    */
318   private VertexEdgeCount loadInputSplits(
319       CallableFactory<VertexEdgeCount> inputSplitsCallableFactory)
320     throws KeeperException, InterruptedException {
321     VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
322     int numThreads = getConfiguration().getNumInputSplitsThreads();
323     if (LOG.isInfoEnabled()) {
324       LOG.info("loadInputSplits: Using " + numThreads + " thread(s), " +
325           "originally " + getConfiguration().getNumInputSplitsThreads() +
326           " threads(s)");
327     }
328 
329     List<VertexEdgeCount> results =
330         ProgressableUtils.getResultsWithNCallables(inputSplitsCallableFactory,
331             numThreads, "load-%d", getContext());
332     for (VertexEdgeCount result : results) {
333       vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(result);
334     }
335 
336     workerClient.waitAllRequests();
337     return vertexEdgeCount;
338   }
339 
340   /**
341    * Load the mapping entries from the user-defined
342    * {@link org.apache.giraph.io.MappingReader}
343    *
344    * @return Count of mapping entries loaded
345    */
346   private long loadMapping() throws KeeperException,
347     InterruptedException {
348     MappingInputSplitsCallableFactory<I, V, E, ? extends Writable>
349         inputSplitsCallableFactory =
350         new MappingInputSplitsCallableFactory<>(
351             getConfiguration().createWrappedMappingInputFormat(),
352             getContext(),
353             getConfiguration(),
354             this,
355             inputSplitsHandler);
356 
357     long mappingsLoaded =
358         loadInputSplits(inputSplitsCallableFactory).getMappingCount();
359 
360     // after all threads finish loading - call postFilling
361     localData.getMappingStore().postFilling();
362     return mappingsLoaded;
363   }
364 
365   /**
366    * Load the vertices from the user-defined
367    * {@link org.apache.giraph.io.VertexReader}
368    *
369    * @return Count of vertices and edges loaded
370    */
371   private VertexEdgeCount loadVertices() throws KeeperException,
372       InterruptedException {
373     VertexInputSplitsCallableFactory<I, V, E> inputSplitsCallableFactory =
374         new VertexInputSplitsCallableFactory<I, V, E>(
375             getConfiguration().createWrappedVertexInputFormat(),
376             getContext(),
377             getConfiguration(),
378             this,
379             inputSplitsHandler);
380 
381     return loadInputSplits(inputSplitsCallableFactory);
382   }
383 
384   /**
385    * Load the edges from the user-defined
386    * {@link org.apache.giraph.io.EdgeReader}.
387    *
388    * @return Number of edges loaded
389    */
390   private long loadEdges() throws KeeperException, InterruptedException {
391     EdgeInputSplitsCallableFactory<I, V, E> inputSplitsCallableFactory =
392         new EdgeInputSplitsCallableFactory<I, V, E>(
393             getConfiguration().createWrappedEdgeInputFormat(),
394             getContext(),
395             getConfiguration(),
396             this,
397             inputSplitsHandler);
398 
399     return loadInputSplits(inputSplitsCallableFactory).getEdgeCount();
400   }
401 
402   @Override
403   public MasterInfo getMasterInfo() {
404     return masterInfo;
405   }
406 
407   @Override
408   public List<WorkerInfo> getWorkerInfoList() {
409     return workerInfoList;
410   }
411 
412   /**
413    * Mark current worker as done and then wait for all workers
414    * to finish processing input splits.
415    */
416   private void markCurrentWorkerDoneReadingThenWaitForOthers() {
417     String workerInputSplitsDonePath =
418         inputSplitsWorkerDonePath + "/" + getWorkerInfo().getHostnameId();
419     try {
420       getZkExt().createExt(workerInputSplitsDonePath,
421           null,
422           Ids.OPEN_ACL_UNSAFE,
423           CreateMode.PERSISTENT,
424           true);
425     } catch (KeeperException e) {
426       throw new IllegalStateException(
427           "markCurrentWorkerDoneThenWaitForOthers: " +
428               "KeeperException creating worker done splits", e);
429     } catch (InterruptedException e) {
430       throw new IllegalStateException(
431           "markCurrentWorkerDoneThenWaitForOthers: " +
432               "InterruptedException creating worker done splits", e);
433     }
434     while (true) {
435       Stat inputSplitsDoneStat;
436       try {
437         inputSplitsDoneStat =
438             getZkExt().exists(inputSplitsAllDonePath, true);
439       } catch (KeeperException e) {
440         throw new IllegalStateException(
441             "markCurrentWorkerDoneThenWaitForOthers: " +
442                 "KeeperException waiting on worker done splits", e);
443       } catch (InterruptedException e) {
444         throw new IllegalStateException(
445             "markCurrentWorkerDoneThenWaitForOthers: " +
446                 "InterruptedException waiting on worker done splits", e);
447       }
448       if (inputSplitsDoneStat != null) {
449         break;
450       }
451       getInputSplitsAllDoneEvent().waitForTimeoutOrFail(
452           GiraphConstants.WAIT_FOR_OTHER_WORKERS_TIMEOUT_MSEC.get(
453               getConfiguration()));
454       getInputSplitsAllDoneEvent().reset();
455     }
456   }
457 
458   @Override
459   public FinishedSuperstepStats setup() {
460     // Unless doing a restart, prepare for computation:
461     // 1. Start superstep INPUT_SUPERSTEP (no computation)
462     // 2. Wait until the INPUT_SPLIT_ALL_READY_PATH node has been created
463     // 3. Process input splits until there are no more.
464     // 4. Wait until the INPUT_SPLIT_ALL_DONE_PATH node has been created
465     // 5. Process any mutations deriving from add edge requests
466     // 6. Wait for superstep INPUT_SUPERSTEP to complete.
467     if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
468       setCachedSuperstep(getRestartedSuperstep());
469       return new FinishedSuperstepStats(0, false, 0, 0, true,
470           CheckpointStatus.NONE);
471     }
472 
473     JSONObject jobState = getJobState();
474     if (jobState != null) {
475       try {
476         if ((ApplicationState.valueOf(jobState.getString(JSONOBJ_STATE_KEY)) ==
477             ApplicationState.START_SUPERSTEP) &&
478             jobState.getLong(JSONOBJ_SUPERSTEP_KEY) ==
479             getSuperstep()) {
480           if (LOG.isInfoEnabled()) {
481             LOG.info("setup: Restarting from an automated " +
482                 "checkpointed superstep " +
483                 getSuperstep() + ", attempt " +
484                 getApplicationAttempt());
485           }
486           setRestartedSuperstep(getSuperstep());
487           return new FinishedSuperstepStats(0, false, 0, 0, true,
488               CheckpointStatus.NONE);
489         }
490       } catch (JSONException e) {
491         throw new RuntimeException(
492             "setup: Failed to get key-values from " +
493                 jobState.toString(), e);
494       }
495     }
496 
497     // Add the partitions that this worker owns
498     Collection<? extends PartitionOwner> masterSetPartitionOwners =
499         startSuperstep();
500     workerGraphPartitioner.updatePartitionOwners(
501         getWorkerInfo(), masterSetPartitionOwners);
502     getPartitionStore().initialize();
503 
504 /*if[HADOOP_NON_SECURE]
505     workerClient.setup();
506 else[HADOOP_NON_SECURE]*/
507     workerClient.setup(getConfiguration().authenticate());
508 /*end[HADOOP_NON_SECURE]*/
509 
510     // Initialize aggregator at worker side during setup.
511     // Do this just before vertex and edge loading.
512     globalCommHandler.prepareSuperstep(workerAggregatorRequestProcessor);
513 
514     VertexEdgeCount vertexEdgeCount;
515     long entriesLoaded;
516 
517     if (getConfiguration().hasMappingInputFormat()) {
518       getContext().progress();
519       try {
520         entriesLoaded = loadMapping();
521         // successfully loaded mapping
522         // now initialize graphPartitionerFactory with this data
523         getGraphPartitionerFactory().initialize(localData);
524       } catch (InterruptedException e) {
525         throw new IllegalStateException(
526             "setup: loadMapping failed with InterruptedException", e);
527       } catch (KeeperException e) {
528         throw new IllegalStateException(
529             "setup: loadMapping failed with KeeperException", e);
530       }
531       getContext().progress();
532       if (LOG.isInfoEnabled()) {
533         LOG.info("setup: Finally loaded a total of " +
534             entriesLoaded + " entries from inputSplits");
535       }
536 
537       // Print stats for data stored in localData once mapping is fully
538       // loaded on all the workers
539       localData.printStats();
540     }
541 
542     if (getConfiguration().hasVertexInputFormat()) {
543       getContext().progress();
544       try {
545         vertexEdgeCount = loadVertices();
546       } catch (InterruptedException e) {
547         throw new IllegalStateException(
548             "setup: loadVertices failed with InterruptedException", e);
549       } catch (KeeperException e) {
550         throw new IllegalStateException(
551             "setup: loadVertices failed with KeeperException", e);
552       }
553       getContext().progress();
554     } else {
555       vertexEdgeCount = new VertexEdgeCount();
556     }
557     WorkerProgress.get().finishLoadingVertices();
558 
559     if (getConfiguration().hasEdgeInputFormat()) {
560       getContext().progress();
561       try {
562         vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(0, loadEdges());
563       } catch (InterruptedException e) {
564         throw new IllegalStateException(
565             "setup: loadEdges failed with InterruptedException", e);
566       } catch (KeeperException e) {
567         throw new IllegalStateException(
568             "setup: loadEdges failed with KeeperException", e);
569       }
570       getContext().progress();
571     }
572     WorkerProgress.get().finishLoadingEdges();
573 
574     if (LOG.isInfoEnabled()) {
575       LOG.info("setup: Finally loaded a total of " + vertexEdgeCount);
576     }
577 
578     markCurrentWorkerDoneReadingThenWaitForOthers();
579 
580     // Create remaining partitions owned by this worker.
581     for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
582       if (partitionOwner.getWorkerInfo().equals(getWorkerInfo()) &&
583           !getPartitionStore().hasPartition(
584               partitionOwner.getPartitionId())) {
585         Partition<I, V, E> partition =
586             getConfiguration().createPartition(
587                 partitionOwner.getPartitionId(), getContext());
588         getPartitionStore().addPartition(partition);
589       }
590     }
591 
592     // remove mapping store if possible
593     localData.removeMappingStoreIfPossible();
594 
595     if (getConfiguration().hasEdgeInputFormat()) {
596       // Move edges from temporary storage to their source vertices.
597       getServerData().getEdgeStore().moveEdgesToVertices();
598     }
599 
600     // Generate the partition stats for the input superstep and process
601     // if necessary
602     List<PartitionStats> partitionStatsList =
603         new ArrayList<PartitionStats>();
604     PartitionStore<I, V, E> partitionStore = getPartitionStore();
605     for (Integer partitionId : partitionStore.getPartitionIds()) {
606       PartitionStats partitionStats =
607           new PartitionStats(partitionId,
608               partitionStore.getPartitionVertexCount(partitionId),
609               0,
610               partitionStore.getPartitionEdgeCount(partitionId),
611               0,
612               0,
613               workerInfo.getHostnameId());
614       partitionStatsList.add(partitionStats);
615     }
616     workerGraphPartitioner.finalizePartitionStats(
617         partitionStatsList, getPartitionStore());
618 
619     return finishSuperstep(partitionStatsList, null);
620   }
621 
622   /**
623    * Register the health of this worker for a given superstep
624    *
625    * @param superstep Superstep to register health on
626    */
627   private void registerHealth(long superstep) {
628     JSONArray hostnamePort = new JSONArray();
629     hostnamePort.put(getHostname());
630 
631     hostnamePort.put(workerInfo.getPort());
632 
633     String myHealthPath = null;
634     if (isHealthy()) {
635       myHealthPath = getWorkerInfoHealthyPath(getApplicationAttempt(),
636           getSuperstep());
637     } else {
638       myHealthPath = getWorkerInfoUnhealthyPath(getApplicationAttempt(),
639           getSuperstep());
640     }
641     myHealthPath = myHealthPath + "/" + workerInfo.getHostnameId();
642     try {
643       myHealthZnode = getZkExt().createExt(
644           myHealthPath,
645           WritableUtils.writeToByteArray(workerInfo),
646           Ids.OPEN_ACL_UNSAFE,
647           CreateMode.EPHEMERAL,
648           true);
649     } catch (KeeperException.NodeExistsException e) {
650       LOG.warn("registerHealth: myHealthPath already exists (likely " +
651           "from previous failure): " + myHealthPath +
652           ".  Waiting for change in attempts " +
653           "to re-join the application");
654       getApplicationAttemptChangedEvent().waitForTimeoutOrFail(
655           GiraphConstants.WAIT_ZOOKEEPER_TIMEOUT_MSEC.get(
656               getConfiguration()));
657       if (LOG.isInfoEnabled()) {
658         LOG.info("registerHealth: Got application " +
659             "attempt changed event, killing self");
660       }
661       throw new IllegalStateException(
662           "registerHealth: Trying " +
663               "to get the new application attempt by killing self", e);
664     } catch (KeeperException e) {
665       throw new IllegalStateException("Creating " + myHealthPath +
666           " failed with KeeperException", e);
667     } catch (InterruptedException e) {
668       throw new IllegalStateException("Creating " + myHealthPath +
669           " failed with InterruptedException", e);
670     }
671     if (LOG.isInfoEnabled()) {
672       LOG.info("registerHealth: Created my health node for attempt=" +
673           getApplicationAttempt() + ", superstep=" +
674           getSuperstep() + " with " + myHealthZnode +
675           " and workerInfo= " + workerInfo);
676     }
677   }
678 
679   /**
680    * Do this to help notify the master quicker that this worker has failed.
681    */
682   private void unregisterHealth() {
683     LOG.error("unregisterHealth: Got failure, unregistering health on " +
684         myHealthZnode + " on superstep " + getSuperstep());
685     try {
686       getZkExt().deleteExt(myHealthZnode, -1, false);
687     } catch (InterruptedException e) {
688       throw new IllegalStateException(
689           "unregisterHealth: InterruptedException - Couldn't delete " +
690               myHealthZnode, e);
691     } catch (KeeperException e) {
692       throw new IllegalStateException(
693           "unregisterHealth: KeeperException - Couldn't delete " +
694               myHealthZnode, e);
695     }
696   }
697 
698   @Override
699   public void failureCleanup() {
700     unregisterHealth();
701   }
702 
703   @Override
704   public Collection<? extends PartitionOwner> startSuperstep() {
705     // Algorithm:
706     // 1. Communication service will combine message from previous
707     //    superstep
708     // 2. Register my health for the next superstep.
709     // 3. Wait until the partition assignment is complete and get it
710     // 4. Get the aggregator values from the previous superstep
711     if (getSuperstep() != INPUT_SUPERSTEP) {
712       workerServer.prepareSuperstep();
713     }
714 
715     registerHealth(getSuperstep());
716 
717     AddressesAndPartitionsWritable addressesAndPartitions =
718         addressesAndPartitionsHolder.getElement(getContext());
719 
720     workerInfoList.clear();
721     workerInfoList = addressesAndPartitions.getWorkerInfos();
722     masterInfo = addressesAndPartitions.getMasterInfo();
723     workerServer.resetBytesReceivedPerSuperstep();
724 
725     if (LOG.isInfoEnabled()) {
726       LOG.info("startSuperstep: " + masterInfo);
727     }
728 
729     getContext().setStatus("startSuperstep: " +
730         getGraphTaskManager().getGraphFunctions().toString() +
731         " - Attempt=" + getApplicationAttempt() +
732         ", Superstep=" + getSuperstep());
733 
734     if (LOG.isDebugEnabled()) {
735       LOG.debug("startSuperstep: addressesAndPartitions" +
736           addressesAndPartitions.getWorkerInfos());
737       for (PartitionOwner partitionOwner : addressesAndPartitions
738           .getPartitionOwners()) {
739         LOG.debug(partitionOwner.getPartitionId() + " " +
740             partitionOwner.getWorkerInfo());
741       }
742     }
743 
744     return addressesAndPartitions.getPartitionOwners();
745   }
746 
747   @Override
748   public FinishedSuperstepStats finishSuperstep(
749       List<PartitionStats> partitionStatsList,
750       GiraphTimerContext superstepTimerContext) {
751     // This barrier blocks until success (or the master signals it to
752     // restart).
753     //
754     // Master will coordinate the barriers and aggregate "doneness" of all
755     // the vertices.  Each worker will:
756     // 1. Ensure that the requests are complete
757     // 2. Execute user postSuperstep() if necessary.
758     // 3. Save aggregator values that are in use.
759     // 4. Report the statistics (vertices, edges, messages, etc.)
760     //    of this worker
761     // 5. Let the master know it is finished.
762     // 6. Wait for the master's superstep info, and check if done
763     waitForRequestsToFinish();
764 
765     getGraphTaskManager().notifyFinishedCommunication();
766 
767     long workerSentMessages = 0;
768     long workerSentMessageBytes = 0;
769     long localVertices = 0;
770     for (PartitionStats partitionStats : partitionStatsList) {
771       workerSentMessages += partitionStats.getMessagesSentCount();
772       workerSentMessageBytes += partitionStats.getMessageBytesSentCount();
773       localVertices += partitionStats.getVertexCount();
774     }
775 
776     if (getSuperstep() != INPUT_SUPERSTEP) {
777       postSuperstepCallbacks();
778     }
779 
780     globalCommHandler.finishSuperstep(workerAggregatorRequestProcessor);
781 
782     MessageStore<I, Writable> incomingMessageStore =
783         getServerData().getIncomingMessageStore();
784     if (incomingMessageStore instanceof AsyncMessageStoreWrapper) {
785       ((AsyncMessageStoreWrapper) incomingMessageStore).waitToComplete();
786     }
787 
788     if (LOG.isInfoEnabled()) {
789       LOG.info("finishSuperstep: Superstep " + getSuperstep() +
790           ", messages = " + workerSentMessages + " " +
791           ", message bytes = " + workerSentMessageBytes + " , " +
792           MemoryUtils.getRuntimeMemoryStats());
793     }
794 
795     if (superstepTimerContext != null) {
796       superstepTimerContext.stop();
797     }
798     writeFinshedSuperstepInfoToZK(partitionStatsList,
799       workerSentMessages, workerSentMessageBytes);
800 
801     LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
802         "finishSuperstep: (waiting for rest " +
803             "of workers) " +
804             getGraphTaskManager().getGraphFunctions().toString() +
805             " - Attempt=" + getApplicationAttempt() +
806             ", Superstep=" + getSuperstep());
807 
808     String superstepFinishedNode =
809         getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep());
810 
811     waitForOtherWorkers(superstepFinishedNode);
812 
813     GlobalStats globalStats = new GlobalStats();
814     SuperstepClasses superstepClasses = SuperstepClasses.createToRead(
815         getConfiguration());
816     WritableUtils.readFieldsFromZnode(
817         getZkExt(), superstepFinishedNode, false, null, globalStats,
818         superstepClasses);
819     if (LOG.isInfoEnabled()) {
820       LOG.info("finishSuperstep: Completed superstep " + getSuperstep() +
821           " with global stats " + globalStats + " and classes " +
822           superstepClasses);
823     }
824     getContext().setStatus("finishSuperstep: (all workers done) " +
825         getGraphTaskManager().getGraphFunctions().toString() +
826         " - Attempt=" + getApplicationAttempt() +
827         ", Superstep=" + getSuperstep());
828     incrCachedSuperstep();
829     getConfiguration().updateSuperstepClasses(superstepClasses);
830 
831     return new FinishedSuperstepStats(
832         localVertices,
833         globalStats.getHaltComputation(),
834         globalStats.getVertexCount(),
835         globalStats.getEdgeCount(),
836         false,
837         globalStats.getCheckpointStatus());
838   }
839 
840   /**
841    * Handle post-superstep callbacks
842    */
843   private void postSuperstepCallbacks() {
844     GiraphTimerContext timerContext = wcPostSuperstepTimer.time();
845     getWorkerContext().postSuperstep();
846     timerContext.stop();
847     getContext().progress();
848 
849     for (WorkerObserver obs : getWorkerObservers()) {
850       obs.postSuperstep(getSuperstep());
851       getContext().progress();
852     }
853   }
854 
855   /**
856    * Wait for all the requests to finish.
857    */
858   private void waitForRequestsToFinish() {
859     if (LOG.isInfoEnabled()) {
860       LOG.info("finishSuperstep: Waiting on all requests, superstep " +
861           getSuperstep() + " " +
862           MemoryUtils.getRuntimeMemoryStats());
863     }
864     GiraphTimerContext timerContext = waitRequestsTimer.time();
865     workerClient.waitAllRequests();
866     timerContext.stop();
867   }
868 
869   /**
870    * Wait for all the other Workers to finish the superstep.
871    *
872    * @param superstepFinishedNode ZooKeeper path to wait on.
873    */
874   private void waitForOtherWorkers(String superstepFinishedNode) {
875     try {
876       while (getZkExt().exists(superstepFinishedNode, true) == null) {
877         getSuperstepFinishedEvent().waitForTimeoutOrFail(
878             GiraphConstants.WAIT_FOR_OTHER_WORKERS_TIMEOUT_MSEC.get(
879                 getConfiguration()));
880         getSuperstepFinishedEvent().reset();
881       }
882     } catch (KeeperException e) {
883       throw new IllegalStateException(
884           "finishSuperstep: Failed while waiting for master to " +
885               "signal completion of superstep " + getSuperstep(), e);
886     } catch (InterruptedException e) {
887       throw new IllegalStateException(
888           "finishSuperstep: Failed while waiting for master to " +
889               "signal completion of superstep " + getSuperstep(), e);
890     }
891   }
892 
893   /**
894    * Write finished superstep info to ZooKeeper.
895    *
896    * @param partitionStatsList List of partition stats from superstep.
897    * @param workerSentMessages Number of messages sent in superstep.
898    * @param workerSentMessageBytes Number of message bytes sent
899    *                               in superstep.
900    */
901   private void writeFinshedSuperstepInfoToZK(
902       List<PartitionStats> partitionStatsList, long workerSentMessages,
903       long workerSentMessageBytes) {
904     Collection<PartitionStats> finalizedPartitionStats =
905         workerGraphPartitioner.finalizePartitionStats(
906             partitionStatsList, getPartitionStore());
907     workerClient.sendWritableRequest(masterInfo.getTaskId(),
908         new PartitionStatsRequest(finalizedPartitionStats));
909     WorkerSuperstepMetrics metrics = new WorkerSuperstepMetrics();
910     metrics.readFromRegistry();
911     byte[] metricsBytes = WritableUtils.writeToByteArray(metrics);
912 
913     JSONObject workerFinishedInfoObj = new JSONObject();
914     try {
915       workerFinishedInfoObj.put(JSONOBJ_NUM_MESSAGES_KEY, workerSentMessages);
916       workerFinishedInfoObj.put(JSONOBJ_NUM_MESSAGE_BYTES_KEY,
917         workerSentMessageBytes);
918       workerFinishedInfoObj.put(JSONOBJ_METRICS_KEY,
919           Base64.encodeBytes(metricsBytes));
920     } catch (JSONException e) {
921       throw new RuntimeException(e);
922     }
923 
924     String finishedWorkerPath =
925         getWorkerFinishedPath(getApplicationAttempt(), getSuperstep()) +
926         "/" + workerInfo.getHostnameId();
927     try {
928       getZkExt().createExt(finishedWorkerPath,
929           workerFinishedInfoObj.toString().getBytes(Charset.defaultCharset()),
930           Ids.OPEN_ACL_UNSAFE,
931           CreateMode.PERSISTENT,
932           true);
933     } catch (KeeperException.NodeExistsException e) {
934       LOG.warn("finishSuperstep: finished worker path " +
935           finishedWorkerPath + " already exists!");
936     } catch (KeeperException e) {
937       throw new IllegalStateException("Creating " + finishedWorkerPath +
938           " failed with KeeperException", e);
939     } catch (InterruptedException e) {
940       throw new IllegalStateException("Creating " + finishedWorkerPath +
941           " failed with InterruptedException", e);
942     }
943   }
944 
945   /**
946    * Save the vertices using the user-defined VertexOutputFormat from our
947    * vertexArray based on the split.
948    *
949    * @param numLocalVertices Number of local vertices
950    * @throws InterruptedException
951    */
952   private void saveVertices(long numLocalVertices) throws IOException,
953       InterruptedException {
954     ImmutableClassesGiraphConfiguration<I, V, E>  conf = getConfiguration();
955 
956     if (conf.getVertexOutputFormatClass() == null) {
957       LOG.warn("saveVertices: " +
958           GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS +
959           " not specified -- there will be no saved output");
960       return;
961     }
962     if (conf.doOutputDuringComputation()) {
963       if (LOG.isInfoEnabled()) {
964         LOG.info("saveVertices: The option for doing output during " +
965             "computation is selected, so there will be no saving of the " +
966             "output in the end of application");
967       }
968       return;
969     }
970 
971     final int numPartitions = getPartitionStore().getNumPartitions();
972     int numThreads = Math.min(getConfiguration().getNumOutputThreads(),
973         numPartitions);
974     LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
975         "saveVertices: Starting to save " + numLocalVertices + " vertices " +
976             "using " + numThreads + " threads");
977     final VertexOutputFormat<I, V, E> vertexOutputFormat =
978         getConfiguration().createWrappedVertexOutputFormat();
979 
980     getPartitionStore().startIteration();
981 
982     long verticesToStore = 0;
983     PartitionStore<I, V, E> partitionStore = getPartitionStore();
984     for (int partitionId : partitionStore.getPartitionIds()) {
985       verticesToStore += partitionStore.getPartitionVertexCount(partitionId);
986     }
987     WorkerProgress.get().startStoring(
988         verticesToStore, getPartitionStore().getNumPartitions());
989 
990     CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
991       @Override
992       public Callable<Void> newCallable(int callableId) {
993         return new Callable<Void>() {
994           /** How often to update WorkerProgress */
995           private static final long VERTICES_TO_UPDATE_PROGRESS = 100000;
996 
997           @Override
998           public Void call() throws Exception {
999             VertexWriter<I, V, E> vertexWriter =
1000                 vertexOutputFormat.createVertexWriter(getContext());
1001             vertexWriter.setConf(getConfiguration());
1002             vertexWriter.initialize(getContext());
1003             long nextPrintVertices = 0;
1004             long nextUpdateProgressVertices = VERTICES_TO_UPDATE_PROGRESS;
1005             long nextPrintMsecs = System.currentTimeMillis() + 15000;
1006             int partitionIndex = 0;
1007             int numPartitions = getPartitionStore().getNumPartitions();
1008             while (true) {
1009               Partition<I, V, E> partition =
1010                   getPartitionStore().getNextPartition();
1011               if (partition == null) {
1012                 break;
1013               }
1014 
1015               long verticesWritten = 0;
1016               for (Vertex<I, V, E> vertex : partition) {
1017                 vertexWriter.writeVertex(vertex);
1018                 ++verticesWritten;
1019 
1020                 // Update status at most every 250k vertices or 15 seconds
1021                 if (verticesWritten > nextPrintVertices &&
1022                     System.currentTimeMillis() > nextPrintMsecs) {
1023                   LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
1024                       "saveVertices: Saved " + verticesWritten + " out of " +
1025                           partition.getVertexCount() + " partition vertices, " +
1026                           "on partition " + partitionIndex +
1027                           " out of " + numPartitions);
1028                   nextPrintMsecs = System.currentTimeMillis() + 15000;
1029                   nextPrintVertices = verticesWritten + 250000;
1030                 }
1031 
1032                 if (verticesWritten >= nextUpdateProgressVertices) {
1033                   WorkerProgress.get().addVerticesStored(
1034                       VERTICES_TO_UPDATE_PROGRESS);
1035                   nextUpdateProgressVertices += VERTICES_TO_UPDATE_PROGRESS;
1036                 }
1037               }
1038               getPartitionStore().putPartition(partition);
1039               ++partitionIndex;
1040               WorkerProgress.get().addVerticesStored(
1041                   verticesWritten % VERTICES_TO_UPDATE_PROGRESS);
1042               WorkerProgress.get().incrementPartitionsStored();
1043             }
1044             vertexWriter.close(getContext()); // the temp results are saved now
1045             return null;
1046           }
1047         };
1048       }
1049     };
1050     ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
1051         "save-vertices-%d", getContext());
1052 
1053     LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
1054       "saveVertices: Done saving vertices.");
1055     // YARN: must complete the commit the "task" output, Hadoop isn't there.
1056     if (getConfiguration().isPureYarnJob() &&
1057       getConfiguration().getVertexOutputFormatClass() != null) {
1058       try {
1059         OutputCommitter outputCommitter =
1060           vertexOutputFormat.getOutputCommitter(getContext());
1061         if (outputCommitter.needsTaskCommit(getContext())) {
1062           LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
1063             "OutputCommitter: committing task output.");
1064           // transfer from temp dirs to "task commit" dirs to prep for
1065           // the master's OutputCommitter#commitJob(context) call to finish.
1066           outputCommitter.commitTask(getContext());
1067         }
1068       } catch (InterruptedException ie) {
1069         LOG.error("Interrupted while attempting to obtain " +
1070           "OutputCommitter.", ie);
1071       } catch (IOException ioe) {
1072         LOG.error("Master task's attempt to commit output has " +
1073           "FAILED.", ioe);
1074       }
1075     }
1076   }
1077 
1078   /**
1079    * Save the edges using the user-defined EdgeOutputFormat from our
1080    * vertexArray based on the split.
1081    *
1082    * @throws InterruptedException
1083    */
1084   private void saveEdges() throws IOException, InterruptedException {
1085     final ImmutableClassesGiraphConfiguration<I, V, E>  conf =
1086       getConfiguration();
1087 
1088     if (conf.getEdgeOutputFormatClass() == null) {
1089       LOG.warn("saveEdges: " +
1090                GiraphConstants.EDGE_OUTPUT_FORMAT_CLASS +
1091                "Make sure that the EdgeOutputFormat is not required.");
1092       return;
1093     }
1094 
1095     final int numPartitions = getPartitionStore().getNumPartitions();
1096     int numThreads = Math.min(conf.getNumOutputThreads(),
1097         numPartitions);
1098     LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
1099         "saveEdges: Starting to save the edges using " +
1100         numThreads + " threads");
1101     final EdgeOutputFormat<I, V, E> edgeOutputFormat =
1102         conf.createWrappedEdgeOutputFormat();
1103 
1104     getPartitionStore().startIteration();
1105 
1106     CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
1107       @Override
1108       public Callable<Void> newCallable(int callableId) {
1109         return new Callable<Void>() {
1110           @Override
1111           public Void call() throws Exception {
1112             EdgeWriter<I, V, E>  edgeWriter =
1113                 edgeOutputFormat.createEdgeWriter(getContext());
1114             edgeWriter.setConf(conf);
1115             edgeWriter.initialize(getContext());
1116 
1117             long nextPrintVertices = 0;
1118             long nextPrintMsecs = System.currentTimeMillis() + 15000;
1119             int partitionIndex = 0;
1120             int numPartitions = getPartitionStore().getNumPartitions();
1121             while (true) {
1122               Partition<I, V, E> partition =
1123                   getPartitionStore().getNextPartition();
1124               if (partition == null) {
1125                 break;
1126               }
1127 
1128               long vertices = 0;
1129               long edges = 0;
1130               long partitionEdgeCount = partition.getEdgeCount();
1131               for (Vertex<I, V, E> vertex : partition) {
1132                 for (Edge<I, E> edge : vertex.getEdges()) {
1133                   edgeWriter.writeEdge(vertex.getId(), vertex.getValue(), edge);
1134                   ++edges;
1135                 }
1136                 ++vertices;
1137 
1138                 // Update status at most every 250k vertices or 15 seconds
1139                 if (vertices > nextPrintVertices &&
1140                     System.currentTimeMillis() > nextPrintMsecs) {
1141                   LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
1142                       "saveEdges: Saved " + edges +
1143                       " edges out of " + partitionEdgeCount +
1144                       " partition edges, on partition " + partitionIndex +
1145                       " out of " + numPartitions);
1146                   nextPrintMsecs = System.currentTimeMillis() + 15000;
1147                   nextPrintVertices = vertices + 250000;
1148                 }
1149               }
1150               getPartitionStore().putPartition(partition);
1151               ++partitionIndex;
1152             }
1153             edgeWriter.close(getContext()); // the temp results are saved now
1154             return null;
1155           }
1156         };
1157       }
1158     };
1159     ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
1160         "save-vertices-%d", getContext());
1161 
1162     LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
1163       "saveEdges: Done saving edges.");
1164     // YARN: must complete the commit the "task" output, Hadoop isn't there.
1165     if (conf.isPureYarnJob() &&
1166       conf.getVertexOutputFormatClass() != null) {
1167       try {
1168         OutputCommitter outputCommitter =
1169           edgeOutputFormat.getOutputCommitter(getContext());
1170         if (outputCommitter.needsTaskCommit(getContext())) {
1171           LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
1172             "OutputCommitter: committing task output.");
1173           // transfer from temp dirs to "task commit" dirs to prep for
1174           // the master's OutputCommitter#commitJob(context) call to finish.
1175           outputCommitter.commitTask(getContext());
1176         }
1177       } catch (InterruptedException ie) {
1178         LOG.error("Interrupted while attempting to obtain " +
1179           "OutputCommitter.", ie);
1180       } catch (IOException ioe) {
1181         LOG.error("Master task's attempt to commit output has " +
1182           "FAILED.", ioe);
1183       }
1184     }
1185   }
1186 
1187   @Override
1188   public void cleanup(FinishedSuperstepStats finishedSuperstepStats)
1189     throws IOException, InterruptedException {
1190     workerClient.closeConnections();
1191     setCachedSuperstep(getSuperstep() - 1);
1192     if (finishedSuperstepStats.getCheckpointStatus() !=
1193         CheckpointStatus.CHECKPOINT_AND_HALT) {
1194       saveVertices(finishedSuperstepStats.getLocalVertexCount());
1195       saveEdges();
1196     }
1197     WorkerProgress.get().finishStoring();
1198     if (workerProgressWriter != null) {
1199       workerProgressWriter.stop();
1200     }
1201     getPartitionStore().shutdown();
1202     // All worker processes should denote they are done by adding special
1203     // znode.  Once the number of znodes equals the number of partitions
1204     // for workers and masters, the master will clean up the ZooKeeper
1205     // znodes associated with this job.
1206     String workerCleanedUpPath = cleanedUpPath  + "/" +
1207         getTaskId() + WORKER_SUFFIX;
1208     try {
1209       String finalFinishedPath =
1210           getZkExt().createExt(workerCleanedUpPath,
1211               null,
1212               Ids.OPEN_ACL_UNSAFE,
1213               CreateMode.PERSISTENT,
1214               true);
1215       if (LOG.isInfoEnabled()) {
1216         LOG.info("cleanup: Notifying master its okay to cleanup with " +
1217             finalFinishedPath);
1218       }
1219     } catch (KeeperException.NodeExistsException e) {
1220       if (LOG.isInfoEnabled()) {
1221         LOG.info("cleanup: Couldn't create finished node '" +
1222             workerCleanedUpPath);
1223       }
1224     } catch (KeeperException e) {
1225       // Cleaning up, it's okay to fail after cleanup is successful
1226       LOG.error("cleanup: Got KeeperException on notification " +
1227           "to master about cleanup", e);
1228     } catch (InterruptedException e) {
1229       // Cleaning up, it's okay to fail after cleanup is successful
1230       LOG.error("cleanup: Got InterruptedException on notification " +
1231           "to master about cleanup", e);
1232     }
1233     try {
1234       getZkExt().close();
1235     } catch (InterruptedException e) {
1236       // cleanup phase -- just log the error
1237       LOG.error("cleanup: Zookeeper failed to close with " + e);
1238     }
1239 
1240     if (getConfiguration().metricsEnabled()) {
1241       GiraphMetrics.get().dumpToStream(System.err);
1242     }
1243 
1244     // Preferably would shut down the service only after
1245     // all clients have disconnected (or the exceptions on the
1246     // client side ignored).
1247     workerServer.close();
1248   }
1249 
1250   @Override
1251   public void storeCheckpoint() throws IOException {
1252     LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
1253         "storeCheckpoint: Starting checkpoint " +
1254             getGraphTaskManager().getGraphFunctions().toString() +
1255             " - Attempt=" + getApplicationAttempt() +
1256             ", Superstep=" + getSuperstep());
1257 
1258     // Algorithm:
1259     // For each partition, dump vertices and messages
1260     Path metadataFilePath = createCheckpointFilePathSafe(
1261         CheckpointingUtils.CHECKPOINT_METADATA_POSTFIX);
1262     Path validFilePath = createCheckpointFilePathSafe(
1263         CheckpointingUtils.CHECKPOINT_VALID_POSTFIX);
1264     Path checkpointFilePath = createCheckpointFilePathSafe(
1265         CheckpointingUtils.CHECKPOINT_DATA_POSTFIX);
1266 
1267 
1268     // Metadata is buffered and written at the end since it's small and
1269     // needs to know how many partitions this worker owns
1270     FSDataOutputStream metadataOutputStream =
1271         getFs().create(metadataFilePath);
1272     metadataOutputStream.writeInt(getPartitionStore().getNumPartitions());
1273 
1274     for (Integer partitionId : getPartitionStore().getPartitionIds()) {
1275       metadataOutputStream.writeInt(partitionId);
1276     }
1277     metadataOutputStream.close();
1278 
1279     storeCheckpointVertices();
1280 
1281     FSDataOutputStream checkpointOutputStream =
1282         getFs().create(checkpointFilePath);
1283     workerContext.write(checkpointOutputStream);
1284     getContext().progress();
1285 
1286     // TODO: checkpointing messages along with vertices to avoid multiple loads
1287     //       of a partition when out-of-core is enabled.
1288     for (Integer partitionId : getPartitionStore().getPartitionIds()) {
1289       // write messages
1290       checkpointOutputStream.writeInt(partitionId);
1291       getServerData().getCurrentMessageStore()
1292           .writePartition(checkpointOutputStream, partitionId);
1293       getContext().progress();
1294 
1295     }
1296 
1297     List<Writable> w2wMessages =
1298         getServerData().getCurrentWorkerToWorkerMessages();
1299     WritableUtils.writeList(w2wMessages, checkpointOutputStream);
1300 
1301     checkpointOutputStream.close();
1302 
1303     getFs().createNewFile(validFilePath);
1304 
1305     // Notify master that checkpoint is stored
1306     String workerWroteCheckpoint =
1307         getWorkerWroteCheckpointPath(getApplicationAttempt(),
1308             getSuperstep()) + "/" + workerInfo.getHostnameId();
1309     try {
1310       getZkExt().createExt(workerWroteCheckpoint,
1311           new byte[0],
1312           Ids.OPEN_ACL_UNSAFE,
1313           CreateMode.PERSISTENT,
1314           true);
1315     } catch (KeeperException.NodeExistsException e) {
1316       LOG.warn("storeCheckpoint: wrote checkpoint worker path " +
1317           workerWroteCheckpoint + " already exists!");
1318     } catch (KeeperException e) {
1319       throw new IllegalStateException("Creating " + workerWroteCheckpoint +
1320           " failed with KeeperException", e);
1321     } catch (InterruptedException e) {
1322       throw new IllegalStateException("Creating " +
1323           workerWroteCheckpoint +
1324           " failed with InterruptedException", e);
1325     }
1326   }
1327 
1328   /**
1329    * Create checkpoint file safely. If file already exists remove it first.
1330    * @param name file extension
1331    * @return full file path to newly created file
1332    * @throws IOException
1333    */
1334   private Path createCheckpointFilePathSafe(String name) throws IOException {
1335     Path validFilePath = new Path(getCheckpointBasePath(getSuperstep()) + '.' +
1336         getWorkerId(workerInfo) + name);
1337     // Remove these files if they already exist (shouldn't though, unless
1338     // of previous failure of this worker)
1339     if (getFs().delete(validFilePath, false)) {
1340       LOG.warn("storeCheckpoint: Removed " + name + " file " +
1341           validFilePath);
1342     }
1343     return validFilePath;
1344   }
1345 
1346   /**
1347    * Returns path to saved checkpoint.
1348    * Doesn't check if file actually exists.
1349    * @param superstep saved superstep.
1350    * @param name extension name
1351    * @return fill file path to checkpoint file
1352    */
1353   private Path getSavedCheckpoint(long superstep, String name) {
1354     return new Path(getSavedCheckpointBasePath(superstep) + '.' +
1355         getWorkerId(workerInfo) + name);
1356   }
1357 
1358   /**
1359    * Save partitions. To speed up this operation
1360    * runs in multiple threads.
1361    */
1362   private void storeCheckpointVertices() {
1363     final int numPartitions = getPartitionStore().getNumPartitions();
1364     int numThreads = Math.min(
1365         GiraphConstants.NUM_CHECKPOINT_IO_THREADS.get(getConfiguration()),
1366         numPartitions);
1367 
1368     getPartitionStore().startIteration();
1369 
1370     final CompressionCodec codec =
1371         new CompressionCodecFactory(getConfiguration())
1372             .getCodec(new Path(
1373                 GiraphConstants.CHECKPOINT_COMPRESSION_CODEC
1374                     .get(getConfiguration())));
1375 
1376     long t0 = System.currentTimeMillis();
1377 
1378     CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
1379       @Override
1380       public Callable<Void> newCallable(int callableId) {
1381         return new Callable<Void>() {
1382 
1383           @Override
1384           public Void call() throws Exception {
1385             while (true) {
1386               Partition<I, V, E> partition =
1387                   getPartitionStore().getNextPartition();
1388               if (partition == null) {
1389                 break;
1390               }
1391               Path path =
1392                   createCheckpointFilePathSafe("_" + partition.getId() +
1393                       CheckpointingUtils.CHECKPOINT_VERTICES_POSTFIX);
1394 
1395               FSDataOutputStream uncompressedStream =
1396                   getFs().create(path);
1397 
1398 
1399               DataOutputStream stream = codec == null ? uncompressedStream :
1400                   new DataOutputStream(
1401                       codec.createOutputStream(uncompressedStream));
1402 
1403 
1404               partition.write(stream);
1405 
1406               getPartitionStore().putPartition(partition);
1407 
1408               stream.close();
1409               uncompressedStream.close();
1410             }
1411             return null;
1412           }
1413 
1414 
1415         };
1416       }
1417     };
1418 
1419     ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
1420         "checkpoint-vertices-%d", getContext());
1421 
1422     LOG.info("Save checkpoint in " + (System.currentTimeMillis() - t0) +
1423         " ms, using " + numThreads + " threads");
1424   }
1425 
1426   /**
1427    * Load saved partitions in multiple threads.
1428    * @param superstep superstep to load
1429    * @param partitions list of partitions to load
1430    */
1431   private void loadCheckpointVertices(final long superstep,
1432                                       List<Integer> partitions) {
1433     int numThreads = Math.min(
1434         GiraphConstants.NUM_CHECKPOINT_IO_THREADS.get(getConfiguration()),
1435         partitions.size());
1436 
1437     final Queue<Integer> partitionIdQueue =
1438         new ConcurrentLinkedQueue<>(partitions);
1439 
1440     final CompressionCodec codec =
1441         new CompressionCodecFactory(getConfiguration())
1442             .getCodec(new Path(
1443                 GiraphConstants.CHECKPOINT_COMPRESSION_CODEC
1444                     .get(getConfiguration())));
1445 
1446     long t0 = System.currentTimeMillis();
1447 
1448     CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
1449       @Override
1450       public Callable<Void> newCallable(int callableId) {
1451         return new Callable<Void>() {
1452 
1453           @Override
1454           public Void call() throws Exception {
1455             while (!partitionIdQueue.isEmpty()) {
1456               Integer partitionId = partitionIdQueue.poll();
1457               if (partitionId == null) {
1458                 break;
1459               }
1460               Path path =
1461                   getSavedCheckpoint(superstep, "_" + partitionId +
1462                       CheckpointingUtils.CHECKPOINT_VERTICES_POSTFIX);
1463 
1464               FSDataInputStream compressedStream =
1465                   getFs().open(path);
1466 
1467               DataInputStream stream = codec == null ? compressedStream :
1468                   new DataInputStream(
1469                       codec.createInputStream(compressedStream));
1470 
1471               Partition<I, V, E> partition =
1472                   getConfiguration().createPartition(partitionId, getContext());
1473 
1474               partition.readFields(stream);
1475 
1476               getPartitionStore().addPartition(partition);
1477 
1478               stream.close();
1479             }
1480             return null;
1481           }
1482 
1483         };
1484       }
1485     };
1486 
1487     ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
1488         "load-vertices-%d", getContext());
1489 
1490     LOG.info("Loaded checkpoint in " + (System.currentTimeMillis() - t0) +
1491         " ms, using " + numThreads + " threads");
1492   }
1493 
1494   @Override
1495   public VertexEdgeCount loadCheckpoint(long superstep) {
1496     Path metadataFilePath = getSavedCheckpoint(
1497         superstep, CheckpointingUtils.CHECKPOINT_METADATA_POSTFIX);
1498 
1499     Path checkpointFilePath = getSavedCheckpoint(
1500         superstep, CheckpointingUtils.CHECKPOINT_DATA_POSTFIX);
1501     // Algorithm:
1502     // Examine all the partition owners and load the ones
1503     // that match my hostname and id from the master designated checkpoint
1504     // prefixes.
1505     try {
1506       DataInputStream metadataStream =
1507           getFs().open(metadataFilePath);
1508 
1509       int partitions = metadataStream.readInt();
1510       List<Integer> partitionIds = new ArrayList<>(partitions);
1511       for (int i = 0; i < partitions; i++) {
1512         int partitionId = metadataStream.readInt();
1513         partitionIds.add(partitionId);
1514       }
1515 
1516       loadCheckpointVertices(superstep, partitionIds);
1517 
1518       getContext().progress();
1519 
1520       metadataStream.close();
1521 
1522       DataInputStream checkpointStream =
1523           getFs().open(checkpointFilePath);
1524       workerContext.readFields(checkpointStream);
1525 
1526       // Load global stats and superstep classes
1527       GlobalStats globalStats = new GlobalStats();
1528       SuperstepClasses superstepClasses = SuperstepClasses.createToRead(
1529           getConfiguration());
1530       String finalizedCheckpointPath = getSavedCheckpointBasePath(superstep) +
1531           CheckpointingUtils.CHECKPOINT_FINALIZED_POSTFIX;
1532       DataInputStream finalizedStream =
1533           getFs().open(new Path(finalizedCheckpointPath));
1534       globalStats.readFields(finalizedStream);
1535       superstepClasses.readFields(finalizedStream);
1536       getConfiguration().updateSuperstepClasses(superstepClasses);
1537       getServerData().resetMessageStores();
1538 
1539       // TODO: checkpointing messages along with vertices to avoid multiple
1540       //       loads of a partition when out-of-core is enabled.
1541       for (int i = 0; i < partitions; i++) {
1542         int partitionId = checkpointStream.readInt();
1543         getServerData().getCurrentMessageStore()
1544             .readFieldsForPartition(checkpointStream, partitionId);
1545       }
1546 
1547       List<Writable> w2wMessages = (List<Writable>) WritableUtils.readList(
1548           checkpointStream);
1549       getServerData().getCurrentWorkerToWorkerMessages().addAll(w2wMessages);
1550 
1551       checkpointStream.close();
1552 
1553       if (LOG.isInfoEnabled()) {
1554         LOG.info("loadCheckpoint: Loaded " +
1555             workerGraphPartitioner.getPartitionOwners().size() +
1556             " total.");
1557       }
1558 
1559       // Communication service needs to setup the connections prior to
1560       // processing vertices
1561 /*if[HADOOP_NON_SECURE]
1562       workerClient.setup();
1563 else[HADOOP_NON_SECURE]*/
1564       workerClient.setup(getConfiguration().authenticate());
1565 /*end[HADOOP_NON_SECURE]*/
1566       return new VertexEdgeCount(globalStats.getVertexCount(),
1567           globalStats.getEdgeCount(), 0);
1568 
1569     } catch (IOException e) {
1570       throw new RuntimeException(
1571           "loadCheckpoint: Failed for superstep=" + superstep, e);
1572     }
1573   }
1574 
1575   /**
1576    * Send the worker partitions to their destination workers
1577    *
1578    * @param workerPartitionMap Map of worker info to the partitions stored
1579    *        on this worker to be sent
1580    */
1581   private void sendWorkerPartitions(
1582       Map<WorkerInfo, List<Integer>> workerPartitionMap) {
1583     List<Entry<WorkerInfo, List<Integer>>> randomEntryList =
1584         new ArrayList<Entry<WorkerInfo, List<Integer>>>(
1585             workerPartitionMap.entrySet());
1586     Collections.shuffle(randomEntryList);
1587     WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor =
1588         new NettyWorkerClientRequestProcessor<I, V, E>(getContext(),
1589             getConfiguration(), this,
1590             false /* useOneMessageToManyIdsEncoding */);
1591     for (Entry<WorkerInfo, List<Integer>> workerPartitionList :
1592       randomEntryList) {
1593       for (Integer partitionId : workerPartitionList.getValue()) {
1594         Partition<I, V, E> partition =
1595             getPartitionStore().removePartition(partitionId);
1596         if (partition == null) {
1597           throw new IllegalStateException(
1598               "sendWorkerPartitions: Couldn't find partition " +
1599                   partitionId + " to send to " +
1600                   workerPartitionList.getKey());
1601         }
1602         if (LOG.isInfoEnabled()) {
1603           LOG.info("sendWorkerPartitions: Sending worker " +
1604               workerPartitionList.getKey() + " partition " +
1605               partitionId);
1606         }
1607         workerClientRequestProcessor.sendPartitionRequest(
1608             workerPartitionList.getKey(),
1609             partition);
1610       }
1611     }
1612 
1613     try {
1614       workerClientRequestProcessor.flush();
1615       workerClient.waitAllRequests();
1616     } catch (IOException e) {
1617       throw new IllegalStateException("sendWorkerPartitions: Flush failed", e);
1618     }
1619     String myPartitionExchangeDonePath =
1620         getPartitionExchangeWorkerPath(
1621             getApplicationAttempt(), getSuperstep(), getWorkerInfo());
1622     try {
1623       getZkExt().createExt(myPartitionExchangeDonePath,
1624           null,
1625           Ids.OPEN_ACL_UNSAFE,
1626           CreateMode.PERSISTENT,
1627           true);
1628     } catch (KeeperException e) {
1629       throw new IllegalStateException(
1630           "sendWorkerPartitions: KeeperException to create " +
1631               myPartitionExchangeDonePath, e);
1632     } catch (InterruptedException e) {
1633       throw new IllegalStateException(
1634           "sendWorkerPartitions: InterruptedException to create " +
1635               myPartitionExchangeDonePath, e);
1636     }
1637     if (LOG.isInfoEnabled()) {
1638       LOG.info("sendWorkerPartitions: Done sending all my partitions.");
1639     }
1640   }
1641 
1642   @Override
1643   public final void exchangeVertexPartitions(
1644       Collection<? extends PartitionOwner> masterSetPartitionOwners) {
1645     // 1. Fix the addresses of the partition ids if they have changed.
1646     // 2. Send all the partitions to their destination workers in a random
1647     //    fashion.
1648     // 3. Notify completion with a ZooKeeper stamp
1649     // 4. Wait for all my dependencies to be done (if any)
1650     // 5. Add the partitions to myself.
1651     PartitionExchange partitionExchange =
1652         workerGraphPartitioner.updatePartitionOwners(
1653             getWorkerInfo(), masterSetPartitionOwners);
1654     workerClient.openConnections();
1655 
1656     Map<WorkerInfo, List<Integer>> sendWorkerPartitionMap =
1657         partitionExchange.getSendWorkerPartitionMap();
1658     if (!getPartitionStore().isEmpty()) {
1659       sendWorkerPartitions(sendWorkerPartitionMap);
1660     }
1661 
1662     Set<WorkerInfo> myDependencyWorkerSet =
1663         partitionExchange.getMyDependencyWorkerSet();
1664     Set<String> workerIdSet = new HashSet<String>();
1665     for (WorkerInfo tmpWorkerInfo : myDependencyWorkerSet) {
1666       if (!workerIdSet.add(tmpWorkerInfo.getHostnameId())) {
1667         throw new IllegalStateException(
1668             "exchangeVertexPartitions: Duplicate entry " + tmpWorkerInfo);
1669       }
1670     }
1671     if (myDependencyWorkerSet.isEmpty() && getPartitionStore().isEmpty()) {
1672       if (LOG.isInfoEnabled()) {
1673         LOG.info("exchangeVertexPartitions: Nothing to exchange, " +
1674             "exiting early");
1675       }
1676       return;
1677     }
1678 
1679     String vertexExchangePath =
1680         getPartitionExchangePath(getApplicationAttempt(), getSuperstep());
1681     List<String> workerDoneList;
1682     try {
1683       while (true) {
1684         workerDoneList = getZkExt().getChildrenExt(
1685             vertexExchangePath, true, false, false);
1686         workerIdSet.removeAll(workerDoneList);
1687         if (workerIdSet.isEmpty()) {
1688           break;
1689         }
1690         if (LOG.isInfoEnabled()) {
1691           LOG.info("exchangeVertexPartitions: Waiting for workers " +
1692               workerIdSet);
1693         }
1694         getPartitionExchangeChildrenChangedEvent().waitForTimeoutOrFail(
1695             GiraphConstants.WAIT_FOR_OTHER_WORKERS_TIMEOUT_MSEC.get(
1696                 getConfiguration()));
1697         getPartitionExchangeChildrenChangedEvent().reset();
1698       }
1699     } catch (KeeperException | InterruptedException e) {
1700       throw new RuntimeException(
1701           "exchangeVertexPartitions: Got runtime exception", e);
1702     }
1703 
1704     if (LOG.isInfoEnabled()) {
1705       LOG.info("exchangeVertexPartitions: Done with exchange.");
1706     }
1707   }
1708 
1709   /**
1710    * Get event when the state of a partition exchange has changed.
1711    *
1712    * @return Event to check.
1713    */
1714   public final BspEvent getPartitionExchangeChildrenChangedEvent() {
1715     return partitionExchangeChildrenChanged;
1716   }
1717 
1718   @Override
1719   protected boolean processEvent(WatchedEvent event) {
1720     boolean foundEvent = false;
1721     if (event.getPath().startsWith(masterJobStatePath) &&
1722         (event.getType() == EventType.NodeChildrenChanged)) {
1723       if (LOG.isInfoEnabled()) {
1724         LOG.info("processEvent: Job state changed, checking " +
1725             "to see if it needs to restart");
1726       }
1727       JSONObject jsonObj = getJobState();
1728       // in YARN, we have to manually commit our own output in 2 stages that we
1729       // do not have to do in Hadoop-based Giraph. So jsonObj can be null.
1730       if (getConfiguration().isPureYarnJob() && null == jsonObj) {
1731         LOG.error("BspServiceWorker#getJobState() came back NULL.");
1732         return false; // the event has been processed.
1733       }
1734       try {
1735         if ((ApplicationState.valueOf(jsonObj.getString(JSONOBJ_STATE_KEY)) ==
1736             ApplicationState.START_SUPERSTEP) &&
1737             jsonObj.getLong(JSONOBJ_APPLICATION_ATTEMPT_KEY) !=
1738             getApplicationAttempt()) {
1739           LOG.fatal("processEvent: Worker will restart " +
1740               "from command - " + jsonObj.toString());
1741           System.exit(-1);
1742         }
1743       } catch (JSONException e) {
1744         throw new RuntimeException(
1745             "processEvent: Couldn't properly get job state from " +
1746                 jsonObj.toString());
1747       }
1748       foundEvent = true;
1749     } else if (event.getPath().contains(PARTITION_EXCHANGE_DIR) &&
1750         event.getType() == EventType.NodeChildrenChanged) {
1751       if (LOG.isInfoEnabled()) {
1752         LOG.info("processEvent : partitionExchangeChildrenChanged " +
1753             "(at least one worker is done sending partitions)");
1754       }
1755       partitionExchangeChildrenChanged.signal();
1756       foundEvent = true;
1757     } else if (event.getPath().contains(MEMORY_OBSERVER_DIR) &&
1758         event.getType() == EventType.NodeChildrenChanged) {
1759       memoryObserver.callGc();
1760       foundEvent = true;
1761     }
1762 
1763     return foundEvent;
1764   }
1765 
1766   @Override
1767   public WorkerInfo getWorkerInfo() {
1768     return workerInfo;
1769   }
1770 
1771   @Override
1772   public PartitionStore<I, V, E> getPartitionStore() {
1773     return getServerData().getPartitionStore();
1774   }
1775 
1776   @Override
1777   public PartitionOwner getVertexPartitionOwner(I vertexId) {
1778     return workerGraphPartitioner.getPartitionOwner(vertexId);
1779   }
1780 
1781   @Override
1782   public Iterable<? extends PartitionOwner> getPartitionOwners() {
1783     return workerGraphPartitioner.getPartitionOwners();
1784   }
1785 
1786   @Override
1787   public int getPartitionId(I vertexId) {
1788     PartitionOwner partitionOwner = getVertexPartitionOwner(vertexId);
1789     return partitionOwner.getPartitionId();
1790   }
1791 
1792   @Override
1793   public boolean hasPartition(Integer partitionId) {
1794     return getPartitionStore().hasPartition(partitionId);
1795   }
1796 
1797   @Override
1798   public Iterable<Integer> getPartitionIds() {
1799     return getPartitionStore().getPartitionIds();
1800   }
1801 
1802   @Override
1803   public long getPartitionVertexCount(Integer partitionId) {
1804     return getPartitionStore().getPartitionVertexCount(partitionId);
1805   }
1806 
1807   @Override
1808   public void startIteration() {
1809     getPartitionStore().startIteration();
1810   }
1811 
1812   @Override
1813   public Partition getNextPartition() {
1814     return getPartitionStore().getNextPartition();
1815   }
1816 
1817   @Override
1818   public void putPartition(Partition partition) {
1819     getPartitionStore().putPartition(partition);
1820   }
1821 
1822   @Override
1823   public ServerData<I, V, E> getServerData() {
1824     return workerServer.getServerData();
1825   }
1826 
1827 
1828   @Override
1829   public WorkerAggregatorHandler getAggregatorHandler() {
1830     return globalCommHandler;
1831   }
1832 
1833   @Override
1834   public void prepareSuperstep() {
1835     if (getSuperstep() != INPUT_SUPERSTEP) {
1836       globalCommHandler.prepareSuperstep(workerAggregatorRequestProcessor);
1837     }
1838   }
1839 
1840   @Override
1841   public SuperstepOutput<I, V, E> getSuperstepOutput() {
1842     return superstepOutput;
1843   }
1844 
1845   @Override
1846   public GlobalStats getGlobalStats() {
1847     GlobalStats globalStats = new GlobalStats();
1848     if (getSuperstep() > Math.max(INPUT_SUPERSTEP, getRestartedSuperstep())) {
1849       String superstepFinishedNode =
1850           getSuperstepFinishedPath(getApplicationAttempt(),
1851               getSuperstep() - 1);
1852       WritableUtils.readFieldsFromZnode(
1853           getZkExt(), superstepFinishedNode, false, null,
1854           globalStats);
1855     }
1856     return globalStats;
1857   }
1858 
1859   @Override
1860   public WorkerInputSplitsHandler getInputSplitsHandler() {
1861     return inputSplitsHandler;
1862   }
1863 
1864   @Override
1865   public void addressesAndPartitionsReceived(
1866       AddressesAndPartitionsWritable addressesAndPartitions) {
1867     addressesAndPartitionsHolder.offer(addressesAndPartitions);
1868   }
1869 }