This project has retired. For details please refer to its Attic page.
TestCheckpointing xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph;
20  
21  import org.apache.giraph.aggregators.LongSumAggregator;
22  import org.apache.giraph.bsp.BspService;
23  import org.apache.giraph.conf.GiraphConfiguration;
24  import org.apache.giraph.conf.GiraphConstants;
25  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
26  import org.apache.giraph.edge.Edge;
27  import org.apache.giraph.edge.EdgeFactory;
28  import org.apache.giraph.examples.SimpleSuperstepComputation;
29  import org.apache.giraph.graph.BasicComputation;
30  import org.apache.giraph.graph.Vertex;
31  import org.apache.giraph.job.GiraphJob;
32  import org.apache.giraph.master.DefaultMasterCompute;
33  import org.apache.giraph.worker.DefaultWorkerContext;
34  import org.apache.giraph.zk.ZooKeeperExt;
35  import org.apache.giraph.zk.ZooKeeperManager;
36  import org.apache.hadoop.fs.FileStatus;
37  import org.apache.hadoop.fs.Path;
38  import org.apache.hadoop.io.FloatWritable;
39  import org.apache.hadoop.io.IntWritable;
40  import org.apache.hadoop.io.LongWritable;
41  import org.apache.hadoop.io.Writable;
42  import org.apache.log4j.Logger;
43  import org.apache.zookeeper.CreateMode;
44  import org.apache.zookeeper.KeeperException;
45  import org.apache.zookeeper.ZooDefs;
46  import org.junit.Assert;
47  import org.junit.Test;
48  
49  import java.io.DataInput;
50  import java.io.DataOutput;
51  import java.io.IOException;
52  import java.util.List;
53  
54  import static org.junit.Assert.assertEquals;
55  import static org.junit.Assert.assertTrue;
56  import static org.junit.Assert.fail;
57  
58  /**
59   * Tests that worker context and master computation
60   * are properly saved and loaded back at checkpoint.
61   */
62  public class TestCheckpointing extends BspCase {
63  
64    /** Class logger */
65    private static final Logger LOG =
66        Logger.getLogger(TestCheckpointing.class);
67    /** ID to be used with test job */
68    public static final String TEST_JOB_ID = "test_job";
69  
70    private static SuperstepCallback SUPERSTEP_CALLBACK;
71  
72    /**
73     * Create the test case
74     */
75    public TestCheckpointing() {
76      super(TestCheckpointing.class.getName());
77    }
78  
79    @Test
80    public void testBspCheckpoint() throws InterruptedException, IOException, ClassNotFoundException {
81      testBspCheckpoint(false);
82    }
83  
84    @Test
85    public void testAsyncMessageStoreCheckpoint() throws InterruptedException, IOException, ClassNotFoundException {
86      testBspCheckpoint(true);
87    }
88  
89    public void testBspCheckpoint(boolean useAsyncMessageStore)
90        throws IOException, InterruptedException, ClassNotFoundException {
91      Path checkpointsDir = getTempPath("checkpointing");
92      GiraphConfiguration conf = new GiraphConfiguration();
93      if (useAsyncMessageStore) {
94        GiraphConstants.ASYNC_MESSAGE_STORE_THREADS_COUNT.set(conf, 2);
95      }
96  
97      SUPERSTEP_CALLBACK = null;
98  
99      GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.set(conf, false);
100     conf.setCheckpointFrequency(2);
101 
102     long idSum = runOriginalJob(checkpointsDir, conf);
103     assertEquals(10, idSum);
104 
105     SUPERSTEP_CALLBACK = new SuperstepCallback() {
106       @Override
107       public void superstep(long superstep,
108                             ImmutableClassesGiraphConfiguration<LongWritable, IntWritable, FloatWritable> conf) {
109         if (superstep < 2) {
110           Assert.fail("Restarted JOB should not be executed on superstep " + superstep);
111         }
112       }
113     };
114 
115     runRestartedJob(checkpointsDir, conf, idSum, 2);
116 
117 
118   }
119 
120   private void runRestartedJob(Path checkpointsDir, GiraphConfiguration conf, long idSum, long restartFrom) throws IOException, InterruptedException, ClassNotFoundException {
121     Path outputPath;
122     LOG.info("testBspCheckpoint: Restarting from the latest superstep " +
123         "with checkpoint path = " + checkpointsDir);
124     outputPath = getTempPath("checkpointing_restarted");
125 
126     GiraphConstants.RESTART_JOB_ID.set(conf, TEST_JOB_ID);
127     conf.set("mapred.job.id", "restarted_test_job");
128     if (restartFrom >= 0) {
129       conf.set(GiraphConstants.RESTART_SUPERSTEP, Long.toString(restartFrom));
130     }
131 
132     GiraphJob restartedJob = prepareJob(getCallingMethodName() + "Restarted",
133         conf, outputPath);
134 
135     GiraphConstants.CHECKPOINT_DIRECTORY.set(restartedJob.getConfiguration(),
136         checkpointsDir.toString());
137 
138     assertTrue(restartedJob.run(true));
139 
140 
141     if (!runningInDistributedMode()) {
142       long idSumRestarted =
143           CheckpointVertexWorkerContext
144               .getFinalSum();
145       LOG.info("testBspCheckpoint: idSumRestarted = " +
146           idSumRestarted);
147       assertEquals(idSum, idSumRestarted);
148     }
149   }
150 
151   private long runOriginalJob(Path checkpointsDir,  GiraphConfiguration conf) throws IOException, InterruptedException, ClassNotFoundException {
152     Path outputPath = getTempPath("checkpointing_original");
153     conf.setComputationClass(
154         CheckpointComputation.class);
155     conf.setWorkerContextClass(
156         CheckpointVertexWorkerContext.class);
157     conf.setMasterComputeClass(
158         CheckpointVertexMasterCompute.class);
159     conf.setVertexInputFormatClass(SimpleSuperstepComputation.SimpleSuperstepVertexInputFormat.class);
160     conf.setVertexOutputFormatClass(SimpleSuperstepComputation.SimpleSuperstepVertexOutputFormat.class);
161     conf.set("mapred.job.id", TEST_JOB_ID);
162     GiraphJob job = prepareJob(getCallingMethodName(), conf, outputPath);
163 
164     GiraphConfiguration configuration = job.getConfiguration();
165     GiraphConstants.CHECKPOINT_DIRECTORY.set(configuration, checkpointsDir.toString());
166 
167     assertTrue(job.run(true));
168 
169     long idSum = 0;
170     if (!runningInDistributedMode()) {
171       FileStatus fileStatus = getSinglePartFileStatus(job.getConfiguration(),
172           outputPath);
173       idSum = CheckpointVertexWorkerContext
174           .getFinalSum();
175       LOG.info("testBspCheckpoint: idSum = " + idSum +
176           " fileLen = " + fileStatus.getLen());
177     }
178     return idSum;
179   }
180 
181 
182   /**
183    * Actual computation.
184    */
185   public static class CheckpointComputation extends
186       BasicComputation<LongWritable, IntWritable, FloatWritable,
187           FloatWritable> {
188     @Override
189     public void compute(
190         Vertex<LongWritable, IntWritable, FloatWritable> vertex,
191         Iterable<FloatWritable> messages) throws IOException {
192       CheckpointVertexWorkerContext workerContext = getWorkerContext();
193       assertEquals(getSuperstep() + 1, workerContext.testValue);
194 
195       if (getSuperstep() > 4) {
196         vertex.voteToHalt();
197         return;
198       }
199 
200       aggregate(LongSumAggregator.class.getName(),
201           new LongWritable(vertex.getId().get()));
202 
203       float msgValue = 0.0f;
204       for (FloatWritable message : messages) {
205         float curMsgValue = message.get();
206         msgValue += curMsgValue;
207       }
208 
209       int vertexValue = vertex.getValue().get();
210       vertex.setValue(new IntWritable(vertexValue + (int) msgValue));
211       for (Edge<LongWritable, FloatWritable> edge : vertex.getEdges()) {
212         FloatWritable newEdgeValue = new FloatWritable(edge.getValue().get() +
213             (float) vertexValue);
214         Edge<LongWritable, FloatWritable> newEdge =
215             EdgeFactory.create(edge.getTargetVertexId(), newEdgeValue);
216         vertex.addEdge(newEdge);
217         sendMessage(edge.getTargetVertexId(), newEdgeValue);
218 
219       }
220     }
221   }
222 
223   @Test
224   public void testManualCheckpointAtTheBeginning()
225       throws InterruptedException, IOException, ClassNotFoundException {
226     testManualCheckpoint(0);
227   }
228 
229   @Test
230   public void testManualCheckpoint()
231       throws InterruptedException, IOException, ClassNotFoundException {
232     testManualCheckpoint(2);
233   }
234 
235 
236   private void testManualCheckpoint(final int checkpointSuperstep)
237       throws IOException, InterruptedException, ClassNotFoundException {
238     Path checkpointsDir = getTempPath("checkpointing");
239     GiraphConfiguration conf = new GiraphConfiguration();
240 
241     SUPERSTEP_CALLBACK = new SuperstepCallback() {
242 
243       @Override
244       public void superstep(long superstep, ImmutableClassesGiraphConfiguration<LongWritable, IntWritable, FloatWritable> conf) {
245         if (superstep == checkpointSuperstep) {
246           try {
247             ZooKeeperExt zooKeeperExt = new ZooKeeperExt(conf.getZookeeperList(),
248                 conf.getZooKeeperSessionTimeout(),
249                 conf.getZookeeperOpsMaxAttempts(),
250                 conf.getZookeeperOpsRetryWaitMsecs(),
251                 TestCheckpointing.this);
252             String basePath = ZooKeeperManager.getBasePath(conf) + BspService.BASE_DIR + "/" + conf.get("mapred.job.id");
253             zooKeeperExt.createExt(
254                 basePath + BspService.FORCE_CHECKPOINT_USER_FLAG,
255                 null,
256                 ZooDefs.Ids.OPEN_ACL_UNSAFE,
257                 CreateMode.PERSISTENT,
258                 true);
259           } catch (IOException | InterruptedException | KeeperException e) {
260             throw new RuntimeException(e);
261           }
262         } else if (superstep > checkpointSuperstep) {
263           Assert.fail("Job should be stopped by now " + superstep);
264         }
265       }
266     };
267 
268     try {
269       runOriginalJob(checkpointsDir, conf);
270       fail("Original job should fail after checkpointing");
271     } catch (Exception e) {
272       LOG.info("Original job failed, that's OK " + e);
273     }
274 
275     SUPERSTEP_CALLBACK = new SuperstepCallback() {
276       @Override
277       public void superstep(long superstep,
278                             ImmutableClassesGiraphConfiguration<LongWritable, IntWritable, FloatWritable> conf) {
279         if (superstep < checkpointSuperstep) {
280           Assert.fail("Restarted JOB should not be executed on superstep " + superstep);
281         }
282       }
283     };
284 
285     runRestartedJob(checkpointsDir, conf, 10, -1);
286   }
287 
288   /**
289    * Worker context associated.
290    */
291   public static class CheckpointVertexWorkerContext
292       extends DefaultWorkerContext {
293     /** User can access this after the application finishes if local */
294     private static long FINAL_SUM;
295 
296     private int testValue;
297 
298     public static long getFinalSum() {
299       return FINAL_SUM;
300     }
301 
302     @Override
303     public void postSuperstep() {
304       super.postSuperstep();
305       sendMessageToMyself(new LongWritable(getSuperstep()));
306     }
307 
308     /**
309      * Send message to all workers (except this worker)
310      *
311      * @param message Message to send
312      */
313     private void sendMessageToMyself(Writable message) {
314       sendMessageToWorker(message, getMyWorkerIndex());
315     }
316 
317     @Override
318     public void postApplication() {
319       setFinalSum(this.<LongWritable>getAggregatedValue(
320           LongSumAggregator.class.getName()).get());
321       LOG.info("FINAL_SUM=" + FINAL_SUM);
322     }
323 
324     /**
325      * Set the final sum
326      *
327      * @param value sum
328      */
329     private static void setFinalSum(long value) {
330       FINAL_SUM = value;
331     }
332 
333     @Override
334     public void preSuperstep() {
335       assertEquals(getSuperstep(), testValue++);
336       if (getSuperstep() > 0) {
337         List<Writable> messages = getAndClearMessagesFromOtherWorkers();
338         assertEquals(1, messages.size());
339         assertEquals(getSuperstep() - 1, ((LongWritable)(messages.get(0))).get());
340       }
341     }
342 
343     @Override
344     public void readFields(DataInput dataInput) throws IOException {
345       super.readFields(dataInput);
346       testValue = dataInput.readInt();
347     }
348 
349     @Override
350     public void write(DataOutput dataOutput) throws IOException {
351       super.write(dataOutput);
352       dataOutput.writeInt(testValue);
353     }
354   }
355 
356   /**
357    * Master compute
358    */
359   public static class CheckpointVertexMasterCompute extends
360       DefaultMasterCompute {
361 
362     private int testValue = 0;
363 
364     @Override
365     public void compute() {
366       long superstep = getSuperstep();
367       if (SUPERSTEP_CALLBACK != null) {
368         SUPERSTEP_CALLBACK.superstep(getSuperstep(), getConf());
369       }
370       assertEquals(superstep, testValue++);
371     }
372 
373     @Override
374     public void initialize() throws InstantiationException,
375         IllegalAccessException {
376       registerAggregator(LongSumAggregator.class.getName(),
377           LongSumAggregator.class);
378     }
379 
380     @Override
381     public void readFields(DataInput in) throws IOException {
382       super.readFields(in);
383       testValue = in.readInt();
384     }
385 
386     @Override
387     public void write(DataOutput out) throws IOException {
388       super.write(out);
389       out.writeInt(testValue);
390     }
391   }
392 
393   private static interface SuperstepCallback {
394 
395     public void superstep(long superstep,
396                           ImmutableClassesGiraphConfiguration<LongWritable,
397                               IntWritable, FloatWritable> conf);
398 
399   }
400 
401 }