This project has retired. For details please refer to its Attic page.
MasterGraphPartitionerImpl xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.partition;
20  
21  import java.util.ArrayList;
22  import java.util.Collection;
23  import java.util.List;
24  
25  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
26  import org.apache.giraph.worker.WorkerInfo;
27  import org.apache.hadoop.io.Writable;
28  import org.apache.hadoop.io.WritableComparable;
29  
30  import com.google.common.collect.Lists;
31  
32  /**
33   * Abstracts and implements all MasterGraphPartitioner logic on top of a single
34   * user function - getWorkerIndex.
35   *
36   * @param <I> Vertex id type
37   * @param <V> Vertex value type
38   * @param <E> Edge value type
39   */
40  public abstract class MasterGraphPartitionerImpl<I extends WritableComparable,
41      V extends Writable, E extends Writable>
42      implements MasterGraphPartitioner<I, V, E> {
43    /** Provided configuration */
44    private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
45    /** Save the last generated partition owner list */
46    private List<PartitionOwner> partitionOwnerList;
47  
48    /**
49     * Constructor.
50     *
51     * @param conf
52     *          Configuration used.
53     */
54    public MasterGraphPartitionerImpl(
55        ImmutableClassesGiraphConfiguration<I, V, E> conf) {
56      this.conf = conf;
57    }
58  
59    @Override
60    public Collection<PartitionOwner> createInitialPartitionOwners(
61        Collection<WorkerInfo> availableWorkerInfos, int maxWorkers) {
62      int partitionCount = PartitionUtils.computePartitionCount(
63          availableWorkerInfos.size(), conf);
64      ArrayList<WorkerInfo> workerList =
65          new ArrayList<WorkerInfo>(availableWorkerInfos);
66  
67      partitionOwnerList = new ArrayList<PartitionOwner>();
68      for (int i = 0; i < partitionCount; i++) {
69        partitionOwnerList.add(new BasicPartitionOwner(i, workerList.get(
70            getWorkerIndex(i, partitionCount, workerList.size()))));
71      }
72  
73      return partitionOwnerList;
74    }
75  
76    @Override
77    public void setPartitionOwners(Collection<PartitionOwner> partitionOwners) {
78      partitionOwnerList = Lists.newArrayList(partitionOwners);
79    }
80  
81    @Override
82    public Collection<PartitionOwner> generateChangedPartitionOwners(
83        Collection<PartitionStats> allPartitionStatsList,
84        Collection<WorkerInfo> availableWorkers,
85        int maxWorkers,
86        long superstep) {
87      return PartitionBalancer.balancePartitionsAcrossWorkers(conf,
88          partitionOwnerList, allPartitionStatsList, availableWorkers);
89    }
90  
91    @Override
92    public Collection<PartitionOwner> getCurrentPartitionOwners() {
93      return partitionOwnerList;
94    }
95  
96    @Override
97    public PartitionStats createPartitionStats() {
98      return new PartitionStats();
99    }
100 
101   /**
102    * Calculates worker that should be responsible for passed partition.
103    *
104    * @param partition Current partition
105    * @param partitionCount Number of partitions
106    * @param workerCount Number of workers
107    * @return index of worker responsible for current partition
108    */
109   protected abstract int getWorkerIndex(
110       int partition, int partitionCount, int workerCount);
111 }