This project has retired. For details please refer to its Attic page.
PartitionUtils xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.partition;
20  
21  import org.apache.giraph.conf.GiraphConstants;
22  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23  import org.apache.giraph.graph.VertexEdgeCount;
24  import org.apache.giraph.worker.WorkerInfo;
25  import org.apache.log4j.Logger;
26  
27  import com.google.common.collect.Lists;
28  import com.google.common.collect.Maps;
29  
30  import java.io.Serializable;
31  import java.util.Collection;
32  import java.util.Collections;
33  import java.util.Comparator;
34  import java.util.HashMap;
35  import java.util.List;
36  import java.util.Map;
37  import java.util.Map.Entry;
38  
39  import static org.apache.giraph.conf.GiraphConstants
40      .MIN_PARTITIONS_PER_COMPUTE_THREAD;
41  import static org.apache.giraph.conf.GiraphConstants.NUM_COMPUTE_THREADS;
42  import static org.apache.giraph.conf.GiraphConstants.USER_PARTITION_COUNT;
43  
44  /**
45   * Helper class for {@link Partition} related operations.
46   */
47  public class PartitionUtils {
48    /** Class logger */
49    private static Logger LOG = Logger.getLogger(PartitionUtils.class);
50  
51    /**
52     * Do not construct this object.
53     */
54    private PartitionUtils() { }
55  
56    /**
57     * Compare edge counts for Entry<WorkerInfo, VertexEdgeCount> objects.
58     */
59    private static class EdgeCountComparator implements
60        Comparator<Entry<WorkerInfo, VertexEdgeCount>>, Serializable {
61      /** Serialization version. */
62      private static final long serialVersionUID = 1L;
63  
64      @Override
65      public int compare(Entry<WorkerInfo, VertexEdgeCount> worker1,
66          Entry<WorkerInfo, VertexEdgeCount> worker2) {
67        return Long.compare(worker1.getValue().getEdgeCount(),
68          worker2.getValue().getEdgeCount());
69      }
70    }
71  
72    /**
73     * Compare vertex counts between a {@link WorkerInfo} and
74     * {@link VertexEdgeCount}.
75     */
76    private static class VertexCountComparator implements
77        Comparator<Entry<WorkerInfo, VertexEdgeCount>>, Serializable {
78      /** Serialization version. */
79      private static final long serialVersionUID = 1L;
80  
81      @Override
82      public int compare(Entry<WorkerInfo, VertexEdgeCount> worker1,
83          Entry<WorkerInfo, VertexEdgeCount> worker2) {
84        return Long.compare(worker1.getValue().getVertexCount(),
85          worker2.getValue().getVertexCount());
86      }
87    }
88  
89    /**
90     * Check for imbalances on a per worker basis, by calculating the
91     * mean, high and low workers by edges and vertices.
92     *
93     * @param partitionOwnerList List of partition owners.
94     * @param allPartitionStats All the partition stats.
95     */
96    public static void analyzePartitionStats(
97        Collection<PartitionOwner> partitionOwnerList,
98        List<PartitionStats> allPartitionStats) {
99      Map<Integer, PartitionOwner> idOwnerMap =
100         new HashMap<Integer, PartitionOwner>();
101     for (PartitionOwner partitionOwner : partitionOwnerList) {
102       if (idOwnerMap.put(partitionOwner.getPartitionId(),
103           partitionOwner) != null) {
104         throw new IllegalStateException(
105             "analyzePartitionStats: Duplicate partition " +
106                 partitionOwner);
107       }
108     }
109 
110     Map<WorkerInfo, VertexEdgeCount> workerStatsMap = Maps.newHashMap();
111     VertexEdgeCount totalVertexEdgeCount = new VertexEdgeCount();
112     for (PartitionStats partitionStats : allPartitionStats) {
113       WorkerInfo workerInfo =
114           idOwnerMap.get(partitionStats.getPartitionId()).getWorkerInfo();
115       VertexEdgeCount vertexEdgeCount =
116           workerStatsMap.get(workerInfo);
117       if (vertexEdgeCount == null) {
118         workerStatsMap.put(
119             workerInfo,
120             new VertexEdgeCount(partitionStats.getVertexCount(),
121                 partitionStats.getEdgeCount(), 0));
122       } else {
123         workerStatsMap.put(
124             workerInfo,
125             vertexEdgeCount.incrVertexEdgeCount(
126                 partitionStats.getVertexCount(),
127                 partitionStats.getEdgeCount()));
128       }
129       totalVertexEdgeCount =
130           totalVertexEdgeCount.incrVertexEdgeCount(
131               partitionStats.getVertexCount(),
132               partitionStats.getEdgeCount());
133     }
134 
135     List<Entry<WorkerInfo, VertexEdgeCount>> workerEntryList =
136         Lists.newArrayList(workerStatsMap.entrySet());
137 
138     if (LOG.isInfoEnabled()) {
139       Collections.sort(workerEntryList, new VertexCountComparator());
140       LOG.info("analyzePartitionStats: Vertices - Mean: " +
141           (totalVertexEdgeCount.getVertexCount() /
142               workerStatsMap.size()) +
143               ", Min: " +
144               workerEntryList.get(0).getKey() + " - " +
145               workerEntryList.get(0).getValue().getVertexCount() +
146               ", Max: " +
147               workerEntryList.get(workerEntryList.size() - 1).getKey() +
148               " - " +
149               workerEntryList.get(workerEntryList.size() - 1).
150               getValue().getVertexCount());
151       Collections.sort(workerEntryList, new EdgeCountComparator());
152       LOG.info("analyzePartitionStats: Edges - Mean: " +
153           (totalVertexEdgeCount.getEdgeCount() /
154               workerStatsMap.size()) +
155               ", Min: " +
156               workerEntryList.get(0).getKey() + " - " +
157               workerEntryList.get(0).getValue().getEdgeCount() +
158               ", Max: " +
159               workerEntryList.get(workerEntryList.size() - 1).getKey() +
160               " - " +
161               workerEntryList.get(workerEntryList.size() - 1).
162               getValue().getEdgeCount());
163     }
164   }
165 
166   /**
167    * Compute the number of partitions, based on the configuration.
168    *
169    * If USER_PARTITION_COUNT is set, it will follow that, otherwise it will
170    * choose the max of what MIN_PARTITIONS_PER_COMPUTE_THREAD and
171    * PARTITION_COUNT_MULTIPLIER settings would choose, capped by max
172    * partitions limited constrained by zookeeper.
173    *
174    * @param availableWorkerCount Number of available workers
175    * @param conf Configuration.
176    * @return Number of partitions for the job.
177    */
178   public static int computePartitionCount(int availableWorkerCount,
179       ImmutableClassesGiraphConfiguration conf) {
180     if (availableWorkerCount == 0) {
181       throw new IllegalArgumentException(
182           "computePartitionCount: No available workers");
183     }
184 
185     int userPartitionCount = USER_PARTITION_COUNT.get(conf);
186     int partitionCount;
187     if (userPartitionCount == USER_PARTITION_COUNT.getDefaultValue()) {
188       float multiplier = GiraphConstants.PARTITION_COUNT_MULTIPLIER.get(conf);
189       partitionCount = Math.max(
190           (int) (multiplier * availableWorkerCount * availableWorkerCount), 1);
191       int minPartitionsPerComputeThread =
192           MIN_PARTITIONS_PER_COMPUTE_THREAD.get(conf);
193       int totalComputeThreads =
194           NUM_COMPUTE_THREADS.get(conf) * availableWorkerCount;
195       partitionCount = Math.max(partitionCount,
196           minPartitionsPerComputeThread * totalComputeThreads);
197     } else {
198       partitionCount = userPartitionCount;
199     }
200     if (LOG.isInfoEnabled()) {
201       LOG.info("computePartitionCount: Creating " +
202           partitionCount + " partitions.");
203     }
204     return partitionCount;
205   }
206 }