This project has retired. For details please refer to its
Attic page.
TestPartitionStores xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.partition;
20
21 import com.google.common.collect.Iterables;
22 import com.google.common.io.Files;
23 import org.apache.commons.io.FileUtils;
24 import org.apache.giraph.bsp.BspService;
25 import org.apache.giraph.bsp.CentralizedServiceWorker;
26 import org.apache.giraph.comm.ServerData;
27 import org.apache.giraph.comm.WorkerServer;
28 import org.apache.giraph.comm.netty.NettyClient;
29 import org.apache.giraph.conf.GiraphConfiguration;
30 import org.apache.giraph.conf.GiraphConstants;
31 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
32 import org.apache.giraph.edge.EdgeFactory;
33 import org.apache.giraph.graph.BasicComputation;
34 import org.apache.giraph.graph.GraphTaskManager;
35 import org.apache.giraph.graph.Vertex;
36 import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
37 import org.apache.giraph.io.formats.IntIntNullTextVertexInputFormat;
38 import org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexInputFormat;
39 import org.apache.giraph.ooc.data.DiskBackedPartitionStore;
40 import org.apache.giraph.utils.InternalVertexRunner;
41 import org.apache.giraph.utils.NoOpComputation;
42 import org.apache.giraph.utils.UnsafeByteArrayInputStream;
43 import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
44 import org.apache.hadoop.io.DoubleWritable;
45 import org.apache.hadoop.io.FloatWritable;
46 import org.apache.hadoop.io.IntWritable;
47 import org.apache.hadoop.io.LongWritable;
48 import org.apache.hadoop.io.NullWritable;
49 import org.apache.hadoop.mapreduce.Mapper;
50 import org.junit.Before;
51 import org.junit.Test;
52 import org.mockito.Mockito;
53
54 import java.io.File;
55 import java.io.IOException;
56 import java.util.ArrayList;
57 import java.util.List;
58 import java.util.Random;
59 import java.util.concurrent.ExecutorCompletionService;
60 import java.util.concurrent.ExecutorService;
61 import java.util.concurrent.Executors;
62 import java.util.concurrent.atomic.AtomicInteger;
63
64 import static org.junit.Assert.assertEquals;
65 import static org.junit.Assert.assertFalse;
66 import static org.junit.Assert.assertTrue;
67
68
69
70
71 @SuppressWarnings("unchecked")
72 public class TestPartitionStores {
73 private ImmutableClassesGiraphConfiguration<IntWritable, IntWritable,
74 NullWritable> conf;
75 private Mapper<?, ?, ?, ?>.Context context;
76
77
78 private static final int NUM_OF_VERTEXES_PER_PARTITION = 20;
79 private static final int NUM_OF_EDGES_PER_VERTEX = 5;
80 private static final int NUM_OF_THREADS = 8;
81 private static final int NUM_OF_PARTITIONS = 30;
82 private static final int NUM_PARTITIONS_IN_MEMORY = 12;
83
84 public static class MyComputation extends NoOpComputation<IntWritable,
85 IntWritable, NullWritable, IntWritable> { }
86
87 private Partition<IntWritable, IntWritable, NullWritable> createPartition(
88 ImmutableClassesGiraphConfiguration<IntWritable, IntWritable,
89 NullWritable> conf,
90 Integer id,
91 Vertex<IntWritable, IntWritable, NullWritable>... vertices) {
92 Partition<IntWritable, IntWritable, NullWritable> partition =
93 conf.createPartition(id, context);
94 for (Vertex<IntWritable, IntWritable, NullWritable> v : vertices) {
95 partition.putVertex(v);
96 }
97 return partition;
98 }
99
100 @Before
101 public void setUp() {
102 GiraphConfiguration configuration = new GiraphConfiguration();
103 configuration.setComputationClass(MyComputation.class);
104 conf = new ImmutableClassesGiraphConfiguration<>(configuration);
105 context = Mockito.mock(Mapper.Context.class);
106 }
107
108 @Test
109 public void testSimplePartitionStore() {
110 PartitionStore<IntWritable, IntWritable, NullWritable>
111 partitionStore = new SimplePartitionStore<>(conf, context);
112 testReadWrite(partitionStore, conf);
113 partitionStore.shutdown();
114 }
115
116 @Test
117 public void testUnsafePartitionSerializationClass() throws IOException {
118 conf.setPartitionClass(ByteArrayPartition.class);
119 Vertex<IntWritable, IntWritable, NullWritable> v1 =
120 conf.createVertex();
121 v1.initialize(new IntWritable(1), new IntWritable(1));
122 Vertex<IntWritable, IntWritable, NullWritable> v2 = conf.createVertex();
123 v2.initialize(new IntWritable(2), new IntWritable(2));
124 Vertex<IntWritable, IntWritable, NullWritable> v3 = conf.createVertex();
125 v3.initialize(new IntWritable(3), new IntWritable(3));
126 Vertex<IntWritable, IntWritable, NullWritable> v4 = conf.createVertex();
127 v4.initialize(new IntWritable(4), new IntWritable(4));
128 Vertex<IntWritable, IntWritable, NullWritable> v5 = conf.createVertex();
129 v5.initialize(new IntWritable(5), new IntWritable(5));
130 Vertex<IntWritable, IntWritable, NullWritable> v6 = conf.createVertex();
131 v6.initialize(new IntWritable(6), new IntWritable(6));
132 Vertex<IntWritable, IntWritable, NullWritable> v7 = conf.createVertex();
133 v7.initialize(new IntWritable(7), new IntWritable(7));
134
135 Partition<IntWritable, IntWritable, NullWritable> partition =
136 createPartition(conf, 3, v1, v2, v3, v4, v5, v6, v7);
137 assertEquals(3, partition.getId());
138 assertEquals(0, partition.getEdgeCount());
139 assertEquals(7, partition.getVertexCount());
140 UnsafeByteArrayOutputStream outputStream = new
141 UnsafeByteArrayOutputStream();
142 partition.write(outputStream);
143 UnsafeByteArrayInputStream inputStream = new UnsafeByteArrayInputStream(
144 outputStream.getByteArray(), 0, outputStream.getPos());
145 Partition<IntWritable, IntWritable, NullWritable> deserializatedPartition =
146 conf.createPartition(-1, context);
147 deserializatedPartition.readFields(inputStream);
148
149 assertEquals(3, deserializatedPartition.getId());
150 assertEquals(0, deserializatedPartition.getEdgeCount());
151 assertEquals(7, deserializatedPartition.getVertexCount());
152 }
153
154 @Test
155 public void testDiskBackedPartitionStoreWithByteArrayPartition()
156 throws IOException {
157 File directory = Files.createTempDir();
158 GiraphConstants.PARTITIONS_DIRECTORY.set(
159 conf, new File(directory, "giraph_partitions").toString());
160 GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
161 GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
162 conf.setPartitionClass(ByteArrayPartition.class);
163
164 CentralizedServiceWorker<IntWritable, IntWritable, NullWritable>
165 serviceWorker = Mockito.mock(CentralizedServiceWorker.class);
166 WorkerServer workerServer = Mockito.mock(WorkerServer.class);
167 Mockito.when(serviceWorker.getSuperstep()).thenReturn(
168 BspService.INPUT_SUPERSTEP);
169 GraphTaskManager<IntWritable, IntWritable, NullWritable>
170 graphTaskManager = Mockito.mock(GraphTaskManager.class);
171 Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager);
172 ServerData<IntWritable, IntWritable, NullWritable>
173 serverData = new ServerData<>(serviceWorker, workerServer, conf, context);
174 Mockito.when(serviceWorker.getServerData()).thenReturn(serverData);
175
176 DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>
177 partitionStore =
178 (DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>)
179 serverData.getPartitionStore();
180 partitionStore.initialize();
181 testReadWrite(partitionStore, conf);
182 partitionStore.shutdown();
183 FileUtils.deleteDirectory(directory);
184 }
185
186 @Test
187 public void testDiskBackedPartitionStore() throws IOException {
188 File directory = Files.createTempDir();
189 GiraphConstants.PARTITIONS_DIRECTORY.set(
190 conf, new File(directory, "giraph_partitions").toString());
191 GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
192 GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
193
194 CentralizedServiceWorker<IntWritable, IntWritable, NullWritable>
195 serviceWorker = Mockito.mock(CentralizedServiceWorker.class);
196 WorkerServer workerServer = Mockito.mock(WorkerServer.class);
197 Mockito.when(serviceWorker.getSuperstep()).thenReturn(
198 BspService.INPUT_SUPERSTEP);
199 GraphTaskManager<IntWritable, IntWritable, NullWritable>
200 graphTaskManager = Mockito.mock(GraphTaskManager.class);
201 Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager);
202 ServerData<IntWritable, IntWritable, NullWritable>
203 serverData = new ServerData<>(serviceWorker, workerServer, conf, context);
204 Mockito.when(serviceWorker.getServerData()).thenReturn(serverData);
205
206 DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>
207 partitionStore =
208 (DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>)
209 serverData.getPartitionStore();
210 partitionStore.initialize();
211 testReadWrite(partitionStore, conf);
212 partitionStore.shutdown();
213 FileUtils.deleteDirectory(directory);
214 }
215
216 @Test
217 public void testDiskBackedPartitionStoreComputation() throws Exception {
218 Iterable<String> results;
219 String[] graph =
220 {
221 "[1,0,[]]", "[2,0,[]]", "[3,0,[]]", "[4,0,[]]", "[5,0,[]]",
222 "[6,0,[]]", "[7,0,[]]", "[8,0,[]]", "[9,0,[]]", "[10,0,[]]"
223 };
224 String[] expected =
225 {
226 "1\t0", "2\t0", "3\t0", "4\t0", "5\t0",
227 "6\t0", "7\t0", "8\t0", "9\t0", "10\t0"
228 };
229
230 GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
231 GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
232 GiraphConstants.USER_PARTITION_COUNT.set(conf, 10);
233
234 File directory = Files.createTempDir();
235 GiraphConstants.PARTITIONS_DIRECTORY.set(conf,
236 new File(directory, "giraph_partitions").toString());
237
238 conf.setComputationClass(EmptyComputation.class);
239 conf.setVertexInputFormatClass(JsonLongDoubleFloatDoubleVertexInputFormat.class);
240 conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
241
242 results = InternalVertexRunner.run(conf, graph);
243 checkResults(results, expected);
244 FileUtils.deleteDirectory(directory);
245 }
246
247 @Test
248 public void testDiskBackedPartitionStoreWithByteArrayComputation()
249 throws Exception {
250 Iterable<String> results;
251 String[] graph =
252 {
253 "[1,0,[]]", "[2,0,[]]", "[3,0,[]]", "[4,0,[]]", "[5,0,[]]",
254 "[6,0,[]]", "[7,0,[]]", "[8,0,[]]", "[9,0,[]]", "[10,0,[]]"
255 };
256 String[] expected =
257 {
258 "1\t0", "2\t0", "3\t0", "4\t0", "5\t0",
259 "6\t0", "7\t0", "8\t0", "9\t0", "10\t0"
260 };
261
262 GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
263 GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
264 GiraphConstants.USER_PARTITION_COUNT.set(conf, 10);
265
266 File directory = Files.createTempDir();
267 GiraphConstants.PARTITIONS_DIRECTORY.set(conf,
268 new File(directory, "giraph_partitions").toString());
269
270 conf.setPartitionClass(ByteArrayPartition.class);
271 conf.setComputationClass(EmptyComputation.class);
272 conf.setVertexInputFormatClass(JsonLongDoubleFloatDoubleVertexInputFormat.class);
273 conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
274
275 results = InternalVertexRunner.run(conf, graph);
276 checkResults(results, expected);
277 FileUtils.deleteDirectory(directory);
278 }
279
280 @Test
281 public void testDiskBackedPartitionStoreMT() throws Exception {
282 GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, NUM_PARTITIONS_IN_MEMORY);
283 GiraphConstants.STATIC_GRAPH.set(conf, false);
284 testMultiThreaded();
285 }
286
287 @Test
288 public void testDiskBackedPartitionStoreMTStatic() throws Exception {
289 GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, NUM_PARTITIONS_IN_MEMORY);
290 GiraphConstants.STATIC_GRAPH.set(conf, true);
291 testMultiThreaded();
292 }
293
294 @Test
295 public void testDiskBackedPartitionStoreAdaptiveOOC() throws Exception {
296 GiraphConstants.STATIC_GRAPH.set(conf, true);
297 NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.set(conf, true);
298 testMultiThreaded();
299 }
300
301 private void testMultiThreaded() throws Exception {
302 final AtomicInteger vertexCounter = new AtomicInteger(0);
303 ExecutorService pool = Executors.newFixedThreadPool(NUM_OF_THREADS);
304 ExecutorCompletionService<Boolean> executor =
305 new ExecutorCompletionService<>(pool);
306
307 File directory = Files.createTempDir();
308 GiraphConstants.PARTITIONS_DIRECTORY.set(
309 conf, new File(directory, "giraph_partitions").toString());
310 GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
311
312 CentralizedServiceWorker<IntWritable, IntWritable, NullWritable>
313 serviceWorker = Mockito.mock(CentralizedServiceWorker.class);
314 WorkerServer workerServer = Mockito.mock(WorkerServer.class);
315
316 Mockito.when(serviceWorker.getSuperstep()).thenReturn(
317 BspService.INPUT_SUPERSTEP);
318 GraphTaskManager<IntWritable, IntWritable, NullWritable>
319 graphTaskManager = Mockito.mock(GraphTaskManager.class);
320 Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager);
321 ServerData<IntWritable, IntWritable, NullWritable>
322 serverData = new ServerData<>(serviceWorker, workerServer, conf, context);
323 Mockito.when(serviceWorker.getServerData()).thenReturn(serverData);
324
325 DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>
326 store =
327 (DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>)
328 serverData.getPartitionStore();
329 store.initialize();
330
331
332 for (int i = 0; i < NUM_OF_THREADS; ++i) {
333 List<Integer> partitionIds = new ArrayList<>();
334 for (int id = i; id < NUM_OF_PARTITIONS; id += NUM_OF_THREADS) {
335 partitionIds.add(id);
336 }
337 Worker worker =
338 new Worker(vertexCounter, store, partitionIds, conf);
339 executor.submit(worker, true);
340 }
341 for (int i = 0; i < NUM_OF_THREADS; ++i)
342 executor.take();
343 pool.shutdownNow();
344
345
346 int totalVertexes = 0;
347 int totalEdges = 0;
348 Partition<IntWritable, IntWritable, NullWritable> partition;
349 for (int i = 0; i < NUM_OF_PARTITIONS; ++i) {
350 totalVertexes += store.getPartitionVertexCount(i);
351 totalEdges += store.getPartitionEdgeCount(i);
352 }
353
354 assert vertexCounter.get() == NUM_OF_PARTITIONS * NUM_OF_VERTEXES_PER_PARTITION;
355 assert totalVertexes == NUM_OF_PARTITIONS * NUM_OF_VERTEXES_PER_PARTITION;
356 assert totalEdges == totalVertexes * NUM_OF_EDGES_PER_VERTEX;
357
358
359 int expected = 0;
360 for (int i = 0; i < NUM_OF_VERTEXES_PER_PARTITION * NUM_OF_PARTITIONS; ++i) {
361 expected += i;
362 }
363 int totalValues = 0;
364 store.startIteration();
365 for (int i = 0; i < NUM_OF_PARTITIONS; ++i) {
366 partition = store.getNextPartition();
367 assert partition != null;
368
369 for (Vertex<IntWritable, IntWritable, NullWritable> v : partition) {
370 totalValues += v.getId().get();
371 }
372 store.putPartition(partition);
373 }
374 assert totalValues == expected;
375
376 store.shutdown();
377 }
378
379 private Partition<IntWritable, IntWritable, NullWritable>
380 getPartition(PartitionStore<IntWritable, IntWritable,
381 NullWritable> partitionStore, int partitionId) {
382 Partition p;
383 Partition result = null;
384 while ((p = partitionStore.getNextPartition()) != null) {
385 if (p.getId() == partitionId) {
386 result = p;
387 }
388 partitionStore.putPartition(p);
389 }
390 return result;
391 }
392
393
394
395
396
397
398
399 public void testReadWrite(
400 PartitionStore<IntWritable, IntWritable,
401 NullWritable> partitionStore,
402 ImmutableClassesGiraphConfiguration<IntWritable, IntWritable,
403 NullWritable> conf) {
404 Vertex<IntWritable, IntWritable, NullWritable> v1 = conf.createVertex();
405 v1.initialize(new IntWritable(1), new IntWritable(1));
406 Vertex<IntWritable, IntWritable, NullWritable> v2 = conf.createVertex();
407 v2.initialize(new IntWritable(2), new IntWritable(2));
408 Vertex<IntWritable, IntWritable, NullWritable> v3 = conf.createVertex();
409 v3.initialize(new IntWritable(3), new IntWritable(3));
410 Vertex<IntWritable, IntWritable, NullWritable> v4 = conf.createVertex();
411 v4.initialize(new IntWritable(4), new IntWritable(4));
412 Vertex<IntWritable, IntWritable, NullWritable> v5 = conf.createVertex();
413 v5.initialize(new IntWritable(5), new IntWritable(5));
414 Vertex<IntWritable, IntWritable, NullWritable> v6 = conf.createVertex();
415 v6.initialize(new IntWritable(7), new IntWritable(7));
416 Vertex<IntWritable, IntWritable, NullWritable> v7 = conf.createVertex();
417 v7.initialize(new IntWritable(7), new IntWritable(7));
418 v7.addEdge(EdgeFactory.create(new IntWritable(1)));
419 v7.addEdge(EdgeFactory.create(new IntWritable(2)));
420
421 partitionStore.addPartition(createPartition(conf, 1, v1, v2, v6));
422 partitionStore.addPartition(createPartition(conf, 2, v3, v4));
423 partitionStore.addPartition(createPartition(conf, 3, v5));
424 partitionStore.addPartition(createPartition(conf, 4, v7));
425
426 partitionStore.startIteration();
427 getPartition(partitionStore, 1);
428 partitionStore.startIteration();
429 getPartition(partitionStore, 2);
430 partitionStore.startIteration();
431 partitionStore.removePartition(3);
432 getPartition(partitionStore, 4);
433
434 assertEquals(3, partitionStore.getNumPartitions());
435 assertEquals(3, Iterables.size(partitionStore.getPartitionIds()));
436 int partitionsNumber = 0;
437
438 partitionStore.startIteration();
439 Partition<IntWritable, IntWritable, NullWritable> p;
440 while ((p = partitionStore.getNextPartition()) != null) {
441 partitionStore.putPartition(p);
442 partitionsNumber++;
443 }
444 assertEquals(3, partitionsNumber);
445 assertTrue(partitionStore.hasPartition(1));
446 assertTrue(partitionStore.hasPartition(2));
447 assertFalse(partitionStore.hasPartition(3));
448 assertTrue(partitionStore.hasPartition(4));
449 assertEquals(3, partitionStore.getPartitionVertexCount(1));
450 assertEquals(2, partitionStore.getPartitionVertexCount(2));
451 assertEquals(1, partitionStore.getPartitionVertexCount(4));
452 assertEquals(2, partitionStore.getPartitionEdgeCount(4));
453 }
454
455
456
457
458
459
460 private void checkResults(Iterable<String> results, String[] expected) {
461
462 for (String str : results) {
463 boolean found = false;
464
465 for (String expectedStr : expected) {
466 if (expectedStr.equals(str)) {
467 found = true;
468 }
469 }
470
471 assert found;
472 }
473 }
474
475
476
477
478
479 public static class EmptyComputation
480 extends BasicComputation<LongWritable, DoubleWritable, FloatWritable,
481 LongWritable> {
482
483 @Override
484 public void compute(
485 Vertex<LongWritable, DoubleWritable,FloatWritable> vertex,
486 Iterable<LongWritable> messages) throws IOException {
487
488 vertex.voteToHalt();
489 }
490 }
491
492 @Test
493 public void testEdgeCombineWithSimplePartition() throws IOException {
494 testEdgeCombine(SimplePartition.class);
495 }
496
497 @Test
498 public void testEdgeCombineWithByteArrayPartition() throws IOException {
499 testEdgeCombine(ByteArrayPartition.class);
500 }
501
502 private void testEdgeCombine(Class<? extends Partition> partitionClass)
503 throws IOException {
504 Vertex<IntWritable, IntWritable, NullWritable> v1 = conf.createVertex();
505 v1.initialize(new IntWritable(1), new IntWritable(1));
506 Vertex<IntWritable, IntWritable, NullWritable> v2 = conf.createVertex();
507 v2.initialize(new IntWritable(2), new IntWritable(2));
508 Vertex<IntWritable, IntWritable, NullWritable> v3 = conf.createVertex();
509 v3.initialize(new IntWritable(3), new IntWritable(3));
510 Vertex<IntWritable, IntWritable, NullWritable> v1e2 = conf.createVertex();
511 v1e2.initialize(new IntWritable(1), new IntWritable(1));
512 v1e2.addEdge(EdgeFactory.create(new IntWritable(2)));
513 Vertex<IntWritable, IntWritable, NullWritable> v1e3 = conf.createVertex();
514 v1e3.initialize(new IntWritable(1), new IntWritable(1));
515 v1e3.addEdge(EdgeFactory.create(new IntWritable(3)));
516
517 GiraphConfiguration newconf = new GiraphConfiguration(conf);
518 newconf.setPartitionClass(partitionClass);
519 Partition<IntWritable, IntWritable, NullWritable> partition =
520 (new ImmutableClassesGiraphConfiguration<IntWritable, IntWritable,
521 NullWritable>(newconf)).createPartition(1, context);
522 assertEquals(partitionClass, partition.getClass());
523 partition.putVertex(v1);
524 partition.putVertex(v2);
525 partition.putVertex(v3);
526 assertEquals(3, partition.getVertexCount());
527 assertEquals(0, partition.getEdgeCount());
528 partition.putOrCombine(v1e2);
529 assertEquals(3, partition.getVertexCount());
530 assertEquals(1, partition.getEdgeCount());
531 partition.putOrCombine(v1e3);
532 assertEquals(3, partition.getVertexCount());
533 assertEquals(2, partition.getEdgeCount());
534 v1 = partition.getVertex(new IntWritable(1));
535 assertEquals(new IntWritable(1), v1.getId());
536 assertEquals(new IntWritable(1), v1.getValue());
537 assertEquals(2, v1.getNumEdges());
538 }
539
540 private class Worker implements Runnable {
541
542 private final AtomicInteger vertexCounter;
543 private final PartitionStore<IntWritable, IntWritable, NullWritable>
544 partitionStore;
545 private final List<Integer> partitionIds;
546 private final ImmutableClassesGiraphConfiguration<IntWritable, IntWritable,
547 NullWritable> conf;
548
549 public Worker(AtomicInteger vertexCounter,
550 PartitionStore<IntWritable, IntWritable, NullWritable> partitionStore,
551 List<Integer> partitionIds,
552 ImmutableClassesGiraphConfiguration<IntWritable, IntWritable,
553 NullWritable> conf) {
554
555 this.vertexCounter = vertexCounter;
556 this.partitionStore = partitionStore;
557 this.partitionIds = partitionIds;
558 this.conf = conf;
559 }
560
561 public void run() {
562 for (int partitionId : partitionIds) {
563 Partition<IntWritable, IntWritable, NullWritable> partition =
564 conf.createPartition(partitionId, context);
565 for (int i = 0; i < NUM_OF_VERTEXES_PER_PARTITION; ++i) {
566 int id = vertexCounter.getAndIncrement();
567 Vertex<IntWritable, IntWritable, NullWritable> v = conf.createVertex();
568 v.initialize(new IntWritable(id), new IntWritable(id));
569
570 Random rand = new Random(id);
571 for (int j = 0; j < NUM_OF_EDGES_PER_VERTEX; ++j) {
572 int dest = rand.nextInt(id + 1);
573 v.addEdge(EdgeFactory.create(new IntWritable(dest)));
574 }
575
576 partition.putVertex(v);
577 }
578 partitionStore.addPartition(partition);
579 }
580 }
581 }
582
583 @Test
584 public void testOutOfCoreMessages() throws Exception {
585 Iterable<String> results;
586 String[] graph =
587 { "1 0 2 3", "2 0 3 5", "3 0 1 2 4", "4 0 3", "5 0 6 7 1 2",
588 "6 0 10 8 7", "7 0 1 3", "8 0 1 10 9 4 6", "9 0 8 1 5 7",
589 "10 0 9" };
590
591 String[] expected =
592 {
593 "1\t32", "2\t9", "3\t14", "4\t11", "5\t11",
594 "6\t13", "7\t20", "8\t15", "9\t18", "10\t14"
595 };
596
597 GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
598 GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
599 GiraphConstants.USER_PARTITION_COUNT.set(conf, 10);
600
601 File directory = Files.createTempDir();
602 GiraphConstants.PARTITIONS_DIRECTORY.set(conf,
603 new File(directory, "giraph_partitions").toString());
604
605 GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
606 GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
607 conf.setComputationClass(TestOutOfCoreMessagesComputation.class);
608 conf.setVertexInputFormatClass(IntIntNullTextVertexInputFormat.class);
609 conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
610
611 results = InternalVertexRunner.run(conf, graph);
612 checkResults(results, expected);
613 FileUtils.deleteDirectory(directory);
614 }
615
616 public static class TestOutOfCoreMessagesComputation extends
617 BasicComputation<IntWritable, IntWritable, NullWritable, IntWritable> {
618
619 @Override
620 public void compute(
621 Vertex<IntWritable, IntWritable, NullWritable> vertex,
622 Iterable<IntWritable> messages) throws IOException {
623 if (getSuperstep() == 0) {
624
625 sendMessageToAllEdges(vertex, vertex.getId());
626 } else {
627
628 int sum = 0;
629 for (IntWritable message : messages) {
630 sum += message.get();
631 }
632 vertex.setValue(new IntWritable(sum));
633 vertex.voteToHalt();
634 }
635 }
636 }
637 }