1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.partition;
2021import org.apache.giraph.worker.WorkerInfo;
22import org.apache.hadoop.conf.Configuration;
23import org.apache.log4j.Logger;
2425import com.google.common.base.Objects;
2627import java.util.ArrayList;
28import java.util.Collection;
29import java.util.Collections;
30import java.util.Comparator;
31import java.util.HashMap;
32import java.util.HashSet;
33import java.util.List;
34import java.util.Map;
35import java.util.PriorityQueue;
36import java.util.Set;
3738/**39 * Helper class for balancing partitions across a set of workers.40 */41publicclassPartitionBalancer {
42/**Partition balancing algorithm */43publicstaticfinal String PARTITION_BALANCE_ALGORITHM =
44"hash.partitionBalanceAlgorithm";
45/** No rebalancing during the supersteps */46publicstaticfinal String STATIC_BALANCE_ALGORITHM =
47"static";
48/** Rebalance across supersteps by edges */49publicstaticfinal String EGDE_BALANCE_ALGORITHM =
50"edges";
51/** Rebalance across supersteps by vertices */52publicstaticfinal String VERTICES_BALANCE_ALGORITHM =
53"vertices";
54/** Class logger */55privatestatic Logger LOG = Logger.getLogger(PartitionBalancer.class);
5657/**58 * What value to balance partitions with? Edges, vertices?59 */60private enum BalanceValue {
61/** Not chosen */62 UNSET,
63/** Balance with edges */64 EDGES,
65/** Balance with vertices */66 VERTICES
67 }
6869/**70 * Do not construct this class.71 */72privatePartitionBalancer() { }
7374/**75 * Get the value used to balance.76 *77 * @param partitionStat Stats of this partition.78 * @param balanceValue Type of the value to balance.79 * @return Balance value.80 */81privatestaticlong getBalanceValue(PartitionStats partitionStat,
82BalanceValue balanceValue) {
83switch (balanceValue) {
84case EDGES:
85return partitionStat.getEdgeCount();
86case VERTICES:
87return partitionStat.getVertexCount();
88default:
89thrownew IllegalArgumentException(
90"getBalanceValue: Illegal balance value " + balanceValue);
91 }
92 }
9394/**95 * Used to sort the partition owners from lowest value to highest value96 */97privatestaticclassPartitionOwnerComparatorimplements98 Comparator<PartitionOwner> {
99/** Map of owner to stats */100privatefinal Map<PartitionOwner, PartitionStats> ownerStatMap;
101/** Value type to compare on */102privatefinalBalanceValue balanceValue;
103104105/**106 * Only constructor.107 *108 * @param ownerStatMap Map of owners to stats.109 * @param balanceValue Value to balance with.110 */111publicPartitionOwnerComparator(
112 Map<PartitionOwner, PartitionStats> ownerStatMap,
113BalanceValue balanceValue) {
114this.ownerStatMap = ownerStatMap;
115this.balanceValue = balanceValue;
116 }
117118 @Override
119publicint compare(PartitionOwner owner1, PartitionOwner owner2) {
120return (int)
121 (getBalanceValue(ownerStatMap.get(owner1), balanceValue) -
122 getBalanceValue(ownerStatMap.get(owner2), balanceValue));
123 }
124 }
125126/**127 * Structure to keep track of how much value a {@link WorkerInfo} has128 * been assigned.129 */130privatestaticclassWorkerInfoAssignmentsimplements131 Comparable<WorkerInfoAssignments> {
132/** Worker info associated */133privatefinalWorkerInfo workerInfo;
134/** Balance value */135privatefinalBalanceValue balanceValue;
136/** Map of owner to stats */137privatefinal Map<PartitionOwner, PartitionStats> ownerStatsMap;
138/** Current value of this object */139privatelong value = 0;
140141/**142 * Constructor with final values.143 *144 * @param workerInfo Worker info for assignment.145 * @param balanceValue Value used to balance.146 * @param ownerStatsMap Map of owner to stats.147 */148publicWorkerInfoAssignments(
149WorkerInfo workerInfo,
150BalanceValue balanceValue,
151 Map<PartitionOwner, PartitionStats> ownerStatsMap) {
152this.workerInfo = workerInfo;
153this.balanceValue = balanceValue;
154this.ownerStatsMap = ownerStatsMap;
155 }
156157/**158 * Get the total value of all partitions assigned to this worker.159 *160 * @return Total value of all partition assignments.161 */162publiclong getValue() {
163return value;
164 }
165166/**167 * Assign a {@link PartitionOwner} to this {@link WorkerInfo}.168 *169 * @param partitionOwner PartitionOwner to assign.170 */171publicvoid assignPartitionOwner(
172PartitionOwner partitionOwner) {
173 value += getBalanceValue(ownerStatsMap.get(partitionOwner),
174 balanceValue);
175if (!partitionOwner.getWorkerInfo().equals(workerInfo)) {
176 partitionOwner.setPreviousWorkerInfo(
177 partitionOwner.getWorkerInfo());
178 partitionOwner.setWorkerInfo(workerInfo);
179 } else {
180 partitionOwner.setPreviousWorkerInfo(null);
181 }
182 }
183184 @Override
185publicint compareTo(WorkerInfoAssignments other) {
186return (int)
187 (getValue() - ((WorkerInfoAssignments) other).getValue());
188 }
189190 @Override
191publicboolean equals(Object obj) {
192return obj instanceof WorkerInfoAssignments &&
193 compareTo((WorkerInfoAssignments) obj) == 0;
194 }
195196 @Override
197publicint hashCode() {
198return Objects.hashCode(value);
199 }
200 }
201202/**203 * Balance the partitions with an algorithm based on a value.204 *205 * @param conf Configuration to find the algorithm206 * @param partitionOwners All the owners of all partitions207 * @param allPartitionStats All the partition stats208 * @param availableWorkerInfos All the available workers209 * @return Balanced partition owners210 */211publicstatic Collection<PartitionOwner> balancePartitionsAcrossWorkers(
212 Configuration conf,
213 Collection<PartitionOwner> partitionOwners,
214 Collection<PartitionStats> allPartitionStats,
215 Collection<WorkerInfo> availableWorkerInfos) {
216217 String balanceAlgorithm =
218 conf.get(PARTITION_BALANCE_ALGORITHM, STATIC_BALANCE_ALGORITHM);
219if (LOG.isInfoEnabled()) {
220 LOG.info("balancePartitionsAcrossWorkers: Using algorithm " +
221 balanceAlgorithm);
222 }
223BalanceValue balanceValue = BalanceValue.UNSET;
224if (balanceAlgorithm.equals(STATIC_BALANCE_ALGORITHM)) {
225return partitionOwners;
226 } elseif (balanceAlgorithm.equals(EGDE_BALANCE_ALGORITHM)) {
227 balanceValue = BalanceValue.EDGES;
228 } elseif (balanceAlgorithm.equals(VERTICES_BALANCE_ALGORITHM)) {
229 balanceValue = BalanceValue.VERTICES;
230 } else {
231thrownew IllegalArgumentException(
232"balancePartitionsAcrossWorkers: Illegal balance " +
233"algorithm - " + balanceAlgorithm);
234 }
235236// Join the partition stats and partition owners by partition id237 Map<Integer, PartitionStats> idStatMap =
238new HashMap<Integer, PartitionStats>();
239for (PartitionStats partitionStats : allPartitionStats) {
240if (idStatMap.put(partitionStats.getPartitionId(), partitionStats) !=
241null) {
242thrownew IllegalStateException(
243"balancePartitionsAcrossWorkers: Duplicate partition id " +
244"for " + partitionStats);
245 }
246 }
247 Map<PartitionOwner, PartitionStats> ownerStatsMap =
248new HashMap<PartitionOwner, PartitionStats>();
249for (PartitionOwner partitionOwner : partitionOwners) {
250PartitionStats partitionStats =
251 idStatMap.get(partitionOwner.getPartitionId());
252if (partitionStats == null) {
253thrownew IllegalStateException(
254"balancePartitionsAcrossWorkers: Missing partition " +
255"stats for " + partitionOwner);
256 }
257if (ownerStatsMap.put(partitionOwner, partitionStats) != null) {
258thrownew IllegalStateException(
259"balancePartitionsAcrossWorkers: Duplicate partition " +
260"owner " + partitionOwner);
261 }
262 }
263if (ownerStatsMap.size() != partitionOwners.size()) {
264thrownew IllegalStateException(
265"balancePartitionsAcrossWorkers: ownerStats count = " +
266 ownerStatsMap.size() + ", partitionOwners count = " +
267 partitionOwners.size() + " and should match.");
268 }
269270 List<WorkerInfoAssignments> workerInfoAssignmentsList =
271new ArrayList<WorkerInfoAssignments>(availableWorkerInfos.size());
272for (WorkerInfo workerInfo : availableWorkerInfos) {
273 workerInfoAssignmentsList.add(
274newWorkerInfoAssignments(
275 workerInfo, balanceValue, ownerStatsMap));
276 }
277278// A simple heuristic for balancing the partitions across the workers279// using a value (edges, vertices). An improvement would be to280// take into account the already existing partition worker assignments.281// 1. Sort the partitions by size282// 2. Place the workers in a min heap sorted by their total balance283// value.284// 3. From largest partition to the smallest, take the partition285// worker at the top of the heap, add the partition to it, and286// then put it back in the heap287 List<PartitionOwner> partitionOwnerList =
288new ArrayList<PartitionOwner>(partitionOwners);
289 Collections.sort(partitionOwnerList,
290 Collections.reverseOrder(
291newPartitionOwnerComparator(ownerStatsMap, balanceValue)));
292 PriorityQueue<WorkerInfoAssignments> minQueue =
293new PriorityQueue<WorkerInfoAssignments>(workerInfoAssignmentsList);
294for (PartitionOwner partitionOwner : partitionOwnerList) {
295WorkerInfoAssignments chosenWorker = minQueue.remove();
296 chosenWorker.assignPartitionOwner(partitionOwner);
297 minQueue.add(chosenWorker);
298 }
299300return partitionOwnerList;
301 }
302303/**304 * Helper function to update partition owners and determine which305 * partitions need to be sent from a specific worker.306 *307 * @param partitionOwnerList Local {@link PartitionOwner} list for the308 * given worker309 * @param myWorkerInfo Worker info310 * @param masterSetPartitionOwners Master set partition owners, received311 * prior to beginning the superstep312 * @return Information for the partition exchange.313 */314publicstaticPartitionExchange updatePartitionOwners(
315 List<PartitionOwner> partitionOwnerList,
316WorkerInfo myWorkerInfo,
317 Collection<? extends PartitionOwner> masterSetPartitionOwners) {
318 partitionOwnerList.clear();
319 partitionOwnerList.addAll(masterSetPartitionOwners);
320321 Set<WorkerInfo> dependentWorkerSet = new HashSet<WorkerInfo>();
322 Map<WorkerInfo, List<Integer>> workerPartitionOwnerMap =
323new HashMap<WorkerInfo, List<Integer>>();
324for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
325if (partitionOwner.getPreviousWorkerInfo() == null) {
326continue;
327 } elseif (partitionOwner.getWorkerInfo().equals(
328 myWorkerInfo) &&
329 partitionOwner.getPreviousWorkerInfo().equals(
330 myWorkerInfo)) {
331thrownew IllegalStateException(
332"updatePartitionOwners: Impossible to have the same " +
333"previous and current worker info " + partitionOwner +
334" as me " + myWorkerInfo);
335 } elseif (partitionOwner.getWorkerInfo().equals(myWorkerInfo)) {
336 dependentWorkerSet.add(partitionOwner.getPreviousWorkerInfo());
337 } elseif (partitionOwner.getPreviousWorkerInfo().equals(
338 myWorkerInfo)) {
339if (workerPartitionOwnerMap.containsKey(
340 partitionOwner.getWorkerInfo())) {
341 workerPartitionOwnerMap.get(
342 partitionOwner.getWorkerInfo()).add(
343 partitionOwner.getPartitionId());
344 } else {
345 List<Integer> tmpPartitionOwnerList = new ArrayList<Integer>();
346 tmpPartitionOwnerList.add(partitionOwner.getPartitionId());
347 workerPartitionOwnerMap.put(partitionOwner.getWorkerInfo(),
348 tmpPartitionOwnerList);
349 }
350 }
351 }
352353returnnewPartitionExchange(dependentWorkerSet,
354 workerPartitionOwnerMap);
355 }
356 }
357