View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.framework;
19  
20  import org.apache.giraph.block_app.framework.api.BlockMasterApi;
21  import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
22  import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
23  import org.apache.giraph.block_app.framework.api.CreateReducersApi;
24  import org.apache.giraph.block_app.framework.api.local.LocalBlockRunner;
25  import org.apache.giraph.block_app.framework.piece.Piece;
26  import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
27  import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
28  import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
29  import org.apache.giraph.conf.GiraphConfiguration;
30  import org.apache.giraph.conf.GiraphConstants;
31  import org.apache.giraph.graph.Vertex;
32  import org.apache.giraph.reducers.impl.SumReduce;
33  import org.apache.giraph.types.NoMessage;
34  import org.apache.giraph.utils.TestGraph;
35  import org.apache.hadoop.io.BooleanWritable;
36  import org.apache.hadoop.io.LongWritable;
37  import org.apache.hadoop.io.NullWritable;
38  import org.apache.hadoop.io.Writable;
39  import org.apache.hadoop.io.WritableComparable;
40  import org.junit.Assert;
41  import org.junit.Test;
42  import org.python.google.common.collect.Iterables;
43  
44  /**
45   * Test of barebones of Blocks Framework.
46   *
47   * Do not look as an example of unit test, or to learn about the Framework,
48   * there are utilities to do things simpler, that we are not trying to test
49   * here.
50   */
51  public class BlockExecutionTest {
52  
53    private static GiraphConfiguration createConf() {
54      GiraphConfiguration conf = new GiraphConfiguration();
55      GiraphConstants.VERTEX_ID_CLASS.set(conf, LongWritable.class);
56      GiraphConstants.VERTEX_VALUE_CLASS.set(conf, LongWritable.class);
57      GiraphConstants.EDGE_VALUE_CLASS.set(conf, NullWritable.class);
58      return conf;
59    }
60  
61    private static TestGraph<LongWritable, LongWritable, NullWritable> createTestGraph() {
62      TestGraph<LongWritable, LongWritable, NullWritable> graph =
63          new TestGraph<LongWritable, LongWritable, NullWritable>(createConf());
64      graph.addVertex(new LongWritable(1), new LongWritable());
65      graph.addVertex(new LongWritable(2), new LongWritable());
66      graph.addVertex(new LongWritable(3), new LongWritable());
67      graph.addVertex(new LongWritable(4), new LongWritable());
68  
69      graph.addEdge(new LongWritable(1), new LongWritable(2), NullWritable.get());
70      graph.addEdge(new LongWritable(2), new LongWritable(1), NullWritable.get());
71      graph.addEdge(new LongWritable(2), new LongWritable(3), NullWritable.get());
72      graph.addEdge(new LongWritable(3), new LongWritable(2), NullWritable.get());
73      return graph;
74    }
75  
76    @Test
77    public void testMessageSending() {
78      TestGraph<LongWritable, LongWritable, NullWritable> graph = createTestGraph();
79  
80      LocalBlockRunner.runBlock(graph, new Piece<WritableComparable, LongWritable, Writable, BooleanWritable, Object>() {
81        @Override
82        public VertexSender<WritableComparable, LongWritable, Writable> getVertexSender(
83            final BlockWorkerSendApi<WritableComparable, LongWritable, Writable, BooleanWritable> workerApi,
84            Object executionStage) {
85          return new InnerVertexSender() {
86            @Override
87            public void vertexSend(Vertex<WritableComparable, LongWritable, Writable> vertex) {
88              workerApi.sendMessageToAllEdges(vertex, new BooleanWritable());
89            }
90          };
91        }
92  
93        @Override
94        public VertexReceiver<WritableComparable, LongWritable, Writable, BooleanWritable>
95            getVertexReceiver(BlockWorkerReceiveApi<WritableComparable> workerApi,
96                Object executionStage) {
97          return new InnerVertexReceiver() {
98            @Override
99            public void vertexReceive(Vertex<WritableComparable, LongWritable, Writable> vertex,
100               Iterable<BooleanWritable> messages) {
101             vertex.getValue().set(Iterables.size(messages));
102           }
103         };
104       }
105 
106       @Override
107       protected Class<BooleanWritable> getMessageClass() {
108         return BooleanWritable.class;
109       }
110     }, new Object());
111 
112     Assert.assertEquals(1, graph.getVertex(new LongWritable(1)).getValue().get());
113     Assert.assertEquals(2, graph.getVertex(new LongWritable(2)).getValue().get());
114     Assert.assertEquals(1, graph.getVertex(new LongWritable(3)).getValue().get());
115     Assert.assertEquals(0, graph.getVertex(new LongWritable(4)).getValue().get());
116   }
117 
118   @Test
119   public void testReducing() {
120     TestGraph<LongWritable, LongWritable, NullWritable> graph = createTestGraph();
121 
122     final LongWritable value = new LongWritable();
123 
124     LocalBlockRunner.runBlock(graph, new Piece<WritableComparable, Writable, Writable, NoMessage, Object>() {
125       private ReducerHandle<LongWritable, LongWritable> numVertices;
126 
127       @Override
128       public void registerReducers(CreateReducersApi reduceApi, Object executionStage) {
129         numVertices = reduceApi.createLocalReducer(SumReduce.LONG);
130       }
131 
132       @Override
133       public VertexSender<WritableComparable, Writable, Writable> getVertexSender(
134           BlockWorkerSendApi<WritableComparable, Writable, Writable, NoMessage> workerApi,
135           Object executionStage) {
136 
137         return new InnerVertexSender() {
138           @Override
139           public void vertexSend(Vertex<WritableComparable, Writable, Writable> vertex) {
140             numVertices.reduce(new LongWritable(1));
141           }
142         };
143       }
144 
145       @Override
146       public void masterCompute(BlockMasterApi masterApi, Object executionStage) {
147         value.set(numVertices.getReducedValue(masterApi).get());
148       }
149     }, new Object());
150 
151     Assert.assertEquals(4, value.get());
152   }
153 
154   public void testVertexRemoval() {
155     TestGraph<LongWritable, LongWritable, NullWritable> graph = createTestGraph();
156     LocalBlockRunner.runBlock(graph, new Piece<LongWritable, Writable, Writable, NoMessage, Object>() {
157       @Override
158       public VertexSender<LongWritable, Writable, Writable> getVertexSender(
159           final BlockWorkerSendApi<LongWritable, Writable, Writable, NoMessage> workerApi,
160           Object executionStage) {
161         return new InnerVertexSender() {
162           @Override
163           public void vertexSend(Vertex<LongWritable, Writable, Writable> vertex) {
164             long id = vertex.getId().get();
165             if (id == 1 || id == 3) {
166               workerApi.removeVertexRequest(vertex.getId());
167             }
168           }
169         };
170       }
171     }, new Object());
172 
173     Assert.assertNull(graph.getVertex(new LongWritable(1)));
174     Assert.assertNotNull(graph.getVertex(new LongWritable(2)));
175     Assert.assertNull(graph.getVertex(new LongWritable(3)));
176     Assert.assertNotNull(graph.getVertex(new LongWritable(4)));
177   }
178 }