This project has retired. For details please refer to its Attic page.
CombinedWorkerProgress xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.job;
20  
21  import com.google.common.collect.Iterables;
22  import org.apache.giraph.conf.FloatConfOption;
23  import org.apache.giraph.conf.GiraphConstants;
24  import org.apache.giraph.master.MasterProgress;
25  import org.apache.giraph.worker.WorkerProgress;
26  import org.apache.giraph.worker.WorkerProgressStats;
27  import org.apache.hadoop.conf.Configuration;
28  
29  import javax.annotation.concurrent.NotThreadSafe;
30  import java.text.DecimalFormat;
31  
32  /**
33   * Class which combines multiple workers' progresses to get overall
34   * application progress
35   */
36  @NotThreadSafe
37  public class CombinedWorkerProgress extends WorkerProgressStats {
38    /** Decimal format which rounds numbers to two decimal places */
39    public static final DecimalFormat DECIMAL_FORMAT = new DecimalFormat("#.##");
40    /**
41     * If free memory fraction on some worker goes below this value,
42     * warning will be printed
43     */
44    public static final FloatConfOption NORMAL_FREE_MEMORY_FRACTION =
45        new FloatConfOption("giraph.normalFreeMemoryFraction", 0.1f,
46            "If free memory fraction on some worker goes below this value, " +
47                "warning will be printed");
48    /**
49     * If free memory fraction on some worker goes below this value,
50     * warning will be printed
51     */
52    private double normalFreeMemoryFraction;
53    /** Total number of supersteps */
54    private final int superstepCount;
55    /**
56     * How many workers have reported that they are in highest reported
57     * superstep
58     */
59    private int workersInSuperstep = 0;
60    /**
61     * How many workers reported that they finished application
62     */
63    private int workersDone = 0;
64    /** Minimum amount of free memory on a worker */
65    private double minFreeMemoryMB = Double.MAX_VALUE;
66    /** Name of the worker with min free memory */
67    private int workerWithMinFreeMemory;
68    /** Minimum fraction of free memory on a worker */
69    private double minFreeMemoryFraction = Double.MAX_VALUE;
70    /**
71     * Minimum percentage of graph in memory in any worker so far in the
72     * computation
73     */
74    private int minGraphPercentageInMemory = 100;
75    /** Id of the worker with min percentage of graph in memory */
76    private int workerWithMinGraphPercentageInMemory = -1;
77    /** Master progress */
78    private MasterProgress masterProgress;
79  
80    /**
81     * Constructor
82     *
83     * @param workerProgresses Worker progresses to combine
84     * @param masterProgress Master progress
85     * @param conf Configuration
86     */
87    public CombinedWorkerProgress(Iterable<WorkerProgress> workerProgresses,
88        MasterProgress masterProgress, Configuration conf) {
89      this.masterProgress = masterProgress;
90      normalFreeMemoryFraction = NORMAL_FREE_MEMORY_FRACTION.get(conf);
91      superstepCount = GiraphConstants.SUPERSTEP_COUNT.get(conf);
92      for (WorkerProgress workerProgress : workerProgresses) {
93        if (workerProgress.getCurrentSuperstep() > currentSuperstep) {
94          verticesToCompute = 0;
95          verticesComputed = 0;
96          partitionsToCompute = 0;
97          partitionsComputed = 0;
98          currentSuperstep = workerProgress.getCurrentSuperstep();
99          workersInSuperstep = 0;
100       }
101 
102       if (workerProgress.getCurrentSuperstep() == currentSuperstep) {
103         workersInSuperstep++;
104         if (isInputSuperstep()) {
105           verticesLoaded += workerProgress.getVerticesLoaded();
106           vertexInputSplitsLoaded +=
107               workerProgress.getVertexInputSplitsLoaded();
108           edgesLoaded += workerProgress.getEdgesLoaded();
109           edgeInputSplitsLoaded += workerProgress.getEdgeInputSplitsLoaded();
110         } else if (isComputeSuperstep()) {
111           verticesToCompute += workerProgress.getVerticesToCompute();
112           verticesComputed += workerProgress.getVerticesComputed();
113           partitionsToCompute += workerProgress.getPartitionsToCompute();
114           partitionsComputed += workerProgress.getPartitionsComputed();
115         } else if (isOutputSuperstep()) {
116           verticesToStore += workerProgress.getVerticesToStore();
117           verticesStored += workerProgress.getVerticesStored();
118           partitionsToStore += workerProgress.getPartitionsToStore();
119           partitionsStored += workerProgress.getPartitionsStored();
120         }
121       }
122 
123       if (workerProgress.isStoringDone()) {
124         workersDone++;
125       }
126 
127       if (workerProgress.getFreeMemoryMB() < minFreeMemoryMB) {
128         minFreeMemoryMB = workerProgress.getFreeMemoryMB();
129         workerWithMinFreeMemory = workerProgress.getTaskId();
130       }
131       minFreeMemoryFraction = Math.min(minFreeMemoryFraction,
132           workerProgress.getFreeMemoryFraction());
133       freeMemoryMB += workerProgress.getFreeMemoryMB();
134       int percentage = workerProgress.getLowestGraphPercentageInMemory();
135       if (percentage < minGraphPercentageInMemory) {
136         minGraphPercentageInMemory = percentage;
137         workerWithMinGraphPercentageInMemory = workerProgress.getTaskId();
138       }
139     }
140     if (!Iterables.isEmpty(workerProgresses)) {
141       freeMemoryMB /= Iterables.size(workerProgresses);
142     }
143   }
144 
145   /**
146    * Get Current superstep
147    * @return Current superstep
148    */
149   public long getCurrentSuperstep() {
150     return currentSuperstep;
151   }
152 
153   /**
154    * Get workers in superstep
155    * @return Workers in superstep.
156    */
157   public long getWorkersInSuperstep() {
158     return workersInSuperstep;
159   }
160 
161   /**
162    * Get vertices computed
163    * @return Vertices computed
164    */
165   public long getVerticesComputed() {
166     return verticesComputed;
167   }
168 
169   /**
170    * Get vertices to compute
171    * @return Vertices to compute
172    */
173   public long getVerticesToCompute() {
174     return verticesToCompute;
175   }
176 
177   /**
178    * Is the application done
179    *
180    * @param expectedWorkersDone Number of workers which should be done in
181    *                            order for application to be done
182    * @return True if application is done
183    */
184   public boolean isDone(int expectedWorkersDone) {
185     return workersDone == expectedWorkersDone;
186   }
187 
188   /**
189    * Get string describing total job progress
190    *
191    * @return String describing total job progress
192    */
193   protected String getProgressString() {
194     StringBuilder sb = new StringBuilder();
195     if (isInputSuperstep()) {
196       sb.append("Loading data: ");
197       if (!masterProgress.vertexInputSplitsSet() ||
198           masterProgress.getVertexInputSplitCount() > 0) {
199         sb.append(verticesLoaded).append(" vertices loaded, ");
200         sb.append(vertexInputSplitsLoaded).append(
201             " vertex input splits loaded");
202         if (masterProgress.getVertexInputSplitCount() > 0) {
203           sb.append(" (out of ").append(
204               masterProgress.getVertexInputSplitCount()).append(")");
205         }
206         sb.append("; ");
207       }
208       if (!masterProgress.edgeInputSplitsSet() ||
209           masterProgress.getEdgeInputSplitsCount() > 0) {
210         sb.append(edgesLoaded).append(" edges loaded, ");
211         sb.append(edgeInputSplitsLoaded).append(" edge input splits loaded");
212         if (masterProgress.getEdgeInputSplitsCount() > 0) {
213           sb.append(" (out of ").append(
214               masterProgress.getEdgeInputSplitsCount()).append(")");
215         }
216       }
217     } else if (isComputeSuperstep()) {
218       sb.append("Compute superstep ").append(currentSuperstep);
219       if (superstepCount > 0) {
220         // Supersteps are 0..superstepCount-1 so subtract 1 here
221         sb.append(" (out of ").append(superstepCount - 1).append(")");
222       }
223       sb.append(": ").append(verticesComputed).append(" out of ").append(
224           verticesToCompute).append(" vertices computed; ");
225       sb.append(partitionsComputed).append(" out of ").append(
226           partitionsToCompute).append(" partitions computed");
227     } else if (isOutputSuperstep()) {
228       sb.append("Storing data: ");
229       sb.append(verticesStored).append(" out of ").append(
230           verticesToStore).append(" vertices stored; ");
231       sb.append(partitionsStored).append(" out of ").append(
232           partitionsToStore).append(" partitions stored");
233     }
234     return sb.toString();
235   }
236 
237   @Override
238   public String toString() {
239     StringBuilder sb = new StringBuilder();
240     sb.append("Data from ").append(workersInSuperstep).append(" workers - ");
241     sb.append(getProgressString());
242     sb.append("; min free memory on worker ").append(
243         workerWithMinFreeMemory).append(" - ").append(
244         DECIMAL_FORMAT.format(minFreeMemoryMB)).append("MB, average ").append(
245         DECIMAL_FORMAT.format(freeMemoryMB)).append("MB");
246     if (minFreeMemoryFraction < normalFreeMemoryFraction) {
247       sb.append(", ******* YOUR JOB IS RUNNING LOW ON MEMORY *******");
248     }
249     if (minGraphPercentageInMemory < 100) {
250       sb.append(" Spilling ")
251           .append(100 - minGraphPercentageInMemory)
252           .append("% of data to external storage on worker ")
253           .append(workerWithMinGraphPercentageInMemory);
254     }
255     return sb.toString();
256   }
257 
258   /**
259    * Check if this instance made progress from another instance
260    *
261    * @param lastProgress Instance to compare with
262    * @return True iff progress was made
263    */
264   public boolean madeProgressFrom(CombinedWorkerProgress lastProgress) {
265     // If progress strings are different there was progress made
266     if (!getProgressString().equals(lastProgress.getProgressString())) {
267       return true;
268     }
269     // If more workers were done there was progress made
270     return workersDone != lastProgress.workersDone;
271   }
272 }