This project has retired. For details please refer to its Attic page.
TestGraphPartitioner xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph;
20  
21  import static org.apache.giraph.examples.GeneratedVertexReader.READER_VERTICES;
22  import static org.junit.Assert.assertEquals;
23  import static org.junit.Assert.assertTrue;
24  
25  import java.io.IOException;
26  
27  import org.apache.giraph.conf.GiraphConfiguration;
28  import org.apache.giraph.conf.GiraphConstants;
29  import org.apache.giraph.examples.SimpleCheckpoint;
30  import org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexInputFormat;
31  import org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexOutputFormat;
32  import org.apache.giraph.job.GiraphJob;
33  import org.apache.giraph.partition.HashRangePartitionerFactory;
34  import org.apache.giraph.partition.PartitionBalancer;
35  import org.apache.giraph.partition.SimpleLongRangePartitionerFactory;
36  import org.apache.hadoop.fs.FileStatus;
37  import org.apache.hadoop.fs.FileSystem;
38  import org.apache.hadoop.fs.Path;
39  import org.junit.Test;
40  
41  /**
42   * Unit test for manual checkpoint restarting
43   */
44  public class TestGraphPartitioner extends BspCase {
45    public TestGraphPartitioner() {
46      super(TestGraphPartitioner.class.getName());
47    }
48  
49    private void verifyOutput(FileSystem fs, Path outputPath)
50        throws IOException {
51      // TODO: this is fragile (breaks with legit serialization changes)
52      final int correctLen = 120;
53      if (runningInDistributedMode()) {
54        FileStatus[] fileStatusArr = fs.listStatus(outputPath);
55        int totalLen = 0;
56        for (FileStatus fileStatus : fileStatusArr) {
57          if (fileStatus.getPath().toString().contains("/part-m-")) {
58            totalLen += fileStatus.getLen();
59          }
60        }
61        assertEquals(correctLen, totalLen);
62      }
63    }
64  
65    /**
66     * Run a sample BSP job locally and test various partitioners and
67     * partition algorithms.
68     *
69     * @throws IOException
70     * @throws ClassNotFoundException
71     * @throws InterruptedException
72     */
73    @Test
74    public void testPartitioners()
75        throws IOException, InterruptedException, ClassNotFoundException {
76      Path outputPath = getTempPath("testVertexBalancer");
77      GiraphConfiguration conf = new GiraphConfiguration();
78      conf.setComputationClass(
79          SimpleCheckpoint.SimpleCheckpointComputation.class);
80      conf.setWorkerContextClass(
81          SimpleCheckpoint.SimpleCheckpointVertexWorkerContext.class);
82      conf.setMasterComputeClass(
83          SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
84      conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
85      conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
86      GiraphJob job = prepareJob("testVertexBalancer", conf, outputPath);
87  
88      job.getConfiguration().set(
89          PartitionBalancer.PARTITION_BALANCE_ALGORITHM,
90          PartitionBalancer.VERTICES_BALANCE_ALGORITHM);
91  
92      assertTrue(job.run(true));
93      FileSystem hdfs = FileSystem.get(job.getConfiguration());
94  
95      conf = new GiraphConfiguration();
96      conf.setComputationClass(
97          SimpleCheckpoint.SimpleCheckpointComputation.class);
98      conf.setWorkerContextClass(
99          SimpleCheckpoint.SimpleCheckpointVertexWorkerContext.class);
100     conf.setMasterComputeClass(
101         SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
102     conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
103     conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
104     outputPath = getTempPath("testHashPartitioner");
105     job = prepareJob("testHashPartitioner", conf, outputPath);
106     assertTrue(job.run(true));
107     verifyOutput(hdfs, outputPath);
108 
109     job = new GiraphJob("testHashRangePartitioner");
110     setupConfiguration(job);
111     job.getConfiguration().setComputationClass(
112         SimpleCheckpoint.SimpleCheckpointComputation.class);
113     job.getConfiguration().setWorkerContextClass(
114         SimpleCheckpoint.SimpleCheckpointVertexWorkerContext.class);
115     job.getConfiguration().setMasterComputeClass(
116         SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
117     job.getConfiguration().setVertexInputFormatClass(
118         SimpleSuperstepVertexInputFormat.class);
119     job.getConfiguration().setVertexOutputFormatClass(
120         SimpleSuperstepVertexOutputFormat.class);
121     job.getConfiguration().setGraphPartitionerFactoryClass(
122         HashRangePartitionerFactory.class);
123     outputPath = getTempPath("testHashRangePartitioner");
124     removeAndSetOutput(job, outputPath);
125     assertTrue(job.run(true));
126     verifyOutput(hdfs, outputPath);
127 
128     job = new GiraphJob("testSimpleRangePartitioner");
129     setupConfiguration(job);
130     job.getConfiguration().setComputationClass(
131         SimpleCheckpoint.SimpleCheckpointComputation.class);
132     job.getConfiguration().setWorkerContextClass(
133         SimpleCheckpoint.SimpleCheckpointVertexWorkerContext.class);
134     job.getConfiguration().setMasterComputeClass(
135         SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
136     job.getConfiguration().setVertexInputFormatClass(
137         SimpleSuperstepVertexInputFormat.class);
138     job.getConfiguration().setVertexOutputFormatClass(
139         SimpleSuperstepVertexOutputFormat.class);
140 
141     job.getConfiguration().setGraphPartitionerFactoryClass(
142         SimpleLongRangePartitionerFactory.class);
143     long readerVertices =
144         READER_VERTICES.getWithDefault(job.getConfiguration(), -1L);
145     job.getConfiguration().setLong(
146         GiraphConstants.PARTITION_VERTEX_KEY_SPACE_SIZE, readerVertices);
147 
148     outputPath = getTempPath("testSimpleRangePartitioner");
149     removeAndSetOutput(job, outputPath);
150     assertTrue(job.run(true));
151     verifyOutput(hdfs, outputPath);
152   }
153 }