This project has retired. For details please refer to its
Attic page.
TestMessageStores xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm;
20
21 import static org.junit.Assert.assertTrue;
22
23 import java.io.BufferedInputStream;
24 import java.io.BufferedOutputStream;
25 import java.io.DataInputStream;
26 import java.io.DataOutputStream;
27 import java.io.File;
28 import java.io.FileInputStream;
29 import java.io.FileOutputStream;
30 import java.io.IOException;
31 import java.util.Collection;
32 import java.util.Map;
33 import java.util.Map.Entry;
34 import java.util.Random;
35 import java.util.SortedMap;
36 import java.util.TreeMap;
37 import java.util.TreeSet;
38
39 import org.apache.giraph.bsp.CentralizedServiceWorker;
40 import org.apache.giraph.comm.messages.ByteArrayMessagesPerVertexStore;
41 import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
42 import org.apache.giraph.comm.messages.MessageStore;
43 import org.apache.giraph.comm.messages.MessageStoreFactory;
44 import org.apache.giraph.conf.DefaultMessageClasses;
45 import org.apache.giraph.conf.GiraphConfiguration;
46 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
47 import org.apache.giraph.factories.DefaultMessageValueFactory;
48 import org.apache.giraph.factories.TestMessageValueFactory;
49 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
50 import org.apache.giraph.utils.CollectionUtils;
51 import org.apache.giraph.utils.IntNoOpComputation;
52 import org.apache.giraph.utils.MockUtils;
53 import org.apache.hadoop.conf.Configuration;
54 import org.apache.hadoop.io.IntWritable;
55 import org.apache.hadoop.io.Writable;
56 import org.apache.hadoop.io.WritableComparable;
57 import org.junit.After;
58 import org.junit.Before;
59 import org.junit.Test;
60
61 import com.google.common.collect.Iterables;
62 import com.google.common.collect.Lists;
63 import com.google.common.collect.Sets;
64 import com.google.common.io.Files;
65
66
67 public class TestMessageStores {
68 private static File directory;
69 private static ImmutableClassesGiraphConfiguration<IntWritable,
70 IntWritable, IntWritable> config;
71 private static TestData testData;
72 private static
73 CentralizedServiceWorker<IntWritable, IntWritable, IntWritable> service;
74
75
76
77
78 private static final Random RANDOM = new Random(101);
79
80 @Before
81 public void prepare() {
82 Configuration.addDefaultResource("giraph-site.xml");
83 GiraphConfiguration initConfig = new GiraphConfiguration();
84 initConfig.setComputationClass(IntNoOpComputation.class);
85 config = new ImmutableClassesGiraphConfiguration<IntWritable,
86 IntWritable, IntWritable>(initConfig);
87
88 testData = new TestData();
89 testData.maxId = 1000000;
90 testData.maxMessage = 1000000;
91 testData.maxNumberOfMessages = 100;
92 testData.numVertices = 50;
93 testData.numTimes = 10;
94 testData.numOfPartitions = 5;
95 testData.maxMessagesInMemory = 20;
96
97 service =
98 MockUtils.mockServiceGetVertexPartitionOwner(testData.numOfPartitions);
99 }
100
101 @After
102 public void cleanUp() {
103 }
104
105 private static class TestData {
106 int numTimes;
107 int numVertices;
108 int maxNumberOfMessages;
109 int maxId;
110 int maxMessage;
111 int numOfPartitions;
112 int maxMessagesInMemory;
113 }
114
115 private SortedMap<IntWritable, Collection<IntWritable>> createRandomMessages(
116 TestData testData) {
117 SortedMap<IntWritable, Collection<IntWritable>> allMessages =
118 new TreeMap<IntWritable, Collection<IntWritable>>();
119 for (int v = 0; v < testData.numVertices; v++) {
120 int messageNum =
121 (int) (RANDOM.nextFloat() * testData.maxNumberOfMessages);
122 Collection<IntWritable> vertexMessages = Lists.newArrayList();
123 for (int m = 0; m < messageNum; m++) {
124 vertexMessages.add(
125 new IntWritable((int) (RANDOM.nextFloat() * testData.maxMessage)));
126 }
127 IntWritable vertexId =
128 new IntWritable((int) (RANDOM.nextFloat() * testData.maxId));
129 allMessages.put(vertexId, vertexMessages);
130 }
131 return allMessages;
132 }
133
134 private static void addMessages(
135 MessageStore<IntWritable, IntWritable> messageStore,
136 CentralizedServiceWorker<IntWritable, ?, ?> service,
137 ImmutableClassesGiraphConfiguration<IntWritable, ?, ?> config,
138 Map<IntWritable, Collection<IntWritable>> inputMap) {
139 for (Map.Entry<IntWritable, Collection<IntWritable>> entry :
140 inputMap.entrySet()) {
141 int partitionId =
142 service.getVertexPartitionOwner(entry.getKey()).getPartitionId();
143 ByteArrayVertexIdMessages<IntWritable, IntWritable>
144 byteArrayVertexIdMessages =
145 new ByteArrayVertexIdMessages<IntWritable, IntWritable>(
146 new TestMessageValueFactory(IntWritable.class));
147 byteArrayVertexIdMessages.setConf(config);
148 byteArrayVertexIdMessages.initialize();
149 for (IntWritable message : entry.getValue()) {
150 byteArrayVertexIdMessages.add(entry.getKey(), message);
151 }
152 messageStore.addPartitionMessages(partitionId, byteArrayVertexIdMessages);
153 }
154 }
155
156 private void putNTimes(
157 MessageStore<IntWritable, IntWritable> messageStore,
158 Map<IntWritable, Collection<IntWritable>> messages,
159 TestData testData) {
160 for (int n = 0; n < testData.numTimes; n++) {
161 SortedMap<IntWritable, Collection<IntWritable>> batch =
162 createRandomMessages(testData);
163 addMessages(messageStore, service, config, batch);
164 for (Entry<IntWritable, Collection<IntWritable>> entry :
165 batch.entrySet()) {
166 if (messages.containsKey(entry.getKey())) {
167 messages.get(entry.getKey()).addAll(entry.getValue());
168 } else {
169 messages.put(entry.getKey(), entry.getValue());
170 }
171 }
172 }
173 }
174
175 private <I extends WritableComparable, M extends Writable> boolean
176 equalMessages(
177 MessageStore<I, M> messageStore,
178 Map<I, Collection<M>> expectedMessages,
179 TestData testData) {
180 for (int partitionId = 0; partitionId < testData.numOfPartitions;
181 partitionId++) {
182 TreeSet<I> vertexIds = Sets.newTreeSet();
183 Iterables.addAll(vertexIds,
184 messageStore.getPartitionDestinationVertices(partitionId));
185 for (I vertexId : vertexIds) {
186 Iterable<M> expected = expectedMessages.get(vertexId);
187 if (expected == null) {
188 return false;
189 }
190 Iterable<M> actual = messageStore.getVertexMessages(vertexId);
191 if (!CollectionUtils.isEqual(expected, actual)) {
192 System.err.println("equalMessages: For vertexId " + vertexId +
193 " expected " + expected + ", but got " + actual);
194 return false;
195 }
196 }
197 }
198 return true;
199 }
200
201 private <S extends MessageStore<IntWritable, IntWritable>> S doCheckpoint(
202 MessageStoreFactory<IntWritable, IntWritable, S> messageStoreFactory,
203 S messageStore, TestData testData) throws IOException {
204 File file = new File(directory, "messageStoreTest");
205 if (file.exists()) {
206 file.delete();
207 }
208 file.createNewFile();
209 DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
210 (new FileOutputStream(file))));
211 for (int partitionId = 0; partitionId < testData.numOfPartitions;
212 partitionId++) {
213 messageStore.writePartition(out, partitionId);
214 }
215 out.close();
216
217 messageStore = (S) messageStoreFactory.newStore(
218 new DefaultMessageClasses(
219 IntWritable.class,
220 DefaultMessageValueFactory.class,
221 null,
222 MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION));
223
224 DataInputStream in = new DataInputStream(new BufferedInputStream(
225 (new FileInputStream(file))));
226 for (int partitionId = 0; partitionId < testData.numOfPartitions;
227 partitionId++) {
228 messageStore.readFieldsForPartition(in, partitionId);
229 }
230 in.close();
231 file.delete();
232
233 return messageStore;
234 }
235
236 private <S extends MessageStore<IntWritable, IntWritable>> void
237 testMessageStore(
238 MessageStoreFactory<IntWritable, IntWritable, S> messageStoreFactory,
239 TestData testData) throws IOException {
240 SortedMap<IntWritable, Collection<IntWritable>> messages =
241 new TreeMap<IntWritable, Collection<IntWritable>>();
242 S messageStore = (S) messageStoreFactory.newStore(
243 new DefaultMessageClasses(
244 IntWritable.class,
245 DefaultMessageValueFactory.class,
246 null,
247 MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION));
248 putNTimes(messageStore, messages, testData);
249 assertTrue(equalMessages(messageStore, messages, testData));
250 messageStore.clearAll();
251 messageStore = doCheckpoint(messageStoreFactory, messageStore, testData);
252 assertTrue(equalMessages(messageStore, messages, testData));
253 messageStore.clearAll();
254 }
255
256 @Test
257 public void testByteArrayMessagesPerVertexStore() {
258 try {
259 testMessageStore(
260 ByteArrayMessagesPerVertexStore.<IntWritable, IntWritable>newFactory(
261 service, config),
262 testData);
263 } catch (IOException e) {
264 e.printStackTrace();
265 }
266 }
267 }