This project has retired. For details please refer to its
Attic page.
PartitionUtils xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.partition;
20
21 import org.apache.giraph.conf.GiraphConstants;
22 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23 import org.apache.giraph.graph.VertexEdgeCount;
24 import org.apache.giraph.worker.WorkerInfo;
25 import org.apache.log4j.Logger;
26
27 import com.google.common.collect.Lists;
28 import com.google.common.collect.Maps;
29
30 import java.io.Serializable;
31 import java.util.Collection;
32 import java.util.Collections;
33 import java.util.Comparator;
34 import java.util.HashMap;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.Map.Entry;
38
39 import static org.apache.giraph.conf.GiraphConstants
40 .MIN_PARTITIONS_PER_COMPUTE_THREAD;
41 import static org.apache.giraph.conf.GiraphConstants.NUM_COMPUTE_THREADS;
42 import static org.apache.giraph.conf.GiraphConstants.USER_PARTITION_COUNT;
43
44
45
46
47 public class PartitionUtils {
48
49 private static Logger LOG = Logger.getLogger(PartitionUtils.class);
50
51
52
53
54 private PartitionUtils() { }
55
56
57
58
59 private static class EdgeCountComparator implements
60 Comparator<Entry<WorkerInfo, VertexEdgeCount>>, Serializable {
61
62 private static final long serialVersionUID = 1L;
63
64 @Override
65 public int compare(Entry<WorkerInfo, VertexEdgeCount> worker1,
66 Entry<WorkerInfo, VertexEdgeCount> worker2) {
67 return Long.compare(worker1.getValue().getEdgeCount(),
68 worker2.getValue().getEdgeCount());
69 }
70 }
71
72
73
74
75
76 private static class VertexCountComparator implements
77 Comparator<Entry<WorkerInfo, VertexEdgeCount>>, Serializable {
78
79 private static final long serialVersionUID = 1L;
80
81 @Override
82 public int compare(Entry<WorkerInfo, VertexEdgeCount> worker1,
83 Entry<WorkerInfo, VertexEdgeCount> worker2) {
84 return Long.compare(worker1.getValue().getVertexCount(),
85 worker2.getValue().getVertexCount());
86 }
87 }
88
89
90
91
92
93
94
95
96 public static void analyzePartitionStats(
97 Collection<PartitionOwner> partitionOwnerList,
98 List<PartitionStats> allPartitionStats) {
99 Map<Integer, PartitionOwner> idOwnerMap =
100 new HashMap<Integer, PartitionOwner>();
101 for (PartitionOwner partitionOwner : partitionOwnerList) {
102 if (idOwnerMap.put(partitionOwner.getPartitionId(),
103 partitionOwner) != null) {
104 throw new IllegalStateException(
105 "analyzePartitionStats: Duplicate partition " +
106 partitionOwner);
107 }
108 }
109
110 Map<WorkerInfo, VertexEdgeCount> workerStatsMap = Maps.newHashMap();
111 VertexEdgeCount totalVertexEdgeCount = new VertexEdgeCount();
112 for (PartitionStats partitionStats : allPartitionStats) {
113 WorkerInfo workerInfo =
114 idOwnerMap.get(partitionStats.getPartitionId()).getWorkerInfo();
115 VertexEdgeCount vertexEdgeCount =
116 workerStatsMap.get(workerInfo);
117 if (vertexEdgeCount == null) {
118 workerStatsMap.put(
119 workerInfo,
120 new VertexEdgeCount(partitionStats.getVertexCount(),
121 partitionStats.getEdgeCount(), 0));
122 } else {
123 workerStatsMap.put(
124 workerInfo,
125 vertexEdgeCount.incrVertexEdgeCount(
126 partitionStats.getVertexCount(),
127 partitionStats.getEdgeCount()));
128 }
129 totalVertexEdgeCount =
130 totalVertexEdgeCount.incrVertexEdgeCount(
131 partitionStats.getVertexCount(),
132 partitionStats.getEdgeCount());
133 }
134
135 List<Entry<WorkerInfo, VertexEdgeCount>> workerEntryList =
136 Lists.newArrayList(workerStatsMap.entrySet());
137
138 if (LOG.isInfoEnabled()) {
139 Collections.sort(workerEntryList, new VertexCountComparator());
140 LOG.info("analyzePartitionStats: Vertices - Mean: " +
141 (totalVertexEdgeCount.getVertexCount() /
142 workerStatsMap.size()) +
143 ", Min: " +
144 workerEntryList.get(0).getKey() + " - " +
145 workerEntryList.get(0).getValue().getVertexCount() +
146 ", Max: " +
147 workerEntryList.get(workerEntryList.size() - 1).getKey() +
148 " - " +
149 workerEntryList.get(workerEntryList.size() - 1).
150 getValue().getVertexCount());
151 Collections.sort(workerEntryList, new EdgeCountComparator());
152 LOG.info("analyzePartitionStats: Edges - Mean: " +
153 (totalVertexEdgeCount.getEdgeCount() /
154 workerStatsMap.size()) +
155 ", Min: " +
156 workerEntryList.get(0).getKey() + " - " +
157 workerEntryList.get(0).getValue().getEdgeCount() +
158 ", Max: " +
159 workerEntryList.get(workerEntryList.size() - 1).getKey() +
160 " - " +
161 workerEntryList.get(workerEntryList.size() - 1).
162 getValue().getEdgeCount());
163 }
164 }
165
166
167
168
169
170
171
172
173
174
175
176
177
178 public static int computePartitionCount(int availableWorkerCount,
179 ImmutableClassesGiraphConfiguration conf) {
180 if (availableWorkerCount == 0) {
181 throw new IllegalArgumentException(
182 "computePartitionCount: No available workers");
183 }
184
185 int userPartitionCount = USER_PARTITION_COUNT.get(conf);
186 int partitionCount;
187 if (userPartitionCount == USER_PARTITION_COUNT.getDefaultValue()) {
188 float multiplier = GiraphConstants.PARTITION_COUNT_MULTIPLIER.get(conf);
189 partitionCount = Math.max(
190 (int) (multiplier * availableWorkerCount * availableWorkerCount), 1);
191 int minPartitionsPerComputeThread =
192 MIN_PARTITIONS_PER_COMPUTE_THREAD.get(conf);
193 int totalComputeThreads =
194 NUM_COMPUTE_THREADS.get(conf) * availableWorkerCount;
195 partitionCount = Math.max(partitionCount,
196 minPartitionsPerComputeThread * totalComputeThreads);
197 } else {
198 partitionCount = userPartitionCount;
199 }
200 if (LOG.isInfoEnabled()) {
201 LOG.info("computePartitionCount: Creating " +
202 partitionCount + " partitions.");
203 }
204 return partitionCount;
205 }
206 }