This project has retired. For details please refer to its Attic page.
WorkerProgress xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.worker;
20  
21  import com.facebook.swift.codec.ThriftField;
22  import com.facebook.swift.codec.ThriftStruct;
23  import org.apache.giraph.utils.MemoryUtils;
24  
25  import javax.annotation.concurrent.ThreadSafe;
26  
27  /**
28   * Stores information about a worker's progress that is periodically written to
29   * ZooKeeper with {@link WorkerProgressWriter}.
30   */
31  @ThreadSafe
32  @ThriftStruct
33  public final class WorkerProgress extends WorkerProgressStats {
34    /** Singleton instance for everyone to use */
35    private static final WorkerProgress INSTANCE = new WorkerProgress();
36  
37    /**
38     * Public constructor for thrift to create us.
39     * Please use WorkerProgress.get() to get the static instance.
40     */
41    public WorkerProgress() {
42    }
43  
44    /**
45     * Get singleton instance of WorkerProgress.
46     *
47     * @return WorkerProgress singleton instance
48     */
49    public static WorkerProgress get() {
50      return INSTANCE;
51    }
52  
53    /**
54     * Add number of vertices loaded
55     *
56     * @param verticesLoaded How many vertices were loaded since the last
57     *                       time this function was called
58     */
59    public synchronized void addVerticesLoaded(long verticesLoaded) {
60      this.verticesLoaded += verticesLoaded;
61    }
62  
63    /**
64     * Increment number of vertex input splits which were loaded
65     */
66    public synchronized void incrementVertexInputSplitsLoaded() {
67      vertexInputSplitsLoaded++;
68    }
69  
70    /**
71     * Notify this class that worker finished loading vertices
72     */
73    public synchronized void finishLoadingVertices() {
74      loadingVerticesDone = true;
75    }
76  
77    /**
78     * Add number of edges loaded
79     *
80     * @param edgesLoaded How many edges were loaded since the last
81     *                    time this function was called
82     */
83    public synchronized void addEdgesLoaded(long edgesLoaded) {
84      this.edgesLoaded += edgesLoaded;
85    }
86  
87    /**
88     * Increment number of edge input splits which were loaded
89     */
90    public synchronized void incrementEdgeInputSplitsLoaded() {
91      edgeInputSplitsLoaded++;
92    }
93  
94    /**
95     * Notify this class that worker finished loading edges
96     */
97    public synchronized void finishLoadingEdges() {
98      loadingEdgesDone = true;
99    }
100 
101   /**
102    * Notify this class that next computation superstep is starting
103    *
104    * @param superstep           Superstep which is starting
105    * @param verticesToCompute   How many vertices are there to compute
106    * @param partitionsToCompute How many partitions are there to compute
107    */
108   public synchronized void startSuperstep(long superstep,
109       long verticesToCompute, int partitionsToCompute) {
110     this.currentSuperstep = superstep;
111     this.verticesToCompute = verticesToCompute;
112     this.partitionsToCompute = partitionsToCompute;
113     verticesComputed = 0;
114     partitionsComputed = 0;
115   }
116 
117   /**
118    * Add number of vertices computed
119    *
120    * @param verticesComputed How many vertices were computed since the last
121    *                         time this function was called
122    */
123   public synchronized void addVerticesComputed(long verticesComputed) {
124     this.verticesComputed += verticesComputed;
125   }
126 
127   /**
128    * Increment number of partitions which were computed
129    */
130   public synchronized void incrementPartitionsComputed() {
131     partitionsComputed++;
132   }
133 
134   /**
135    * Notify this class that worker is starting to store data
136    *
137    * @param verticesToStore   How many vertices should be stored
138    * @param partitionsToStore How many partitions should be stored
139    */
140   public synchronized void startStoring(long verticesToStore,
141       int partitionsToStore) {
142     computationDone = true;
143     verticesToCompute = 0;
144     verticesComputed = 0;
145     partitionsToCompute = 0;
146     partitionsComputed = 0;
147     currentSuperstep = Long.MAX_VALUE;
148     this.verticesToStore = verticesToStore;
149     this.partitionsToStore = partitionsToStore;
150   }
151 
152   /**
153    * Add number of vertices stored
154    *
155    * @param verticesStored How many vertices were stored since the last time
156    *                       this function was called
157    */
158   public synchronized void addVerticesStored(long verticesStored) {
159     this.verticesStored += verticesStored;
160   }
161 
162   /**
163    * Increment number of partitions which were stored
164    */
165   public synchronized void incrementPartitionsStored() {
166     partitionsStored++;
167   }
168 
169   /**
170    * Notify this class that storing data is done
171    */
172   public synchronized void finishStoring() {
173     storingDone = true;
174   }
175 
176   /**
177    * Update memory info
178    */
179   public synchronized void updateMemory() {
180     freeMemoryMB = MemoryUtils.freePlusUnallocatedMemoryMB();
181     freeMemoryFraction = MemoryUtils.freeMemoryFraction();
182   }
183 
184   /**
185    * Update lowest percentage of graph which stayed in memory so far in the
186    * execution
187    *
188    * @param fraction the fraction of graph in memory so far in this superstep
189    */
190   public synchronized void updateLowestGraphPercentageInMemory(int fraction) {
191     lowestGraphPercentageInMemory =
192         Math.min(lowestGraphPercentageInMemory, fraction);
193   }
194 
195   @ThriftField(1)
196   public synchronized long getCurrentSuperstep() {
197     return currentSuperstep;
198   }
199 
200   @ThriftField(2)
201   public synchronized long getVerticesLoaded() {
202     return verticesLoaded;
203   }
204 
205   @ThriftField(3)
206   public synchronized int getVertexInputSplitsLoaded() {
207     return vertexInputSplitsLoaded;
208   }
209 
210   @ThriftField(4)
211   public synchronized boolean isLoadingVerticesDone() {
212     return loadingVerticesDone;
213   }
214 
215   @ThriftField(5)
216   public synchronized long getEdgesLoaded() {
217     return edgesLoaded;
218   }
219 
220   @ThriftField(6)
221   public synchronized int getEdgeInputSplitsLoaded() {
222     return edgeInputSplitsLoaded;
223   }
224 
225   @ThriftField(7)
226   public synchronized boolean isLoadingEdgesDone() {
227     return loadingEdgesDone;
228   }
229 
230   @ThriftField(8)
231   public synchronized long getVerticesToCompute() {
232     return verticesToCompute;
233   }
234 
235   @ThriftField(9)
236   public synchronized long getVerticesComputed() {
237     return verticesComputed;
238   }
239 
240   @ThriftField(10)
241   public synchronized int getPartitionsToCompute() {
242     return partitionsToCompute;
243   }
244 
245   @ThriftField(11)
246   public synchronized int getPartitionsComputed() {
247     return partitionsComputed;
248   }
249 
250   @ThriftField(12)
251   public synchronized boolean isComputationDone() {
252     return computationDone;
253   }
254 
255   @ThriftField(13)
256   public synchronized long getVerticesToStore() {
257     return verticesToStore;
258   }
259 
260   @ThriftField(14)
261   public synchronized long getVerticesStored() {
262     return verticesStored;
263   }
264 
265   @ThriftField(15)
266   public synchronized int getPartitionsToStore() {
267     return partitionsToStore;
268   }
269 
270   @ThriftField(16)
271   public synchronized int getPartitionsStored() {
272     return partitionsStored;
273   }
274 
275   @ThriftField(17)
276   public synchronized boolean isStoringDone() {
277     return storingDone;
278   }
279 
280   @ThriftField(18)
281   public synchronized int getTaskId() {
282     return taskId;
283   }
284 
285   @ThriftField(19)
286   public synchronized double getFreeMemoryMB() {
287     return freeMemoryMB;
288   }
289 
290   @ThriftField(20)
291   public synchronized double getFreeMemoryFraction() {
292     return freeMemoryFraction;
293   }
294 
295   @ThriftField(21)
296   public synchronized int getLowestGraphPercentageInMemory() {
297     return lowestGraphPercentageInMemory;
298   }
299 
300   public synchronized boolean isInputSuperstep() {
301     return currentSuperstep == -1;
302   }
303 
304   public synchronized boolean isComputeSuperstep() {
305     return currentSuperstep >= 0 && currentSuperstep < Long.MAX_VALUE;
306   }
307 
308   public synchronized boolean isOutputSuperstep() {
309     return currentSuperstep == Long.MAX_VALUE;
310   }
311 
312   @ThriftField
313   public void setCurrentSuperstep(long currentSuperstep) {
314     this.currentSuperstep = currentSuperstep;
315   }
316 
317   @ThriftField
318   public void setVerticesLoaded(long verticesLoaded) {
319     this.verticesLoaded = verticesLoaded;
320   }
321 
322   @ThriftField
323   public void setVertexInputSplitsLoaded(int vertexInputSplitsLoaded) {
324     this.vertexInputSplitsLoaded = vertexInputSplitsLoaded;
325   }
326 
327   @ThriftField
328   public void setLoadingVerticesDone(boolean loadingVerticesDone) {
329     this.loadingVerticesDone = loadingVerticesDone;
330   }
331 
332   @ThriftField
333   public void setEdgesLoaded(long edgesLoaded) {
334     this.edgesLoaded = edgesLoaded;
335   }
336 
337   @ThriftField
338   public void setEdgeInputSplitsLoaded(int edgeInputSplitsLoaded) {
339     this.edgeInputSplitsLoaded = edgeInputSplitsLoaded;
340   }
341 
342   @ThriftField
343   public void setLoadingEdgesDone(boolean loadingEdgesDone) {
344     this.loadingEdgesDone = loadingEdgesDone;
345   }
346 
347   @ThriftField
348   public void setVerticesToCompute(long verticesToCompute) {
349     this.verticesToCompute = verticesToCompute;
350   }
351 
352   @ThriftField
353   public void setVerticesComputed(long verticesComputed) {
354     this.verticesComputed = verticesComputed;
355   }
356 
357   @ThriftField
358   public void setPartitionsToCompute(int partitionsToCompute) {
359     this.partitionsToCompute = partitionsToCompute;
360   }
361 
362   @ThriftField
363   public void setPartitionsComputed(int partitionsComputed) {
364     this.partitionsComputed = partitionsComputed;
365   }
366 
367   @ThriftField
368   public void setComputationDone(boolean computationDone) {
369     this.computationDone = computationDone;
370   }
371 
372   @ThriftField
373   public void setVerticesToStore(long verticesToStore) {
374     this.verticesToStore = verticesToStore;
375   }
376 
377   @ThriftField
378   public void setVerticesStored(long verticesStored) {
379     this.verticesStored = verticesStored;
380   }
381 
382   @ThriftField
383   public void setPartitionsToStore(int partitionsToStore) {
384     this.partitionsToStore = partitionsToStore;
385   }
386 
387   @ThriftField
388   public void setPartitionsStored(int partitionsStored) {
389     this.partitionsStored = partitionsStored;
390   }
391 
392   @ThriftField
393   public void setStoringDone(boolean storingDone) {
394     this.storingDone = storingDone;
395   }
396 
397   @ThriftField
398   public void setFreeMemoryMB(double freeMemoryMB) {
399     this.freeMemoryMB = freeMemoryMB;
400   }
401 
402   @ThriftField
403   public void setFreeMemoryFraction(double freeMemoryFraction) {
404     this.freeMemoryFraction = freeMemoryFraction;
405   }
406 
407   @ThriftField
408   public synchronized void setTaskId(int taskId) {
409     this.taskId = taskId;
410   }
411 
412   @ThriftField
413   public synchronized void setLowestGraphPercentageInMemory(
414       int lowestGraphPercentageInMemory) {
415     this.lowestGraphPercentageInMemory = lowestGraphPercentageInMemory;
416   }
417 }