This project has retired. For details please refer to its Attic page.
TestManualCheckpoint xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph;
20  
21  import org.apache.giraph.conf.GiraphConfiguration;
22  import org.apache.giraph.conf.GiraphConstants;
23  import org.apache.giraph.examples.SimpleCheckpoint;
24  import org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexInputFormat;
25  import org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexOutputFormat;
26  import org.apache.giraph.job.GiraphJob;
27  import org.apache.hadoop.fs.FileStatus;
28  import org.apache.hadoop.fs.Path;
29  import org.junit.Test;
30  
31  import java.io.IOException;
32  
33  import static org.junit.Assert.assertEquals;
34  import static org.junit.Assert.assertTrue;
35  
36  /**
37   * Unit test for manual checkpoint restarting
38   */
39  public class TestManualCheckpoint extends BspCase {
40  
41    public TestManualCheckpoint() {
42      super(TestManualCheckpoint.class.getName());
43    }
44  
45    /**
46     * Run a sample BSP job locally and test checkpointing.
47     * @throws IOException
48     * @throws ClassNotFoundException
49     * @throws InterruptedException
50     */
51    @Test
52    public void testBspCheckpoint()
53        throws IOException, InterruptedException, ClassNotFoundException {
54      Path checkpointsDir = getTempPath("checkPointsForTesting");
55      Path outputPath = getTempPath(getCallingMethodName());
56      GiraphConfiguration conf = new GiraphConfiguration();
57      conf.setComputationClass(
58          SimpleCheckpoint.SimpleCheckpointComputation.class);
59      conf.setWorkerContextClass(
60          SimpleCheckpoint.SimpleCheckpointVertexWorkerContext.class);
61      conf.setMasterComputeClass(
62          SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
63      conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
64      conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
65      GiraphJob job = prepareJob(getCallingMethodName(), conf, outputPath);
66  
67      GiraphConfiguration configuration = job.getConfiguration();
68      GiraphConstants.CHECKPOINT_DIRECTORY.set(configuration, checkpointsDir.toString());
69      GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.set(configuration, false);
70      configuration.setCheckpointFrequency(2);
71  
72      assertTrue(job.run(true));
73  
74      long idSum = 0;
75      if (!runningInDistributedMode()) {
76        FileStatus fileStatus = getSinglePartFileStatus(job.getConfiguration(),
77            outputPath);
78        idSum = SimpleCheckpoint.SimpleCheckpointVertexWorkerContext
79            .getFinalSum();
80        System.out.println("testBspCheckpoint: idSum = " + idSum +
81            " fileLen = " + fileStatus.getLen());
82      }
83  
84      // Restart the test from superstep 2
85      System.out.println("testBspCheckpoint: Restarting from superstep 2" +
86          " with checkpoint path = " + checkpointsDir);
87      outputPath = getTempPath(getCallingMethodName() + "Restarted");
88      conf = new GiraphConfiguration();
89      conf.setComputationClass(
90          SimpleCheckpoint.SimpleCheckpointComputation.class);
91      conf.setWorkerContextClass(
92          SimpleCheckpoint.SimpleCheckpointVertexWorkerContext.class);
93      conf.setMasterComputeClass(
94          SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
95      conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
96      conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
97      GiraphJob restartedJob = prepareJob(getCallingMethodName() + "Restarted",
98          conf, outputPath);
99      configuration.setMasterComputeClass(
100         SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
101     GiraphConstants.CHECKPOINT_DIRECTORY.set(restartedJob.getConfiguration(),
102         checkpointsDir.toString());
103 
104     assertTrue(restartedJob.run(true));
105     if (!runningInDistributedMode()) {
106       long idSumRestarted =
107           SimpleCheckpoint.SimpleCheckpointVertexWorkerContext
108               .getFinalSum();
109       System.out.println("testBspCheckpoint: idSumRestarted = " +
110           idSumRestarted);
111       assertEquals(idSum, idSumRestarted);
112     }
113   }
114 }