View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.partition;
20  
21  import org.apache.giraph.conf.GiraphConstants;
22  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23  import org.apache.giraph.graph.VertexEdgeCount;
24  import org.apache.giraph.worker.WorkerInfo;
25  import org.apache.log4j.Logger;
26  
27  import com.google.common.collect.Lists;
28  import com.google.common.collect.Maps;
29  
30  import java.io.Serializable;
31  import java.util.Collection;
32  import java.util.Collections;
33  import java.util.Comparator;
34  import java.util.HashMap;
35  import java.util.List;
36  import java.util.Map;
37  import java.util.Map.Entry;
38  
39  import static org.apache.giraph.conf.GiraphConstants
40      .MIN_PARTITIONS_PER_COMPUTE_THREAD;
41  import static org.apache.giraph.conf.GiraphConstants.NUM_COMPUTE_THREADS;
42  import static org.apache.giraph.conf.GiraphConstants.USER_PARTITION_COUNT;
43  
44  /**
45   * Helper class for {@link Partition} related operations.
46   */
47  public class PartitionUtils {
48    /** Class logger */
49    private static Logger LOG = Logger.getLogger(PartitionUtils.class);
50  
51    /**
52     * Do not construct this object.
53     */
54    private PartitionUtils() { }
55  
56    /**
57     * Compare edge counts for Entry<WorkerInfo, VertexEdgeCount> objects.
58     */
59    private static class EdgeCountComparator implements
60        Comparator<Entry<WorkerInfo, VertexEdgeCount>>, Serializable {
61      /** Serialization version. */
62      private static final long serialVersionUID = 1L;
63  
64      @Override
65      public int compare(Entry<WorkerInfo, VertexEdgeCount> worker1,
66          Entry<WorkerInfo, VertexEdgeCount> worker2) {
67        return Long.compare(worker1.getValue().getEdgeCount(),
68          worker2.getValue().getEdgeCount());
69      }
70    }
71  
72    /**
73     * Compare vertex counts between a {@link WorkerInfo} and
74     * {@link VertexEdgeCount}.
75     */
76    private static class VertexCountComparator implements
77        Comparator<Entry<WorkerInfo, VertexEdgeCount>>, Serializable {
78      /** Serialization version. */
79      private static final long serialVersionUID = 1L;
80  
81      @Override
82      public int compare(Entry<WorkerInfo, VertexEdgeCount> worker1,
83          Entry<WorkerInfo, VertexEdgeCount> worker2) {
84        return Long.compare(worker1.getValue().getVertexCount(),
85          worker2.getValue().getVertexCount());
86      }
87    }
88  
89    /**
90     * Check for imbalances on a per worker basis, by calculating the
91     * mean, high and low workers by edges and vertices.
92     *
93     * @param partitionOwnerList List of partition owners.
94     * @param allPartitionStats All the partition stats.
95     */
96    public static void analyzePartitionStats(
97        Collection<PartitionOwner> partitionOwnerList,
98        List<PartitionStats> allPartitionStats) {
99      Map<Integer, PartitionOwner> idOwnerMap =
100         new HashMap<Integer, PartitionOwner>();
101     for (PartitionOwner partitionOwner : partitionOwnerList) {
102       if (idOwnerMap.put(partitionOwner.getPartitionId(),
103           partitionOwner) != null) {
104         throw new IllegalStateException(
105             "analyzePartitionStats: Duplicate partition " +
106                 partitionOwner);
107       }
108     }
109 
110     Map<WorkerInfo, VertexEdgeCount> workerStatsMap = Maps.newHashMap();
111     VertexEdgeCount totalVertexEdgeCount = new VertexEdgeCount();
112     for (PartitionStats partitionStats : allPartitionStats) {
113       WorkerInfo workerInfo =
114           idOwnerMap.get(partitionStats.getPartitionId()).getWorkerInfo();
115       VertexEdgeCount vertexEdgeCount =
116           workerStatsMap.get(workerInfo);
117       if (vertexEdgeCount == null) {
118         workerStatsMap.put(
119             workerInfo,
120             new VertexEdgeCount(partitionStats.getVertexCount(),
121                 partitionStats.getEdgeCount(), 0));
122       } else {
123         workerStatsMap.put(
124             workerInfo,
125             vertexEdgeCount.incrVertexEdgeCount(
126                 partitionStats.getVertexCount(),
127                 partitionStats.getEdgeCount()));
128       }
129       totalVertexEdgeCount =
130           totalVertexEdgeCount.incrVertexEdgeCount(
131               partitionStats.getVertexCount(),
132               partitionStats.getEdgeCount());
133     }
134 
135     List<Entry<WorkerInfo, VertexEdgeCount>> workerEntryList =
136         Lists.newArrayList(workerStatsMap.entrySet());
137 
138     if (LOG.isInfoEnabled()) {
139       StringBuilder sb = new StringBuilder();
140       for (Entry<WorkerInfo, VertexEdgeCount> worker : workerEntryList) {
141         sb.append(worker.getKey());
142         sb.append(":");
143         sb.append(worker.getValue());
144         sb.append(",");
145       }
146       LOG.info("analyzePartitionStats: [" + sb + "]");
147       Collections.sort(workerEntryList, new VertexCountComparator());
148       LOG.info("analyzePartitionStats: Vertices - Mean: " +
149           (totalVertexEdgeCount.getVertexCount() /
150               workerStatsMap.size()) +
151               ", Min: " +
152               workerEntryList.get(0).getKey() + " - " +
153               workerEntryList.get(0).getValue().getVertexCount() +
154               ", Max: " +
155               workerEntryList.get(workerEntryList.size() - 1).getKey() +
156               " - " +
157               workerEntryList.get(workerEntryList.size() - 1).
158               getValue().getVertexCount());
159       Collections.sort(workerEntryList, new EdgeCountComparator());
160       LOG.info("analyzePartitionStats: Edges - Mean: " +
161           (totalVertexEdgeCount.getEdgeCount() /
162               workerStatsMap.size()) +
163               ", Min: " +
164               workerEntryList.get(0).getKey() + " - " +
165               workerEntryList.get(0).getValue().getEdgeCount() +
166               ", Max: " +
167               workerEntryList.get(workerEntryList.size() - 1).getKey() +
168               " - " +
169               workerEntryList.get(workerEntryList.size() - 1).
170               getValue().getEdgeCount());
171     }
172   }
173 
174   /**
175    * Compute the number of partitions, based on the configuration.
176    *
177    * If USER_PARTITION_COUNT is set, it will follow that, otherwise it will
178    * choose the max of what MIN_PARTITIONS_PER_COMPUTE_THREAD and
179    * PARTITION_COUNT_MULTIPLIER settings would choose, capped by max
180    * partitions limited constrained by zookeeper.
181    *
182    * @param availableWorkerCount Number of available workers
183    * @param conf Configuration.
184    * @return Number of partitions for the job.
185    */
186   public static int computePartitionCount(int availableWorkerCount,
187       ImmutableClassesGiraphConfiguration conf) {
188     if (availableWorkerCount == 0) {
189       throw new IllegalArgumentException(
190           "computePartitionCount: No available workers");
191     }
192 
193     int userPartitionCount = USER_PARTITION_COUNT.get(conf);
194     int partitionCount;
195     if (userPartitionCount == USER_PARTITION_COUNT.getDefaultValue()) {
196       float multiplier = GiraphConstants.PARTITION_COUNT_MULTIPLIER.get(conf);
197       partitionCount = Math.max(
198           (int) (multiplier * availableWorkerCount * availableWorkerCount), 1);
199       int minPartitionsPerComputeThread =
200           MIN_PARTITIONS_PER_COMPUTE_THREAD.get(conf);
201       int totalComputeThreads =
202           NUM_COMPUTE_THREADS.get(conf) * availableWorkerCount;
203       partitionCount = Math.max(partitionCount,
204           minPartitionsPerComputeThread * totalComputeThreads);
205     } else {
206       partitionCount = userPartitionCount;
207     }
208     if (LOG.isInfoEnabled()) {
209       LOG.info("computePartitionCount: Creating " +
210           partitionCount + " partitions.");
211     }
212     return partitionCount;
213   }
214 }