This project has retired. For details please refer to its
Attic page.
AsyncMessageStoreWrapperTest xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.messages.queue;
20
21 import static org.junit.Assert.assertArrayEquals;
22 import static org.junit.Assert.assertNotNull;
23
24 import java.io.DataInput;
25 import java.io.DataOutput;
26 import java.io.IOException;
27 import java.util.Arrays;
28
29 import org.apache.giraph.comm.messages.MessageStore;
30 import org.apache.giraph.factories.TestMessageValueFactory;
31 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
32 import org.apache.giraph.utils.VertexIdMessages;
33 import org.apache.hadoop.io.IntWritable;
34 import org.apache.hadoop.io.LongWritable;
35 import org.junit.Test;
36
37
38
39
40 public class AsyncMessageStoreWrapperTest {
41
42
43 @Test
44 public void testAsyncQueue() {
45 TestMessageStore store = new TestMessageStore();
46
47 AsyncMessageStoreWrapper<LongWritable, IntWritable> queue =
48 new AsyncMessageStoreWrapper<>(store,
49 Arrays.asList(0, 1, 2, 3, 4), 2);
50
51 for (int i = 0; i < 1000; i++) {
52 queue.addPartitionMessages(i % 5, new ByteArrayVertexIdMessages<LongWritable, IntWritable>(new TestMessageValueFactory<>(IntWritable.class)));
53 }
54
55 queue.waitToComplete();
56
57 assertArrayEquals(new int[] {200, 200, 200, 200, 200}, store.counters);
58
59 queue.clearAll();
60 }
61
62
63 static class TestMessageStore implements MessageStore<LongWritable, IntWritable> {
64
65 private int counters[] = new int[5];
66
67 @Override
68 public void addPartitionMessages(int partition, VertexIdMessages messages) {
69 assertNotNull(messages);
70 counters[partition]++;
71 }
72
73 @Override
74 public void addMessage(LongWritable vertexId, IntWritable message) throws IOException {
75 }
76
77 @Override
78 public boolean isPointerListEncoding() {
79 return false;
80 }
81
82 @Override
83 public Iterable<IntWritable> getVertexMessages(LongWritable vertexId) {
84 return null;
85 }
86
87 @Override
88 public void clearVertexMessages(LongWritable vertexId) {
89
90 }
91
92 @Override
93 public void clearAll() {
94
95 }
96
97 @Override
98 public boolean hasMessagesForVertex(LongWritable vertexId) {
99 return false;
100 }
101
102 @Override
103 public boolean hasMessagesForPartition(int partitionId) {
104 return false;
105 }
106
107 @Override
108 public void finalizeStore() {
109
110 }
111
112 @Override
113 public Iterable<LongWritable> getPartitionDestinationVertices(int partitionId) {
114 return null;
115 }
116
117 @Override
118 public void clearPartition(int partitionId) {
119
120 }
121
122 @Override
123 public void writePartition(DataOutput out, int partitionId) throws IOException {
124
125 }
126
127 @Override
128 public void readFieldsForPartition(DataInput in, int partitionId) throws IOException {
129
130 }
131
132 }
133 }