This project has retired. For details please refer to its
Attic page.
BlockApiHandleTest xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.framework;
19
20 import org.apache.giraph.block_app.framework.api.BlockApiHandle;
21 import org.apache.giraph.block_app.framework.api.BlockMasterApi;
22 import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi;
23 import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
24 import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
25 import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
26 import org.apache.giraph.block_app.framework.api.local.LocalBlockRunner;
27 import org.apache.giraph.block_app.framework.block.Block;
28 import org.apache.giraph.block_app.framework.block.BlockWithApiHandle;
29 import org.apache.giraph.block_app.framework.block.PieceCount;
30 import org.apache.giraph.block_app.framework.piece.AbstractPiece;
31 import org.apache.giraph.block_app.framework.piece.DefaultParentPiece;
32 import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
33 import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
34 import org.apache.giraph.conf.GiraphConfiguration;
35 import org.apache.giraph.conf.GiraphConstants;
36 import org.apache.giraph.function.Consumer;
37 import org.apache.giraph.graph.Vertex;
38 import org.apache.giraph.utils.TestGraph;
39 import org.apache.hadoop.io.DoubleWritable;
40 import org.apache.hadoop.io.LongWritable;
41 import org.apache.hadoop.io.NullWritable;
42 import org.apache.hadoop.io.Writable;
43 import org.apache.hadoop.io.WritableComparable;
44 import org.junit.Test;
45
46 import java.util.Iterator;
47 import java.util.List;
48
49 import static org.junit.Assert.assertEquals;
50 import static org.junit.Assert.assertFalse;
51
52
53
54
55 public class BlockApiHandleTest {
56
57 private static GiraphConfiguration createConf() {
58 GiraphConfiguration conf = new GiraphConfiguration();
59 GiraphConstants.VERTEX_ID_CLASS.set(conf, LongWritable.class);
60 GiraphConstants.VERTEX_VALUE_CLASS.set(conf, LongWritable.class);
61 GiraphConstants.EDGE_VALUE_CLASS.set(conf, NullWritable.class);
62 return conf;
63 }
64
65 private static TestGraph<LongWritable, LongWritable, NullWritable>
66 createTestGraph() {
67 TestGraph<LongWritable, LongWritable, NullWritable> graph =
68 new TestGraph<>(createConf());
69 graph.addVertex(new LongWritable(1), new LongWritable());
70 graph.addVertex(new LongWritable(2), new LongWritable());
71 graph.addVertex(new LongWritable(3), new LongWritable());
72 graph.addVertex(new LongWritable(4), new LongWritable());
73 graph.addEdge(new LongWritable(1), new LongWritable(2), NullWritable.get());
74 graph.addEdge(new LongWritable(2), new LongWritable(1), NullWritable.get());
75 graph.addEdge(new LongWritable(2), new LongWritable(3), NullWritable.get());
76 graph.addEdge(new LongWritable(3), new LongWritable(2), NullWritable.get());
77 return graph;
78 }
79
80 public static class DummyObjectWithApiHandle {
81
82 private BlockApiHandle handle;
83
84 public DummyObjectWithApiHandle(BlockApiHandle handle) {
85 this.handle = handle;
86 }
87
88 public void doSomethingAtWorker() {
89
90 assertFalse(handle.isMasterApiSet());
91 assertFalse(handle.isWorkerContextReceiveApiSet());
92 assertFalse(handle.isWorkerContextSendApiSet());
93 assertEquals(1, handle.getWorkerReceiveApi().getWorkerCount());
94 assertEquals(0, handle.getWorkerSendApi().getMyWorkerIndex());
95 }
96
97 public void doSomethingAtWorkerContext() {
98
99 assertFalse(handle.isMasterApiSet());
100 assertFalse(handle.isWorkerReceiveApiSet());
101 assertFalse(handle.isWorkerSendApiSet());
102 assertEquals(1, handle.getWorkerContextReceiveApi().getWorkerCount());
103 assertEquals(0, handle.getWorkerContextSendApi().getMyWorkerIndex());
104 }
105
106 public void doSomethingAtMaster() {
107
108 assertEquals(1, handle.getMasterApi().getWorkerCount());
109 assertFalse(handle.isWorkerReceiveApiSet());
110 assertFalse(handle.isWorkerSendApiSet());
111 assertFalse(handle.isWorkerContextReceiveApiSet());
112 assertFalse(handle.isWorkerContextSendApiSet());
113 }
114 }
115
116 public static class TestPiece extends DefaultParentPiece<WritableComparable,
117 LongWritable, Writable, NullWritable, Object, DoubleWritable, Object> {
118
119 private DummyObjectWithApiHandle object;
120
121 public TestPiece(DummyObjectWithApiHandle object) {
122 this.object = object;
123 }
124
125 @Override
126 public VertexSender<WritableComparable, LongWritable, Writable>
127 getVertexSender(final BlockWorkerSendApi<WritableComparable, LongWritable,
128 Writable, NullWritable> workerApi, Object executionStage) {
129 return new InnerVertexSender() {
130 @Override
131 public void vertexSend(
132 Vertex<WritableComparable, LongWritable, Writable> vertex) {
133 object.doSomethingAtWorker();
134 }
135 };
136 }
137
138 @Override
139 public VertexReceiver<WritableComparable, LongWritable, Writable,
140 NullWritable> getVertexReceiver(
141 BlockWorkerReceiveApi<WritableComparable> workerApi,
142 Object executionStage) {
143 return new InnerVertexReceiver() {
144 @Override
145 public void vertexReceive(
146 Vertex<WritableComparable, LongWritable, Writable> vertex,
147 Iterable<NullWritable> messages) {
148 object.doSomethingAtWorker();
149 }
150 };
151 }
152
153 public void workerContextSend(BlockWorkerContextSendApi<WritableComparable,
154 DoubleWritable> workerContextApi, Object executionStage,
155 Writable workerValue) {
156 object.doSomethingAtWorkerContext();
157 }
158
159
160
161
162
163
164
165 public void workerContextReceive(
166 BlockWorkerContextReceiveApi workerContextApi, Object executionStage,
167 Object workerValue, List<DoubleWritable> workerMessages) {
168 object.doSomethingAtWorkerContext();
169 }
170
171 @Override
172 public void masterCompute(BlockMasterApi masterApi, Object executionStage) {
173 object.doSomethingAtMaster();
174 }
175
176 @Override
177 protected Class<NullWritable> getMessageClass() {
178 return NullWritable.class;
179 }
180 }
181
182 @Test
183 public void testBlockApiHandle() {
184 TestGraph<LongWritable, LongWritable, NullWritable> graph =
185 createTestGraph();
186
187 final BlockApiHandle handle = new BlockApiHandle();
188 final DefaultParentPiece piece =
189 new TestPiece(new DummyObjectWithApiHandle(handle));
190
191 Block block = new BlockWithApiHandle() {
192 @Override
193 public Iterator<AbstractPiece> iterator() {
194 return piece.iterator();
195 }
196
197 @Override
198 public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
199 piece.forAllPossiblePieces(consumer);
200 }
201
202 @Override
203 public PieceCount getPieceCount() {
204 return piece.getPieceCount();
205 }
206
207 @Override
208 public BlockApiHandle getBlockApiHandle() {
209 return handle;
210 }
211 };
212
213 BlockUtils.BLOCK_WORKER_CONTEXT_VALUE_CLASS.set(
214 graph.getConf(), Object.class);
215 LocalBlockRunner.runBlock(graph, block, new Object());
216 }
217 }