This project has retired. For details please refer to its
Attic page.
TestCheckpointing xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph;
20
21 import org.apache.giraph.aggregators.LongSumAggregator;
22 import org.apache.giraph.bsp.BspService;
23 import org.apache.giraph.conf.GiraphConfiguration;
24 import org.apache.giraph.conf.GiraphConstants;
25 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
26 import org.apache.giraph.edge.Edge;
27 import org.apache.giraph.edge.EdgeFactory;
28 import org.apache.giraph.examples.SimpleSuperstepComputation;
29 import org.apache.giraph.graph.BasicComputation;
30 import org.apache.giraph.graph.Vertex;
31 import org.apache.giraph.job.GiraphJob;
32 import org.apache.giraph.master.DefaultMasterCompute;
33 import org.apache.giraph.worker.DefaultWorkerContext;
34 import org.apache.giraph.zk.ZooKeeperExt;
35 import org.apache.giraph.zk.ZooKeeperManager;
36 import org.apache.hadoop.fs.FileStatus;
37 import org.apache.hadoop.fs.Path;
38 import org.apache.hadoop.io.FloatWritable;
39 import org.apache.hadoop.io.IntWritable;
40 import org.apache.hadoop.io.LongWritable;
41 import org.apache.hadoop.io.Writable;
42 import org.apache.log4j.Logger;
43 import org.apache.zookeeper.CreateMode;
44 import org.apache.zookeeper.KeeperException;
45 import org.apache.zookeeper.ZooDefs;
46 import org.junit.Assert;
47 import org.junit.Test;
48
49 import java.io.DataInput;
50 import java.io.DataOutput;
51 import java.io.IOException;
52 import java.util.List;
53
54 import static org.junit.Assert.assertEquals;
55 import static org.junit.Assert.assertTrue;
56 import static org.junit.Assert.fail;
57
58
59
60
61
62 public class TestCheckpointing extends BspCase {
63
64
65 private static final Logger LOG =
66 Logger.getLogger(TestCheckpointing.class);
67
68 public static final String TEST_JOB_ID = "test_job";
69
70 private static SuperstepCallback SUPERSTEP_CALLBACK;
71
72
73
74
75 public TestCheckpointing() {
76 super(TestCheckpointing.class.getName());
77 }
78
79 @Test
80 public void testBspCheckpoint() throws InterruptedException, IOException, ClassNotFoundException {
81 testBspCheckpoint(false);
82 }
83
84 @Test
85 public void testAsyncMessageStoreCheckpoint() throws InterruptedException, IOException, ClassNotFoundException {
86 testBspCheckpoint(true);
87 }
88
89 public void testBspCheckpoint(boolean useAsyncMessageStore)
90 throws IOException, InterruptedException, ClassNotFoundException {
91 Path checkpointsDir = getTempPath("checkpointing");
92 GiraphConfiguration conf = new GiraphConfiguration();
93 if (useAsyncMessageStore) {
94 GiraphConstants.ASYNC_MESSAGE_STORE_THREADS_COUNT.set(conf, 2);
95 }
96
97 SUPERSTEP_CALLBACK = null;
98
99 GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.set(conf, false);
100 conf.setCheckpointFrequency(2);
101
102 long idSum = runOriginalJob(checkpointsDir, conf);
103 assertEquals(10, idSum);
104
105 SUPERSTEP_CALLBACK = new SuperstepCallback() {
106 @Override
107 public void superstep(long superstep,
108 ImmutableClassesGiraphConfiguration<LongWritable, IntWritable, FloatWritable> conf) {
109 if (superstep < 2) {
110 Assert.fail("Restarted JOB should not be executed on superstep " + superstep);
111 }
112 }
113 };
114
115 runRestartedJob(checkpointsDir, conf, idSum, 2);
116
117
118 }
119
120 private void runRestartedJob(Path checkpointsDir, GiraphConfiguration conf, long idSum, long restartFrom) throws IOException, InterruptedException, ClassNotFoundException {
121 Path outputPath;
122 LOG.info("testBspCheckpoint: Restarting from the latest superstep " +
123 "with checkpoint path = " + checkpointsDir);
124 outputPath = getTempPath("checkpointing_restarted");
125
126 GiraphConstants.RESTART_JOB_ID.set(conf, TEST_JOB_ID);
127 conf.set("mapred.job.id", "restarted_test_job");
128 if (restartFrom >= 0) {
129 conf.set(GiraphConstants.RESTART_SUPERSTEP, Long.toString(restartFrom));
130 }
131
132 GiraphJob restartedJob = prepareJob(getCallingMethodName() + "Restarted",
133 conf, outputPath);
134
135 GiraphConstants.CHECKPOINT_DIRECTORY.set(restartedJob.getConfiguration(),
136 checkpointsDir.toString());
137
138 assertTrue(restartedJob.run(true));
139
140
141 if (!runningInDistributedMode()) {
142 long idSumRestarted =
143 CheckpointVertexWorkerContext
144 .getFinalSum();
145 LOG.info("testBspCheckpoint: idSumRestarted = " +
146 idSumRestarted);
147 assertEquals(idSum, idSumRestarted);
148 }
149 }
150
151 private long runOriginalJob(Path checkpointsDir, GiraphConfiguration conf) throws IOException, InterruptedException, ClassNotFoundException {
152 Path outputPath = getTempPath("checkpointing_original");
153 conf.setComputationClass(
154 CheckpointComputation.class);
155 conf.setWorkerContextClass(
156 CheckpointVertexWorkerContext.class);
157 conf.setMasterComputeClass(
158 CheckpointVertexMasterCompute.class);
159 conf.setVertexInputFormatClass(SimpleSuperstepComputation.SimpleSuperstepVertexInputFormat.class);
160 conf.setVertexOutputFormatClass(SimpleSuperstepComputation.SimpleSuperstepVertexOutputFormat.class);
161 conf.set("mapred.job.id", TEST_JOB_ID);
162 GiraphJob job = prepareJob(getCallingMethodName(), conf, outputPath);
163
164 GiraphConfiguration configuration = job.getConfiguration();
165 GiraphConstants.CHECKPOINT_DIRECTORY.set(configuration, checkpointsDir.toString());
166
167 assertTrue(job.run(true));
168
169 long idSum = 0;
170 if (!runningInDistributedMode()) {
171 FileStatus fileStatus = getSinglePartFileStatus(job.getConfiguration(),
172 outputPath);
173 idSum = CheckpointVertexWorkerContext
174 .getFinalSum();
175 LOG.info("testBspCheckpoint: idSum = " + idSum +
176 " fileLen = " + fileStatus.getLen());
177 }
178 return idSum;
179 }
180
181
182
183
184
185 public static class CheckpointComputation extends
186 BasicComputation<LongWritable, IntWritable, FloatWritable,
187 FloatWritable> {
188 @Override
189 public void compute(
190 Vertex<LongWritable, IntWritable, FloatWritable> vertex,
191 Iterable<FloatWritable> messages) throws IOException {
192 CheckpointVertexWorkerContext workerContext = getWorkerContext();
193 assertEquals(getSuperstep() + 1, workerContext.testValue);
194
195 if (getSuperstep() > 4) {
196 vertex.voteToHalt();
197 return;
198 }
199
200 aggregate(LongSumAggregator.class.getName(),
201 new LongWritable(vertex.getId().get()));
202
203 float msgValue = 0.0f;
204 for (FloatWritable message : messages) {
205 float curMsgValue = message.get();
206 msgValue += curMsgValue;
207 }
208
209 int vertexValue = vertex.getValue().get();
210 vertex.setValue(new IntWritable(vertexValue + (int) msgValue));
211 for (Edge<LongWritable, FloatWritable> edge : vertex.getEdges()) {
212 FloatWritable newEdgeValue = new FloatWritable(edge.getValue().get() +
213 (float) vertexValue);
214 Edge<LongWritable, FloatWritable> newEdge =
215 EdgeFactory.create(edge.getTargetVertexId(), newEdgeValue);
216 vertex.addEdge(newEdge);
217 sendMessage(edge.getTargetVertexId(), newEdgeValue);
218
219 }
220 }
221 }
222
223 @Test
224 public void testManualCheckpointAtTheBeginning()
225 throws InterruptedException, IOException, ClassNotFoundException {
226 testManualCheckpoint(0);
227 }
228
229 @Test
230 public void testManualCheckpoint()
231 throws InterruptedException, IOException, ClassNotFoundException {
232 testManualCheckpoint(2);
233 }
234
235
236 private void testManualCheckpoint(final int checkpointSuperstep)
237 throws IOException, InterruptedException, ClassNotFoundException {
238 Path checkpointsDir = getTempPath("checkpointing");
239 GiraphConfiguration conf = new GiraphConfiguration();
240
241 SUPERSTEP_CALLBACK = new SuperstepCallback() {
242
243 @Override
244 public void superstep(long superstep, ImmutableClassesGiraphConfiguration<LongWritable, IntWritable, FloatWritable> conf) {
245 if (superstep == checkpointSuperstep) {
246 try {
247 ZooKeeperExt zooKeeperExt = new ZooKeeperExt(conf.getZookeeperList(),
248 conf.getZooKeeperSessionTimeout(),
249 conf.getZookeeperOpsMaxAttempts(),
250 conf.getZookeeperOpsRetryWaitMsecs(),
251 TestCheckpointing.this);
252 String basePath = ZooKeeperManager.getBasePath(conf) + BspService.BASE_DIR + "/" + conf.get("mapred.job.id");
253 zooKeeperExt.createExt(
254 basePath + BspService.FORCE_CHECKPOINT_USER_FLAG,
255 null,
256 ZooDefs.Ids.OPEN_ACL_UNSAFE,
257 CreateMode.PERSISTENT,
258 true);
259 } catch (IOException | InterruptedException | KeeperException e) {
260 throw new RuntimeException(e);
261 }
262 } else if (superstep > checkpointSuperstep) {
263 Assert.fail("Job should be stopped by now " + superstep);
264 }
265 }
266 };
267
268 try {
269 runOriginalJob(checkpointsDir, conf);
270 fail("Original job should fail after checkpointing");
271 } catch (Exception e) {
272 LOG.info("Original job failed, that's OK " + e);
273 }
274
275 SUPERSTEP_CALLBACK = new SuperstepCallback() {
276 @Override
277 public void superstep(long superstep,
278 ImmutableClassesGiraphConfiguration<LongWritable, IntWritable, FloatWritable> conf) {
279 if (superstep < checkpointSuperstep) {
280 Assert.fail("Restarted JOB should not be executed on superstep " + superstep);
281 }
282 }
283 };
284
285 runRestartedJob(checkpointsDir, conf, 10, -1);
286 }
287
288
289
290
291 public static class CheckpointVertexWorkerContext
292 extends DefaultWorkerContext {
293
294 private static long FINAL_SUM;
295
296 private int testValue;
297
298 public static long getFinalSum() {
299 return FINAL_SUM;
300 }
301
302 @Override
303 public void postSuperstep() {
304 super.postSuperstep();
305 sendMessageToMyself(new LongWritable(getSuperstep()));
306 }
307
308
309
310
311
312
313 private void sendMessageToMyself(Writable message) {
314 sendMessageToWorker(message, getMyWorkerIndex());
315 }
316
317 @Override
318 public void postApplication() {
319 setFinalSum(this.<LongWritable>getAggregatedValue(
320 LongSumAggregator.class.getName()).get());
321 LOG.info("FINAL_SUM=" + FINAL_SUM);
322 }
323
324
325
326
327
328
329 private static void setFinalSum(long value) {
330 FINAL_SUM = value;
331 }
332
333 @Override
334 public void preSuperstep() {
335 assertEquals(getSuperstep(), testValue++);
336 if (getSuperstep() > 0) {
337 List<Writable> messages = getAndClearMessagesFromOtherWorkers();
338 assertEquals(1, messages.size());
339 assertEquals(getSuperstep() - 1, ((LongWritable)(messages.get(0))).get());
340 }
341 }
342
343 @Override
344 public void readFields(DataInput dataInput) throws IOException {
345 super.readFields(dataInput);
346 testValue = dataInput.readInt();
347 }
348
349 @Override
350 public void write(DataOutput dataOutput) throws IOException {
351 super.write(dataOutput);
352 dataOutput.writeInt(testValue);
353 }
354 }
355
356
357
358
359 public static class CheckpointVertexMasterCompute extends
360 DefaultMasterCompute {
361
362 private int testValue = 0;
363
364 @Override
365 public void compute() {
366 long superstep = getSuperstep();
367 if (SUPERSTEP_CALLBACK != null) {
368 SUPERSTEP_CALLBACK.superstep(getSuperstep(), getConf());
369 }
370 assertEquals(superstep, testValue++);
371 }
372
373 @Override
374 public void initialize() throws InstantiationException,
375 IllegalAccessException {
376 registerAggregator(LongSumAggregator.class.getName(),
377 LongSumAggregator.class);
378 }
379
380 @Override
381 public void readFields(DataInput in) throws IOException {
382 super.readFields(in);
383 testValue = in.readInt();
384 }
385
386 @Override
387 public void write(DataOutput out) throws IOException {
388 super.write(out);
389 out.writeInt(testValue);
390 }
391 }
392
393 private static interface SuperstepCallback {
394
395 public void superstep(long superstep,
396 ImmutableClassesGiraphConfiguration<LongWritable,
397 IntWritable, FloatWritable> conf);
398
399 }
400
401 }