This project has retired. For details please refer to its
Attic page.
BlockExecutionTest xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.framework;
19
20 import org.apache.giraph.block_app.framework.api.BlockMasterApi;
21 import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
22 import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
23 import org.apache.giraph.block_app.framework.api.CreateReducersApi;
24 import org.apache.giraph.block_app.framework.api.local.LocalBlockRunner;
25 import org.apache.giraph.block_app.framework.block.Block;
26 import org.apache.giraph.block_app.framework.block.RepeatUntilBlock;
27 import org.apache.giraph.block_app.framework.block.SequenceBlock;
28 import org.apache.giraph.block_app.framework.piece.Piece;
29 import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
30 import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
31 import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
32 import org.apache.giraph.block_app.library.Pieces;
33 import org.apache.giraph.conf.GiraphConfiguration;
34 import org.apache.giraph.conf.GiraphConstants;
35 import org.apache.giraph.function.Consumer;
36 import org.apache.giraph.function.ObjectTransfer;
37 import org.apache.giraph.function.primitive.PrimitiveRefs.IntRef;
38 import org.apache.giraph.graph.Vertex;
39 import org.apache.giraph.reducers.impl.SumReduce;
40 import org.apache.giraph.types.NoMessage;
41 import org.apache.giraph.utils.TestGraph;
42 import org.apache.hadoop.io.BooleanWritable;
43 import org.apache.hadoop.io.LongWritable;
44 import org.apache.hadoop.io.NullWritable;
45 import org.apache.hadoop.io.Writable;
46 import org.apache.hadoop.io.WritableComparable;
47 import org.junit.Assert;
48 import org.junit.Test;
49 import com.google.common.collect.Iterables;
50
51
52
53
54
55
56
57
58 public class BlockExecutionTest {
59
60 private static GiraphConfiguration createConf() {
61 GiraphConfiguration conf = new GiraphConfiguration();
62 GiraphConstants.VERTEX_ID_CLASS.set(conf, LongWritable.class);
63 GiraphConstants.VERTEX_VALUE_CLASS.set(conf, LongWritable.class);
64 GiraphConstants.EDGE_VALUE_CLASS.set(conf, NullWritable.class);
65 return conf;
66 }
67
68 private static TestGraph<LongWritable, LongWritable, NullWritable> createTestGraph() {
69 TestGraph<LongWritable, LongWritable, NullWritable> graph =
70 new TestGraph<LongWritable, LongWritable, NullWritable>(createConf());
71 graph.addVertex(new LongWritable(1), new LongWritable());
72 graph.addVertex(new LongWritable(2), new LongWritable());
73 graph.addVertex(new LongWritable(3), new LongWritable());
74 graph.addVertex(new LongWritable(4), new LongWritable());
75
76 graph.addEdge(new LongWritable(1), new LongWritable(2), NullWritable.get());
77 graph.addEdge(new LongWritable(2), new LongWritable(1), NullWritable.get());
78 graph.addEdge(new LongWritable(2), new LongWritable(3), NullWritable.get());
79 graph.addEdge(new LongWritable(3), new LongWritable(2), NullWritable.get());
80 return graph;
81 }
82
83 @Test
84 public void testMessageSending() {
85 TestGraph<LongWritable, LongWritable, NullWritable> graph = createTestGraph();
86
87 LocalBlockRunner.runBlock(graph, new Piece<WritableComparable, LongWritable, Writable, BooleanWritable, Object>() {
88 @Override
89 public VertexSender<WritableComparable, LongWritable, Writable> getVertexSender(
90 final BlockWorkerSendApi<WritableComparable, LongWritable, Writable, BooleanWritable> workerApi,
91 Object executionStage) {
92 return new InnerVertexSender() {
93 @Override
94 public void vertexSend(Vertex<WritableComparable, LongWritable, Writable> vertex) {
95 workerApi.sendMessageToAllEdges(vertex, new BooleanWritable());
96 }
97 };
98 }
99
100 @Override
101 public VertexReceiver<WritableComparable, LongWritable, Writable, BooleanWritable>
102 getVertexReceiver(BlockWorkerReceiveApi<WritableComparable> workerApi,
103 Object executionStage) {
104 return new InnerVertexReceiver() {
105 @Override
106 public void vertexReceive(Vertex<WritableComparable, LongWritable, Writable> vertex,
107 Iterable<BooleanWritable> messages) {
108 vertex.getValue().set(Iterables.size(messages));
109 }
110 };
111 }
112
113 @Override
114 protected Class<BooleanWritable> getMessageClass() {
115 return BooleanWritable.class;
116 }
117 }, new Object());
118
119 Assert.assertEquals(1, graph.getVertex(new LongWritable(1)).getValue().get());
120 Assert.assertEquals(2, graph.getVertex(new LongWritable(2)).getValue().get());
121 Assert.assertEquals(1, graph.getVertex(new LongWritable(3)).getValue().get());
122 Assert.assertEquals(0, graph.getVertex(new LongWritable(4)).getValue().get());
123 }
124
125 @Test
126 public void testReducing() {
127 TestGraph<LongWritable, LongWritable, NullWritable> graph = createTestGraph();
128
129 final LongWritable value = new LongWritable();
130
131 LocalBlockRunner.runBlock(graph, new Piece<WritableComparable, Writable, Writable, NoMessage, Object>() {
132 private ReducerHandle<LongWritable, LongWritable> numVertices;
133
134 @Override
135 public void registerReducers(CreateReducersApi reduceApi, Object executionStage) {
136 numVertices = reduceApi.createLocalReducer(SumReduce.LONG);
137 }
138
139 @Override
140 public VertexSender<WritableComparable, Writable, Writable> getVertexSender(
141 BlockWorkerSendApi<WritableComparable, Writable, Writable, NoMessage> workerApi,
142 Object executionStage) {
143
144 return new InnerVertexSender() {
145 @Override
146 public void vertexSend(Vertex<WritableComparable, Writable, Writable> vertex) {
147 numVertices.reduce(new LongWritable(1));
148 }
149 };
150 }
151
152 @Override
153 public void masterCompute(BlockMasterApi masterApi, Object executionStage) {
154 value.set(numVertices.getReducedValue(masterApi).get());
155 }
156 }, new Object());
157
158 Assert.assertEquals(4, value.get());
159 }
160
161 public void testVertexRemoval() {
162 TestGraph<LongWritable, LongWritable, NullWritable> graph = createTestGraph();
163 LocalBlockRunner.runBlock(graph, new Piece<LongWritable, Writable, Writable, NoMessage, Object>() {
164 @Override
165 public VertexSender<LongWritable, Writable, Writable> getVertexSender(
166 final BlockWorkerSendApi<LongWritable, Writable, Writable, NoMessage> workerApi,
167 Object executionStage) {
168 return new InnerVertexSender() {
169 @Override
170 public void vertexSend(Vertex<LongWritable, Writable, Writable> vertex) {
171 long id = vertex.getId().get();
172 if (id == 1 || id == 3) {
173 workerApi.removeVertexRequest(vertex.getId());
174 }
175 }
176 };
177 }
178 }, new Object());
179
180 Assert.assertNull(graph.getVertex(new LongWritable(1)));
181 Assert.assertNotNull(graph.getVertex(new LongWritable(2)));
182 Assert.assertNull(graph.getVertex(new LongWritable(3)));
183 Assert.assertNotNull(graph.getVertex(new LongWritable(4)));
184 }
185
186 @Test
187 public void testRepeatUntilBlockFinishCurrentLoop() throws Exception {
188 final ObjectTransfer<Boolean> toQuit = new ObjectTransfer<>();
189 final IntRef counter = new IntRef(5);
190 Block counterPiece = Pieces.masterCompute("Count", new Consumer<BlockMasterApi>() {
191 @Override
192 public void apply(BlockMasterApi input) {
193 counter.value--;
194 if (counter.value == 0) {
195 toQuit.apply(true);
196 }
197 }
198 });
199 Block innerBlock = new SequenceBlock(counterPiece, counterPiece, counterPiece, counterPiece);
200 Block repeatBlock = RepeatUntilBlock.unlimited(
201 innerBlock,
202 toQuit
203 );
204
205 LocalBlockRunner.runBlock(createTestGraph(), repeatBlock, new Object());
206
207 Assert.assertEquals(-3, counter.value);
208 }
209
210 }