1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.partition;
2021import java.util.Collection;
22import java.util.HashSet;
23import java.util.List;
24import java.util.Set;
2526import org.apache.giraph.worker.WorkerInfo;
27import org.apache.hadoop.io.Writable;
28import org.apache.hadoop.io.WritableComparable;
2930import com.google.common.collect.Lists;
31import org.apache.log4j.Logger;
3233/**34 * Abstracts and implements all WorkerGraphPartitioner logic on top of a single35 * user function - getPartitionIndex.36 *37 * @param <I> Vertex id type38 * @param <V> Vertex value type39 * @param <E> Edge value type40 */41publicabstractclass WorkerGraphPartitionerImpl<I extends WritableComparable,
42 V extends Writable, E extends Writable>
43implements WorkerGraphPartitioner<I, V, E> {
44/** Logger instance */45privatestaticfinal Logger LOG = Logger.getLogger(
46 WorkerGraphPartitionerImpl.class);
47/** List of {@link PartitionOwner}s for this worker. */48private List<PartitionOwner> partitionOwnerList = Lists.newArrayList();
49/** List of available workers */50private Set<WorkerInfo> availableWorkers = new HashSet<>();
5152 @Override
53publicPartitionOwner createPartitionOwner() {
54returnnewBasicPartitionOwner();
55 }
5657 @Override
58publicPartitionOwner getPartitionOwner(I vertexId) {
59return partitionOwnerList.get(
60 getPartitionIndex(vertexId, partitionOwnerList.size(),
61 availableWorkers.size()));
62 }
6364 @Override
65public Collection<PartitionStats> finalizePartitionStats(
66 Collection<PartitionStats> workerPartitionStats,
67 PartitionStore<I, V, E> partitionStore) {
68// No modification necessary69return workerPartitionStats;
70 }
7172 @Override
73publicPartitionExchange updatePartitionOwners(WorkerInfo myWorkerInfo,
74 Collection<? extends PartitionOwner> masterSetPartitionOwners) {
75PartitionExchange exchange = PartitionBalancer.updatePartitionOwners(
76 partitionOwnerList, myWorkerInfo, masterSetPartitionOwners);
77 extractAvailableWorkers();
78return exchange;
79 }
8081 @Override
82public Collection<? extends PartitionOwner> getPartitionOwners() {
83return partitionOwnerList;
84 }
8586/**87 * Update availableWorkers88 */89publicvoid extractAvailableWorkers() {
90 availableWorkers.clear();
91for (PartitionOwner partitionOwner : partitionOwnerList) {
92 availableWorkers.add(partitionOwner.getWorkerInfo());
93 }
94 LOG.info("After updating partitionOwnerList " + availableWorkers.size() +
95" workers are available");
96 }
9798/**99 * Calculates in which partition current vertex belongs to,100 * from interval [0, partitionCount).101 *102 * @param id Vertex id103 * @param partitionCount Number of partitions104 * @param workerCount Number of active workers105 * @return partition106 */107protectedabstractint getPartitionIndex(I id, int partitionCount,
108int workerCount);
109 }