View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.worker;
20  
21  import java.io.DataInputStream;
22  import java.io.DataOutputStream;
23  import java.io.IOException;
24  import java.nio.charset.Charset;
25  import java.util.ArrayList;
26  import java.util.Collection;
27  import java.util.Collections;
28  import java.util.HashSet;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.Map.Entry;
32  import java.util.Queue;
33  import java.util.Set;
34  import java.util.concurrent.Callable;
35  import java.util.concurrent.ConcurrentLinkedQueue;
36  import java.util.concurrent.TimeUnit;
37  
38  import net.iharder.Base64;
39  
40  import org.apache.giraph.bsp.ApplicationState;
41  import org.apache.giraph.bsp.BspService;
42  import org.apache.giraph.bsp.CentralizedServiceWorker;
43  import org.apache.giraph.bsp.checkpoints.CheckpointStatus;
44  import org.apache.giraph.comm.ServerData;
45  import org.apache.giraph.comm.WorkerClient;
46  import org.apache.giraph.comm.WorkerClientRequestProcessor;
47  import org.apache.giraph.comm.WorkerServer;
48  import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
49  import org.apache.giraph.comm.messages.MessageStore;
50  import org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper;
51  import org.apache.giraph.comm.netty.NettyWorkerAggregatorRequestProcessor;
52  import org.apache.giraph.comm.netty.NettyWorkerClient;
53  import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
54  import org.apache.giraph.comm.netty.NettyWorkerServer;
55  import org.apache.giraph.comm.requests.PartitionStatsRequest;
56  import org.apache.giraph.conf.GiraphConstants;
57  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
58  import org.apache.giraph.edge.Edge;
59  import org.apache.giraph.graph.AddressesAndPartitionsWritable;
60  import org.apache.giraph.graph.FinishedSuperstepStats;
61  import org.apache.giraph.graph.GlobalStats;
62  import org.apache.giraph.graph.GraphTaskManager;
63  import org.apache.giraph.graph.Vertex;
64  import org.apache.giraph.graph.VertexEdgeCount;
65  import org.apache.giraph.io.EdgeOutputFormat;
66  import org.apache.giraph.io.EdgeWriter;
67  import org.apache.giraph.io.VertexOutputFormat;
68  import org.apache.giraph.io.VertexWriter;
69  import org.apache.giraph.io.superstep_output.SuperstepOutput;
70  import org.apache.giraph.mapping.translate.TranslateEdge;
71  import org.apache.giraph.master.MasterInfo;
72  import org.apache.giraph.master.SuperstepClasses;
73  import org.apache.giraph.metrics.GiraphMetrics;
74  import org.apache.giraph.metrics.GiraphTimer;
75  import org.apache.giraph.metrics.GiraphTimerContext;
76  import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
77  import org.apache.giraph.metrics.SuperstepMetricsRegistry;
78  import org.apache.giraph.metrics.WorkerSuperstepMetrics;
79  import org.apache.giraph.ooc.OutOfCoreEngine;
80  import org.apache.giraph.partition.Partition;
81  import org.apache.giraph.partition.PartitionExchange;
82  import org.apache.giraph.partition.PartitionOwner;
83  import org.apache.giraph.partition.PartitionStats;
84  import org.apache.giraph.partition.PartitionStore;
85  import org.apache.giraph.partition.WorkerGraphPartitioner;
86  import org.apache.giraph.utils.BlockingElementsSet;
87  import org.apache.giraph.utils.CallableFactory;
88  import org.apache.giraph.utils.CheckpointingUtils;
89  import org.apache.giraph.utils.JMapHistoDumper;
90  import org.apache.giraph.utils.LoggerUtils;
91  import org.apache.giraph.utils.MemoryUtils;
92  import org.apache.giraph.utils.ProgressableUtils;
93  import org.apache.giraph.utils.ReactiveJMapHistoDumper;
94  import org.apache.giraph.utils.WritableUtils;
95  import org.apache.giraph.zk.BspEvent;
96  import org.apache.giraph.zk.PredicateLock;
97  import org.apache.hadoop.fs.FSDataInputStream;
98  import org.apache.hadoop.fs.FSDataOutputStream;
99  import org.apache.hadoop.fs.Path;
100 import org.apache.hadoop.io.Writable;
101 import org.apache.hadoop.io.WritableComparable;
102 import org.apache.hadoop.io.compress.CompressionCodec;
103 import org.apache.hadoop.io.compress.CompressionCodecFactory;
104 import org.apache.hadoop.mapreduce.Mapper;
105 import org.apache.hadoop.mapreduce.OutputCommitter;
106 import org.apache.log4j.Level;
107 import org.apache.log4j.Logger;
108 import org.apache.zookeeper.CreateMode;
109 import org.apache.zookeeper.KeeperException;
110 import org.apache.zookeeper.WatchedEvent;
111 import org.apache.zookeeper.Watcher.Event.EventType;
112 import org.apache.zookeeper.ZooDefs.Ids;
113 import org.apache.zookeeper.data.Stat;
114 import org.json.JSONArray;
115 import org.json.JSONException;
116 import org.json.JSONObject;
117 
118 import com.google.common.collect.Lists;
119 
120 /**
121  * ZooKeeper-based implementation of {@link CentralizedServiceWorker}.
122  *
123  * @param <I> Vertex id
124  * @param <V> Vertex data
125  * @param <E> Edge data
126  */
127 @SuppressWarnings("rawtypes")
128 public class BspServiceWorker<I extends WritableComparable,
129     V extends Writable, E extends Writable>
130     extends BspService<I, V, E>
131     implements CentralizedServiceWorker<I, V, E>,
132     ResetSuperstepMetricsObserver {
133   /** Name of gauge for time spent waiting on other workers */
134   public static final String TIMER_WAIT_REQUESTS = "wait-requests-us";
135   /** Class logger */
136   private static final Logger LOG = Logger.getLogger(BspServiceWorker.class);
137   /** My process health znode */
138   private String myHealthZnode;
139   /** Worker info */
140   private final WorkerInfo workerInfo;
141   /** Worker graph partitioner */
142   private final WorkerGraphPartitioner<I, V, E> workerGraphPartitioner;
143   /** Local Data for each worker */
144   private final LocalData<I, V, E, ? extends Writable> localData;
145   /** Used to translate Edges during vertex input phase based on localData */
146   private final TranslateEdge<I, E> translateEdge;
147   /** IPC Client */
148   private final WorkerClient<I, V, E> workerClient;
149   /** IPC Server */
150   private final WorkerServer<I, V, E> workerServer;
151   /** Request processor for aggregator requests */
152   private final WorkerAggregatorRequestProcessor
153   workerAggregatorRequestProcessor;
154   /** Master info */
155   private MasterInfo masterInfo = new MasterInfo();
156   /** List of workers */
157   private List<WorkerInfo> workerInfoList = Lists.newArrayList();
158   /** Have the partition exchange children (workers) changed? */
159   private final BspEvent partitionExchangeChildrenChanged;
160 
161   /** Addresses and partitions transfer */
162   private BlockingElementsSet<AddressesAndPartitionsWritable>
163       addressesAndPartitionsHolder = new BlockingElementsSet<>();
164 
165   /** Worker Context */
166   private final WorkerContext workerContext;
167 
168   /** Handler for aggregators */
169   private final WorkerAggregatorHandler globalCommHandler;
170 
171   /** Superstep output */
172   private final SuperstepOutput<I, V, E> superstepOutput;
173 
174   /** array of observers to call back to */
175   private final WorkerObserver[] observers;
176   /** Writer for worker progress */
177   private final WorkerProgressWriter workerProgressWriter;
178 
179   // Per-Superstep Metrics
180   /** Timer for WorkerContext#postSuperstep */
181   private GiraphTimer wcPostSuperstepTimer;
182   /** Time spent waiting on requests to finish */
183   private GiraphTimer waitRequestsTimer;
184 
185   /** InputSplit handlers used in INPUT_SUPERSTEP */
186   private WorkerInputSplitsHandler inputSplitsHandler;
187 
188   /** Memory observer */
189   private final MemoryObserver memoryObserver;
190 
191   /**
192    * Constructor for setting up the worker.
193    *
194    * @param context Mapper context
195    * @param graphTaskManager GraphTaskManager for this compute node
196    * @throws IOException
197    * @throws InterruptedException
198    */
199   public BspServiceWorker(
200     Mapper<?, ?, ?, ?>.Context context,
201     GraphTaskManager<I, V, E> graphTaskManager)
202     throws IOException, InterruptedException {
203     super(context, graphTaskManager);
204     ImmutableClassesGiraphConfiguration<I, V, E> conf = getConfiguration();
205     localData = new LocalData<>(conf);
206     translateEdge = getConfiguration().edgeTranslationInstance();
207     if (translateEdge != null) {
208       translateEdge.initialize(this);
209     }
210     partitionExchangeChildrenChanged = new PredicateLock(context);
211     registerBspEvent(partitionExchangeChildrenChanged);
212     workerGraphPartitioner =
213         getGraphPartitionerFactory().createWorkerGraphPartitioner();
214     workerInfo = new WorkerInfo();
215     workerServer = new NettyWorkerServer<I, V, E>(conf, this, context,
216         graphTaskManager.createUncaughtExceptionHandler());
217     workerInfo.setInetSocketAddress(workerServer.getMyAddress(),
218         workerServer.getLocalHostOrIp());
219     workerInfo.setTaskId(getTaskPartition());
220     workerClient = new NettyWorkerClient<I, V, E>(context, conf, this,
221         graphTaskManager.createUncaughtExceptionHandler());
222     workerServer.setFlowControl(workerClient.getFlowControl());
223     OutOfCoreEngine oocEngine = workerServer.getServerData().getOocEngine();
224     if (oocEngine != null) {
225       oocEngine.setFlowControl(workerClient.getFlowControl());
226     }
227 
228     workerAggregatorRequestProcessor =
229         new NettyWorkerAggregatorRequestProcessor(getContext(), conf, this);
230 
231     globalCommHandler = new WorkerAggregatorHandler(this, conf, context);
232 
233     workerContext = conf.createWorkerContext();
234     workerContext.setWorkerGlobalCommUsage(globalCommHandler);
235 
236     superstepOutput = conf.createSuperstepOutput(context);
237 
238     if (conf.isJMapHistogramDumpEnabled()) {
239       conf.addWorkerObserverClass(JMapHistoDumper.class);
240     }
241     if (conf.isReactiveJmapHistogramDumpEnabled()) {
242       conf.addWorkerObserverClass(ReactiveJMapHistoDumper.class);
243     }
244     observers = conf.createWorkerObservers();
245 
246     WorkerProgress.get().setTaskId(getTaskPartition());
247     workerProgressWriter = conf.trackJobProgressOnClient() ?
248         new WorkerProgressWriter(graphTaskManager.getJobProgressTracker()) :
249         null;
250 
251     GiraphMetrics.get().addSuperstepResetObserver(this);
252 
253     inputSplitsHandler = new WorkerInputSplitsHandler(
254         workerInfo, masterInfo.getTaskId(), workerClient);
255 
256     memoryObserver = new MemoryObserver(getZkExt(), memoryObserverPath, conf);
257   }
258 
259   @Override
260   public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
261     waitRequestsTimer = new GiraphTimer(superstepMetrics,
262         TIMER_WAIT_REQUESTS, TimeUnit.MICROSECONDS);
263     wcPostSuperstepTimer = new GiraphTimer(superstepMetrics,
264         "worker-context-post-superstep", TimeUnit.MICROSECONDS);
265   }
266 
267   @Override
268   public WorkerContext getWorkerContext() {
269     return workerContext;
270   }
271 
272   @Override
273   public WorkerObserver[] getWorkerObservers() {
274     return observers;
275   }
276 
277   @Override
278   public WorkerClient<I, V, E> getWorkerClient() {
279     return workerClient;
280   }
281 
282   public LocalData<I, V, E, ? extends Writable> getLocalData() {
283     return localData;
284   }
285 
286   public TranslateEdge<I, E> getTranslateEdge() {
287     return translateEdge;
288   }
289 
290   /**
291    * Intended to check the health of the node.  For instance, can it ssh,
292    * dmesg, etc. For now, does nothing.
293    * TODO: Make this check configurable by the user (i.e. search dmesg for
294    * problems).
295    *
296    * @return True if healthy (always in this case).
297    */
298   public boolean isHealthy() {
299     return true;
300   }
301 
302   /**
303    * Load the vertices/edges from input slits. Do this until all the
304    * InputSplits have been processed.
305    * All workers will try to do as many InputSplits as they can.  The master
306    * will monitor progress and stop this once all the InputSplits have been
307    * loaded and check-pointed.  Keep track of the last input split path to
308    * ensure the input split cache is flushed prior to marking the last input
309    * split complete.
310    *
311    * Use one or more threads to do the loading.
312    *
313    * @param inputSplitsCallableFactory Factory for {@link InputSplitsCallable}s
314    * @return Statistics of the vertices and edges loaded
315    * @throws InterruptedException
316    * @throws KeeperException
317    */
318   private VertexEdgeCount loadInputSplits(
319       CallableFactory<VertexEdgeCount> inputSplitsCallableFactory)
320     throws KeeperException, InterruptedException {
321     VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
322     int numThreads = getConfiguration().getNumInputSplitsThreads();
323     if (LOG.isInfoEnabled()) {
324       LOG.info("loadInputSplits: Using " + numThreads + " thread(s), " +
325           "originally " + getConfiguration().getNumInputSplitsThreads() +
326           " threads(s)");
327     }
328 
329     List<VertexEdgeCount> results =
330         ProgressableUtils.getResultsWithNCallables(inputSplitsCallableFactory,
331             numThreads, "load-%d", getContext());
332     for (VertexEdgeCount result : results) {
333       vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(result);
334     }
335 
336     workerClient.waitAllRequests();
337     return vertexEdgeCount;
338   }
339 
340   /**
341    * Load the mapping entries from the user-defined
342    * {@link org.apache.giraph.io.MappingReader}
343    *
344    * @return Count of mapping entries loaded
345    */
346   private long loadMapping() throws KeeperException,
347     InterruptedException {
348     MappingInputSplitsCallableFactory<I, V, E, ? extends Writable>
349         inputSplitsCallableFactory =
350         new MappingInputSplitsCallableFactory<>(
351             getConfiguration().createWrappedMappingInputFormat(),
352             getContext(),
353             getConfiguration(),
354             this,
355             inputSplitsHandler);
356 
357     long mappingsLoaded =
358         loadInputSplits(inputSplitsCallableFactory).getMappingCount();
359 
360     // after all threads finish loading - call postFilling
361     localData.getMappingStore().postFilling();
362     return mappingsLoaded;
363   }
364 
365   /**
366    * Load the vertices from the user-defined
367    * {@link org.apache.giraph.io.VertexReader}
368    *
369    * @return Count of vertices and edges loaded
370    */
371   private VertexEdgeCount loadVertices() throws KeeperException,
372       InterruptedException {
373     VertexInputSplitsCallableFactory<I, V, E> inputSplitsCallableFactory =
374         new VertexInputSplitsCallableFactory<I, V, E>(
375             getConfiguration().createWrappedVertexInputFormat(),
376             getContext(),
377             getConfiguration(),
378             this,
379             inputSplitsHandler);
380 
381     return loadInputSplits(inputSplitsCallableFactory);
382   }
383 
384   /**
385    * Load the edges from the user-defined
386    * {@link org.apache.giraph.io.EdgeReader}.
387    *
388    * @return Number of edges loaded
389    */
390   private long loadEdges() throws KeeperException, InterruptedException {
391     EdgeInputSplitsCallableFactory<I, V, E> inputSplitsCallableFactory =
392         new EdgeInputSplitsCallableFactory<I, V, E>(
393             getConfiguration().createWrappedEdgeInputFormat(),
394             getContext(),
395             getConfiguration(),
396             this,
397             inputSplitsHandler);
398 
399     return loadInputSplits(inputSplitsCallableFactory).getEdgeCount();
400   }
401 
402   @Override
403   public MasterInfo getMasterInfo() {
404     return masterInfo;
405   }
406 
407   @Override
408   public List<WorkerInfo> getWorkerInfoList() {
409     return workerInfoList;
410   }
411 
412   /**
413    * Mark current worker as done and then wait for all workers
414    * to finish processing input splits.
415    */
416   private void markCurrentWorkerDoneReadingThenWaitForOthers() {
417     String workerInputSplitsDonePath =
418         inputSplitsWorkerDonePath + "/" + getWorkerInfo().getHostnameId();
419     try {
420       getZkExt().createExt(workerInputSplitsDonePath,
421           null,
422           Ids.OPEN_ACL_UNSAFE,
423           CreateMode.PERSISTENT,
424           true);
425     } catch (KeeperException e) {
426       throw new IllegalStateException(
427           "markCurrentWorkerDoneThenWaitForOthers: " +
428               "KeeperException creating worker done splits", e);
429     } catch (InterruptedException e) {
430       throw new IllegalStateException(
431           "markCurrentWorkerDoneThenWaitForOthers: " +
432               "InterruptedException creating worker done splits", e);
433     }
434     while (true) {
435       Stat inputSplitsDoneStat;
436       try {
437         inputSplitsDoneStat =
438             getZkExt().exists(inputSplitsAllDonePath, true);
439       } catch (KeeperException e) {
440         throw new IllegalStateException(
441             "markCurrentWorkerDoneThenWaitForOthers: " +
442                 "KeeperException waiting on worker done splits", e);
443       } catch (InterruptedException e) {
444         throw new IllegalStateException(
445             "markCurrentWorkerDoneThenWaitForOthers: " +
446                 "InterruptedException waiting on worker done splits", e);
447       }
448       if (inputSplitsDoneStat != null) {
449         break;
450       }
451       getInputSplitsAllDoneEvent().waitForever();
452       getInputSplitsAllDoneEvent().reset();
453     }
454   }
455 
456   @Override
457   public FinishedSuperstepStats setup() {
458     // Unless doing a restart, prepare for computation:
459     // 1. Start superstep INPUT_SUPERSTEP (no computation)
460     // 2. Wait until the INPUT_SPLIT_ALL_READY_PATH node has been created
461     // 3. Process input splits until there are no more.
462     // 4. Wait until the INPUT_SPLIT_ALL_DONE_PATH node has been created
463     // 5. Process any mutations deriving from add edge requests
464     // 6. Wait for superstep INPUT_SUPERSTEP to complete.
465     if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
466       setCachedSuperstep(getRestartedSuperstep());
467       return new FinishedSuperstepStats(0, false, 0, 0, true,
468           CheckpointStatus.NONE);
469     }
470 
471     JSONObject jobState = getJobState();
472     if (jobState != null) {
473       try {
474         if ((ApplicationState.valueOf(jobState.getString(JSONOBJ_STATE_KEY)) ==
475             ApplicationState.START_SUPERSTEP) &&
476             jobState.getLong(JSONOBJ_SUPERSTEP_KEY) ==
477             getSuperstep()) {
478           if (LOG.isInfoEnabled()) {
479             LOG.info("setup: Restarting from an automated " +
480                 "checkpointed superstep " +
481                 getSuperstep() + ", attempt " +
482                 getApplicationAttempt());
483           }
484           setRestartedSuperstep(getSuperstep());
485           return new FinishedSuperstepStats(0, false, 0, 0, true,
486               CheckpointStatus.NONE);
487         }
488       } catch (JSONException e) {
489         throw new RuntimeException(
490             "setup: Failed to get key-values from " +
491                 jobState.toString(), e);
492       }
493     }
494 
495     // Add the partitions that this worker owns
496     Collection<? extends PartitionOwner> masterSetPartitionOwners =
497         startSuperstep();
498     workerGraphPartitioner.updatePartitionOwners(
499         getWorkerInfo(), masterSetPartitionOwners);
500     getPartitionStore().initialize();
501 
502 /*if[HADOOP_NON_SECURE]
503     workerClient.setup();
504 else[HADOOP_NON_SECURE]*/
505     workerClient.setup(getConfiguration().authenticate());
506 /*end[HADOOP_NON_SECURE]*/
507 
508     // Initialize aggregator at worker side during setup.
509     // Do this just before vertex and edge loading.
510     globalCommHandler.prepareSuperstep(workerAggregatorRequestProcessor);
511 
512     VertexEdgeCount vertexEdgeCount;
513     long entriesLoaded;
514 
515     if (getConfiguration().hasMappingInputFormat()) {
516       getContext().progress();
517       try {
518         entriesLoaded = loadMapping();
519         // successfully loaded mapping
520         // now initialize graphPartitionerFactory with this data
521         getGraphPartitionerFactory().initialize(localData);
522       } catch (InterruptedException e) {
523         throw new IllegalStateException(
524             "setup: loadMapping failed with InterruptedException", e);
525       } catch (KeeperException e) {
526         throw new IllegalStateException(
527             "setup: loadMapping failed with KeeperException", e);
528       }
529       getContext().progress();
530       if (LOG.isInfoEnabled()) {
531         LOG.info("setup: Finally loaded a total of " +
532             entriesLoaded + " entries from inputSplits");
533       }
534 
535       // Print stats for data stored in localData once mapping is fully
536       // loaded on all the workers
537       localData.printStats();
538     }
539 
540     if (getConfiguration().hasVertexInputFormat()) {
541       getContext().progress();
542       try {
543         vertexEdgeCount = loadVertices();
544       } catch (InterruptedException e) {
545         throw new IllegalStateException(
546             "setup: loadVertices failed with InterruptedException", e);
547       } catch (KeeperException e) {
548         throw new IllegalStateException(
549             "setup: loadVertices failed with KeeperException", e);
550       }
551       getContext().progress();
552     } else {
553       vertexEdgeCount = new VertexEdgeCount();
554     }
555     WorkerProgress.get().finishLoadingVertices();
556 
557     if (getConfiguration().hasEdgeInputFormat()) {
558       getContext().progress();
559       try {
560         vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(0, loadEdges());
561       } catch (InterruptedException e) {
562         throw new IllegalStateException(
563             "setup: loadEdges failed with InterruptedException", e);
564       } catch (KeeperException e) {
565         throw new IllegalStateException(
566             "setup: loadEdges failed with KeeperException", e);
567       }
568       getContext().progress();
569     }
570     WorkerProgress.get().finishLoadingEdges();
571 
572     if (LOG.isInfoEnabled()) {
573       LOG.info("setup: Finally loaded a total of " + vertexEdgeCount);
574     }
575 
576     markCurrentWorkerDoneReadingThenWaitForOthers();
577 
578     // Create remaining partitions owned by this worker.
579     for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
580       if (partitionOwner.getWorkerInfo().equals(getWorkerInfo()) &&
581           !getPartitionStore().hasPartition(
582               partitionOwner.getPartitionId())) {
583         Partition<I, V, E> partition =
584             getConfiguration().createPartition(
585                 partitionOwner.getPartitionId(), getContext());
586         getPartitionStore().addPartition(partition);
587       }
588     }
589 
590     // remove mapping store if possible
591     localData.removeMappingStoreIfPossible();
592 
593     if (getConfiguration().hasEdgeInputFormat()) {
594       // Move edges from temporary storage to their source vertices.
595       getServerData().getEdgeStore().moveEdgesToVertices();
596     }
597 
598     // Generate the partition stats for the input superstep and process
599     // if necessary
600     List<PartitionStats> partitionStatsList =
601         new ArrayList<PartitionStats>();
602     PartitionStore<I, V, E> partitionStore = getPartitionStore();
603     for (Integer partitionId : partitionStore.getPartitionIds()) {
604       PartitionStats partitionStats =
605           new PartitionStats(partitionId,
606               partitionStore.getPartitionVertexCount(partitionId),
607               0,
608               partitionStore.getPartitionEdgeCount(partitionId),
609               0, 0);
610       partitionStatsList.add(partitionStats);
611     }
612     workerGraphPartitioner.finalizePartitionStats(
613         partitionStatsList, getPartitionStore());
614 
615     return finishSuperstep(partitionStatsList, null);
616   }
617 
618   /**
619    * Register the health of this worker for a given superstep
620    *
621    * @param superstep Superstep to register health on
622    */
623   private void registerHealth(long superstep) {
624     JSONArray hostnamePort = new JSONArray();
625     hostnamePort.put(getHostname());
626 
627     hostnamePort.put(workerInfo.getPort());
628 
629     String myHealthPath = null;
630     if (isHealthy()) {
631       myHealthPath = getWorkerInfoHealthyPath(getApplicationAttempt(),
632           getSuperstep());
633     } else {
634       myHealthPath = getWorkerInfoUnhealthyPath(getApplicationAttempt(),
635           getSuperstep());
636     }
637     myHealthPath = myHealthPath + "/" + workerInfo.getHostnameId();
638     try {
639       myHealthZnode = getZkExt().createExt(
640           myHealthPath,
641           WritableUtils.writeToByteArray(workerInfo),
642           Ids.OPEN_ACL_UNSAFE,
643           CreateMode.EPHEMERAL,
644           true);
645     } catch (KeeperException.NodeExistsException e) {
646       LOG.warn("registerHealth: myHealthPath already exists (likely " +
647           "from previous failure): " + myHealthPath +
648           ".  Waiting for change in attempts " +
649           "to re-join the application");
650       getApplicationAttemptChangedEvent().waitForever();
651       if (LOG.isInfoEnabled()) {
652         LOG.info("registerHealth: Got application " +
653             "attempt changed event, killing self");
654       }
655       throw new IllegalStateException(
656           "registerHealth: Trying " +
657               "to get the new application attempt by killing self", e);
658     } catch (KeeperException e) {
659       throw new IllegalStateException("Creating " + myHealthPath +
660           " failed with KeeperException", e);
661     } catch (InterruptedException e) {
662       throw new IllegalStateException("Creating " + myHealthPath +
663           " failed with InterruptedException", e);
664     }
665     if (LOG.isInfoEnabled()) {
666       LOG.info("registerHealth: Created my health node for attempt=" +
667           getApplicationAttempt() + ", superstep=" +
668           getSuperstep() + " with " + myHealthZnode +
669           " and workerInfo= " + workerInfo);
670     }
671   }
672 
673   /**
674    * Do this to help notify the master quicker that this worker has failed.
675    */
676   private void unregisterHealth() {
677     LOG.error("unregisterHealth: Got failure, unregistering health on " +
678         myHealthZnode + " on superstep " + getSuperstep());
679     try {
680       getZkExt().deleteExt(myHealthZnode, -1, false);
681     } catch (InterruptedException e) {
682       throw new IllegalStateException(
683           "unregisterHealth: InterruptedException - Couldn't delete " +
684               myHealthZnode, e);
685     } catch (KeeperException e) {
686       throw new IllegalStateException(
687           "unregisterHealth: KeeperException - Couldn't delete " +
688               myHealthZnode, e);
689     }
690   }
691 
692   @Override
693   public void failureCleanup() {
694     unregisterHealth();
695   }
696 
697   @Override
698   public Collection<? extends PartitionOwner> startSuperstep() {
699     // Algorithm:
700     // 1. Communication service will combine message from previous
701     //    superstep
702     // 2. Register my health for the next superstep.
703     // 3. Wait until the partition assignment is complete and get it
704     // 4. Get the aggregator values from the previous superstep
705     if (getSuperstep() != INPUT_SUPERSTEP) {
706       workerServer.prepareSuperstep();
707     }
708 
709     registerHealth(getSuperstep());
710 
711     AddressesAndPartitionsWritable addressesAndPartitions =
712         addressesAndPartitionsHolder.getElement(getContext());
713 
714     workerInfoList.clear();
715     workerInfoList = addressesAndPartitions.getWorkerInfos();
716     masterInfo = addressesAndPartitions.getMasterInfo();
717 
718     if (LOG.isInfoEnabled()) {
719       LOG.info("startSuperstep: " + masterInfo);
720     }
721 
722     getContext().setStatus("startSuperstep: " +
723         getGraphTaskManager().getGraphFunctions().toString() +
724         " - Attempt=" + getApplicationAttempt() +
725         ", Superstep=" + getSuperstep());
726 
727     if (LOG.isDebugEnabled()) {
728       LOG.debug("startSuperstep: addressesAndPartitions" +
729           addressesAndPartitions.getWorkerInfos());
730       for (PartitionOwner partitionOwner : addressesAndPartitions
731           .getPartitionOwners()) {
732         LOG.debug(partitionOwner.getPartitionId() + " " +
733             partitionOwner.getWorkerInfo());
734       }
735     }
736 
737     return addressesAndPartitions.getPartitionOwners();
738   }
739 
740   @Override
741   public FinishedSuperstepStats finishSuperstep(
742       List<PartitionStats> partitionStatsList,
743       GiraphTimerContext superstepTimerContext) {
744     // This barrier blocks until success (or the master signals it to
745     // restart).
746     //
747     // Master will coordinate the barriers and aggregate "doneness" of all
748     // the vertices.  Each worker will:
749     // 1. Ensure that the requests are complete
750     // 2. Execute user postSuperstep() if necessary.
751     // 3. Save aggregator values that are in use.
752     // 4. Report the statistics (vertices, edges, messages, etc.)
753     //    of this worker
754     // 5. Let the master know it is finished.
755     // 6. Wait for the master's superstep info, and check if done
756     waitForRequestsToFinish();
757 
758     getGraphTaskManager().notifyFinishedCommunication();
759 
760     long workerSentMessages = 0;
761     long workerSentMessageBytes = 0;
762     long localVertices = 0;
763     for (PartitionStats partitionStats : partitionStatsList) {
764       workerSentMessages += partitionStats.getMessagesSentCount();
765       workerSentMessageBytes += partitionStats.getMessageBytesSentCount();
766       localVertices += partitionStats.getVertexCount();
767     }
768 
769     if (getSuperstep() != INPUT_SUPERSTEP) {
770       postSuperstepCallbacks();
771     }
772 
773     globalCommHandler.finishSuperstep(workerAggregatorRequestProcessor);
774 
775     MessageStore<I, Writable> incomingMessageStore =
776         getServerData().getIncomingMessageStore();
777     if (incomingMessageStore instanceof AsyncMessageStoreWrapper) {
778       ((AsyncMessageStoreWrapper) incomingMessageStore).waitToComplete();
779     }
780 
781     if (LOG.isInfoEnabled()) {
782       LOG.info("finishSuperstep: Superstep " + getSuperstep() +
783           ", messages = " + workerSentMessages + " " +
784           ", message bytes = " + workerSentMessageBytes + " , " +
785           MemoryUtils.getRuntimeMemoryStats());
786     }
787 
788     if (superstepTimerContext != null) {
789       superstepTimerContext.stop();
790     }
791     writeFinshedSuperstepInfoToZK(partitionStatsList,
792       workerSentMessages, workerSentMessageBytes);
793 
794     LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
795         "finishSuperstep: (waiting for rest " +
796             "of workers) " +
797             getGraphTaskManager().getGraphFunctions().toString() +
798             " - Attempt=" + getApplicationAttempt() +
799             ", Superstep=" + getSuperstep());
800 
801     String superstepFinishedNode =
802         getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep());
803 
804     waitForOtherWorkers(superstepFinishedNode);
805 
806     GlobalStats globalStats = new GlobalStats();
807     SuperstepClasses superstepClasses = SuperstepClasses.createToRead(
808         getConfiguration());
809     WritableUtils.readFieldsFromZnode(
810         getZkExt(), superstepFinishedNode, false, null, globalStats,
811         superstepClasses);
812     if (LOG.isInfoEnabled()) {
813       LOG.info("finishSuperstep: Completed superstep " + getSuperstep() +
814           " with global stats " + globalStats + " and classes " +
815           superstepClasses);
816     }
817     getContext().setStatus("finishSuperstep: (all workers done) " +
818         getGraphTaskManager().getGraphFunctions().toString() +
819         " - Attempt=" + getApplicationAttempt() +
820         ", Superstep=" + getSuperstep());
821     incrCachedSuperstep();
822     getConfiguration().updateSuperstepClasses(superstepClasses);
823 
824     return new FinishedSuperstepStats(
825         localVertices,
826         globalStats.getHaltComputation(),
827         globalStats.getVertexCount(),
828         globalStats.getEdgeCount(),
829         false,
830         globalStats.getCheckpointStatus());
831   }
832 
833   /**
834    * Handle post-superstep callbacks
835    */
836   private void postSuperstepCallbacks() {
837     GiraphTimerContext timerContext = wcPostSuperstepTimer.time();
838     getWorkerContext().postSuperstep();
839     timerContext.stop();
840     getContext().progress();
841 
842     for (WorkerObserver obs : getWorkerObservers()) {
843       obs.postSuperstep(getSuperstep());
844       getContext().progress();
845     }
846   }
847 
848   /**
849    * Wait for all the requests to finish.
850    */
851   private void waitForRequestsToFinish() {
852     if (LOG.isInfoEnabled()) {
853       LOG.info("finishSuperstep: Waiting on all requests, superstep " +
854           getSuperstep() + " " +
855           MemoryUtils.getRuntimeMemoryStats());
856     }
857     GiraphTimerContext timerContext = waitRequestsTimer.time();
858     workerClient.waitAllRequests();
859     timerContext.stop();
860   }
861 
862   /**
863    * Wait for all the other Workers to finish the superstep.
864    *
865    * @param superstepFinishedNode ZooKeeper path to wait on.
866    */
867   private void waitForOtherWorkers(String superstepFinishedNode) {
868     try {
869       while (getZkExt().exists(superstepFinishedNode, true) == null) {
870         getSuperstepFinishedEvent().waitForever();
871         getSuperstepFinishedEvent().reset();
872       }
873     } catch (KeeperException e) {
874       throw new IllegalStateException(
875           "finishSuperstep: Failed while waiting for master to " +
876               "signal completion of superstep " + getSuperstep(), e);
877     } catch (InterruptedException e) {
878       throw new IllegalStateException(
879           "finishSuperstep: Failed while waiting for master to " +
880               "signal completion of superstep " + getSuperstep(), e);
881     }
882   }
883 
884   /**
885    * Write finished superstep info to ZooKeeper.
886    *
887    * @param partitionStatsList List of partition stats from superstep.
888    * @param workerSentMessages Number of messages sent in superstep.
889    * @param workerSentMessageBytes Number of message bytes sent
890    *                               in superstep.
891    */
892   private void writeFinshedSuperstepInfoToZK(
893       List<PartitionStats> partitionStatsList, long workerSentMessages,
894       long workerSentMessageBytes) {
895     Collection<PartitionStats> finalizedPartitionStats =
896         workerGraphPartitioner.finalizePartitionStats(
897             partitionStatsList, getPartitionStore());
898     workerClient.sendWritableRequest(masterInfo.getTaskId(),
899         new PartitionStatsRequest(finalizedPartitionStats));
900     WorkerSuperstepMetrics metrics = new WorkerSuperstepMetrics();
901     metrics.readFromRegistry();
902     byte[] metricsBytes = WritableUtils.writeToByteArray(metrics);
903 
904     JSONObject workerFinishedInfoObj = new JSONObject();
905     try {
906       workerFinishedInfoObj.put(JSONOBJ_NUM_MESSAGES_KEY, workerSentMessages);
907       workerFinishedInfoObj.put(JSONOBJ_NUM_MESSAGE_BYTES_KEY,
908         workerSentMessageBytes);
909       workerFinishedInfoObj.put(JSONOBJ_METRICS_KEY,
910           Base64.encodeBytes(metricsBytes));
911     } catch (JSONException e) {
912       throw new RuntimeException(e);
913     }
914 
915     String finishedWorkerPath =
916         getWorkerFinishedPath(getApplicationAttempt(), getSuperstep()) +
917         "/" + getHostnamePartitionId();
918     try {
919       getZkExt().createExt(finishedWorkerPath,
920           workerFinishedInfoObj.toString().getBytes(Charset.defaultCharset()),
921           Ids.OPEN_ACL_UNSAFE,
922           CreateMode.PERSISTENT,
923           true);
924     } catch (KeeperException.NodeExistsException e) {
925       LOG.warn("finishSuperstep: finished worker path " +
926           finishedWorkerPath + " already exists!");
927     } catch (KeeperException e) {
928       throw new IllegalStateException("Creating " + finishedWorkerPath +
929           " failed with KeeperException", e);
930     } catch (InterruptedException e) {
931       throw new IllegalStateException("Creating " + finishedWorkerPath +
932           " failed with InterruptedException", e);
933     }
934   }
935 
936   /**
937    * Save the vertices using the user-defined VertexOutputFormat from our
938    * vertexArray based on the split.
939    *
940    * @param numLocalVertices Number of local vertices
941    * @throws InterruptedException
942    */
943   private void saveVertices(long numLocalVertices) throws IOException,
944       InterruptedException {
945     ImmutableClassesGiraphConfiguration<I, V, E>  conf = getConfiguration();
946 
947     if (conf.getVertexOutputFormatClass() == null) {
948       LOG.warn("saveVertices: " +
949           GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS +
950           " not specified -- there will be no saved output");
951       return;
952     }
953     if (conf.doOutputDuringComputation()) {
954       if (LOG.isInfoEnabled()) {
955         LOG.info("saveVertices: The option for doing output during " +
956             "computation is selected, so there will be no saving of the " +
957             "output in the end of application");
958       }
959       return;
960     }
961 
962     final int numPartitions = getPartitionStore().getNumPartitions();
963     int numThreads = Math.min(getConfiguration().getNumOutputThreads(),
964         numPartitions);
965     LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
966         "saveVertices: Starting to save " + numLocalVertices + " vertices " +
967             "using " + numThreads + " threads");
968     final VertexOutputFormat<I, V, E> vertexOutputFormat =
969         getConfiguration().createWrappedVertexOutputFormat();
970 
971     getPartitionStore().startIteration();
972 
973     long verticesToStore = 0;
974     PartitionStore<I, V, E> partitionStore = getPartitionStore();
975     for (int partitionId : partitionStore.getPartitionIds()) {
976       verticesToStore += partitionStore.getPartitionVertexCount(partitionId);
977     }
978     WorkerProgress.get().startStoring(
979         verticesToStore, getPartitionStore().getNumPartitions());
980 
981     CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
982       @Override
983       public Callable<Void> newCallable(int callableId) {
984         return new Callable<Void>() {
985           /** How often to update WorkerProgress */
986           private static final long VERTICES_TO_UPDATE_PROGRESS = 100000;
987 
988           @Override
989           public Void call() throws Exception {
990             VertexWriter<I, V, E> vertexWriter =
991                 vertexOutputFormat.createVertexWriter(getContext());
992             vertexWriter.setConf(getConfiguration());
993             vertexWriter.initialize(getContext());
994             long nextPrintVertices = 0;
995             long nextUpdateProgressVertices = VERTICES_TO_UPDATE_PROGRESS;
996             long nextPrintMsecs = System.currentTimeMillis() + 15000;
997             int partitionIndex = 0;
998             int numPartitions = getPartitionStore().getNumPartitions();
999             while (true) {
1000               Partition<I, V, E> partition =
1001                   getPartitionStore().getNextPartition();
1002               if (partition == null) {
1003                 break;
1004               }
1005 
1006               long verticesWritten = 0;
1007               for (Vertex<I, V, E> vertex : partition) {
1008                 vertexWriter.writeVertex(vertex);
1009                 ++verticesWritten;
1010 
1011                 // Update status at most every 250k vertices or 15 seconds
1012                 if (verticesWritten > nextPrintVertices &&
1013                     System.currentTimeMillis() > nextPrintMsecs) {
1014                   LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
1015                       "saveVertices: Saved " + verticesWritten + " out of " +
1016                           partition.getVertexCount() + " partition vertices, " +
1017                           "on partition " + partitionIndex +
1018                           " out of " + numPartitions);
1019                   nextPrintMsecs = System.currentTimeMillis() + 15000;
1020                   nextPrintVertices = verticesWritten + 250000;
1021                 }
1022 
1023                 if (verticesWritten >= nextUpdateProgressVertices) {
1024                   WorkerProgress.get().addVerticesStored(
1025                       VERTICES_TO_UPDATE_PROGRESS);
1026                   nextUpdateProgressVertices += VERTICES_TO_UPDATE_PROGRESS;
1027                 }
1028               }
1029               getPartitionStore().putPartition(partition);
1030               ++partitionIndex;
1031               WorkerProgress.get().addVerticesStored(
1032                   verticesWritten % VERTICES_TO_UPDATE_PROGRESS);
1033               WorkerProgress.get().incrementPartitionsStored();
1034             }
1035             vertexWriter.close(getContext()); // the temp results are saved now
1036             return null;
1037           }
1038         };
1039       }
1040     };
1041     ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
1042         "save-vertices-%d", getContext());
1043 
1044     LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
1045       "saveVertices: Done saving vertices.");
1046     // YARN: must complete the commit the "task" output, Hadoop isn't there.
1047     if (getConfiguration().isPureYarnJob() &&
1048       getConfiguration().getVertexOutputFormatClass() != null) {
1049       try {
1050         OutputCommitter outputCommitter =
1051           vertexOutputFormat.getOutputCommitter(getContext());
1052         if (outputCommitter.needsTaskCommit(getContext())) {
1053           LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
1054             "OutputCommitter: committing task output.");
1055           // transfer from temp dirs to "task commit" dirs to prep for
1056           // the master's OutputCommitter#commitJob(context) call to finish.
1057           outputCommitter.commitTask(getContext());
1058         }
1059       } catch (InterruptedException ie) {
1060         LOG.error("Interrupted while attempting to obtain " +
1061           "OutputCommitter.", ie);
1062       } catch (IOException ioe) {
1063         LOG.error("Master task's attempt to commit output has " +
1064           "FAILED.", ioe);
1065       }
1066     }
1067   }
1068 
1069   /**
1070    * Save the edges using the user-defined EdgeOutputFormat from our
1071    * vertexArray based on the split.
1072    *
1073    * @throws InterruptedException
1074    */
1075   private void saveEdges() throws IOException, InterruptedException {
1076     final ImmutableClassesGiraphConfiguration<I, V, E>  conf =
1077       getConfiguration();
1078 
1079     if (conf.getEdgeOutputFormatClass() == null) {
1080       LOG.warn("saveEdges: " +
1081                GiraphConstants.EDGE_OUTPUT_FORMAT_CLASS +
1082                "Make sure that the EdgeOutputFormat is not required.");
1083       return;
1084     }
1085 
1086     final int numPartitions = getPartitionStore().getNumPartitions();
1087     int numThreads = Math.min(conf.getNumOutputThreads(),
1088         numPartitions);
1089     LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
1090         "saveEdges: Starting to save the edges using " +
1091         numThreads + " threads");
1092     final EdgeOutputFormat<I, V, E> edgeOutputFormat =
1093         conf.createWrappedEdgeOutputFormat();
1094 
1095     getPartitionStore().startIteration();
1096 
1097     CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
1098       @Override
1099       public Callable<Void> newCallable(int callableId) {
1100         return new Callable<Void>() {
1101           @Override
1102           public Void call() throws Exception {
1103             EdgeWriter<I, V, E>  edgeWriter =
1104                 edgeOutputFormat.createEdgeWriter(getContext());
1105             edgeWriter.setConf(conf);
1106             edgeWriter.initialize(getContext());
1107 
1108             long nextPrintVertices = 0;
1109             long nextPrintMsecs = System.currentTimeMillis() + 15000;
1110             int partitionIndex = 0;
1111             int numPartitions = getPartitionStore().getNumPartitions();
1112             while (true) {
1113               Partition<I, V, E> partition =
1114                   getPartitionStore().getNextPartition();
1115               if (partition == null) {
1116                 break;
1117               }
1118 
1119               long vertices = 0;
1120               long edges = 0;
1121               long partitionEdgeCount = partition.getEdgeCount();
1122               for (Vertex<I, V, E> vertex : partition) {
1123                 for (Edge<I, E> edge : vertex.getEdges()) {
1124                   edgeWriter.writeEdge(vertex.getId(), vertex.getValue(), edge);
1125                   ++edges;
1126                 }
1127                 ++vertices;
1128 
1129                 // Update status at most every 250k vertices or 15 seconds
1130                 if (vertices > nextPrintVertices &&
1131                     System.currentTimeMillis() > nextPrintMsecs) {
1132                   LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
1133                       "saveEdges: Saved " + edges +
1134                       " edges out of " + partitionEdgeCount +
1135                       " partition edges, on partition " + partitionIndex +
1136                       " out of " + numPartitions);
1137                   nextPrintMsecs = System.currentTimeMillis() + 15000;
1138                   nextPrintVertices = vertices + 250000;
1139                 }
1140               }
1141               getPartitionStore().putPartition(partition);
1142               ++partitionIndex;
1143             }
1144             edgeWriter.close(getContext()); // the temp results are saved now
1145             return null;
1146           }
1147         };
1148       }
1149     };
1150     ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
1151         "save-vertices-%d", getContext());
1152 
1153     LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
1154       "saveEdges: Done saving edges.");
1155     // YARN: must complete the commit the "task" output, Hadoop isn't there.
1156     if (conf.isPureYarnJob() &&
1157       conf.getVertexOutputFormatClass() != null) {
1158       try {
1159         OutputCommitter outputCommitter =
1160           edgeOutputFormat.getOutputCommitter(getContext());
1161         if (outputCommitter.needsTaskCommit(getContext())) {
1162           LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
1163             "OutputCommitter: committing task output.");
1164           // transfer from temp dirs to "task commit" dirs to prep for
1165           // the master's OutputCommitter#commitJob(context) call to finish.
1166           outputCommitter.commitTask(getContext());
1167         }
1168       } catch (InterruptedException ie) {
1169         LOG.error("Interrupted while attempting to obtain " +
1170           "OutputCommitter.", ie);
1171       } catch (IOException ioe) {
1172         LOG.error("Master task's attempt to commit output has " +
1173           "FAILED.", ioe);
1174       }
1175     }
1176   }
1177 
1178   @Override
1179   public void cleanup(FinishedSuperstepStats finishedSuperstepStats)
1180     throws IOException, InterruptedException {
1181     workerClient.closeConnections();
1182     setCachedSuperstep(getSuperstep() - 1);
1183     if (finishedSuperstepStats.getCheckpointStatus() !=
1184         CheckpointStatus.CHECKPOINT_AND_HALT) {
1185       saveVertices(finishedSuperstepStats.getLocalVertexCount());
1186       saveEdges();
1187     }
1188     WorkerProgress.get().finishStoring();
1189     if (workerProgressWriter != null) {
1190       workerProgressWriter.stop();
1191     }
1192     getPartitionStore().shutdown();
1193     // All worker processes should denote they are done by adding special
1194     // znode.  Once the number of znodes equals the number of partitions
1195     // for workers and masters, the master will clean up the ZooKeeper
1196     // znodes associated with this job.
1197     String workerCleanedUpPath = cleanedUpPath  + "/" +
1198         getTaskPartition() + WORKER_SUFFIX;
1199     try {
1200       String finalFinishedPath =
1201           getZkExt().createExt(workerCleanedUpPath,
1202               null,
1203               Ids.OPEN_ACL_UNSAFE,
1204               CreateMode.PERSISTENT,
1205               true);
1206       if (LOG.isInfoEnabled()) {
1207         LOG.info("cleanup: Notifying master its okay to cleanup with " +
1208             finalFinishedPath);
1209       }
1210     } catch (KeeperException.NodeExistsException e) {
1211       if (LOG.isInfoEnabled()) {
1212         LOG.info("cleanup: Couldn't create finished node '" +
1213             workerCleanedUpPath);
1214       }
1215     } catch (KeeperException e) {
1216       // Cleaning up, it's okay to fail after cleanup is successful
1217       LOG.error("cleanup: Got KeeperException on notification " +
1218           "to master about cleanup", e);
1219     } catch (InterruptedException e) {
1220       // Cleaning up, it's okay to fail after cleanup is successful
1221       LOG.error("cleanup: Got InterruptedException on notification " +
1222           "to master about cleanup", e);
1223     }
1224     try {
1225       getZkExt().close();
1226     } catch (InterruptedException e) {
1227       // cleanup phase -- just log the error
1228       LOG.error("cleanup: Zookeeper failed to close with " + e);
1229     }
1230 
1231     if (getConfiguration().metricsEnabled()) {
1232       GiraphMetrics.get().dumpToStream(System.err);
1233     }
1234 
1235     // Preferably would shut down the service only after
1236     // all clients have disconnected (or the exceptions on the
1237     // client side ignored).
1238     workerServer.close();
1239   }
1240 
1241   @Override
1242   public void storeCheckpoint() throws IOException {
1243     LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
1244         "storeCheckpoint: Starting checkpoint " +
1245             getGraphTaskManager().getGraphFunctions().toString() +
1246             " - Attempt=" + getApplicationAttempt() +
1247             ", Superstep=" + getSuperstep());
1248 
1249     // Algorithm:
1250     // For each partition, dump vertices and messages
1251     Path metadataFilePath = createCheckpointFilePathSafe(
1252         CheckpointingUtils.CHECKPOINT_METADATA_POSTFIX);
1253     Path validFilePath = createCheckpointFilePathSafe(
1254         CheckpointingUtils.CHECKPOINT_VALID_POSTFIX);
1255     Path checkpointFilePath = createCheckpointFilePathSafe(
1256         CheckpointingUtils.CHECKPOINT_DATA_POSTFIX);
1257 
1258 
1259     // Metadata is buffered and written at the end since it's small and
1260     // needs to know how many partitions this worker owns
1261     FSDataOutputStream metadataOutputStream =
1262         getFs().create(metadataFilePath);
1263     metadataOutputStream.writeInt(getPartitionStore().getNumPartitions());
1264 
1265     for (Integer partitionId : getPartitionStore().getPartitionIds()) {
1266       metadataOutputStream.writeInt(partitionId);
1267     }
1268     metadataOutputStream.close();
1269 
1270     storeCheckpointVertices();
1271 
1272     FSDataOutputStream checkpointOutputStream =
1273         getFs().create(checkpointFilePath);
1274     workerContext.write(checkpointOutputStream);
1275     getContext().progress();
1276 
1277     // TODO: checkpointing messages along with vertices to avoid multiple loads
1278     //       of a partition when out-of-core is enabled.
1279     for (Integer partitionId : getPartitionStore().getPartitionIds()) {
1280       // write messages
1281       checkpointOutputStream.writeInt(partitionId);
1282       getServerData().getCurrentMessageStore()
1283           .writePartition(checkpointOutputStream, partitionId);
1284       getContext().progress();
1285 
1286     }
1287 
1288     List<Writable> w2wMessages =
1289         getServerData().getCurrentWorkerToWorkerMessages();
1290     WritableUtils.writeList(w2wMessages, checkpointOutputStream);
1291 
1292     checkpointOutputStream.close();
1293 
1294     getFs().createNewFile(validFilePath);
1295 
1296     // Notify master that checkpoint is stored
1297     String workerWroteCheckpoint =
1298         getWorkerWroteCheckpointPath(getApplicationAttempt(),
1299             getSuperstep()) + "/" + getHostnamePartitionId();
1300     try {
1301       getZkExt().createExt(workerWroteCheckpoint,
1302           new byte[0],
1303           Ids.OPEN_ACL_UNSAFE,
1304           CreateMode.PERSISTENT,
1305           true);
1306     } catch (KeeperException.NodeExistsException e) {
1307       LOG.warn("storeCheckpoint: wrote checkpoint worker path " +
1308           workerWroteCheckpoint + " already exists!");
1309     } catch (KeeperException e) {
1310       throw new IllegalStateException("Creating " + workerWroteCheckpoint +
1311           " failed with KeeperException", e);
1312     } catch (InterruptedException e) {
1313       throw new IllegalStateException("Creating " +
1314           workerWroteCheckpoint +
1315           " failed with InterruptedException", e);
1316     }
1317   }
1318 
1319   /**
1320    * Create checkpoint file safely. If file already exists remove it first.
1321    * @param name file extension
1322    * @return full file path to newly created file
1323    * @throws IOException
1324    */
1325   private Path createCheckpointFilePathSafe(String name) throws IOException {
1326     Path validFilePath = new Path(getCheckpointBasePath(getSuperstep()) + '.' +
1327         getWorkerId(workerInfo) + name);
1328     // Remove these files if they already exist (shouldn't though, unless
1329     // of previous failure of this worker)
1330     if (getFs().delete(validFilePath, false)) {
1331       LOG.warn("storeCheckpoint: Removed " + name + " file " +
1332           validFilePath);
1333     }
1334     return validFilePath;
1335   }
1336 
1337   /**
1338    * Returns path to saved checkpoint.
1339    * Doesn't check if file actually exists.
1340    * @param superstep saved superstep.
1341    * @param name extension name
1342    * @return fill file path to checkpoint file
1343    */
1344   private Path getSavedCheckpoint(long superstep, String name) {
1345     return new Path(getSavedCheckpointBasePath(superstep) + '.' +
1346         getWorkerId(workerInfo) + name);
1347   }
1348 
1349   /**
1350    * Save partitions. To speed up this operation
1351    * runs in multiple threads.
1352    */
1353   private void storeCheckpointVertices() {
1354     final int numPartitions = getPartitionStore().getNumPartitions();
1355     int numThreads = Math.min(
1356         GiraphConstants.NUM_CHECKPOINT_IO_THREADS.get(getConfiguration()),
1357         numPartitions);
1358 
1359     getPartitionStore().startIteration();
1360 
1361     final CompressionCodec codec =
1362         new CompressionCodecFactory(getConfiguration())
1363             .getCodec(new Path(
1364                 GiraphConstants.CHECKPOINT_COMPRESSION_CODEC
1365                     .get(getConfiguration())));
1366 
1367     long t0 = System.currentTimeMillis();
1368 
1369     CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
1370       @Override
1371       public Callable<Void> newCallable(int callableId) {
1372         return new Callable<Void>() {
1373 
1374           @Override
1375           public Void call() throws Exception {
1376             while (true) {
1377               Partition<I, V, E> partition =
1378                   getPartitionStore().getNextPartition();
1379               if (partition == null) {
1380                 break;
1381               }
1382               Path path =
1383                   createCheckpointFilePathSafe("_" + partition.getId() +
1384                       CheckpointingUtils.CHECKPOINT_VERTICES_POSTFIX);
1385 
1386               FSDataOutputStream uncompressedStream =
1387                   getFs().create(path);
1388 
1389 
1390               DataOutputStream stream = codec == null ? uncompressedStream :
1391                   new DataOutputStream(
1392                       codec.createOutputStream(uncompressedStream));
1393 
1394 
1395               partition.write(stream);
1396 
1397               getPartitionStore().putPartition(partition);
1398 
1399               stream.close();
1400               uncompressedStream.close();
1401             }
1402             return null;
1403           }
1404 
1405 
1406         };
1407       }
1408     };
1409 
1410     ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
1411         "checkpoint-vertices-%d", getContext());
1412 
1413     LOG.info("Save checkpoint in " + (System.currentTimeMillis() - t0) +
1414         " ms, using " + numThreads + " threads");
1415   }
1416 
1417   /**
1418    * Load saved partitions in multiple threads.
1419    * @param superstep superstep to load
1420    * @param partitions list of partitions to load
1421    */
1422   private void loadCheckpointVertices(final long superstep,
1423                                       List<Integer> partitions) {
1424     int numThreads = Math.min(
1425         GiraphConstants.NUM_CHECKPOINT_IO_THREADS.get(getConfiguration()),
1426         partitions.size());
1427 
1428     final Queue<Integer> partitionIdQueue =
1429         new ConcurrentLinkedQueue<>(partitions);
1430 
1431     final CompressionCodec codec =
1432         new CompressionCodecFactory(getConfiguration())
1433             .getCodec(new Path(
1434                 GiraphConstants.CHECKPOINT_COMPRESSION_CODEC
1435                     .get(getConfiguration())));
1436 
1437     long t0 = System.currentTimeMillis();
1438 
1439     CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
1440       @Override
1441       public Callable<Void> newCallable(int callableId) {
1442         return new Callable<Void>() {
1443 
1444           @Override
1445           public Void call() throws Exception {
1446             while (!partitionIdQueue.isEmpty()) {
1447               Integer partitionId = partitionIdQueue.poll();
1448               if (partitionId == null) {
1449                 break;
1450               }
1451               Path path =
1452                   getSavedCheckpoint(superstep, "_" + partitionId +
1453                       CheckpointingUtils.CHECKPOINT_VERTICES_POSTFIX);
1454 
1455               FSDataInputStream compressedStream =
1456                   getFs().open(path);
1457 
1458               DataInputStream stream = codec == null ? compressedStream :
1459                   new DataInputStream(
1460                       codec.createInputStream(compressedStream));
1461 
1462               Partition<I, V, E> partition =
1463                   getConfiguration().createPartition(partitionId, getContext());
1464 
1465               partition.readFields(stream);
1466 
1467               getPartitionStore().addPartition(partition);
1468 
1469               stream.close();
1470             }
1471             return null;
1472           }
1473 
1474         };
1475       }
1476     };
1477 
1478     ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
1479         "load-vertices-%d", getContext());
1480 
1481     LOG.info("Loaded checkpoint in " + (System.currentTimeMillis() - t0) +
1482         " ms, using " + numThreads + " threads");
1483   }
1484 
1485   @Override
1486   public VertexEdgeCount loadCheckpoint(long superstep) {
1487     Path metadataFilePath = getSavedCheckpoint(
1488         superstep, CheckpointingUtils.CHECKPOINT_METADATA_POSTFIX);
1489 
1490     Path checkpointFilePath = getSavedCheckpoint(
1491         superstep, CheckpointingUtils.CHECKPOINT_DATA_POSTFIX);
1492     // Algorithm:
1493     // Examine all the partition owners and load the ones
1494     // that match my hostname and id from the master designated checkpoint
1495     // prefixes.
1496     try {
1497       DataInputStream metadataStream =
1498           getFs().open(metadataFilePath);
1499 
1500       int partitions = metadataStream.readInt();
1501       List<Integer> partitionIds = new ArrayList<>(partitions);
1502       for (int i = 0; i < partitions; i++) {
1503         int partitionId = metadataStream.readInt();
1504         partitionIds.add(partitionId);
1505       }
1506 
1507       loadCheckpointVertices(superstep, partitionIds);
1508 
1509       getContext().progress();
1510 
1511       metadataStream.close();
1512 
1513       DataInputStream checkpointStream =
1514           getFs().open(checkpointFilePath);
1515       workerContext.readFields(checkpointStream);
1516 
1517       // Load global stats and superstep classes
1518       GlobalStats globalStats = new GlobalStats();
1519       SuperstepClasses superstepClasses = SuperstepClasses.createToRead(
1520           getConfiguration());
1521       String finalizedCheckpointPath = getSavedCheckpointBasePath(superstep) +
1522           CheckpointingUtils.CHECKPOINT_FINALIZED_POSTFIX;
1523       DataInputStream finalizedStream =
1524           getFs().open(new Path(finalizedCheckpointPath));
1525       globalStats.readFields(finalizedStream);
1526       superstepClasses.readFields(finalizedStream);
1527       getConfiguration().updateSuperstepClasses(superstepClasses);
1528       getServerData().resetMessageStores();
1529 
1530       // TODO: checkpointing messages along with vertices to avoid multiple
1531       //       loads of a partition when out-of-core is enabled.
1532       for (int i = 0; i < partitions; i++) {
1533         int partitionId = checkpointStream.readInt();
1534         getServerData().getCurrentMessageStore()
1535             .readFieldsForPartition(checkpointStream, partitionId);
1536       }
1537 
1538       List<Writable> w2wMessages = (List<Writable>) WritableUtils.readList(
1539           checkpointStream);
1540       getServerData().getCurrentWorkerToWorkerMessages().addAll(w2wMessages);
1541 
1542       checkpointStream.close();
1543 
1544       if (LOG.isInfoEnabled()) {
1545         LOG.info("loadCheckpoint: Loaded " +
1546             workerGraphPartitioner.getPartitionOwners().size() +
1547             " total.");
1548       }
1549 
1550       // Communication service needs to setup the connections prior to
1551       // processing vertices
1552 /*if[HADOOP_NON_SECURE]
1553       workerClient.setup();
1554 else[HADOOP_NON_SECURE]*/
1555       workerClient.setup(getConfiguration().authenticate());
1556 /*end[HADOOP_NON_SECURE]*/
1557       return new VertexEdgeCount(globalStats.getVertexCount(),
1558           globalStats.getEdgeCount(), 0);
1559 
1560     } catch (IOException e) {
1561       throw new RuntimeException(
1562           "loadCheckpoint: Failed for superstep=" + superstep, e);
1563     }
1564   }
1565 
1566   /**
1567    * Send the worker partitions to their destination workers
1568    *
1569    * @param workerPartitionMap Map of worker info to the partitions stored
1570    *        on this worker to be sent
1571    */
1572   private void sendWorkerPartitions(
1573       Map<WorkerInfo, List<Integer>> workerPartitionMap) {
1574     List<Entry<WorkerInfo, List<Integer>>> randomEntryList =
1575         new ArrayList<Entry<WorkerInfo, List<Integer>>>(
1576             workerPartitionMap.entrySet());
1577     Collections.shuffle(randomEntryList);
1578     WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor =
1579         new NettyWorkerClientRequestProcessor<I, V, E>(getContext(),
1580             getConfiguration(), this,
1581             false /* useOneMessageToManyIdsEncoding */);
1582     for (Entry<WorkerInfo, List<Integer>> workerPartitionList :
1583       randomEntryList) {
1584       for (Integer partitionId : workerPartitionList.getValue()) {
1585         Partition<I, V, E> partition =
1586             getPartitionStore().removePartition(partitionId);
1587         if (partition == null) {
1588           throw new IllegalStateException(
1589               "sendWorkerPartitions: Couldn't find partition " +
1590                   partitionId + " to send to " +
1591                   workerPartitionList.getKey());
1592         }
1593         if (LOG.isInfoEnabled()) {
1594           LOG.info("sendWorkerPartitions: Sending worker " +
1595               workerPartitionList.getKey() + " partition " +
1596               partitionId);
1597         }
1598         workerClientRequestProcessor.sendPartitionRequest(
1599             workerPartitionList.getKey(),
1600             partition);
1601       }
1602     }
1603 
1604     try {
1605       workerClientRequestProcessor.flush();
1606       workerClient.waitAllRequests();
1607     } catch (IOException e) {
1608       throw new IllegalStateException("sendWorkerPartitions: Flush failed", e);
1609     }
1610     String myPartitionExchangeDonePath =
1611         getPartitionExchangeWorkerPath(
1612             getApplicationAttempt(), getSuperstep(), getWorkerInfo());
1613     try {
1614       getZkExt().createExt(myPartitionExchangeDonePath,
1615           null,
1616           Ids.OPEN_ACL_UNSAFE,
1617           CreateMode.PERSISTENT,
1618           true);
1619     } catch (KeeperException e) {
1620       throw new IllegalStateException(
1621           "sendWorkerPartitions: KeeperException to create " +
1622               myPartitionExchangeDonePath, e);
1623     } catch (InterruptedException e) {
1624       throw new IllegalStateException(
1625           "sendWorkerPartitions: InterruptedException to create " +
1626               myPartitionExchangeDonePath, e);
1627     }
1628     if (LOG.isInfoEnabled()) {
1629       LOG.info("sendWorkerPartitions: Done sending all my partitions.");
1630     }
1631   }
1632 
1633   @Override
1634   public final void exchangeVertexPartitions(
1635       Collection<? extends PartitionOwner> masterSetPartitionOwners) {
1636     // 1. Fix the addresses of the partition ids if they have changed.
1637     // 2. Send all the partitions to their destination workers in a random
1638     //    fashion.
1639     // 3. Notify completion with a ZooKeeper stamp
1640     // 4. Wait for all my dependencies to be done (if any)
1641     // 5. Add the partitions to myself.
1642     PartitionExchange partitionExchange =
1643         workerGraphPartitioner.updatePartitionOwners(
1644             getWorkerInfo(), masterSetPartitionOwners);
1645     workerClient.openConnections();
1646 
1647     Map<WorkerInfo, List<Integer>> sendWorkerPartitionMap =
1648         partitionExchange.getSendWorkerPartitionMap();
1649     if (!getPartitionStore().isEmpty()) {
1650       sendWorkerPartitions(sendWorkerPartitionMap);
1651     }
1652 
1653     Set<WorkerInfo> myDependencyWorkerSet =
1654         partitionExchange.getMyDependencyWorkerSet();
1655     Set<String> workerIdSet = new HashSet<String>();
1656     for (WorkerInfo tmpWorkerInfo : myDependencyWorkerSet) {
1657       if (!workerIdSet.add(tmpWorkerInfo.getHostnameId())) {
1658         throw new IllegalStateException(
1659             "exchangeVertexPartitions: Duplicate entry " + tmpWorkerInfo);
1660       }
1661     }
1662     if (myDependencyWorkerSet.isEmpty() && getPartitionStore().isEmpty()) {
1663       if (LOG.isInfoEnabled()) {
1664         LOG.info("exchangeVertexPartitions: Nothing to exchange, " +
1665             "exiting early");
1666       }
1667       return;
1668     }
1669 
1670     String vertexExchangePath =
1671         getPartitionExchangePath(getApplicationAttempt(), getSuperstep());
1672     List<String> workerDoneList;
1673     try {
1674       while (true) {
1675         workerDoneList = getZkExt().getChildrenExt(
1676             vertexExchangePath, true, false, false);
1677         workerIdSet.removeAll(workerDoneList);
1678         if (workerIdSet.isEmpty()) {
1679           break;
1680         }
1681         if (LOG.isInfoEnabled()) {
1682           LOG.info("exchangeVertexPartitions: Waiting for workers " +
1683               workerIdSet);
1684         }
1685         getPartitionExchangeChildrenChangedEvent().waitForever();
1686         getPartitionExchangeChildrenChangedEvent().reset();
1687       }
1688     } catch (KeeperException | InterruptedException e) {
1689       throw new RuntimeException(
1690           "exchangeVertexPartitions: Got runtime exception", e);
1691     }
1692 
1693     if (LOG.isInfoEnabled()) {
1694       LOG.info("exchangeVertexPartitions: Done with exchange.");
1695     }
1696   }
1697 
1698   /**
1699    * Get event when the state of a partition exchange has changed.
1700    *
1701    * @return Event to check.
1702    */
1703   public final BspEvent getPartitionExchangeChildrenChangedEvent() {
1704     return partitionExchangeChildrenChanged;
1705   }
1706 
1707   @Override
1708   protected boolean processEvent(WatchedEvent event) {
1709     boolean foundEvent = false;
1710     if (event.getPath().startsWith(masterJobStatePath) &&
1711         (event.getType() == EventType.NodeChildrenChanged)) {
1712       if (LOG.isInfoEnabled()) {
1713         LOG.info("processEvent: Job state changed, checking " +
1714             "to see if it needs to restart");
1715       }
1716       JSONObject jsonObj = getJobState();
1717       // in YARN, we have to manually commit our own output in 2 stages that we
1718       // do not have to do in Hadoop-based Giraph. So jsonObj can be null.
1719       if (getConfiguration().isPureYarnJob() && null == jsonObj) {
1720         LOG.error("BspServiceWorker#getJobState() came back NULL.");
1721         return false; // the event has been processed.
1722       }
1723       try {
1724         if ((ApplicationState.valueOf(jsonObj.getString(JSONOBJ_STATE_KEY)) ==
1725             ApplicationState.START_SUPERSTEP) &&
1726             jsonObj.getLong(JSONOBJ_APPLICATION_ATTEMPT_KEY) !=
1727             getApplicationAttempt()) {
1728           LOG.fatal("processEvent: Worker will restart " +
1729               "from command - " + jsonObj.toString());
1730           System.exit(-1);
1731         }
1732       } catch (JSONException e) {
1733         throw new RuntimeException(
1734             "processEvent: Couldn't properly get job state from " +
1735                 jsonObj.toString());
1736       }
1737       foundEvent = true;
1738     } else if (event.getPath().contains(PARTITION_EXCHANGE_DIR) &&
1739         event.getType() == EventType.NodeChildrenChanged) {
1740       if (LOG.isInfoEnabled()) {
1741         LOG.info("processEvent : partitionExchangeChildrenChanged " +
1742             "(at least one worker is done sending partitions)");
1743       }
1744       partitionExchangeChildrenChanged.signal();
1745       foundEvent = true;
1746     } else if (event.getPath().contains(MEMORY_OBSERVER_DIR) &&
1747         event.getType() == EventType.NodeChildrenChanged) {
1748       memoryObserver.callGc();
1749       foundEvent = true;
1750     }
1751 
1752     return foundEvent;
1753   }
1754 
1755   @Override
1756   public WorkerInfo getWorkerInfo() {
1757     return workerInfo;
1758   }
1759 
1760   @Override
1761   public PartitionStore<I, V, E> getPartitionStore() {
1762     return getServerData().getPartitionStore();
1763   }
1764 
1765   @Override
1766   public PartitionOwner getVertexPartitionOwner(I vertexId) {
1767     return workerGraphPartitioner.getPartitionOwner(vertexId);
1768   }
1769 
1770   @Override
1771   public Iterable<? extends PartitionOwner> getPartitionOwners() {
1772     return workerGraphPartitioner.getPartitionOwners();
1773   }
1774 
1775   @Override
1776   public int getPartitionId(I vertexId) {
1777     PartitionOwner partitionOwner = getVertexPartitionOwner(vertexId);
1778     return partitionOwner.getPartitionId();
1779   }
1780 
1781   @Override
1782   public boolean hasPartition(Integer partitionId) {
1783     return getPartitionStore().hasPartition(partitionId);
1784   }
1785 
1786   @Override
1787   public ServerData<I, V, E> getServerData() {
1788     return workerServer.getServerData();
1789   }
1790 
1791 
1792   @Override
1793   public WorkerAggregatorHandler getAggregatorHandler() {
1794     return globalCommHandler;
1795   }
1796 
1797   @Override
1798   public void prepareSuperstep() {
1799     if (getSuperstep() != INPUT_SUPERSTEP) {
1800       globalCommHandler.prepareSuperstep(workerAggregatorRequestProcessor);
1801     }
1802   }
1803 
1804   @Override
1805   public SuperstepOutput<I, V, E> getSuperstepOutput() {
1806     return superstepOutput;
1807   }
1808 
1809   @Override
1810   public GlobalStats getGlobalStats() {
1811     GlobalStats globalStats = new GlobalStats();
1812     if (getSuperstep() > Math.max(INPUT_SUPERSTEP, getRestartedSuperstep())) {
1813       String superstepFinishedNode =
1814           getSuperstepFinishedPath(getApplicationAttempt(),
1815               getSuperstep() - 1);
1816       WritableUtils.readFieldsFromZnode(
1817           getZkExt(), superstepFinishedNode, false, null,
1818           globalStats);
1819     }
1820     return globalStats;
1821   }
1822 
1823   @Override
1824   public WorkerInputSplitsHandler getInputSplitsHandler() {
1825     return inputSplitsHandler;
1826   }
1827 
1828   @Override
1829   public void addressesAndPartitionsReceived(
1830       AddressesAndPartitionsWritable addressesAndPartitions) {
1831     addressesAndPartitionsHolder.offer(addressesAndPartitions);
1832   }
1833 }