This project has retired. For details please refer to its Attic page.
SimpleRangePartitionFactoryTest xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.partition;
20  
21  import java.net.InetSocketAddress;
22  import java.util.ArrayList;
23  import java.util.Collection;
24  
25  import org.apache.giraph.conf.GiraphConstants;
26  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
27  import org.apache.giraph.partition.PartitionOwner;
28  import org.apache.giraph.partition.WorkerGraphPartitioner;
29  import org.apache.giraph.worker.WorkerInfo;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.io.LongWritable;
32  import org.apache.hadoop.io.Writable;
33  import org.junit.Test;
34  
35  import static org.junit.Assert.assertTrue;
36  import static org.junit.Assert.assertEquals;
37  
38  /** Test {@link org.apache.giraph.partition.SimpleLongRangePartitionerFactory}. */
39  public class SimpleRangePartitionFactoryTest {
40  
41    private void testRange(int numWorkers, int numPartitions, int keySpaceSize,
42        int allowedWorkerDiff, boolean emptyWorkers) {
43      Configuration conf = new Configuration();
44      conf.setLong(GiraphConstants.PARTITION_VERTEX_KEY_SPACE_SIZE, keySpaceSize);
45      GiraphConstants.USER_PARTITION_COUNT.set(conf, numPartitions);
46      SimpleLongRangePartitionerFactory<Writable, Writable> factory =
47          new SimpleLongRangePartitionerFactory<Writable, Writable>();
48      factory.setConf(new ImmutableClassesGiraphConfiguration(conf));
49  
50      ArrayList<WorkerInfo> infos = new ArrayList<WorkerInfo>();
51      for (int i = 0; i < numWorkers; i++) {
52        WorkerInfo info = new WorkerInfo();
53        info.setInetSocketAddress(new InetSocketAddress(8080), "127.0.0.1");
54        info.setTaskId(i);
55        infos.add(info);
56      }
57  
58      Collection<PartitionOwner> owners =
59          factory.createMasterGraphPartitioner().createInitialPartitionOwners(infos, -1);
60  
61      int[] tasks = new int[owners.size()];
62      for (PartitionOwner owner : owners) {
63        WorkerInfo worker = owner.getWorkerInfo();
64        assertEquals(0, tasks[owner.getPartitionId()]);
65        tasks[owner.getPartitionId()] = worker.getTaskId() + 1;
66      }
67      checkMapping(tasks, allowedWorkerDiff, emptyWorkers);
68  
69      WorkerGraphPartitioner<LongWritable, Writable, Writable> workerPartitioner =
70          factory.createWorkerGraphPartitioner();
71      workerPartitioner.updatePartitionOwners(null, owners);
72      LongWritable longWritable = new LongWritable();
73  
74      int[] partitions = new int[keySpaceSize];
75      for (int i = 0; i < keySpaceSize; i++) {
76        longWritable.set(i);
77        PartitionOwner owner = workerPartitioner.getPartitionOwner(longWritable);
78        partitions[i] = owner.getPartitionId();
79      }
80      checkMapping(partitions, 1, emptyWorkers);
81    }
82  
83    private void checkMapping(int[] mapping, int allowedDiff, boolean emptyWorkers) {
84      int prev = -1;
85  
86      int max = 0;
87      int min = Integer.MAX_VALUE;
88      int cur = 0;
89      for (int value : mapping) {
90        if (value != prev) {
91          if (prev != -1) {
92            min = Math.min(cur, min);
93            max = Math.max(cur, max);
94            assertTrue(prev < value);
95            if (!emptyWorkers) {
96              assertEquals(prev + 1, value);
97            }
98          }
99          cur = 1;
100       } else {
101         cur++;
102       }
103       prev = value;
104     }
105     assertTrue(min + allowedDiff >= max);
106   }
107 
108   @Test
109   public void testLongRangePartitionerFactory() {
110     // perfect distribution
111     testRange(10, 1000, 100000, 0, false);
112     testRange(1000, 50000, 100000, 0, false);
113 
114     // perfect distribution even when max is hit, and max is not divisible by #workers
115     testRange(8949, (50000 / 8949) * 8949, 100023, 0, false);
116     testRange(1949, (50000 / 1949) * 1949, 211111, 0, false);
117 
118     // imperfect distribution - because there are more workers than max partitions.
119     testRange(194942, 50000, 211111, 1, true);
120   }
121 }