View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.partition;
20  
21  import com.google.common.collect.Iterables;
22  import com.google.common.io.Files;
23  import org.apache.commons.io.FileUtils;
24  import org.apache.giraph.bsp.BspService;
25  import org.apache.giraph.bsp.CentralizedServiceWorker;
26  import org.apache.giraph.comm.ServerData;
27  import org.apache.giraph.comm.netty.NettyClient;
28  import org.apache.giraph.conf.GiraphConfiguration;
29  import org.apache.giraph.conf.GiraphConstants;
30  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
31  import org.apache.giraph.edge.EdgeFactory;
32  import org.apache.giraph.graph.BasicComputation;
33  import org.apache.giraph.graph.GraphTaskManager;
34  import org.apache.giraph.graph.Vertex;
35  import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
36  import org.apache.giraph.io.formats.IntIntNullTextVertexInputFormat;
37  import org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexInputFormat;
38  import org.apache.giraph.ooc.data.DiskBackedPartitionStore;
39  import org.apache.giraph.utils.InternalVertexRunner;
40  import org.apache.giraph.utils.NoOpComputation;
41  import org.apache.giraph.utils.UnsafeByteArrayInputStream;
42  import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
43  import org.apache.hadoop.io.DoubleWritable;
44  import org.apache.hadoop.io.FloatWritable;
45  import org.apache.hadoop.io.IntWritable;
46  import org.apache.hadoop.io.LongWritable;
47  import org.apache.hadoop.io.NullWritable;
48  import org.apache.hadoop.mapreduce.Mapper;
49  import org.junit.Before;
50  import org.junit.Test;
51  import org.mockito.Mockito;
52  
53  import java.io.File;
54  import java.io.IOException;
55  import java.util.ArrayList;
56  import java.util.List;
57  import java.util.Random;
58  import java.util.concurrent.ExecutorCompletionService;
59  import java.util.concurrent.ExecutorService;
60  import java.util.concurrent.Executors;
61  import java.util.concurrent.atomic.AtomicInteger;
62  
63  import static org.junit.Assert.assertEquals;
64  import static org.junit.Assert.assertFalse;
65  import static org.junit.Assert.assertTrue;
66  
67  /**
68   * Test case for partition stores.
69   */
70  @SuppressWarnings("unchecked")
71  public class TestPartitionStores {
72    private ImmutableClassesGiraphConfiguration<IntWritable, IntWritable,
73        NullWritable> conf;
74    private Mapper<?, ?, ?, ?>.Context context;
75  
76    /* these static variables are used for the multithreaded tests */
77    private static final int NUM_OF_VERTEXES_PER_PARTITION = 20;
78    private static final int NUM_OF_EDGES_PER_VERTEX = 5;
79    private static final int NUM_OF_THREADS = 8;
80    private static final int NUM_OF_PARTITIONS = 30;
81    private static final int NUM_PARTITIONS_IN_MEMORY = 12;
82  
83    public static class MyComputation extends NoOpComputation<IntWritable,
84        IntWritable, NullWritable, IntWritable> { }
85  
86    private Partition<IntWritable, IntWritable, NullWritable> createPartition(
87        ImmutableClassesGiraphConfiguration<IntWritable, IntWritable,
88            NullWritable> conf,
89        Integer id,
90        Vertex<IntWritable, IntWritable, NullWritable>... vertices) {
91      Partition<IntWritable, IntWritable, NullWritable> partition =
92          conf.createPartition(id, context);
93      for (Vertex<IntWritable, IntWritable, NullWritable> v : vertices) {
94        partition.putVertex(v);
95      }
96      return partition;
97    }
98  
99    @Before
100   public void setUp() {
101     GiraphConfiguration configuration = new GiraphConfiguration();
102     configuration.setComputationClass(MyComputation.class);
103     conf = new ImmutableClassesGiraphConfiguration<>(configuration);
104     context = Mockito.mock(Mapper.Context.class);
105   }
106 
107   @Test
108   public void testSimplePartitionStore() {
109     PartitionStore<IntWritable, IntWritable, NullWritable>
110     partitionStore = new SimplePartitionStore<>(conf, context);
111     testReadWrite(partitionStore, conf);
112     partitionStore.shutdown();
113   }
114 
115   @Test
116   public void testUnsafePartitionSerializationClass() throws IOException {
117     conf.setPartitionClass(ByteArrayPartition.class);
118     Vertex<IntWritable, IntWritable, NullWritable> v1 =
119         conf.createVertex();
120     v1.initialize(new IntWritable(1), new IntWritable(1));
121     Vertex<IntWritable, IntWritable, NullWritable> v2 = conf.createVertex();
122     v2.initialize(new IntWritable(2), new IntWritable(2));
123     Vertex<IntWritable, IntWritable, NullWritable> v3 = conf.createVertex();
124     v3.initialize(new IntWritable(3), new IntWritable(3));
125     Vertex<IntWritable, IntWritable, NullWritable> v4 = conf.createVertex();
126     v4.initialize(new IntWritable(4), new IntWritable(4));
127     Vertex<IntWritable, IntWritable, NullWritable> v5 = conf.createVertex();
128     v5.initialize(new IntWritable(5), new IntWritable(5));
129     Vertex<IntWritable, IntWritable, NullWritable> v6 = conf.createVertex();
130     v6.initialize(new IntWritable(6), new IntWritable(6));
131     Vertex<IntWritable, IntWritable, NullWritable> v7 = conf.createVertex();
132     v7.initialize(new IntWritable(7), new IntWritable(7));
133 
134     Partition<IntWritable, IntWritable, NullWritable> partition =
135         createPartition(conf, 3, v1, v2, v3, v4, v5, v6, v7);
136     assertEquals(3, partition.getId());
137     assertEquals(0, partition.getEdgeCount());
138     assertEquals(7, partition.getVertexCount());
139     UnsafeByteArrayOutputStream outputStream = new
140         UnsafeByteArrayOutputStream();
141     partition.write(outputStream);
142     UnsafeByteArrayInputStream inputStream = new UnsafeByteArrayInputStream(
143         outputStream.getByteArray(), 0, outputStream.getPos());
144     Partition<IntWritable, IntWritable, NullWritable> deserializatedPartition =
145         conf.createPartition(-1, context);
146     deserializatedPartition.readFields(inputStream);
147 
148     assertEquals(3, deserializatedPartition.getId());
149     assertEquals(0, deserializatedPartition.getEdgeCount());
150     assertEquals(7, deserializatedPartition.getVertexCount());
151   }
152 
153   @Test
154   public void testDiskBackedPartitionStoreWithByteArrayPartition()
155     throws IOException {
156     File directory = Files.createTempDir();
157     GiraphConstants.PARTITIONS_DIRECTORY.set(
158         conf, new File(directory, "giraph_partitions").toString());
159     GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
160     GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
161     conf.setPartitionClass(ByteArrayPartition.class);
162 
163     CentralizedServiceWorker<IntWritable, IntWritable, NullWritable>
164       serviceWorker = Mockito.mock(CentralizedServiceWorker.class);
165     Mockito.when(serviceWorker.getSuperstep()).thenReturn(
166         BspService.INPUT_SUPERSTEP);
167     GraphTaskManager<IntWritable, IntWritable, NullWritable>
168         graphTaskManager = Mockito.mock(GraphTaskManager.class);
169     Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager);
170     ServerData<IntWritable, IntWritable, NullWritable>
171         serverData = new ServerData<>(serviceWorker, conf, context);
172     Mockito.when(serviceWorker.getServerData()).thenReturn(serverData);
173 
174     DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>
175         partitionStore =
176         (DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>)
177             serverData.getPartitionStore();
178     partitionStore.initialize();
179     testReadWrite(partitionStore, conf);
180     partitionStore.shutdown();
181     FileUtils.deleteDirectory(directory);
182   }
183 
184   @Test
185   public void testDiskBackedPartitionStore() throws IOException {
186     File directory = Files.createTempDir();
187     GiraphConstants.PARTITIONS_DIRECTORY.set(
188         conf, new File(directory, "giraph_partitions").toString());
189     GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
190     GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
191 
192     CentralizedServiceWorker<IntWritable, IntWritable, NullWritable>
193     serviceWorker = Mockito.mock(CentralizedServiceWorker.class);
194     Mockito.when(serviceWorker.getSuperstep()).thenReturn(
195         BspService.INPUT_SUPERSTEP);
196     GraphTaskManager<IntWritable, IntWritable, NullWritable>
197         graphTaskManager = Mockito.mock(GraphTaskManager.class);
198     Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager);
199     ServerData<IntWritable, IntWritable, NullWritable>
200         serverData = new ServerData<>(serviceWorker, conf, context);
201     Mockito.when(serviceWorker.getServerData()).thenReturn(serverData);
202 
203     DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>
204         partitionStore =
205         (DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>)
206             serverData.getPartitionStore();
207     partitionStore.initialize();
208     testReadWrite(partitionStore, conf);
209     partitionStore.shutdown();
210     FileUtils.deleteDirectory(directory);
211   }
212 
213   @Test
214   public void testDiskBackedPartitionStoreComputation() throws Exception {
215     Iterable<String> results;
216     String[] graph =
217         {
218             "[1,0,[]]", "[2,0,[]]", "[3,0,[]]", "[4,0,[]]", "[5,0,[]]",
219             "[6,0,[]]", "[7,0,[]]", "[8,0,[]]", "[9,0,[]]", "[10,0,[]]"
220         };
221     String[] expected =
222         {
223             "1\t0", "2\t0", "3\t0", "4\t0", "5\t0",
224             "6\t0", "7\t0", "8\t0", "9\t0", "10\t0"
225         };
226 
227     GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
228     GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
229     GiraphConstants.USER_PARTITION_COUNT.set(conf, 10);
230 
231     File directory = Files.createTempDir();
232     GiraphConstants.PARTITIONS_DIRECTORY.set(conf,
233         new File(directory, "giraph_partitions").toString());
234 
235     conf.setComputationClass(EmptyComputation.class);
236     conf.setVertexInputFormatClass(JsonLongDoubleFloatDoubleVertexInputFormat.class);
237     conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
238 
239     results = InternalVertexRunner.run(conf, graph);
240     checkResults(results, expected);
241     FileUtils.deleteDirectory(directory);
242   }
243 
244   @Test
245   public void testDiskBackedPartitionStoreWithByteArrayComputation()
246     throws Exception {
247     Iterable<String> results;
248     String[] graph =
249     {
250       "[1,0,[]]", "[2,0,[]]", "[3,0,[]]", "[4,0,[]]", "[5,0,[]]",
251       "[6,0,[]]", "[7,0,[]]", "[8,0,[]]", "[9,0,[]]", "[10,0,[]]"
252     };
253     String[] expected =
254     {
255       "1\t0", "2\t0", "3\t0", "4\t0", "5\t0",
256       "6\t0", "7\t0", "8\t0", "9\t0", "10\t0"
257     };
258 
259     GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
260     GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
261     GiraphConstants.USER_PARTITION_COUNT.set(conf, 10);
262 
263     File directory = Files.createTempDir();
264     GiraphConstants.PARTITIONS_DIRECTORY.set(conf,
265       new File(directory, "giraph_partitions").toString());
266 
267     conf.setPartitionClass(ByteArrayPartition.class);
268     conf.setComputationClass(EmptyComputation.class);
269     conf.setVertexInputFormatClass(JsonLongDoubleFloatDoubleVertexInputFormat.class);
270     conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
271 
272     results = InternalVertexRunner.run(conf, graph);
273     checkResults(results, expected);
274     FileUtils.deleteDirectory(directory);
275   }
276 
277   @Test
278   public void testDiskBackedPartitionStoreMT() throws Exception {
279     GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, NUM_PARTITIONS_IN_MEMORY);
280     GiraphConstants.STATIC_GRAPH.set(conf, false);
281     testMultiThreaded();
282   }
283 
284   @Test
285   public void testDiskBackedPartitionStoreMTStatic() throws Exception {
286     GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, NUM_PARTITIONS_IN_MEMORY);
287     GiraphConstants.STATIC_GRAPH.set(conf, true);
288     testMultiThreaded();
289   }
290 
291   @Test
292   public void testDiskBackedPartitionStoreAdaptiveOOC() throws Exception {
293     GiraphConstants.STATIC_GRAPH.set(conf, true);
294     NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.set(conf, true);
295     testMultiThreaded();
296   }
297 
298   private void testMultiThreaded() throws Exception {
299     final AtomicInteger vertexCounter = new AtomicInteger(0);
300     ExecutorService pool = Executors.newFixedThreadPool(NUM_OF_THREADS);
301     ExecutorCompletionService<Boolean> executor =
302       new ExecutorCompletionService<>(pool);
303 
304     File directory = Files.createTempDir();
305     GiraphConstants.PARTITIONS_DIRECTORY.set(
306         conf, new File(directory, "giraph_partitions").toString());
307     GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
308 
309     CentralizedServiceWorker<IntWritable, IntWritable, NullWritable>
310     serviceWorker = Mockito.mock(CentralizedServiceWorker.class);
311 
312     Mockito.when(serviceWorker.getSuperstep()).thenReturn(
313         BspService.INPUT_SUPERSTEP);
314     GraphTaskManager<IntWritable, IntWritable, NullWritable>
315         graphTaskManager = Mockito.mock(GraphTaskManager.class);
316     Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager);
317     ServerData<IntWritable, IntWritable, NullWritable>
318         serverData = new ServerData<>(serviceWorker, conf, context);
319     Mockito.when(serviceWorker.getServerData()).thenReturn(serverData);
320 
321     DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>
322         store =
323         (DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>)
324             serverData.getPartitionStore();
325     store.initialize();
326 
327     // Create a new Graph in memory using multiple threads
328     for (int i = 0; i < NUM_OF_THREADS; ++i) {
329       List<Integer> partitionIds = new ArrayList<>();
330       for (int id = i; id < NUM_OF_PARTITIONS; id += NUM_OF_THREADS) {
331         partitionIds.add(id);
332       }
333       Worker worker =
334         new Worker(vertexCounter, store, partitionIds, conf);
335       executor.submit(worker, true);
336     }
337     for (int i = 0; i < NUM_OF_THREADS; ++i)
338       executor.take();
339     pool.shutdownNow();
340 
341     // Check the number of vertices
342     int totalVertexes = 0;
343     int totalEdges = 0;
344     Partition<IntWritable, IntWritable, NullWritable> partition;
345     for (int i = 0; i < NUM_OF_PARTITIONS; ++i) {
346       totalVertexes += store.getPartitionVertexCount(i);
347       totalEdges += store.getPartitionEdgeCount(i);
348     }
349 
350     assert vertexCounter.get() == NUM_OF_PARTITIONS * NUM_OF_VERTEXES_PER_PARTITION;
351     assert totalVertexes == NUM_OF_PARTITIONS * NUM_OF_VERTEXES_PER_PARTITION;
352     assert totalEdges == totalVertexes * NUM_OF_EDGES_PER_VERTEX;
353 
354     // Check the content of the vertices
355     int expected = 0;
356     for (int i = 0; i < NUM_OF_VERTEXES_PER_PARTITION * NUM_OF_PARTITIONS; ++i) {
357       expected += i;
358     }
359     int totalValues = 0;
360     store.startIteration();
361     for (int i = 0; i < NUM_OF_PARTITIONS; ++i) {
362       partition = store.getNextPartition();
363       assert partition != null;
364 
365       for (Vertex<IntWritable, IntWritable, NullWritable> v : partition) {
366         totalValues += v.getId().get();
367       }
368       store.putPartition(partition);
369     }
370     assert totalValues == expected;
371 
372     store.shutdown();
373   }
374 
375   private Partition<IntWritable, IntWritable, NullWritable>
376   getPartition(PartitionStore<IntWritable, IntWritable,
377       NullWritable> partitionStore, int partitionId) {
378     Partition p;
379     Partition result = null;
380     while ((p = partitionStore.getNextPartition()) != null) {
381       if (p.getId() == partitionId) {
382         result = p;
383       }
384       partitionStore.putPartition(p);
385     }
386     return result;
387   }
388 
389   /**
390    * Test reading/writing to/from a partition store
391    *
392    * @param partitionStore Partition store to test
393    * @param conf Configuration to use
394    */
395   public void testReadWrite(
396       PartitionStore<IntWritable, IntWritable,
397           NullWritable> partitionStore,
398       ImmutableClassesGiraphConfiguration<IntWritable, IntWritable,
399           NullWritable> conf) {
400     Vertex<IntWritable, IntWritable, NullWritable> v1 = conf.createVertex();
401     v1.initialize(new IntWritable(1), new IntWritable(1));
402     Vertex<IntWritable, IntWritable, NullWritable> v2 = conf.createVertex();
403     v2.initialize(new IntWritable(2), new IntWritable(2));
404     Vertex<IntWritable, IntWritable, NullWritable> v3 = conf.createVertex();
405     v3.initialize(new IntWritable(3), new IntWritable(3));
406     Vertex<IntWritable, IntWritable, NullWritable> v4 = conf.createVertex();
407     v4.initialize(new IntWritable(4), new IntWritable(4));
408     Vertex<IntWritable, IntWritable, NullWritable> v5 = conf.createVertex();
409     v5.initialize(new IntWritable(5), new IntWritable(5));
410     Vertex<IntWritable, IntWritable, NullWritable> v6 = conf.createVertex();
411     v6.initialize(new IntWritable(7), new IntWritable(7));
412     Vertex<IntWritable, IntWritable, NullWritable> v7 = conf.createVertex();
413     v7.initialize(new IntWritable(7), new IntWritable(7));
414     v7.addEdge(EdgeFactory.create(new IntWritable(1)));
415     v7.addEdge(EdgeFactory.create(new IntWritable(2)));
416 
417     partitionStore.addPartition(createPartition(conf, 1, v1, v2, v6));
418     partitionStore.addPartition(createPartition(conf, 2, v3, v4));
419     partitionStore.addPartition(createPartition(conf, 3, v5));
420     partitionStore.addPartition(createPartition(conf, 4, v7));
421 
422     partitionStore.startIteration();
423     getPartition(partitionStore, 1);
424     partitionStore.startIteration();
425     getPartition(partitionStore, 2);
426     partitionStore.startIteration();
427     partitionStore.removePartition(3);
428     getPartition(partitionStore, 4);
429 
430     assertEquals(3, partitionStore.getNumPartitions());
431     assertEquals(3, Iterables.size(partitionStore.getPartitionIds()));
432     int partitionsNumber = 0;
433 
434     partitionStore.startIteration();
435     Partition<IntWritable, IntWritable, NullWritable> p;
436     while ((p = partitionStore.getNextPartition()) != null) {
437       partitionStore.putPartition(p);
438       partitionsNumber++;
439     }
440     assertEquals(3, partitionsNumber);
441     assertTrue(partitionStore.hasPartition(1));
442     assertTrue(partitionStore.hasPartition(2));
443     assertFalse(partitionStore.hasPartition(3));
444     assertTrue(partitionStore.hasPartition(4));
445     assertEquals(3, partitionStore.getPartitionVertexCount(1));
446     assertEquals(2, partitionStore.getPartitionVertexCount(2));
447     assertEquals(1, partitionStore.getPartitionVertexCount(4));
448     assertEquals(2, partitionStore.getPartitionEdgeCount(4));
449   }
450 
451   /**
452    * Internal checker to verify the correctness of the tests.
453    * @param results   the actual results obtained
454    * @param expected  expected results
455    */
456   private void checkResults(Iterable<String> results, String[] expected) {
457 
458     for (String str : results) {
459       boolean found = false;
460 
461       for (String expectedStr : expected) {
462         if (expectedStr.equals(str)) {
463           found = true;
464         }
465       }
466 
467       assert found;
468     }
469   }
470 
471   /**
472    * Test compute method that sends each edge a notification of its parents.
473    * The test set only has a 1-1 parent-to-child ratio for this unit test.
474    */
475   public static class EmptyComputation
476     extends BasicComputation<LongWritable, DoubleWritable, FloatWritable,
477       LongWritable> {
478 
479     @Override
480     public void compute(
481       Vertex<LongWritable, DoubleWritable,FloatWritable> vertex,
482       Iterable<LongWritable> messages) throws IOException {
483 
484       vertex.voteToHalt();
485     }
486   }
487 
488   @Test
489   public void testEdgeCombineWithSimplePartition() throws IOException {
490     testEdgeCombine(SimplePartition.class);
491   }
492 
493   @Test
494   public void testEdgeCombineWithByteArrayPartition() throws IOException {
495     testEdgeCombine(ByteArrayPartition.class);
496   }
497 
498   private void testEdgeCombine(Class<? extends Partition> partitionClass)
499       throws IOException {
500     Vertex<IntWritable, IntWritable, NullWritable> v1 = conf.createVertex();
501     v1.initialize(new IntWritable(1), new IntWritable(1));
502     Vertex<IntWritable, IntWritable, NullWritable> v2 = conf.createVertex();
503     v2.initialize(new IntWritable(2), new IntWritable(2));
504     Vertex<IntWritable, IntWritable, NullWritable> v3 = conf.createVertex();
505     v3.initialize(new IntWritable(3), new IntWritable(3));
506     Vertex<IntWritable, IntWritable, NullWritable> v1e2 = conf.createVertex();
507     v1e2.initialize(new IntWritable(1), new IntWritable(1));
508     v1e2.addEdge(EdgeFactory.create(new IntWritable(2)));
509     Vertex<IntWritable, IntWritable, NullWritable> v1e3 = conf.createVertex();
510     v1e3.initialize(new IntWritable(1), new IntWritable(1));
511     v1e3.addEdge(EdgeFactory.create(new IntWritable(3)));
512 
513     GiraphConfiguration newconf = new GiraphConfiguration(conf);
514     newconf.setPartitionClass(partitionClass);
515     Partition<IntWritable, IntWritable, NullWritable> partition =
516         (new ImmutableClassesGiraphConfiguration<IntWritable, IntWritable,
517             NullWritable>(newconf)).createPartition(1, context);
518     assertEquals(partitionClass, partition.getClass());
519     partition.putVertex(v1);
520     partition.putVertex(v2);
521     partition.putVertex(v3);
522     assertEquals(3, partition.getVertexCount());
523     assertEquals(0, partition.getEdgeCount());
524     partition.putOrCombine(v1e2);
525     assertEquals(3, partition.getVertexCount());
526     assertEquals(1, partition.getEdgeCount());
527     partition.putOrCombine(v1e3);
528     assertEquals(3, partition.getVertexCount());
529     assertEquals(2, partition.getEdgeCount());
530     v1 = partition.getVertex(new IntWritable(1));
531     assertEquals(new IntWritable(1), v1.getId());
532     assertEquals(new IntWritable(1), v1.getValue());
533     assertEquals(2, v1.getNumEdges());
534   }
535 
536   private class Worker implements Runnable {
537 
538     private final AtomicInteger vertexCounter;
539     private final PartitionStore<IntWritable, IntWritable, NullWritable>
540       partitionStore;
541     private final List<Integer> partitionIds;
542     private final ImmutableClassesGiraphConfiguration<IntWritable, IntWritable,
543             NullWritable> conf;
544 
545     public Worker(AtomicInteger vertexCounter,
546         PartitionStore<IntWritable, IntWritable, NullWritable> partitionStore,
547         List<Integer> partitionIds,
548         ImmutableClassesGiraphConfiguration<IntWritable, IntWritable,
549           NullWritable> conf) {
550 
551       this.vertexCounter = vertexCounter;
552       this.partitionStore = partitionStore;
553       this.partitionIds = partitionIds;
554       this.conf = conf;
555     }
556 
557     public void run() {
558       for (int partitionId : partitionIds) {
559         Partition<IntWritable, IntWritable, NullWritable> partition =
560             conf.createPartition(partitionId, context);
561         for (int i = 0; i < NUM_OF_VERTEXES_PER_PARTITION; ++i) {
562           int id = vertexCounter.getAndIncrement();
563           Vertex<IntWritable, IntWritable, NullWritable> v = conf.createVertex();
564           v.initialize(new IntWritable(id), new IntWritable(id));
565 
566           Random rand = new Random(id);
567           for (int j = 0; j < NUM_OF_EDGES_PER_VERTEX; ++j) {
568             int dest = rand.nextInt(id + 1);
569             v.addEdge(EdgeFactory.create(new IntWritable(dest)));
570           }
571 
572           partition.putVertex(v);
573         }
574         partitionStore.addPartition(partition);
575       }
576     }
577   }
578 
579   @Test
580   public void testOutOfCoreMessages() throws Exception {
581     Iterable<String> results;
582     String[] graph =
583         { "1 0 2 3", "2 0 3 5", "3 0 1 2 4", "4 0 3", "5 0 6 7 1 2",
584             "6 0 10 8 7", "7 0 1 3", "8 0 1 10 9 4 6", "9 0 8 1 5 7",
585             "10 0 9" };
586 
587     String[] expected =
588         {
589             "1\t32", "2\t9", "3\t14", "4\t11", "5\t11",
590             "6\t13", "7\t20", "8\t15", "9\t18", "10\t14"
591         };
592 
593     GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
594     GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
595     GiraphConstants.USER_PARTITION_COUNT.set(conf, 10);
596 
597     File directory = Files.createTempDir();
598     GiraphConstants.PARTITIONS_DIRECTORY.set(conf,
599         new File(directory, "giraph_partitions").toString());
600 
601     GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
602     GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
603     conf.setComputationClass(TestOutOfCoreMessagesComputation.class);
604     conf.setVertexInputFormatClass(IntIntNullTextVertexInputFormat.class);
605     conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
606 
607     results = InternalVertexRunner.run(conf, graph);
608     checkResults(results, expected);
609     FileUtils.deleteDirectory(directory);
610   }
611 
612   public static class TestOutOfCoreMessagesComputation extends
613       BasicComputation<IntWritable, IntWritable, NullWritable, IntWritable> {
614 
615     @Override
616     public void compute(
617         Vertex<IntWritable, IntWritable, NullWritable> vertex,
618         Iterable<IntWritable> messages) throws IOException {
619       if (getSuperstep() == 0) {
620         // Send id to all neighbors
621         sendMessageToAllEdges(vertex, vertex.getId());
622       } else {
623         // Add received messages and halt
624         int sum = 0;
625         for (IntWritable message : messages) {
626           sum += message.get();
627         }
628         vertex.setValue(new IntWritable(sum));
629         vertex.voteToHalt();
630       }
631     }
632   }
633 }