View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.framework;
19  
20  import org.apache.giraph.block_app.framework.api.BlockMasterApi;
21  import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
22  import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
23  import org.apache.giraph.block_app.framework.api.CreateReducersApi;
24  import org.apache.giraph.block_app.framework.api.local.LocalBlockRunner;
25  import org.apache.giraph.block_app.framework.block.Block;
26  import org.apache.giraph.block_app.framework.block.RepeatUntilBlock;
27  import org.apache.giraph.block_app.framework.block.SequenceBlock;
28  import org.apache.giraph.block_app.framework.piece.Piece;
29  import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
30  import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
31  import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
32  import org.apache.giraph.block_app.library.Pieces;
33  import org.apache.giraph.conf.GiraphConfiguration;
34  import org.apache.giraph.conf.GiraphConstants;
35  import org.apache.giraph.function.Consumer;
36  import org.apache.giraph.function.ObjectTransfer;
37  import org.apache.giraph.function.primitive.PrimitiveRefs.IntRef;
38  import org.apache.giraph.graph.Vertex;
39  import org.apache.giraph.reducers.impl.SumReduce;
40  import org.apache.giraph.types.NoMessage;
41  import org.apache.giraph.utils.TestGraph;
42  import org.apache.hadoop.io.BooleanWritable;
43  import org.apache.hadoop.io.LongWritable;
44  import org.apache.hadoop.io.NullWritable;
45  import org.apache.hadoop.io.Writable;
46  import org.apache.hadoop.io.WritableComparable;
47  import org.junit.Assert;
48  import org.junit.Test;
49  import org.python.google.common.collect.Iterables;
50  
51  /**
52   * Test of barebones of Blocks Framework.
53   *
54   * Do not look as an example of unit test, or to learn about the Framework,
55   * there are utilities to do things simpler, that we are not trying to test
56   * here.
57   */
58  public class BlockExecutionTest {
59  
60    private static GiraphConfiguration createConf() {
61      GiraphConfiguration conf = new GiraphConfiguration();
62      GiraphConstants.VERTEX_ID_CLASS.set(conf, LongWritable.class);
63      GiraphConstants.VERTEX_VALUE_CLASS.set(conf, LongWritable.class);
64      GiraphConstants.EDGE_VALUE_CLASS.set(conf, NullWritable.class);
65      return conf;
66    }
67  
68    private static TestGraph<LongWritable, LongWritable, NullWritable> createTestGraph() {
69      TestGraph<LongWritable, LongWritable, NullWritable> graph =
70          new TestGraph<LongWritable, LongWritable, NullWritable>(createConf());
71      graph.addVertex(new LongWritable(1), new LongWritable());
72      graph.addVertex(new LongWritable(2), new LongWritable());
73      graph.addVertex(new LongWritable(3), new LongWritable());
74      graph.addVertex(new LongWritable(4), new LongWritable());
75  
76      graph.addEdge(new LongWritable(1), new LongWritable(2), NullWritable.get());
77      graph.addEdge(new LongWritable(2), new LongWritable(1), NullWritable.get());
78      graph.addEdge(new LongWritable(2), new LongWritable(3), NullWritable.get());
79      graph.addEdge(new LongWritable(3), new LongWritable(2), NullWritable.get());
80      return graph;
81    }
82  
83    @Test
84    public void testMessageSending() {
85      TestGraph<LongWritable, LongWritable, NullWritable> graph = createTestGraph();
86  
87      LocalBlockRunner.runBlock(graph, new Piece<WritableComparable, LongWritable, Writable, BooleanWritable, Object>() {
88        @Override
89        public VertexSender<WritableComparable, LongWritable, Writable> getVertexSender(
90            final BlockWorkerSendApi<WritableComparable, LongWritable, Writable, BooleanWritable> workerApi,
91            Object executionStage) {
92          return new InnerVertexSender() {
93            @Override
94            public void vertexSend(Vertex<WritableComparable, LongWritable, Writable> vertex) {
95              workerApi.sendMessageToAllEdges(vertex, new BooleanWritable());
96            }
97          };
98        }
99  
100       @Override
101       public VertexReceiver<WritableComparable, LongWritable, Writable, BooleanWritable>
102           getVertexReceiver(BlockWorkerReceiveApi<WritableComparable> workerApi,
103               Object executionStage) {
104         return new InnerVertexReceiver() {
105           @Override
106           public void vertexReceive(Vertex<WritableComparable, LongWritable, Writable> vertex,
107               Iterable<BooleanWritable> messages) {
108             vertex.getValue().set(Iterables.size(messages));
109           }
110         };
111       }
112 
113       @Override
114       protected Class<BooleanWritable> getMessageClass() {
115         return BooleanWritable.class;
116       }
117     }, new Object());
118 
119     Assert.assertEquals(1, graph.getVertex(new LongWritable(1)).getValue().get());
120     Assert.assertEquals(2, graph.getVertex(new LongWritable(2)).getValue().get());
121     Assert.assertEquals(1, graph.getVertex(new LongWritable(3)).getValue().get());
122     Assert.assertEquals(0, graph.getVertex(new LongWritable(4)).getValue().get());
123   }
124 
125   @Test
126   public void testReducing() {
127     TestGraph<LongWritable, LongWritable, NullWritable> graph = createTestGraph();
128 
129     final LongWritable value = new LongWritable();
130 
131     LocalBlockRunner.runBlock(graph, new Piece<WritableComparable, Writable, Writable, NoMessage, Object>() {
132       private ReducerHandle<LongWritable, LongWritable> numVertices;
133 
134       @Override
135       public void registerReducers(CreateReducersApi reduceApi, Object executionStage) {
136         numVertices = reduceApi.createLocalReducer(SumReduce.LONG);
137       }
138 
139       @Override
140       public VertexSender<WritableComparable, Writable, Writable> getVertexSender(
141           BlockWorkerSendApi<WritableComparable, Writable, Writable, NoMessage> workerApi,
142           Object executionStage) {
143 
144         return new InnerVertexSender() {
145           @Override
146           public void vertexSend(Vertex<WritableComparable, Writable, Writable> vertex) {
147             numVertices.reduce(new LongWritable(1));
148           }
149         };
150       }
151 
152       @Override
153       public void masterCompute(BlockMasterApi masterApi, Object executionStage) {
154         value.set(numVertices.getReducedValue(masterApi).get());
155       }
156     }, new Object());
157 
158     Assert.assertEquals(4, value.get());
159   }
160 
161   public void testVertexRemoval() {
162     TestGraph<LongWritable, LongWritable, NullWritable> graph = createTestGraph();
163     LocalBlockRunner.runBlock(graph, new Piece<LongWritable, Writable, Writable, NoMessage, Object>() {
164       @Override
165       public VertexSender<LongWritable, Writable, Writable> getVertexSender(
166           final BlockWorkerSendApi<LongWritable, Writable, Writable, NoMessage> workerApi,
167           Object executionStage) {
168         return new InnerVertexSender() {
169           @Override
170           public void vertexSend(Vertex<LongWritable, Writable, Writable> vertex) {
171             long id = vertex.getId().get();
172             if (id == 1 || id == 3) {
173               workerApi.removeVertexRequest(vertex.getId());
174             }
175           }
176         };
177       }
178     }, new Object());
179 
180     Assert.assertNull(graph.getVertex(new LongWritable(1)));
181     Assert.assertNotNull(graph.getVertex(new LongWritable(2)));
182     Assert.assertNull(graph.getVertex(new LongWritable(3)));
183     Assert.assertNotNull(graph.getVertex(new LongWritable(4)));
184   }
185 
186   @Test
187   public void testRepeatUntilBlockFinishCurrentLoop() throws Exception {
188     final ObjectTransfer<Boolean> toQuit = new ObjectTransfer<>();
189     final IntRef counter = new IntRef(5);
190     Block counterPiece = Pieces.masterCompute("Count", new Consumer<BlockMasterApi>() {
191       @Override
192       public void apply(BlockMasterApi input) {
193         counter.value--;
194         if (counter.value == 0) {
195           toQuit.apply(true);
196         }
197       }
198     });
199     Block innerBlock = new SequenceBlock(counterPiece, counterPiece, counterPiece, counterPiece);
200     Block repeatBlock = RepeatUntilBlock.unlimited(
201       innerBlock,
202       toQuit
203     );
204 
205     LocalBlockRunner.runBlock(createTestGraph(), repeatBlock, new Object());
206 
207     Assert.assertEquals(-3, counter.value);
208   }
209 
210 }