This project has retired. For details please refer to its Attic page.
TestAggregatorsHandling xref
View Javadoc

1   /*
2   * Licensed to the Apache Software Foundation (ASF) under one
3   * or more contributor license agreements.  See the NOTICE file
4   * distributed with this work for additional information
5   * regarding copyright ownership.  The ASF licenses this file
6   * to you under the Apache License, Version 2.0 (the
7   * "License"); you may not use this file except in compliance
8   * with the License.  You may obtain a copy of the License at
9   *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18  
19  package org.apache.giraph.aggregators;
20  
21  import static org.junit.Assert.assertTrue;
22  
23  import java.io.IOException;
24  
25  import org.apache.giraph.BspCase;
26  import org.apache.giraph.comm.aggregators.AggregatorUtils;
27  import org.apache.giraph.conf.GiraphConfiguration;
28  import org.apache.giraph.conf.GiraphConstants;
29  import org.apache.giraph.examples.AggregatorsTestComputation;
30  import org.apache.giraph.examples.SimpleCheckpoint;
31  import org.apache.giraph.job.GiraphJob;
32  import org.apache.hadoop.fs.Path;
33  import org.junit.Test;
34  
35  /** Tests if aggregators are handled on a proper way */
36  public class TestAggregatorsHandling extends BspCase {
37  
38    public TestAggregatorsHandling() {
39      super(TestAggregatorsHandling.class.getName());
40    }
41  
42    /** Tests if aggregators are handled on a proper way during supersteps */
43    @Test
44    public void testAggregatorsHandling() throws IOException,
45        ClassNotFoundException, InterruptedException {
46      GiraphConfiguration conf = new GiraphConfiguration();
47      conf.setComputationClass(AggregatorsTestComputation.class);
48      conf.setVertexInputFormatClass(
49          AggregatorsTestComputation.SimpleVertexInputFormat.class);
50      conf.setEdgeInputFormatClass(
51          AggregatorsTestComputation.SimpleEdgeInputFormat.class);
52      GiraphJob job = prepareJob(getCallingMethodName(), conf);
53      job.getConfiguration().setMasterComputeClass(
54          AggregatorsTestComputation.AggregatorsTestMasterCompute.class);
55      // test with aggregators split in a few requests
56      job.getConfiguration().setInt(
57          AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST, 50);
58      assertTrue(job.run(true));
59    }
60  
61    /**
62     * Test if aggregators are are handled properly when restarting from a
63     * checkpoint
64     */
65    @Test
66    public void testAggregatorsCheckpointing() throws ClassNotFoundException,
67        IOException, InterruptedException {
68      Path checkpointsDir = getTempPath("checkPointsForTesting");
69      Path outputPath = getTempPath(getCallingMethodName());
70      GiraphConfiguration conf = new GiraphConfiguration();
71      conf.setComputationClass(AggregatorsTestComputation.class);
72      conf.setMasterComputeClass(
73          AggregatorsTestComputation.AggregatorsTestMasterCompute.class);
74      conf.setVertexInputFormatClass(
75          AggregatorsTestComputation.SimpleVertexInputFormat.class);
76      conf.setEdgeInputFormatClass(
77          AggregatorsTestComputation.SimpleEdgeInputFormat.class);
78      GiraphJob job = prepareJob(getCallingMethodName(), conf, outputPath);
79  
80      GiraphConfiguration configuration = job.getConfiguration();
81      GiraphConstants.CHECKPOINT_DIRECTORY.set(configuration, checkpointsDir.toString());
82      GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.set(configuration, false);
83      configuration.setCheckpointFrequency(4);
84  
85      assertTrue(job.run(true));
86  
87      // Restart the test from superstep 4
88      System.out.println("testAggregatorsCheckpointing: Restarting from " +
89          "superstep 4 with checkpoint path = " + checkpointsDir);
90      outputPath = getTempPath(getCallingMethodName() + "Restarted");
91      conf = new GiraphConfiguration();
92      conf.setComputationClass(AggregatorsTestComputation.class);
93      conf.setMasterComputeClass(
94          AggregatorsTestComputation.AggregatorsTestMasterCompute.class);
95      conf.setVertexInputFormatClass(
96          AggregatorsTestComputation.SimpleVertexInputFormat.class);
97      conf.setEdgeInputFormatClass(
98          AggregatorsTestComputation.SimpleEdgeInputFormat.class);
99      GiraphJob restartedJob = prepareJob(getCallingMethodName() + "Restarted",
100         conf, outputPath);
101     job.getConfiguration().setMasterComputeClass(
102         SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
103     GiraphConfiguration restartedJobConf = restartedJob.getConfiguration();
104     GiraphConstants.CHECKPOINT_DIRECTORY.set(restartedJobConf,
105         checkpointsDir.toString());
106     restartedJobConf.setLong(GiraphConstants.RESTART_SUPERSTEP, 4);
107 
108     assertTrue(restartedJob.run(true));
109   }
110 }