This project has retired. For details please refer to its Attic page.
GraphPartitionerFactory xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.partition;
20  
21  import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
22  import org.apache.giraph.worker.LocalData;
23  import org.apache.hadoop.io.Writable;
24  import org.apache.hadoop.io.WritableComparable;
25  
26  /**
27   * Defines the partitioning framework for this application.
28   *
29   * Abstracts and implements all GraphPartitionerFactoryInterface logic
30   * on top of two functions which define partitioning scheme:
31   * - which partition vertex should be in, and
32   * - which partition should belong to which worker
33   *
34   * @param <I> Vertex id value
35   * @param <V> Vertex value
36   * @param <E> Edge value
37   */
38  @SuppressWarnings("rawtypes")
39  public abstract class GraphPartitionerFactory<I extends WritableComparable,
40      V extends Writable, E extends Writable>
41      extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
42      implements GraphPartitionerFactoryInterface<I, V, E>  {
43    @Override
44    public void initialize(LocalData<I, V, E, ? extends Writable> localData) {
45    }
46  
47    @Override
48    public final MasterGraphPartitioner<I, V, E> createMasterGraphPartitioner() {
49      return new MasterGraphPartitionerImpl<I, V, E>(getConf()) {
50        @Override
51        protected int getWorkerIndex(int partition, int partitionCount,
52            int workerCount) {
53          return GraphPartitionerFactory.this.getWorker(
54              partition, partitionCount, workerCount);
55        }
56      };
57    }
58  
59    @Override
60    public final WorkerGraphPartitioner<I, V, E> createWorkerGraphPartitioner() {
61      return new WorkerGraphPartitionerImpl<I, V, E>() {
62        @Override
63        protected int getPartitionIndex(I id, int partitionCount,
64          int workerCount) {
65          return GraphPartitionerFactory.this.getPartition(id,
66              partitionCount, workerCount);
67        }
68      };
69    }
70  
71    /**
72     * Calculates in which partition current vertex belongs to,
73     * from interval [0, partitionCount).
74     *
75     * @param id Vertex id
76     * @param partitionCount Number of partitions
77     * @param workerCount Number of workers
78     * @return partition
79     */
80    public abstract int getPartition(I id, int partitionCount,
81      int workerCount);
82  
83    /**
84     * Calculates worker that should be responsible for passed partition.
85     *
86     * @param partition Current partition
87     * @param partitionCount Number of partitions
88     * @param workerCount Number of workers
89     * @return index of worker responsible for current partition
90     */
91    public abstract int getWorker(
92        int partition, int partitionCount, int workerCount);
93  
94    /**
95     * Utility function for calculating in which partition value
96     * from interval [0, max) should belong to.
97     *
98     * @param value Value for which partition is requested
99     * @param max Maximum possible value
100    * @param partitions Number of partitions, equally sized.
101    * @return Index of partition where value belongs to.
102    */
103   public static int getPartitionInRange(int value, int max, int partitions) {
104     double keyRange = ((double) max) / partitions;
105     int part = (int) ((value % max) / keyRange);
106     return Math.max(0, Math.min(partitions - 1, part));
107   }
108 
109   /**
110    * Utility function for calculating in which partition value
111    * from interval [0, max) should belong to.
112    *
113    * @param value Value for which partition is requested
114    * @param max Maximum possible value
115    * @param partitions Number of partitions, equally sized.
116    * @return Index of partition where value belongs to.
117    */
118   public static int getPartitionInRange(long value, long max, int partitions) {
119     double keyRange = ((double) max) / partitions;
120     int part = (int) ((value % max) / keyRange);
121     return Math.max(0, Math.min(partitions - 1, part));
122   }
123 }