This project has retired. For details please refer to its
Attic page.
TestWorkerMessages xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.framework;
19
20 import java.util.HashSet;
21 import java.util.List;
22
23 import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi;
24 import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
25 import org.apache.giraph.block_app.framework.api.local.LocalBlockRunner;
26 import org.apache.giraph.block_app.framework.block.Block;
27 import org.apache.giraph.block_app.framework.block.SequenceBlock;
28 import org.apache.giraph.block_app.framework.piece.PieceWithWorkerContext;
29 import org.apache.giraph.block_app.test_setup.NumericTestGraph;
30 import org.apache.giraph.block_app.test_setup.TestGraphModifier;
31 import org.apache.giraph.block_app.test_setup.TestGraphUtils;
32 import org.apache.giraph.conf.BulkConfigurator;
33 import org.apache.giraph.conf.GiraphConfiguration;
34 import org.apache.giraph.types.NoMessage;
35 import org.apache.giraph.utils.TestGraph;
36 import org.apache.hadoop.io.LongWritable;
37 import org.apache.hadoop.io.NullWritable;
38 import org.apache.hadoop.io.Writable;
39 import org.apache.hadoop.io.WritableComparable;
40 import org.junit.Assert;
41 import org.junit.Test;
42
43
44
45
46 public class TestWorkerMessages {
47 @Test
48 public void testWorkerMessages() throws Exception {
49 GiraphConfiguration conf = new GiraphConfiguration();
50 BlockUtils.setAndInitBlockFactoryClass(conf, TestWorkerMessagesBlockFactory.class);
51 TestGraph testGraph = new TestGraph(conf);
52 testGraph.addEdge(new LongWritable(1), new LongWritable(2), NullWritable.get());
53 LocalBlockRunner.runApp(testGraph);
54 }
55
56 @Test
57 public void testWithTestSetup() throws Exception {
58 TestGraphUtils.runTest(
59 new TestGraphModifier<WritableComparable, Writable, Writable>() {
60 @Override
61 public void modifyGraph(NumericTestGraph<WritableComparable, Writable, Writable> graph) {
62 graph.addEdge(1, 2);
63 }
64 },
65 null,
66 new BulkConfigurator() {
67 @Override
68 public void configure(GiraphConfiguration conf) {
69 BlockUtils.setBlockFactoryClass(conf, TestWorkerMessagesBlockFactory.class);
70 }
71 });
72 }
73
74 public static class TestWorkerMessagesBlockFactory extends TestLongNullNullBlockFactory {
75 @Override
76 public Block createBlock(GiraphConfiguration conf) {
77 return new SequenceBlock(
78 new TestWorkerMessagesPiece(2, 4, 11),
79 new TestWorkerMessagesPiece(3, 5, 2, 100));
80 }
81 }
82
83 public static class TestWorkerMessagesPiece extends PieceWithWorkerContext<LongWritable,
84 Writable, Writable, NoMessage, Object, LongWritable, Object> {
85 private final HashSet<Long> values;
86
87 public TestWorkerMessagesPiece(long... values) {
88 this.values = new HashSet<>();
89 for (long value : values) {
90 this.values.add(value);
91 }
92 }
93
94 @Override
95 public void workerContextSend(
96 BlockWorkerContextSendApi<LongWritable, LongWritable> workerContextApi,
97 Object executionStage, Object workerValue) {
98 for (long value : values) {
99 workerContextApi.sendMessageToWorker(new LongWritable(value),
100 workerContextApi.getMyWorkerIndex());
101 }
102 }
103
104 @Override
105 public void workerContextReceive(BlockWorkerContextReceiveApi workerContextApi,
106 Object executionStage, Object workerValue, List<LongWritable> workerMessages) {
107 Assert.assertEquals(values.size(), workerMessages.size());
108 for (LongWritable workerMessage : workerMessages) {
109 Assert.assertTrue(values.remove(workerMessage.get()));
110 }
111 }
112 }
113 }