This project has retired. For details please refer to its Attic page.
PartitionBalancer xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.partition;
20  
21  import org.apache.giraph.worker.WorkerInfo;
22  import org.apache.hadoop.conf.Configuration;
23  import org.apache.log4j.Logger;
24  
25  import com.google.common.base.Objects;
26  
27  import java.util.ArrayList;
28  import java.util.Collection;
29  import java.util.Collections;
30  import java.util.Comparator;
31  import java.util.HashMap;
32  import java.util.HashSet;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.PriorityQueue;
36  import java.util.Set;
37  
38  /**
39   * Helper class for balancing partitions across a set of workers.
40   */
41  public class PartitionBalancer {
42    /** Partition balancing algorithm */
43    public static final String PARTITION_BALANCE_ALGORITHM =
44      "hash.partitionBalanceAlgorithm";
45    /** No rebalancing during the supersteps */
46    public static final String STATIC_BALANCE_ALGORITHM =
47      "static";
48    /** Rebalance across supersteps by edges */
49    public static final String EGDE_BALANCE_ALGORITHM =
50      "edges";
51    /** Rebalance across supersteps by vertices */
52    public static final String VERTICES_BALANCE_ALGORITHM =
53      "vertices";
54    /** Class logger */
55    private static Logger LOG = Logger.getLogger(PartitionBalancer.class);
56  
57    /**
58     * What value to balance partitions with?  Edges, vertices?
59     */
60    private enum BalanceValue {
61      /** Not chosen */
62      UNSET,
63      /** Balance with edges */
64      EDGES,
65      /** Balance with vertices */
66      VERTICES
67    }
68  
69    /**
70     * Do not construct this class.
71     */
72    private PartitionBalancer() { }
73  
74    /**
75     * Get the value used to balance.
76     *
77     * @param partitionStat Stats of this partition.
78     * @param balanceValue Type of the value to balance.
79     * @return Balance value.
80     */
81    private static long getBalanceValue(PartitionStats partitionStat,
82        BalanceValue balanceValue) {
83      switch (balanceValue) {
84      case EDGES:
85        return partitionStat.getEdgeCount();
86      case VERTICES:
87        return partitionStat.getVertexCount();
88      default:
89        throw new IllegalArgumentException(
90            "getBalanceValue: Illegal balance value " + balanceValue);
91      }
92    }
93  
94    /**
95     * Used to sort the partition owners from lowest value to highest value
96     */
97    private static class PartitionOwnerComparator implements
98        Comparator<PartitionOwner> {
99      /** Map of owner to stats */
100     private final Map<PartitionOwner, PartitionStats> ownerStatMap;
101     /** Value type to compare on */
102     private final BalanceValue balanceValue;
103 
104 
105     /**
106      * Only constructor.
107      *
108      * @param ownerStatMap Map of owners to stats.
109      * @param balanceValue Value to balance with.
110      */
111     public PartitionOwnerComparator(
112         Map<PartitionOwner, PartitionStats> ownerStatMap,
113         BalanceValue balanceValue) {
114       this.ownerStatMap = ownerStatMap;
115       this.balanceValue = balanceValue;
116     }
117 
118     @Override
119     public int compare(PartitionOwner owner1, PartitionOwner owner2) {
120       return (int)
121           (getBalanceValue(ownerStatMap.get(owner1), balanceValue) -
122               getBalanceValue(ownerStatMap.get(owner2), balanceValue));
123     }
124   }
125 
126   /**
127    * Structure to keep track of how much value a {@link WorkerInfo} has
128    * been assigned.
129    */
130   private static class WorkerInfoAssignments implements
131       Comparable<WorkerInfoAssignments> {
132     /** Worker info associated */
133     private final WorkerInfo workerInfo;
134     /** Balance value */
135     private final BalanceValue balanceValue;
136     /** Map of owner to stats */
137     private final Map<PartitionOwner, PartitionStats> ownerStatsMap;
138     /** Current value of this object */
139     private long value = 0;
140 
141     /**
142      * Constructor with final values.
143      *
144      * @param workerInfo Worker info for assignment.
145      * @param balanceValue Value used to balance.
146      * @param ownerStatsMap Map of owner to stats.
147      */
148     public WorkerInfoAssignments(
149         WorkerInfo workerInfo,
150         BalanceValue balanceValue,
151         Map<PartitionOwner, PartitionStats> ownerStatsMap) {
152       this.workerInfo = workerInfo;
153       this.balanceValue = balanceValue;
154       this.ownerStatsMap = ownerStatsMap;
155     }
156 
157     /**
158      * Get the total value of all partitions assigned to this worker.
159      *
160      * @return Total value of all partition assignments.
161      */
162     public long getValue() {
163       return value;
164     }
165 
166     /**
167      * Assign a {@link PartitionOwner} to this {@link WorkerInfo}.
168      *
169      * @param partitionOwner PartitionOwner to assign.
170      */
171     public void assignPartitionOwner(
172         PartitionOwner partitionOwner) {
173       value += getBalanceValue(ownerStatsMap.get(partitionOwner),
174           balanceValue);
175       if (!partitionOwner.getWorkerInfo().equals(workerInfo)) {
176         partitionOwner.setPreviousWorkerInfo(
177             partitionOwner.getWorkerInfo());
178         partitionOwner.setWorkerInfo(workerInfo);
179       } else {
180         partitionOwner.setPreviousWorkerInfo(null);
181       }
182     }
183 
184     @Override
185     public int compareTo(WorkerInfoAssignments other) {
186       return (int)
187           (getValue() - ((WorkerInfoAssignments) other).getValue());
188     }
189 
190     @Override
191     public boolean equals(Object obj) {
192       return obj instanceof WorkerInfoAssignments &&
193           compareTo((WorkerInfoAssignments) obj) == 0;
194     }
195 
196     @Override
197     public int hashCode() {
198       return Objects.hashCode(value);
199     }
200   }
201 
202   /**
203    * Balance the partitions with an algorithm based on a value.
204    *
205    * @param conf Configuration to find the algorithm
206    * @param partitionOwners All the owners of all partitions
207    * @param allPartitionStats All the partition stats
208    * @param availableWorkerInfos All the available workers
209    * @return Balanced partition owners
210    */
211   public static Collection<PartitionOwner> balancePartitionsAcrossWorkers(
212       Configuration conf,
213       Collection<PartitionOwner> partitionOwners,
214       Collection<PartitionStats> allPartitionStats,
215       Collection<WorkerInfo> availableWorkerInfos) {
216 
217     String balanceAlgorithm =
218         conf.get(PARTITION_BALANCE_ALGORITHM, STATIC_BALANCE_ALGORITHM);
219     if (LOG.isInfoEnabled()) {
220       LOG.info("balancePartitionsAcrossWorkers: Using algorithm " +
221           balanceAlgorithm);
222     }
223     BalanceValue balanceValue = BalanceValue.UNSET;
224     if (balanceAlgorithm.equals(STATIC_BALANCE_ALGORITHM)) {
225       return partitionOwners;
226     } else if (balanceAlgorithm.equals(EGDE_BALANCE_ALGORITHM)) {
227       balanceValue = BalanceValue.EDGES;
228     } else if (balanceAlgorithm.equals(VERTICES_BALANCE_ALGORITHM)) {
229       balanceValue = BalanceValue.VERTICES;
230     } else {
231       throw new IllegalArgumentException(
232           "balancePartitionsAcrossWorkers: Illegal balance " +
233               "algorithm - " + balanceAlgorithm);
234     }
235 
236     // Join the partition stats and partition owners by partition id
237     Map<Integer, PartitionStats> idStatMap =
238         new HashMap<Integer, PartitionStats>();
239     for (PartitionStats partitionStats : allPartitionStats) {
240       if (idStatMap.put(partitionStats.getPartitionId(), partitionStats) !=
241           null) {
242         throw new IllegalStateException(
243             "balancePartitionsAcrossWorkers: Duplicate partition id " +
244                 "for " + partitionStats);
245       }
246     }
247     Map<PartitionOwner, PartitionStats> ownerStatsMap =
248         new HashMap<PartitionOwner, PartitionStats>();
249     for (PartitionOwner partitionOwner : partitionOwners) {
250       PartitionStats partitionStats =
251           idStatMap.get(partitionOwner.getPartitionId());
252       if (partitionStats == null) {
253         throw new IllegalStateException(
254             "balancePartitionsAcrossWorkers: Missing partition " +
255                 "stats for " + partitionOwner);
256       }
257       if (ownerStatsMap.put(partitionOwner, partitionStats) != null) {
258         throw new IllegalStateException(
259             "balancePartitionsAcrossWorkers: Duplicate partition " +
260                 "owner " + partitionOwner);
261       }
262     }
263     if (ownerStatsMap.size() != partitionOwners.size()) {
264       throw new IllegalStateException(
265           "balancePartitionsAcrossWorkers: ownerStats count = " +
266               ownerStatsMap.size() + ", partitionOwners count = " +
267               partitionOwners.size() + " and should match.");
268     }
269 
270     List<WorkerInfoAssignments> workerInfoAssignmentsList =
271         new ArrayList<WorkerInfoAssignments>(availableWorkerInfos.size());
272     for (WorkerInfo workerInfo : availableWorkerInfos) {
273       workerInfoAssignmentsList.add(
274           new WorkerInfoAssignments(
275               workerInfo, balanceValue, ownerStatsMap));
276     }
277 
278     // A simple heuristic for balancing the partitions across the workers
279     // using a value (edges, vertices).  An improvement would be to
280     // take into account the already existing partition worker assignments.
281     // 1.  Sort the partitions by size
282     // 2.  Place the workers in a min heap sorted by their total balance
283     //     value.
284     // 3.  From largest partition to the smallest, take the partition
285     //     worker at the top of the heap, add the partition to it, and
286     //     then put it back in the heap
287     List<PartitionOwner> partitionOwnerList =
288         new ArrayList<PartitionOwner>(partitionOwners);
289     Collections.sort(partitionOwnerList,
290         Collections.reverseOrder(
291             new PartitionOwnerComparator(ownerStatsMap, balanceValue)));
292     PriorityQueue<WorkerInfoAssignments> minQueue =
293         new PriorityQueue<WorkerInfoAssignments>(workerInfoAssignmentsList);
294     for (PartitionOwner partitionOwner : partitionOwnerList) {
295       WorkerInfoAssignments chosenWorker = minQueue.remove();
296       chosenWorker.assignPartitionOwner(partitionOwner);
297       minQueue.add(chosenWorker);
298     }
299 
300     return partitionOwnerList;
301   }
302 
303   /**
304    * Helper function to update partition owners and determine which
305    * partitions need to be sent from a specific worker.
306    *
307    * @param partitionOwnerList Local {@link PartitionOwner} list for the
308    *                           given worker
309    * @param myWorkerInfo Worker info
310    * @param masterSetPartitionOwners Master set partition owners, received
311    *        prior to beginning the superstep
312    * @return Information for the partition exchange.
313    */
314   public static PartitionExchange updatePartitionOwners(
315       List<PartitionOwner> partitionOwnerList,
316       WorkerInfo myWorkerInfo,
317       Collection<? extends PartitionOwner> masterSetPartitionOwners) {
318     partitionOwnerList.clear();
319     partitionOwnerList.addAll(masterSetPartitionOwners);
320 
321     Set<WorkerInfo> dependentWorkerSet = new HashSet<WorkerInfo>();
322     Map<WorkerInfo, List<Integer>> workerPartitionOwnerMap =
323         new HashMap<WorkerInfo, List<Integer>>();
324     for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
325       if (partitionOwner.getPreviousWorkerInfo() == null) {
326         continue;
327       } else if (partitionOwner.getWorkerInfo().equals(
328           myWorkerInfo) &&
329           partitionOwner.getPreviousWorkerInfo().equals(
330               myWorkerInfo)) {
331         throw new IllegalStateException(
332             "updatePartitionOwners: Impossible to have the same " +
333                 "previous and current worker info " + partitionOwner +
334                 " as me " + myWorkerInfo);
335       } else if (partitionOwner.getWorkerInfo().equals(myWorkerInfo)) {
336         dependentWorkerSet.add(partitionOwner.getPreviousWorkerInfo());
337       } else if (partitionOwner.getPreviousWorkerInfo().equals(
338           myWorkerInfo)) {
339         if (workerPartitionOwnerMap.containsKey(
340             partitionOwner.getWorkerInfo())) {
341           workerPartitionOwnerMap.get(
342               partitionOwner.getWorkerInfo()).add(
343               partitionOwner.getPartitionId());
344         } else {
345           List<Integer> tmpPartitionOwnerList = new ArrayList<Integer>();
346           tmpPartitionOwnerList.add(partitionOwner.getPartitionId());
347           workerPartitionOwnerMap.put(partitionOwner.getWorkerInfo(),
348               tmpPartitionOwnerList);
349         }
350       }
351     }
352 
353     return new PartitionExchange(dependentWorkerSet,
354         workerPartitionOwnerMap);
355   }
356 }
357