1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.partition;
2021import java.util.ArrayList;
22import java.util.Collection;
23import java.util.List;
2425import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
26import org.apache.giraph.worker.WorkerInfo;
27import org.apache.hadoop.io.Writable;
28import org.apache.hadoop.io.WritableComparable;
2930import com.google.common.collect.Lists;
3132/**33 * Abstracts and implements all MasterGraphPartitioner logic on top of a single34 * user function - getWorkerIndex.35 *36 * @param <I> Vertex id type37 * @param <V> Vertex value type38 * @param <E> Edge value type39 */40publicabstractclass MasterGraphPartitionerImpl<I extends WritableComparable,
41 V extends Writable, E extends Writable>
42implements MasterGraphPartitioner<I, V, E> {
43/** Provided configuration */44privatefinal ImmutableClassesGiraphConfiguration<I, V, E> conf;
45/** Save the last generated partition owner list */46private List<PartitionOwner> partitionOwnerList;
4748/**49 * Constructor.50 *51 * @param conf52 * Configuration used.53 */54publicMasterGraphPartitionerImpl(
55 ImmutableClassesGiraphConfiguration<I, V, E> conf) {
56this.conf = conf;
57 }
5859 @Override
60public Collection<PartitionOwner> createInitialPartitionOwners(
61 Collection<WorkerInfo> availableWorkerInfos, int maxWorkers) {
62int partitionCount = PartitionUtils.computePartitionCount(
63 availableWorkerInfos.size(), conf);
64 ArrayList<WorkerInfo> workerList =
65new ArrayList<WorkerInfo>(availableWorkerInfos);
6667 partitionOwnerList = new ArrayList<PartitionOwner>();
68for (int i = 0; i < partitionCount; i++) {
69 partitionOwnerList.add(newBasicPartitionOwner(i, workerList.get(
70 getWorkerIndex(i, partitionCount, workerList.size()))));
71 }
7273return partitionOwnerList;
74 }
7576 @Override
77publicvoid setPartitionOwners(Collection<PartitionOwner> partitionOwners) {
78 partitionOwnerList = Lists.newArrayList(partitionOwners);
79 }
8081 @Override
82public Collection<PartitionOwner> generateChangedPartitionOwners(
83 Collection<PartitionStats> allPartitionStatsList,
84 Collection<WorkerInfo> availableWorkers,
85int maxWorkers,
86long superstep) {
87return PartitionBalancer.balancePartitionsAcrossWorkers(conf,
88 partitionOwnerList, allPartitionStatsList, availableWorkers);
89 }
9091 @Override
92public Collection<PartitionOwner> getCurrentPartitionOwners() {
93return partitionOwnerList;
94 }
9596 @Override
97publicPartitionStats createPartitionStats() {
98returnnewPartitionStats();
99 }
100101/**102 * Calculates worker that should be responsible for passed partition.103 *104 * @param partition Current partition105 * @param partitionCount Number of partitions106 * @param workerCount Number of workers107 * @return index of worker responsible for current partition108 */109protectedabstractint getWorkerIndex(
110int partition, int partitionCount, int workerCount);
111 }