1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.giraph.partition;
20
21 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
22 import org.apache.giraph.worker.LocalData;
23 import org.apache.hadoop.io.Writable;
24 import org.apache.hadoop.io.WritableComparable;
25
26 /**
27 * Defines the partitioning framework for this application.
28 *
29 * Abstracts and implements all GraphPartitionerFactoryInterface logic
30 * on top of two functions which define partitioning scheme:
31 * - which partition vertex should be in, and
32 * - which partition should belong to which worker
33 *
34 * @param <I> Vertex id value
35 * @param <V> Vertex value
36 * @param <E> Edge value
37 */
38 @SuppressWarnings("rawtypes")
39 public abstract class GraphPartitionerFactory<I extends WritableComparable,
40 V extends Writable, E extends Writable>
41 extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
42 implements GraphPartitionerFactoryInterface<I, V, E> {
43 @Override
44 public void initialize(LocalData<I, V, E, ? extends Writable> localData) {
45 }
46
47 @Override
48 public final MasterGraphPartitioner<I, V, E> createMasterGraphPartitioner() {
49 return new MasterGraphPartitionerImpl<I, V, E>(getConf()) {
50 @Override
51 protected int getWorkerIndex(int partition, int partitionCount,
52 int workerCount) {
53 return GraphPartitionerFactory.this.getWorker(
54 partition, partitionCount, workerCount);
55 }
56 };
57 }
58
59 @Override
60 public final WorkerGraphPartitioner<I, V, E> createWorkerGraphPartitioner() {
61 return new WorkerGraphPartitionerImpl<I, V, E>() {
62 @Override
63 protected int getPartitionIndex(I id, int partitionCount,
64 int workerCount) {
65 return GraphPartitionerFactory.this.getPartition(id,
66 partitionCount, workerCount);
67 }
68 };
69 }
70
71 /**
72 * Calculates in which partition current vertex belongs to,
73 * from interval [0, partitionCount).
74 *
75 * @param id Vertex id
76 * @param partitionCount Number of partitions
77 * @param workerCount Number of workers
78 * @return partition
79 */
80 public abstract int getPartition(I id, int partitionCount,
81 int workerCount);
82
83 /**
84 * Calculates worker that should be responsible for passed partition.
85 *
86 * @param partition Current partition
87 * @param partitionCount Number of partitions
88 * @param workerCount Number of workers
89 * @return index of worker responsible for current partition
90 */
91 public abstract int getWorker(
92 int partition, int partitionCount, int workerCount);
93
94 /**
95 * Utility function for calculating in which partition value
96 * from interval [0, max) should belong to.
97 *
98 * @param value Value for which partition is requested
99 * @param max Maximum possible value
100 * @param partitions Number of partitions, equally sized.
101 * @return Index of partition where value belongs to.
102 */
103 public static int getPartitionInRange(int value, int max, int partitions) {
104 double keyRange = ((double) max) / partitions;
105 int part = (int) ((value % max) / keyRange);
106 return Math.max(0, Math.min(partitions - 1, part));
107 }
108
109 /**
110 * Utility function for calculating in which partition value
111 * from interval [0, max) should belong to.
112 *
113 * @param value Value for which partition is requested
114 * @param max Maximum possible value
115 * @param partitions Number of partitions, equally sized.
116 * @return Index of partition where value belongs to.
117 */
118 public static int getPartitionInRange(long value, long max, int partitions) {
119 double keyRange = ((double) max) / partitions;
120 int part = (int) ((value % max) / keyRange);
121 return Math.max(0, Math.min(partitions - 1, part));
122 }
123 }