This project has retired. For details please refer to its
Attic page.
CombinedWorkerProgress xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.job;
20
21 import com.google.common.collect.Iterables;
22 import org.apache.giraph.conf.FloatConfOption;
23 import org.apache.giraph.conf.GiraphConstants;
24 import org.apache.giraph.master.MasterProgress;
25 import org.apache.giraph.worker.WorkerProgress;
26 import org.apache.giraph.worker.WorkerProgressStats;
27 import org.apache.hadoop.conf.Configuration;
28
29 import javax.annotation.concurrent.NotThreadSafe;
30 import java.text.DecimalFormat;
31
32
33
34
35
36 @NotThreadSafe
37 public class CombinedWorkerProgress extends WorkerProgressStats {
38
39 public static final DecimalFormat DECIMAL_FORMAT = new DecimalFormat("#.##");
40
41
42
43
44 public static final FloatConfOption NORMAL_FREE_MEMORY_FRACTION =
45 new FloatConfOption("giraph.normalFreeMemoryFraction", 0.1f,
46 "If free memory fraction on some worker goes below this value, " +
47 "warning will be printed");
48
49
50
51
52 private double normalFreeMemoryFraction;
53
54 private final int superstepCount;
55
56
57
58
59 private int workersInSuperstep = 0;
60
61
62
63 private int workersDone = 0;
64
65 private double minFreeMemoryMB = Double.MAX_VALUE;
66
67 private int workerWithMinFreeMemory;
68
69 private double minFreeMemoryFraction = Double.MAX_VALUE;
70
71
72
73
74 private int minGraphPercentageInMemory = 100;
75
76 private int workerWithMinGraphPercentageInMemory = -1;
77
78 private MasterProgress masterProgress;
79
80
81
82
83
84
85
86
87 public CombinedWorkerProgress(Iterable<WorkerProgress> workerProgresses,
88 MasterProgress masterProgress, Configuration conf) {
89 this.masterProgress = masterProgress;
90 normalFreeMemoryFraction = NORMAL_FREE_MEMORY_FRACTION.get(conf);
91 superstepCount = GiraphConstants.SUPERSTEP_COUNT.get(conf);
92 for (WorkerProgress workerProgress : workerProgresses) {
93 if (workerProgress.getCurrentSuperstep() > currentSuperstep) {
94 verticesToCompute = 0;
95 verticesComputed = 0;
96 partitionsToCompute = 0;
97 partitionsComputed = 0;
98 currentSuperstep = workerProgress.getCurrentSuperstep();
99 workersInSuperstep = 0;
100 }
101
102 if (workerProgress.getCurrentSuperstep() == currentSuperstep) {
103 workersInSuperstep++;
104 if (isInputSuperstep()) {
105 verticesLoaded += workerProgress.getVerticesLoaded();
106 vertexInputSplitsLoaded +=
107 workerProgress.getVertexInputSplitsLoaded();
108 edgesLoaded += workerProgress.getEdgesLoaded();
109 edgeInputSplitsLoaded += workerProgress.getEdgeInputSplitsLoaded();
110 } else if (isComputeSuperstep()) {
111 verticesToCompute += workerProgress.getVerticesToCompute();
112 verticesComputed += workerProgress.getVerticesComputed();
113 partitionsToCompute += workerProgress.getPartitionsToCompute();
114 partitionsComputed += workerProgress.getPartitionsComputed();
115 } else if (isOutputSuperstep()) {
116 verticesToStore += workerProgress.getVerticesToStore();
117 verticesStored += workerProgress.getVerticesStored();
118 partitionsToStore += workerProgress.getPartitionsToStore();
119 partitionsStored += workerProgress.getPartitionsStored();
120 }
121 }
122
123 if (workerProgress.isStoringDone()) {
124 workersDone++;
125 }
126
127 if (workerProgress.getFreeMemoryMB() < minFreeMemoryMB) {
128 minFreeMemoryMB = workerProgress.getFreeMemoryMB();
129 workerWithMinFreeMemory = workerProgress.getTaskId();
130 }
131 minFreeMemoryFraction = Math.min(minFreeMemoryFraction,
132 workerProgress.getFreeMemoryFraction());
133 freeMemoryMB += workerProgress.getFreeMemoryMB();
134 int percentage = workerProgress.getLowestGraphPercentageInMemory();
135 if (percentage < minGraphPercentageInMemory) {
136 minGraphPercentageInMemory = percentage;
137 workerWithMinGraphPercentageInMemory = workerProgress.getTaskId();
138 }
139 }
140 if (!Iterables.isEmpty(workerProgresses)) {
141 freeMemoryMB /= Iterables.size(workerProgresses);
142 }
143 }
144
145
146
147
148
149 public long getCurrentSuperstep() {
150 return currentSuperstep;
151 }
152
153
154
155
156
157 public long getWorkersInSuperstep() {
158 return workersInSuperstep;
159 }
160
161
162
163
164
165 public long getVerticesComputed() {
166 return verticesComputed;
167 }
168
169
170
171
172
173 public long getVerticesToCompute() {
174 return verticesToCompute;
175 }
176
177
178
179
180
181
182
183
184 public boolean isDone(int expectedWorkersDone) {
185 return workersDone == expectedWorkersDone;
186 }
187
188
189
190
191
192
193 protected String getProgressString() {
194 StringBuilder sb = new StringBuilder();
195 if (isInputSuperstep()) {
196 sb.append("Loading data: ");
197 if (!masterProgress.vertexInputSplitsSet() ||
198 masterProgress.getVertexInputSplitCount() > 0) {
199 sb.append(verticesLoaded).append(" vertices loaded, ");
200 sb.append(vertexInputSplitsLoaded).append(
201 " vertex input splits loaded");
202 if (masterProgress.getVertexInputSplitCount() > 0) {
203 sb.append(" (out of ").append(
204 masterProgress.getVertexInputSplitCount()).append(")");
205 }
206 sb.append("; ");
207 }
208 if (!masterProgress.edgeInputSplitsSet() ||
209 masterProgress.getEdgeInputSplitsCount() > 0) {
210 sb.append(edgesLoaded).append(" edges loaded, ");
211 sb.append(edgeInputSplitsLoaded).append(" edge input splits loaded");
212 if (masterProgress.getEdgeInputSplitsCount() > 0) {
213 sb.append(" (out of ").append(
214 masterProgress.getEdgeInputSplitsCount()).append(")");
215 }
216 }
217 } else if (isComputeSuperstep()) {
218 sb.append("Compute superstep ").append(currentSuperstep);
219 if (superstepCount > 0) {
220
221 sb.append(" (out of ").append(superstepCount - 1).append(")");
222 }
223 sb.append(": ").append(verticesComputed).append(" out of ").append(
224 verticesToCompute).append(" vertices computed; ");
225 sb.append(partitionsComputed).append(" out of ").append(
226 partitionsToCompute).append(" partitions computed");
227 } else if (isOutputSuperstep()) {
228 sb.append("Storing data: ");
229 sb.append(verticesStored).append(" out of ").append(
230 verticesToStore).append(" vertices stored; ");
231 sb.append(partitionsStored).append(" out of ").append(
232 partitionsToStore).append(" partitions stored");
233 }
234 return sb.toString();
235 }
236
237 @Override
238 public String toString() {
239 StringBuilder sb = new StringBuilder();
240 sb.append("Data from ").append(workersInSuperstep).append(" workers - ");
241 sb.append(getProgressString());
242 sb.append("; min free memory on worker ").append(
243 workerWithMinFreeMemory).append(" - ").append(
244 DECIMAL_FORMAT.format(minFreeMemoryMB)).append("MB, average ").append(
245 DECIMAL_FORMAT.format(freeMemoryMB)).append("MB");
246 if (minFreeMemoryFraction < normalFreeMemoryFraction) {
247 sb.append(", ******* YOUR JOB IS RUNNING LOW ON MEMORY *******");
248 }
249 if (minGraphPercentageInMemory < 100) {
250 sb.append(" Spilling ")
251 .append(100 - minGraphPercentageInMemory)
252 .append("% of data to external storage on worker ")
253 .append(workerWithMinGraphPercentageInMemory);
254 }
255 return sb.toString();
256 }
257
258
259
260
261
262
263
264 public boolean madeProgressFrom(CombinedWorkerProgress lastProgress) {
265
266 if (!getProgressString().equals(lastProgress.getProgressString())) {
267 return true;
268 }
269
270 return workersDone != lastProgress.workersDone;
271 }
272 }