This project has retired. For details please refer to its
Attic page.
SimpleRangePartitionFactoryTest xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.partition;
20
21 import java.net.InetSocketAddress;
22 import java.util.ArrayList;
23 import java.util.Collection;
24
25 import org.apache.giraph.conf.GiraphConstants;
26 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
27 import org.apache.giraph.partition.PartitionOwner;
28 import org.apache.giraph.partition.WorkerGraphPartitioner;
29 import org.apache.giraph.worker.WorkerInfo;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.io.LongWritable;
32 import org.apache.hadoop.io.Writable;
33 import org.junit.Test;
34
35 import static org.junit.Assert.assertTrue;
36 import static org.junit.Assert.assertEquals;
37
38
39 public class SimpleRangePartitionFactoryTest {
40
41 private void testRange(int numWorkers, int numPartitions, int keySpaceSize,
42 int allowedWorkerDiff, boolean emptyWorkers) {
43 Configuration conf = new Configuration();
44 conf.setLong(GiraphConstants.PARTITION_VERTEX_KEY_SPACE_SIZE, keySpaceSize);
45 GiraphConstants.USER_PARTITION_COUNT.set(conf, numPartitions);
46 SimpleLongRangePartitionerFactory<Writable, Writable> factory =
47 new SimpleLongRangePartitionerFactory<Writable, Writable>();
48 factory.setConf(new ImmutableClassesGiraphConfiguration(conf));
49
50 ArrayList<WorkerInfo> infos = new ArrayList<WorkerInfo>();
51 for (int i = 0; i < numWorkers; i++) {
52 WorkerInfo info = new WorkerInfo();
53 info.setInetSocketAddress(new InetSocketAddress(8080), "127.0.0.1");
54 info.setTaskId(i);
55 infos.add(info);
56 }
57
58 Collection<PartitionOwner> owners =
59 factory.createMasterGraphPartitioner().createInitialPartitionOwners(infos, -1);
60
61 int[] tasks = new int[owners.size()];
62 for (PartitionOwner owner : owners) {
63 WorkerInfo worker = owner.getWorkerInfo();
64 assertEquals(0, tasks[owner.getPartitionId()]);
65 tasks[owner.getPartitionId()] = worker.getTaskId() + 1;
66 }
67 checkMapping(tasks, allowedWorkerDiff, emptyWorkers);
68
69 WorkerGraphPartitioner<LongWritable, Writable, Writable> workerPartitioner =
70 factory.createWorkerGraphPartitioner();
71 workerPartitioner.updatePartitionOwners(null, owners);
72 LongWritable longWritable = new LongWritable();
73
74 int[] partitions = new int[keySpaceSize];
75 for (int i = 0; i < keySpaceSize; i++) {
76 longWritable.set(i);
77 PartitionOwner owner = workerPartitioner.getPartitionOwner(longWritable);
78 partitions[i] = owner.getPartitionId();
79 }
80 checkMapping(partitions, 1, emptyWorkers);
81 }
82
83 private void checkMapping(int[] mapping, int allowedDiff, boolean emptyWorkers) {
84 int prev = -1;
85
86 int max = 0;
87 int min = Integer.MAX_VALUE;
88 int cur = 0;
89 for (int value : mapping) {
90 if (value != prev) {
91 if (prev != -1) {
92 min = Math.min(cur, min);
93 max = Math.max(cur, max);
94 assertTrue(prev < value);
95 if (!emptyWorkers) {
96 assertEquals(prev + 1, value);
97 }
98 }
99 cur = 1;
100 } else {
101 cur++;
102 }
103 prev = value;
104 }
105 assertTrue(min + allowedDiff >= max);
106 }
107
108 @Test
109 public void testLongRangePartitionerFactory() {
110
111 testRange(10, 1000, 100000, 0, false);
112 testRange(1000, 50000, 100000, 0, false);
113
114
115 testRange(8949, (50000 / 8949) * 8949, 100023, 0, false);
116 testRange(1949, (50000 / 1949) * 1949, 211111, 0, false);
117
118
119 testRange(194942, 50000, 211111, 1, true);
120 }
121 }