1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph;
2021import org.apache.giraph.conf.GiraphConfiguration;
22import org.apache.giraph.conf.GiraphConstants;
23import org.apache.giraph.examples.SimpleCheckpoint;
24import org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexInputFormat;
25import org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexOutputFormat;
26import org.apache.giraph.job.GiraphJob;
27import org.apache.hadoop.fs.Path;
28import org.junit.Test;
2930import java.io.IOException;
3132importstatic org.junit.Assert.assertTrue;
3334/**35 * Unit test for automated checkpoint restarting36 */37publicclassTestAutoCheckpointextendsBspCase {
3839publicTestAutoCheckpoint() {
40super(TestAutoCheckpoint.class.getName());
41 }
4243/**44 * Run a job that requires checkpointing and will have a worker crash45 * and still recover from a previous checkpoint.46 *47 * @throws IOException48 * @throws ClassNotFoundException49 * @throws InterruptedException50 */51 @Test
52publicvoid testSingleFault()
53throws IOException, InterruptedException, ClassNotFoundException {
54if (!runningInDistributedMode()) {
55 System.out.println(
56"testSingleFault: Ignore this test in local mode.");
57return;
58 }
59 Path outputPath = getTempPath(getCallingMethodName());
60 GiraphConfiguration conf = new GiraphConfiguration();
61 conf.setComputationClass(
62 SimpleCheckpoint.SimpleCheckpointComputation.class);
63 conf.setWorkerContextClass(
64 SimpleCheckpoint.SimpleCheckpointVertexWorkerContext.class);
65 conf.setMasterComputeClass(
66 SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
67 conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
68 conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
69 conf.setBoolean(SimpleCheckpoint.ENABLE_FAULT, true);
70 conf.setInt("mapred.map.max.attempts", 4);
71// Trigger failure faster72 conf.setInt("mapred.task.timeout", 10000);
73 conf.setMaxMasterSuperstepWaitMsecs(10000);
74 conf.setEventWaitMsecs(1000);
75 conf.setCheckpointFrequency(2);
76 GiraphConstants.CHECKPOINT_DIRECTORY.set(conf,
77 getTempPath("_singleFaultCheckpoints").toString());
78 GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.set(conf, false);
79 GiraphConstants.ZOOKEEPER_SESSION_TIMEOUT.set(conf, 10000);
80 GiraphConstants.ZOOKEEPER_MIN_SESSION_TIMEOUT.set(conf, 10000);
81 GiraphJob job = prepareJob(getCallingMethodName(), conf, outputPath);
82 assertTrue(job.run(true));
83 }
84 }