This project has retired. For details please refer to its Attic page.
TestMessageStores xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm;
20  
21  import static org.junit.Assert.assertTrue;
22  
23  import java.io.BufferedInputStream;
24  import java.io.BufferedOutputStream;
25  import java.io.DataInputStream;
26  import java.io.DataOutputStream;
27  import java.io.File;
28  import java.io.FileInputStream;
29  import java.io.FileOutputStream;
30  import java.io.IOException;
31  import java.util.Collection;
32  import java.util.Map;
33  import java.util.Map.Entry;
34  import java.util.Random;
35  import java.util.SortedMap;
36  import java.util.TreeMap;
37  import java.util.TreeSet;
38  
39  import org.apache.giraph.bsp.CentralizedServiceWorker;
40  import org.apache.giraph.comm.messages.ByteArrayMessagesPerVertexStore;
41  import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
42  import org.apache.giraph.comm.messages.MessageStore;
43  import org.apache.giraph.comm.messages.MessageStoreFactory;
44  import org.apache.giraph.conf.DefaultMessageClasses;
45  import org.apache.giraph.conf.GiraphConfiguration;
46  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
47  import org.apache.giraph.factories.DefaultMessageValueFactory;
48  import org.apache.giraph.factories.TestMessageValueFactory;
49  import org.apache.giraph.utils.ByteArrayVertexIdMessages;
50  import org.apache.giraph.utils.CollectionUtils;
51  import org.apache.giraph.utils.IntNoOpComputation;
52  import org.apache.giraph.utils.MockUtils;
53  import org.apache.hadoop.conf.Configuration;
54  import org.apache.hadoop.io.IntWritable;
55  import org.apache.hadoop.io.Writable;
56  import org.apache.hadoop.io.WritableComparable;
57  import org.junit.After;
58  import org.junit.Before;
59  import org.junit.Test;
60  
61  import com.google.common.collect.Iterables;
62  import com.google.common.collect.Lists;
63  import com.google.common.collect.Sets;
64  import com.google.common.io.Files;
65  
66  /** Test for different types of message stores */
67  public class TestMessageStores {
68    private static File directory;
69    private static ImmutableClassesGiraphConfiguration<IntWritable,
70        IntWritable, IntWritable> config;
71    private static TestData testData;
72    private static
73    CentralizedServiceWorker<IntWritable, IntWritable, IntWritable> service;
74    /**
75     * Pseudo-random number generator with the same seed to help with
76     * debugging)
77     */
78    private static final Random RANDOM = new Random(101);
79  
80    @Before
81    public void prepare() {
82      Configuration.addDefaultResource("giraph-site.xml");
83      GiraphConfiguration initConfig = new GiraphConfiguration();
84      initConfig.setComputationClass(IntNoOpComputation.class);
85      config = new ImmutableClassesGiraphConfiguration<IntWritable,
86          IntWritable, IntWritable>(initConfig);
87  
88      testData = new TestData();
89      testData.maxId = 1000000;
90      testData.maxMessage = 1000000;
91      testData.maxNumberOfMessages = 100;
92      testData.numVertices = 50;
93      testData.numTimes = 10;
94      testData.numOfPartitions = 5;
95      testData.maxMessagesInMemory = 20;
96  
97      service =
98          MockUtils.mockServiceGetVertexPartitionOwner(testData.numOfPartitions);
99    }
100 
101   @After
102   public void cleanUp() {
103   }
104 
105   private static class TestData {
106     int numTimes;
107     int numVertices;
108     int maxNumberOfMessages;
109     int maxId;
110     int maxMessage;
111     int numOfPartitions;
112     int maxMessagesInMemory;
113   }
114 
115   private SortedMap<IntWritable, Collection<IntWritable>> createRandomMessages(
116       TestData testData) {
117     SortedMap<IntWritable, Collection<IntWritable>> allMessages =
118         new TreeMap<IntWritable, Collection<IntWritable>>();
119     for (int v = 0; v < testData.numVertices; v++) {
120       int messageNum =
121           (int) (RANDOM.nextFloat() * testData.maxNumberOfMessages);
122       Collection<IntWritable> vertexMessages = Lists.newArrayList();
123       for (int m = 0; m < messageNum; m++) {
124         vertexMessages.add(
125             new IntWritable((int) (RANDOM.nextFloat() * testData.maxMessage)));
126       }
127       IntWritable vertexId =
128           new IntWritable((int) (RANDOM.nextFloat() * testData.maxId));
129       allMessages.put(vertexId, vertexMessages);
130     }
131     return allMessages;
132   }
133 
134   private static void addMessages(
135       MessageStore<IntWritable, IntWritable> messageStore,
136       CentralizedServiceWorker<IntWritable, ?, ?> service,
137       ImmutableClassesGiraphConfiguration<IntWritable, ?, ?> config,
138       Map<IntWritable, Collection<IntWritable>> inputMap) {
139     for (Map.Entry<IntWritable, Collection<IntWritable>> entry :
140         inputMap.entrySet()) {
141       int partitionId =
142           service.getVertexPartitionOwner(entry.getKey()).getPartitionId();
143       ByteArrayVertexIdMessages<IntWritable, IntWritable>
144           byteArrayVertexIdMessages =
145           new ByteArrayVertexIdMessages<IntWritable, IntWritable>(
146               new TestMessageValueFactory(IntWritable.class));
147       byteArrayVertexIdMessages.setConf(config);
148       byteArrayVertexIdMessages.initialize();
149       for (IntWritable message : entry.getValue()) {
150         byteArrayVertexIdMessages.add(entry.getKey(), message);
151       }
152       messageStore.addPartitionMessages(partitionId, byteArrayVertexIdMessages);
153     }
154   }
155 
156   private void putNTimes(
157       MessageStore<IntWritable, IntWritable> messageStore,
158       Map<IntWritable, Collection<IntWritable>> messages,
159       TestData testData) {
160     for (int n = 0; n < testData.numTimes; n++) {
161       SortedMap<IntWritable, Collection<IntWritable>> batch =
162           createRandomMessages(testData);
163       addMessages(messageStore, service, config, batch);
164       for (Entry<IntWritable, Collection<IntWritable>> entry :
165           batch.entrySet()) {
166         if (messages.containsKey(entry.getKey())) {
167           messages.get(entry.getKey()).addAll(entry.getValue());
168         } else {
169           messages.put(entry.getKey(), entry.getValue());
170         }
171       }
172     }
173   }
174 
175   private <I extends WritableComparable, M extends Writable> boolean
176   equalMessages(
177       MessageStore<I, M> messageStore,
178       Map<I, Collection<M>> expectedMessages,
179       TestData testData) {
180     for (int partitionId = 0; partitionId < testData.numOfPartitions;
181          partitionId++) {
182       TreeSet<I> vertexIds = Sets.newTreeSet();
183       Iterables.addAll(vertexIds,
184           messageStore.getPartitionDestinationVertices(partitionId));
185       for (I vertexId : vertexIds) {
186         Iterable<M> expected = expectedMessages.get(vertexId);
187         if (expected == null) {
188           return false;
189         }
190         Iterable<M> actual = messageStore.getVertexMessages(vertexId);
191         if (!CollectionUtils.isEqual(expected, actual)) {
192           System.err.println("equalMessages: For vertexId " + vertexId +
193               " expected " + expected + ", but got " + actual);
194           return false;
195         }
196       }
197     }
198     return true;
199   }
200 
201   private <S extends MessageStore<IntWritable, IntWritable>> S doCheckpoint(
202       MessageStoreFactory<IntWritable, IntWritable, S> messageStoreFactory,
203       S messageStore, TestData testData) throws IOException {
204     File file = new File(directory, "messageStoreTest");
205     if (file.exists()) {
206       file.delete();
207     }
208     file.createNewFile();
209     DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
210         (new FileOutputStream(file))));
211     for (int partitionId = 0; partitionId < testData.numOfPartitions;
212          partitionId++) {
213       messageStore.writePartition(out, partitionId);
214     }
215     out.close();
216 
217     messageStore = (S) messageStoreFactory.newStore(
218         new DefaultMessageClasses(
219             IntWritable.class,
220             DefaultMessageValueFactory.class,
221             null,
222             MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION));
223 
224     DataInputStream in = new DataInputStream(new BufferedInputStream(
225         (new FileInputStream(file))));
226     for (int partitionId = 0; partitionId < testData.numOfPartitions;
227          partitionId++) {
228       messageStore.readFieldsForPartition(in, partitionId);
229     }
230     in.close();
231     file.delete();
232 
233     return messageStore;
234   }
235 
236   private <S extends MessageStore<IntWritable, IntWritable>> void
237   testMessageStore(
238       MessageStoreFactory<IntWritable, IntWritable, S> messageStoreFactory,
239       TestData testData) throws IOException {
240     SortedMap<IntWritable, Collection<IntWritable>> messages =
241         new TreeMap<IntWritable, Collection<IntWritable>>();
242     S messageStore = (S) messageStoreFactory.newStore(
243         new DefaultMessageClasses(
244             IntWritable.class,
245             DefaultMessageValueFactory.class,
246             null,
247             MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION));
248     putNTimes(messageStore, messages, testData);
249     assertTrue(equalMessages(messageStore, messages, testData));
250     messageStore.clearAll();
251     messageStore = doCheckpoint(messageStoreFactory, messageStore, testData);
252     assertTrue(equalMessages(messageStore, messages, testData));
253     messageStore.clearAll();
254   }
255 
256   @Test
257   public void testByteArrayMessagesPerVertexStore() {
258     try {
259       testMessageStore(
260           ByteArrayMessagesPerVertexStore.<IntWritable, IntWritable>newFactory(
261               service, config),
262           testData);
263     } catch (IOException e) {
264       e.printStackTrace();
265     }
266   }
267 }