1/*2* Licensed to the Apache Software Foundation (ASF) under one3* or more contributor license agreements. See the NOTICE file4* distributed with this work for additional information5* regarding copyright ownership. The ASF licenses this file6* to you under the Apache License, Version 2.0 (the7* "License"); you may not use this file except in compliance8* with the License. You may obtain a copy of the License at9*10* http://www.apache.org/licenses/LICENSE-2.011*12* Unless required by applicable law or agreed to in writing, software13* distributed under the License is distributed on an "AS IS" BASIS,14* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15* See the License for the specific language governing permissions and16* limitations under the License.17*/1819package org.apache.giraph.aggregators;
2021importstatic org.junit.Assert.assertTrue;
2223import java.io.IOException;
2425import org.apache.giraph.BspCase;
26import org.apache.giraph.comm.aggregators.AggregatorUtils;
27import org.apache.giraph.conf.GiraphConfiguration;
28import org.apache.giraph.conf.GiraphConstants;
29import org.apache.giraph.examples.AggregatorsTestComputation;
30import org.apache.giraph.examples.SimpleCheckpoint;
31import org.apache.giraph.job.GiraphJob;
32import org.apache.hadoop.fs.Path;
33import org.junit.Test;
3435/** Tests if aggregators are handled on a proper way */36publicclassTestAggregatorsHandlingextendsBspCase {
3738publicTestAggregatorsHandling() {
39super(TestAggregatorsHandling.class.getName());
40 }
4142/** Tests if aggregators are handled on a proper way during supersteps */43 @Test
44publicvoid testAggregatorsHandling() throws IOException,
45 ClassNotFoundException, InterruptedException {
46 GiraphConfiguration conf = new GiraphConfiguration();
47 conf.setComputationClass(AggregatorsTestComputation.class);
48 conf.setVertexInputFormatClass(
49 AggregatorsTestComputation.SimpleVertexInputFormat.class);
50 conf.setEdgeInputFormatClass(
51 AggregatorsTestComputation.SimpleEdgeInputFormat.class);
52 GiraphJob job = prepareJob(getCallingMethodName(), conf);
53 job.getConfiguration().setMasterComputeClass(
54 AggregatorsTestComputation.AggregatorsTestMasterCompute.class);
55// test with aggregators split in a few requests56 job.getConfiguration().setInt(
57 AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST, 50);
58 assertTrue(job.run(true));
59 }
6061/**62 * Test if aggregators are are handled properly when restarting from a63 * checkpoint64 */65 @Test
66publicvoid testAggregatorsCheckpointing() throws ClassNotFoundException,
67 IOException, InterruptedException {
68 Path checkpointsDir = getTempPath("checkPointsForTesting");
69 Path outputPath = getTempPath(getCallingMethodName());
70 GiraphConfiguration conf = new GiraphConfiguration();
71 conf.setComputationClass(AggregatorsTestComputation.class);
72 conf.setMasterComputeClass(
73 AggregatorsTestComputation.AggregatorsTestMasterCompute.class);
74 conf.setVertexInputFormatClass(
75 AggregatorsTestComputation.SimpleVertexInputFormat.class);
76 conf.setEdgeInputFormatClass(
77 AggregatorsTestComputation.SimpleEdgeInputFormat.class);
78 GiraphJob job = prepareJob(getCallingMethodName(), conf, outputPath);
7980 GiraphConfiguration configuration = job.getConfiguration();
81 GiraphConstants.CHECKPOINT_DIRECTORY.set(configuration, checkpointsDir.toString());
82 GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.set(configuration, false);
83 configuration.setCheckpointFrequency(4);
8485 assertTrue(job.run(true));
8687// Restart the test from superstep 488 System.out.println("testAggregatorsCheckpointing: Restarting from " +
89"superstep 4 with checkpoint path = " + checkpointsDir);
90 outputPath = getTempPath(getCallingMethodName() + "Restarted");
91 conf = new GiraphConfiguration();
92 conf.setComputationClass(AggregatorsTestComputation.class);
93 conf.setMasterComputeClass(
94 AggregatorsTestComputation.AggregatorsTestMasterCompute.class);
95 conf.setVertexInputFormatClass(
96 AggregatorsTestComputation.SimpleVertexInputFormat.class);
97 conf.setEdgeInputFormatClass(
98 AggregatorsTestComputation.SimpleEdgeInputFormat.class);
99 GiraphJob restartedJob = prepareJob(getCallingMethodName() + "Restarted",
100 conf, outputPath);
101 job.getConfiguration().setMasterComputeClass(
102 SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
103 GiraphConfiguration restartedJobConf = restartedJob.getConfiguration();
104 GiraphConstants.CHECKPOINT_DIRECTORY.set(restartedJobConf,
105 checkpointsDir.toString());
106 restartedJobConf.setLong(GiraphConstants.RESTART_SUPERSTEP, 4);
107108 assertTrue(restartedJob.run(true));
109 }
110 }