This project has retired. For details please refer to its Attic page.
TestMutateGraph xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph;
20  
21  import org.apache.giraph.conf.GiraphConfiguration;
22  import org.apache.giraph.conf.GiraphConstants;
23  import org.apache.giraph.examples.GeneratedVertexReader;
24  import org.apache.giraph.examples.SimpleMutateGraphComputation;
25  import org.apache.giraph.examples.SimplePageRankComputation.SimplePageRankVertexInputFormat;
26  import org.apache.giraph.examples.SimplePageRankComputation.SimplePageRankVertexOutputFormat;
27  import org.apache.giraph.graph.DefaultVertexResolver;
28  import org.apache.giraph.graph.Vertex;
29  import org.apache.giraph.graph.VertexChanges;
30  import org.apache.giraph.job.GiraphJob;
31  import org.apache.hadoop.io.Writable;
32  import org.apache.hadoop.io.WritableComparable;
33  import org.junit.Test;
34  
35  import java.io.IOException;
36  
37  import static org.junit.Assert.assertTrue;
38  
39  /**
40   * Unit test for graph mutation
41   */
42  public class TestMutateGraph extends BspCase {
43    public TestMutateGraph() {
44        super(TestMutateGraph.class.getName());
45    }
46    /**
47     * Custom vertex resolver
48     */
49    public static class TestVertexResolver<I extends WritableComparable, V
50        extends Writable, E extends Writable>
51        extends DefaultVertexResolver {
52      @Override
53      public Vertex resolve(WritableComparable vertexId, Vertex vertex,
54          VertexChanges vertexChanges, boolean hasMessages) {
55        Vertex originalVertex = vertex;
56        // 1. If the vertex exists, first prune the edges
57        removeEdges(vertex, vertexChanges);
58  
59        // 2. If vertex removal desired, remove the vertex.
60        vertex = removeVertexIfDesired(vertex, vertexChanges);
61  
62        // If vertex removal happens do not add it back even if it has messages.
63        if (originalVertex != null && vertex == null) {
64          hasMessages = false;
65        }
66  
67        // 3. If creation of vertex desired, pick first vertex
68        // 4. If vertex doesn't exist, but got messages or added edges, create
69        vertex = addVertexIfDesired(vertexId, vertex, vertexChanges, hasMessages);
70  
71        // 5. If edge addition, add the edges
72        addEdges(vertex, vertexChanges);
73  
74        return vertex;
75      }
76    }
77  
78    /**
79     * Run a job that tests the various graph mutations that can occur
80     *
81     * @throws IOException
82     * @throws ClassNotFoundException
83     * @throws InterruptedException
84     */
85    @Test
86    public void testMutateGraph()
87            throws IOException, InterruptedException, ClassNotFoundException {
88      GiraphConfiguration conf = new GiraphConfiguration();
89      conf.setComputationClass(SimpleMutateGraphComputation.class);
90      conf.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
91      conf.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
92      conf.setWorkerContextClass(
93          SimpleMutateGraphComputation.SimpleMutateGraphVertexWorkerContext.class);
94      GiraphConstants.USER_PARTITION_COUNT.set(conf, 32);
95      conf.setNumComputeThreads(8);
96      GiraphConstants.VERTEX_RESOLVER_CLASS.set(conf, TestVertexResolver.class);
97      GiraphJob job = prepareJob(getCallingMethodName(), conf,
98          getTempPath(getCallingMethodName()));
99      // Overwrite the number of vertices set in BspCase
100     GeneratedVertexReader.READER_VERTICES.set(conf, 400);
101     assertTrue(job.run(true));
102   }
103 }