This project has retired. For details please refer to its
Attic page.
TestManualCheckpoint xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph;
20
21 import org.apache.giraph.conf.GiraphConfiguration;
22 import org.apache.giraph.conf.GiraphConstants;
23 import org.apache.giraph.examples.SimpleCheckpoint;
24 import org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexInputFormat;
25 import org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexOutputFormat;
26 import org.apache.giraph.job.GiraphJob;
27 import org.apache.hadoop.fs.FileStatus;
28 import org.apache.hadoop.fs.Path;
29 import org.junit.Test;
30
31 import java.io.IOException;
32
33 import static org.junit.Assert.assertEquals;
34 import static org.junit.Assert.assertTrue;
35
36
37
38
39 public class TestManualCheckpoint extends BspCase {
40
41 public TestManualCheckpoint() {
42 super(TestManualCheckpoint.class.getName());
43 }
44
45
46
47
48
49
50
51 @Test
52 public void testBspCheckpoint()
53 throws IOException, InterruptedException, ClassNotFoundException {
54 Path checkpointsDir = getTempPath("checkPointsForTesting");
55 Path outputPath = getTempPath(getCallingMethodName());
56 GiraphConfiguration conf = new GiraphConfiguration();
57 conf.setComputationClass(
58 SimpleCheckpoint.SimpleCheckpointComputation.class);
59 conf.setWorkerContextClass(
60 SimpleCheckpoint.SimpleCheckpointVertexWorkerContext.class);
61 conf.setMasterComputeClass(
62 SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
63 conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
64 conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
65 GiraphJob job = prepareJob(getCallingMethodName(), conf, outputPath);
66
67 GiraphConfiguration configuration = job.getConfiguration();
68 GiraphConstants.CHECKPOINT_DIRECTORY.set(configuration, checkpointsDir.toString());
69 GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.set(configuration, false);
70 configuration.setCheckpointFrequency(2);
71
72 assertTrue(job.run(true));
73
74 long idSum = 0;
75 if (!runningInDistributedMode()) {
76 FileStatus fileStatus = getSinglePartFileStatus(job.getConfiguration(),
77 outputPath);
78 idSum = SimpleCheckpoint.SimpleCheckpointVertexWorkerContext
79 .getFinalSum();
80 System.out.println("testBspCheckpoint: idSum = " + idSum +
81 " fileLen = " + fileStatus.getLen());
82 }
83
84
85 System.out.println("testBspCheckpoint: Restarting from superstep 2" +
86 " with checkpoint path = " + checkpointsDir);
87 outputPath = getTempPath(getCallingMethodName() + "Restarted");
88 conf = new GiraphConfiguration();
89 conf.setComputationClass(
90 SimpleCheckpoint.SimpleCheckpointComputation.class);
91 conf.setWorkerContextClass(
92 SimpleCheckpoint.SimpleCheckpointVertexWorkerContext.class);
93 conf.setMasterComputeClass(
94 SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
95 conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
96 conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
97 GiraphJob restartedJob = prepareJob(getCallingMethodName() + "Restarted",
98 conf, outputPath);
99 configuration.setMasterComputeClass(
100 SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
101 GiraphConstants.CHECKPOINT_DIRECTORY.set(restartedJob.getConfiguration(),
102 checkpointsDir.toString());
103
104 assertTrue(restartedJob.run(true));
105 if (!runningInDistributedMode()) {
106 long idSumRestarted =
107 SimpleCheckpoint.SimpleCheckpointVertexWorkerContext
108 .getFinalSum();
109 System.out.println("testBspCheckpoint: idSumRestarted = " +
110 idSumRestarted);
111 assertEquals(idSum, idSumRestarted);
112 }
113 }
114 }