This project has retired. For details please refer to its Attic page.
TestPartitionStores xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.partition;
20  
21  import com.google.common.collect.Iterables;
22  import com.google.common.io.Files;
23  import org.apache.commons.io.FileUtils;
24  import org.apache.giraph.bsp.BspService;
25  import org.apache.giraph.bsp.CentralizedServiceWorker;
26  import org.apache.giraph.comm.ServerData;
27  import org.apache.giraph.comm.WorkerServer;
28  import org.apache.giraph.comm.netty.NettyClient;
29  import org.apache.giraph.conf.GiraphConfiguration;
30  import org.apache.giraph.conf.GiraphConstants;
31  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
32  import org.apache.giraph.edge.EdgeFactory;
33  import org.apache.giraph.graph.BasicComputation;
34  import org.apache.giraph.graph.GraphTaskManager;
35  import org.apache.giraph.graph.Vertex;
36  import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
37  import org.apache.giraph.io.formats.IntIntNullTextVertexInputFormat;
38  import org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexInputFormat;
39  import org.apache.giraph.ooc.data.DiskBackedPartitionStore;
40  import org.apache.giraph.utils.InternalVertexRunner;
41  import org.apache.giraph.utils.NoOpComputation;
42  import org.apache.giraph.utils.UnsafeByteArrayInputStream;
43  import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
44  import org.apache.hadoop.io.DoubleWritable;
45  import org.apache.hadoop.io.FloatWritable;
46  import org.apache.hadoop.io.IntWritable;
47  import org.apache.hadoop.io.LongWritable;
48  import org.apache.hadoop.io.NullWritable;
49  import org.apache.hadoop.mapreduce.Mapper;
50  import org.junit.Before;
51  import org.junit.Test;
52  import org.mockito.Mockito;
53  
54  import java.io.File;
55  import java.io.IOException;
56  import java.util.ArrayList;
57  import java.util.List;
58  import java.util.Random;
59  import java.util.concurrent.ExecutorCompletionService;
60  import java.util.concurrent.ExecutorService;
61  import java.util.concurrent.Executors;
62  import java.util.concurrent.atomic.AtomicInteger;
63  
64  import static org.junit.Assert.assertEquals;
65  import static org.junit.Assert.assertFalse;
66  import static org.junit.Assert.assertTrue;
67  
68  /**
69   * Test case for partition stores.
70   */
71  @SuppressWarnings("unchecked")
72  public class TestPartitionStores {
73    private ImmutableClassesGiraphConfiguration<IntWritable, IntWritable,
74        NullWritable> conf;
75    private Mapper<?, ?, ?, ?>.Context context;
76  
77    /* these static variables are used for the multithreaded tests */
78    private static final int NUM_OF_VERTEXES_PER_PARTITION = 20;
79    private static final int NUM_OF_EDGES_PER_VERTEX = 5;
80    private static final int NUM_OF_THREADS = 8;
81    private static final int NUM_OF_PARTITIONS = 30;
82    private static final int NUM_PARTITIONS_IN_MEMORY = 12;
83  
84    public static class MyComputation extends NoOpComputation<IntWritable,
85        IntWritable, NullWritable, IntWritable> { }
86  
87    private Partition<IntWritable, IntWritable, NullWritable> createPartition(
88        ImmutableClassesGiraphConfiguration<IntWritable, IntWritable,
89            NullWritable> conf,
90        Integer id,
91        Vertex<IntWritable, IntWritable, NullWritable>... vertices) {
92      Partition<IntWritable, IntWritable, NullWritable> partition =
93          conf.createPartition(id, context);
94      for (Vertex<IntWritable, IntWritable, NullWritable> v : vertices) {
95        partition.putVertex(v);
96      }
97      return partition;
98    }
99  
100   @Before
101   public void setUp() {
102     GiraphConfiguration configuration = new GiraphConfiguration();
103     configuration.setComputationClass(MyComputation.class);
104     conf = new ImmutableClassesGiraphConfiguration<>(configuration);
105     context = Mockito.mock(Mapper.Context.class);
106   }
107 
108   @Test
109   public void testSimplePartitionStore() {
110     PartitionStore<IntWritable, IntWritable, NullWritable>
111     partitionStore = new SimplePartitionStore<>(conf, context);
112     testReadWrite(partitionStore, conf);
113     partitionStore.shutdown();
114   }
115 
116   @Test
117   public void testUnsafePartitionSerializationClass() throws IOException {
118     conf.setPartitionClass(ByteArrayPartition.class);
119     Vertex<IntWritable, IntWritable, NullWritable> v1 =
120         conf.createVertex();
121     v1.initialize(new IntWritable(1), new IntWritable(1));
122     Vertex<IntWritable, IntWritable, NullWritable> v2 = conf.createVertex();
123     v2.initialize(new IntWritable(2), new IntWritable(2));
124     Vertex<IntWritable, IntWritable, NullWritable> v3 = conf.createVertex();
125     v3.initialize(new IntWritable(3), new IntWritable(3));
126     Vertex<IntWritable, IntWritable, NullWritable> v4 = conf.createVertex();
127     v4.initialize(new IntWritable(4), new IntWritable(4));
128     Vertex<IntWritable, IntWritable, NullWritable> v5 = conf.createVertex();
129     v5.initialize(new IntWritable(5), new IntWritable(5));
130     Vertex<IntWritable, IntWritable, NullWritable> v6 = conf.createVertex();
131     v6.initialize(new IntWritable(6), new IntWritable(6));
132     Vertex<IntWritable, IntWritable, NullWritable> v7 = conf.createVertex();
133     v7.initialize(new IntWritable(7), new IntWritable(7));
134 
135     Partition<IntWritable, IntWritable, NullWritable> partition =
136         createPartition(conf, 3, v1, v2, v3, v4, v5, v6, v7);
137     assertEquals(3, partition.getId());
138     assertEquals(0, partition.getEdgeCount());
139     assertEquals(7, partition.getVertexCount());
140     UnsafeByteArrayOutputStream outputStream = new
141         UnsafeByteArrayOutputStream();
142     partition.write(outputStream);
143     UnsafeByteArrayInputStream inputStream = new UnsafeByteArrayInputStream(
144         outputStream.getByteArray(), 0, outputStream.getPos());
145     Partition<IntWritable, IntWritable, NullWritable> deserializatedPartition =
146         conf.createPartition(-1, context);
147     deserializatedPartition.readFields(inputStream);
148 
149     assertEquals(3, deserializatedPartition.getId());
150     assertEquals(0, deserializatedPartition.getEdgeCount());
151     assertEquals(7, deserializatedPartition.getVertexCount());
152   }
153 
154   @Test
155   public void testDiskBackedPartitionStoreWithByteArrayPartition()
156     throws IOException {
157     File directory = Files.createTempDir();
158     GiraphConstants.PARTITIONS_DIRECTORY.set(
159         conf, new File(directory, "giraph_partitions").toString());
160     GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
161     GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
162     conf.setPartitionClass(ByteArrayPartition.class);
163 
164     CentralizedServiceWorker<IntWritable, IntWritable, NullWritable>
165       serviceWorker = Mockito.mock(CentralizedServiceWorker.class);
166     WorkerServer workerServer = Mockito.mock(WorkerServer.class);
167     Mockito.when(serviceWorker.getSuperstep()).thenReturn(
168         BspService.INPUT_SUPERSTEP);
169     GraphTaskManager<IntWritable, IntWritable, NullWritable>
170         graphTaskManager = Mockito.mock(GraphTaskManager.class);
171     Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager);
172     ServerData<IntWritable, IntWritable, NullWritable>
173         serverData = new ServerData<>(serviceWorker, workerServer, conf, context);
174     Mockito.when(serviceWorker.getServerData()).thenReturn(serverData);
175 
176     DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>
177         partitionStore =
178         (DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>)
179             serverData.getPartitionStore();
180     partitionStore.initialize();
181     testReadWrite(partitionStore, conf);
182     partitionStore.shutdown();
183     FileUtils.deleteDirectory(directory);
184   }
185 
186   @Test
187   public void testDiskBackedPartitionStore() throws IOException {
188     File directory = Files.createTempDir();
189     GiraphConstants.PARTITIONS_DIRECTORY.set(
190         conf, new File(directory, "giraph_partitions").toString());
191     GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
192     GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
193 
194     CentralizedServiceWorker<IntWritable, IntWritable, NullWritable>
195     serviceWorker = Mockito.mock(CentralizedServiceWorker.class);
196     WorkerServer workerServer = Mockito.mock(WorkerServer.class);
197     Mockito.when(serviceWorker.getSuperstep()).thenReturn(
198         BspService.INPUT_SUPERSTEP);
199     GraphTaskManager<IntWritable, IntWritable, NullWritable>
200         graphTaskManager = Mockito.mock(GraphTaskManager.class);
201     Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager);
202     ServerData<IntWritable, IntWritable, NullWritable>
203         serverData = new ServerData<>(serviceWorker, workerServer, conf, context);
204     Mockito.when(serviceWorker.getServerData()).thenReturn(serverData);
205 
206     DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>
207         partitionStore =
208         (DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>)
209             serverData.getPartitionStore();
210     partitionStore.initialize();
211     testReadWrite(partitionStore, conf);
212     partitionStore.shutdown();
213     FileUtils.deleteDirectory(directory);
214   }
215 
216   @Test
217   public void testDiskBackedPartitionStoreComputation() throws Exception {
218     Iterable<String> results;
219     String[] graph =
220         {
221             "[1,0,[]]", "[2,0,[]]", "[3,0,[]]", "[4,0,[]]", "[5,0,[]]",
222             "[6,0,[]]", "[7,0,[]]", "[8,0,[]]", "[9,0,[]]", "[10,0,[]]"
223         };
224     String[] expected =
225         {
226             "1\t0", "2\t0", "3\t0", "4\t0", "5\t0",
227             "6\t0", "7\t0", "8\t0", "9\t0", "10\t0"
228         };
229 
230     GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
231     GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
232     GiraphConstants.USER_PARTITION_COUNT.set(conf, 10);
233 
234     File directory = Files.createTempDir();
235     GiraphConstants.PARTITIONS_DIRECTORY.set(conf,
236         new File(directory, "giraph_partitions").toString());
237 
238     conf.setComputationClass(EmptyComputation.class);
239     conf.setVertexInputFormatClass(JsonLongDoubleFloatDoubleVertexInputFormat.class);
240     conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
241 
242     results = InternalVertexRunner.run(conf, graph);
243     checkResults(results, expected);
244     FileUtils.deleteDirectory(directory);
245   }
246 
247   @Test
248   public void testDiskBackedPartitionStoreWithByteArrayComputation()
249     throws Exception {
250     Iterable<String> results;
251     String[] graph =
252     {
253       "[1,0,[]]", "[2,0,[]]", "[3,0,[]]", "[4,0,[]]", "[5,0,[]]",
254       "[6,0,[]]", "[7,0,[]]", "[8,0,[]]", "[9,0,[]]", "[10,0,[]]"
255     };
256     String[] expected =
257     {
258       "1\t0", "2\t0", "3\t0", "4\t0", "5\t0",
259       "6\t0", "7\t0", "8\t0", "9\t0", "10\t0"
260     };
261 
262     GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
263     GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
264     GiraphConstants.USER_PARTITION_COUNT.set(conf, 10);
265 
266     File directory = Files.createTempDir();
267     GiraphConstants.PARTITIONS_DIRECTORY.set(conf,
268       new File(directory, "giraph_partitions").toString());
269 
270     conf.setPartitionClass(ByteArrayPartition.class);
271     conf.setComputationClass(EmptyComputation.class);
272     conf.setVertexInputFormatClass(JsonLongDoubleFloatDoubleVertexInputFormat.class);
273     conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
274 
275     results = InternalVertexRunner.run(conf, graph);
276     checkResults(results, expected);
277     FileUtils.deleteDirectory(directory);
278   }
279 
280   @Test
281   public void testDiskBackedPartitionStoreMT() throws Exception {
282     GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, NUM_PARTITIONS_IN_MEMORY);
283     GiraphConstants.STATIC_GRAPH.set(conf, false);
284     testMultiThreaded();
285   }
286 
287   @Test
288   public void testDiskBackedPartitionStoreMTStatic() throws Exception {
289     GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, NUM_PARTITIONS_IN_MEMORY);
290     GiraphConstants.STATIC_GRAPH.set(conf, true);
291     testMultiThreaded();
292   }
293 
294   @Test
295   public void testDiskBackedPartitionStoreAdaptiveOOC() throws Exception {
296     GiraphConstants.STATIC_GRAPH.set(conf, true);
297     NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.set(conf, true);
298     testMultiThreaded();
299   }
300 
301   private void testMultiThreaded() throws Exception {
302     final AtomicInteger vertexCounter = new AtomicInteger(0);
303     ExecutorService pool = Executors.newFixedThreadPool(NUM_OF_THREADS);
304     ExecutorCompletionService<Boolean> executor =
305       new ExecutorCompletionService<>(pool);
306 
307     File directory = Files.createTempDir();
308     GiraphConstants.PARTITIONS_DIRECTORY.set(
309         conf, new File(directory, "giraph_partitions").toString());
310     GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
311 
312     CentralizedServiceWorker<IntWritable, IntWritable, NullWritable>
313     serviceWorker = Mockito.mock(CentralizedServiceWorker.class);
314     WorkerServer workerServer = Mockito.mock(WorkerServer.class);
315 
316     Mockito.when(serviceWorker.getSuperstep()).thenReturn(
317         BspService.INPUT_SUPERSTEP);
318     GraphTaskManager<IntWritable, IntWritable, NullWritable>
319         graphTaskManager = Mockito.mock(GraphTaskManager.class);
320     Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager);
321     ServerData<IntWritable, IntWritable, NullWritable>
322         serverData = new ServerData<>(serviceWorker, workerServer, conf, context);
323     Mockito.when(serviceWorker.getServerData()).thenReturn(serverData);
324 
325     DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>
326         store =
327         (DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>)
328             serverData.getPartitionStore();
329     store.initialize();
330 
331     // Create a new Graph in memory using multiple threads
332     for (int i = 0; i < NUM_OF_THREADS; ++i) {
333       List<Integer> partitionIds = new ArrayList<>();
334       for (int id = i; id < NUM_OF_PARTITIONS; id += NUM_OF_THREADS) {
335         partitionIds.add(id);
336       }
337       Worker worker =
338         new Worker(vertexCounter, store, partitionIds, conf);
339       executor.submit(worker, true);
340     }
341     for (int i = 0; i < NUM_OF_THREADS; ++i)
342       executor.take();
343     pool.shutdownNow();
344 
345     // Check the number of vertices
346     int totalVertexes = 0;
347     int totalEdges = 0;
348     Partition<IntWritable, IntWritable, NullWritable> partition;
349     for (int i = 0; i < NUM_OF_PARTITIONS; ++i) {
350       totalVertexes += store.getPartitionVertexCount(i);
351       totalEdges += store.getPartitionEdgeCount(i);
352     }
353 
354     assert vertexCounter.get() == NUM_OF_PARTITIONS * NUM_OF_VERTEXES_PER_PARTITION;
355     assert totalVertexes == NUM_OF_PARTITIONS * NUM_OF_VERTEXES_PER_PARTITION;
356     assert totalEdges == totalVertexes * NUM_OF_EDGES_PER_VERTEX;
357 
358     // Check the content of the vertices
359     int expected = 0;
360     for (int i = 0; i < NUM_OF_VERTEXES_PER_PARTITION * NUM_OF_PARTITIONS; ++i) {
361       expected += i;
362     }
363     int totalValues = 0;
364     store.startIteration();
365     for (int i = 0; i < NUM_OF_PARTITIONS; ++i) {
366       partition = store.getNextPartition();
367       assert partition != null;
368 
369       for (Vertex<IntWritable, IntWritable, NullWritable> v : partition) {
370         totalValues += v.getId().get();
371       }
372       store.putPartition(partition);
373     }
374     assert totalValues == expected;
375 
376     store.shutdown();
377   }
378 
379   private Partition<IntWritable, IntWritable, NullWritable>
380   getPartition(PartitionStore<IntWritable, IntWritable,
381       NullWritable> partitionStore, int partitionId) {
382     Partition p;
383     Partition result = null;
384     while ((p = partitionStore.getNextPartition()) != null) {
385       if (p.getId() == partitionId) {
386         result = p;
387       }
388       partitionStore.putPartition(p);
389     }
390     return result;
391   }
392 
393   /**
394    * Test reading/writing to/from a partition store
395    *
396    * @param partitionStore Partition store to test
397    * @param conf Configuration to use
398    */
399   public void testReadWrite(
400       PartitionStore<IntWritable, IntWritable,
401           NullWritable> partitionStore,
402       ImmutableClassesGiraphConfiguration<IntWritable, IntWritable,
403           NullWritable> conf) {
404     Vertex<IntWritable, IntWritable, NullWritable> v1 = conf.createVertex();
405     v1.initialize(new IntWritable(1), new IntWritable(1));
406     Vertex<IntWritable, IntWritable, NullWritable> v2 = conf.createVertex();
407     v2.initialize(new IntWritable(2), new IntWritable(2));
408     Vertex<IntWritable, IntWritable, NullWritable> v3 = conf.createVertex();
409     v3.initialize(new IntWritable(3), new IntWritable(3));
410     Vertex<IntWritable, IntWritable, NullWritable> v4 = conf.createVertex();
411     v4.initialize(new IntWritable(4), new IntWritable(4));
412     Vertex<IntWritable, IntWritable, NullWritable> v5 = conf.createVertex();
413     v5.initialize(new IntWritable(5), new IntWritable(5));
414     Vertex<IntWritable, IntWritable, NullWritable> v6 = conf.createVertex();
415     v6.initialize(new IntWritable(7), new IntWritable(7));
416     Vertex<IntWritable, IntWritable, NullWritable> v7 = conf.createVertex();
417     v7.initialize(new IntWritable(7), new IntWritable(7));
418     v7.addEdge(EdgeFactory.create(new IntWritable(1)));
419     v7.addEdge(EdgeFactory.create(new IntWritable(2)));
420 
421     partitionStore.addPartition(createPartition(conf, 1, v1, v2, v6));
422     partitionStore.addPartition(createPartition(conf, 2, v3, v4));
423     partitionStore.addPartition(createPartition(conf, 3, v5));
424     partitionStore.addPartition(createPartition(conf, 4, v7));
425 
426     partitionStore.startIteration();
427     getPartition(partitionStore, 1);
428     partitionStore.startIteration();
429     getPartition(partitionStore, 2);
430     partitionStore.startIteration();
431     partitionStore.removePartition(3);
432     getPartition(partitionStore, 4);
433 
434     assertEquals(3, partitionStore.getNumPartitions());
435     assertEquals(3, Iterables.size(partitionStore.getPartitionIds()));
436     int partitionsNumber = 0;
437 
438     partitionStore.startIteration();
439     Partition<IntWritable, IntWritable, NullWritable> p;
440     while ((p = partitionStore.getNextPartition()) != null) {
441       partitionStore.putPartition(p);
442       partitionsNumber++;
443     }
444     assertEquals(3, partitionsNumber);
445     assertTrue(partitionStore.hasPartition(1));
446     assertTrue(partitionStore.hasPartition(2));
447     assertFalse(partitionStore.hasPartition(3));
448     assertTrue(partitionStore.hasPartition(4));
449     assertEquals(3, partitionStore.getPartitionVertexCount(1));
450     assertEquals(2, partitionStore.getPartitionVertexCount(2));
451     assertEquals(1, partitionStore.getPartitionVertexCount(4));
452     assertEquals(2, partitionStore.getPartitionEdgeCount(4));
453   }
454 
455   /**
456    * Internal checker to verify the correctness of the tests.
457    * @param results   the actual results obtained
458    * @param expected  expected results
459    */
460   private void checkResults(Iterable<String> results, String[] expected) {
461 
462     for (String str : results) {
463       boolean found = false;
464 
465       for (String expectedStr : expected) {
466         if (expectedStr.equals(str)) {
467           found = true;
468         }
469       }
470 
471       assert found;
472     }
473   }
474 
475   /**
476    * Test compute method that sends each edge a notification of its parents.
477    * The test set only has a 1-1 parent-to-child ratio for this unit test.
478    */
479   public static class EmptyComputation
480     extends BasicComputation<LongWritable, DoubleWritable, FloatWritable,
481       LongWritable> {
482 
483     @Override
484     public void compute(
485       Vertex<LongWritable, DoubleWritable,FloatWritable> vertex,
486       Iterable<LongWritable> messages) throws IOException {
487 
488       vertex.voteToHalt();
489     }
490   }
491 
492   @Test
493   public void testEdgeCombineWithSimplePartition() throws IOException {
494     testEdgeCombine(SimplePartition.class);
495   }
496 
497   @Test
498   public void testEdgeCombineWithByteArrayPartition() throws IOException {
499     testEdgeCombine(ByteArrayPartition.class);
500   }
501 
502   private void testEdgeCombine(Class<? extends Partition> partitionClass)
503       throws IOException {
504     Vertex<IntWritable, IntWritable, NullWritable> v1 = conf.createVertex();
505     v1.initialize(new IntWritable(1), new IntWritable(1));
506     Vertex<IntWritable, IntWritable, NullWritable> v2 = conf.createVertex();
507     v2.initialize(new IntWritable(2), new IntWritable(2));
508     Vertex<IntWritable, IntWritable, NullWritable> v3 = conf.createVertex();
509     v3.initialize(new IntWritable(3), new IntWritable(3));
510     Vertex<IntWritable, IntWritable, NullWritable> v1e2 = conf.createVertex();
511     v1e2.initialize(new IntWritable(1), new IntWritable(1));
512     v1e2.addEdge(EdgeFactory.create(new IntWritable(2)));
513     Vertex<IntWritable, IntWritable, NullWritable> v1e3 = conf.createVertex();
514     v1e3.initialize(new IntWritable(1), new IntWritable(1));
515     v1e3.addEdge(EdgeFactory.create(new IntWritable(3)));
516 
517     GiraphConfiguration newconf = new GiraphConfiguration(conf);
518     newconf.setPartitionClass(partitionClass);
519     Partition<IntWritable, IntWritable, NullWritable> partition =
520         (new ImmutableClassesGiraphConfiguration<IntWritable, IntWritable,
521             NullWritable>(newconf)).createPartition(1, context);
522     assertEquals(partitionClass, partition.getClass());
523     partition.putVertex(v1);
524     partition.putVertex(v2);
525     partition.putVertex(v3);
526     assertEquals(3, partition.getVertexCount());
527     assertEquals(0, partition.getEdgeCount());
528     partition.putOrCombine(v1e2);
529     assertEquals(3, partition.getVertexCount());
530     assertEquals(1, partition.getEdgeCount());
531     partition.putOrCombine(v1e3);
532     assertEquals(3, partition.getVertexCount());
533     assertEquals(2, partition.getEdgeCount());
534     v1 = partition.getVertex(new IntWritable(1));
535     assertEquals(new IntWritable(1), v1.getId());
536     assertEquals(new IntWritable(1), v1.getValue());
537     assertEquals(2, v1.getNumEdges());
538   }
539 
540   private class Worker implements Runnable {
541 
542     private final AtomicInteger vertexCounter;
543     private final PartitionStore<IntWritable, IntWritable, NullWritable>
544       partitionStore;
545     private final List<Integer> partitionIds;
546     private final ImmutableClassesGiraphConfiguration<IntWritable, IntWritable,
547             NullWritable> conf;
548 
549     public Worker(AtomicInteger vertexCounter,
550         PartitionStore<IntWritable, IntWritable, NullWritable> partitionStore,
551         List<Integer> partitionIds,
552         ImmutableClassesGiraphConfiguration<IntWritable, IntWritable,
553           NullWritable> conf) {
554 
555       this.vertexCounter = vertexCounter;
556       this.partitionStore = partitionStore;
557       this.partitionIds = partitionIds;
558       this.conf = conf;
559     }
560 
561     public void run() {
562       for (int partitionId : partitionIds) {
563         Partition<IntWritable, IntWritable, NullWritable> partition =
564             conf.createPartition(partitionId, context);
565         for (int i = 0; i < NUM_OF_VERTEXES_PER_PARTITION; ++i) {
566           int id = vertexCounter.getAndIncrement();
567           Vertex<IntWritable, IntWritable, NullWritable> v = conf.createVertex();
568           v.initialize(new IntWritable(id), new IntWritable(id));
569 
570           Random rand = new Random(id);
571           for (int j = 0; j < NUM_OF_EDGES_PER_VERTEX; ++j) {
572             int dest = rand.nextInt(id + 1);
573             v.addEdge(EdgeFactory.create(new IntWritable(dest)));
574           }
575 
576           partition.putVertex(v);
577         }
578         partitionStore.addPartition(partition);
579       }
580     }
581   }
582 
583   @Test
584   public void testOutOfCoreMessages() throws Exception {
585     Iterable<String> results;
586     String[] graph =
587         { "1 0 2 3", "2 0 3 5", "3 0 1 2 4", "4 0 3", "5 0 6 7 1 2",
588             "6 0 10 8 7", "7 0 1 3", "8 0 1 10 9 4 6", "9 0 8 1 5 7",
589             "10 0 9" };
590 
591     String[] expected =
592         {
593             "1\t32", "2\t9", "3\t14", "4\t11", "5\t11",
594             "6\t13", "7\t20", "8\t15", "9\t18", "10\t14"
595         };
596 
597     GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
598     GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
599     GiraphConstants.USER_PARTITION_COUNT.set(conf, 10);
600 
601     File directory = Files.createTempDir();
602     GiraphConstants.PARTITIONS_DIRECTORY.set(conf,
603         new File(directory, "giraph_partitions").toString());
604 
605     GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
606     GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
607     conf.setComputationClass(TestOutOfCoreMessagesComputation.class);
608     conf.setVertexInputFormatClass(IntIntNullTextVertexInputFormat.class);
609     conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
610 
611     results = InternalVertexRunner.run(conf, graph);
612     checkResults(results, expected);
613     FileUtils.deleteDirectory(directory);
614   }
615 
616   public static class TestOutOfCoreMessagesComputation extends
617       BasicComputation<IntWritable, IntWritable, NullWritable, IntWritable> {
618 
619     @Override
620     public void compute(
621         Vertex<IntWritable, IntWritable, NullWritable> vertex,
622         Iterable<IntWritable> messages) throws IOException {
623       if (getSuperstep() == 0) {
624         // Send id to all neighbors
625         sendMessageToAllEdges(vertex, vertex.getId());
626       } else {
627         // Add received messages and halt
628         int sum = 0;
629         for (IntWritable message : messages) {
630           sum += message.get();
631         }
632         vertex.setValue(new IntWritable(sum));
633         vertex.voteToHalt();
634       }
635     }
636   }
637 }