View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.job;
20  
21  import com.google.common.collect.Iterables;
22  import org.apache.giraph.conf.FloatConfOption;
23  import org.apache.giraph.worker.WorkerProgress;
24  import org.apache.giraph.worker.WorkerProgressStats;
25  import org.apache.hadoop.conf.Configuration;
26  
27  import javax.annotation.concurrent.NotThreadSafe;
28  import java.text.DecimalFormat;
29  
30  /**
31   * Class which combines multiple workers' progresses to get overall
32   * application progress
33   */
34  @NotThreadSafe
35  public class CombinedWorkerProgress extends WorkerProgressStats {
36    /** Decimal format which rounds numbers to two decimal places */
37    public static final DecimalFormat DECIMAL_FORMAT = new DecimalFormat("#.##");
38    /**
39     * If free memory fraction on some worker goes below this value,
40     * warning will be printed
41     */
42    public static final FloatConfOption NORMAL_FREE_MEMORY_FRACTION =
43        new FloatConfOption("giraph.normalFreeMemoryFraction", 0.1f,
44            "If free memory fraction on some worker goes below this value, " +
45                "warning will be printed");
46  
47    /**
48     * If free memory fraction on some worker goes below this value,
49     * warning will be printed
50     */
51    private double normalFreeMemoryFraction;
52    /**
53     * How many workers have reported that they are in highest reported
54     * superstep
55     */
56    private int workersInSuperstep = 0;
57    /**
58     * How many workers reported that they finished application
59     */
60    private int workersDone = 0;
61    /** Minimum amount of free memory on a worker */
62    private double minFreeMemoryMB = Double.MAX_VALUE;
63    /** Name of the worker with min free memory */
64    private int workerWithMinFreeMemory;
65    /** Minimum fraction of free memory on a worker */
66    private double minFreeMemoryFraction = Double.MAX_VALUE;
67    /**
68     * Minimum percentage of graph in memory in any worker so far in the
69     * computation
70     */
71    private int minGraphPercentageInMemory = 100;
72    /** Id of the worker with min percentage of graph in memory */
73    private int workerWithMinGraphPercentageInMemory = -1;
74  
75    /**
76     * Constructor
77     *
78     * @param workerProgresses Worker progresses to combine
79     * @param conf Configuration
80     */
81    public CombinedWorkerProgress(Iterable<WorkerProgress> workerProgresses,
82        Configuration conf) {
83      normalFreeMemoryFraction = NORMAL_FREE_MEMORY_FRACTION.get(conf);
84      for (WorkerProgress workerProgress : workerProgresses) {
85        if (workerProgress.getCurrentSuperstep() > currentSuperstep) {
86          verticesToCompute = 0;
87          verticesComputed = 0;
88          partitionsToCompute = 0;
89          partitionsComputed = 0;
90          currentSuperstep = workerProgress.getCurrentSuperstep();
91          workersInSuperstep = 0;
92        }
93  
94        if (workerProgress.getCurrentSuperstep() == currentSuperstep) {
95          workersInSuperstep++;
96          if (isInputSuperstep()) {
97            verticesLoaded += workerProgress.getVerticesLoaded();
98            vertexInputSplitsLoaded +=
99                workerProgress.getVertexInputSplitsLoaded();
100           edgesLoaded += workerProgress.getEdgesLoaded();
101           edgeInputSplitsLoaded += workerProgress.getEdgeInputSplitsLoaded();
102         } else if (isComputeSuperstep()) {
103           verticesToCompute += workerProgress.getVerticesToCompute();
104           verticesComputed += workerProgress.getVerticesComputed();
105           partitionsToCompute += workerProgress.getPartitionsToCompute();
106           partitionsComputed += workerProgress.getPartitionsComputed();
107         } else if (isOutputSuperstep()) {
108           verticesToStore += workerProgress.getVerticesToStore();
109           verticesStored += workerProgress.getVerticesStored();
110           partitionsToStore += workerProgress.getPartitionsToStore();
111           partitionsStored += workerProgress.getPartitionsStored();
112         }
113       }
114 
115       if (workerProgress.isStoringDone()) {
116         workersDone++;
117       }
118 
119       if (workerProgress.getFreeMemoryMB() < minFreeMemoryMB) {
120         minFreeMemoryMB = workerProgress.getFreeMemoryMB();
121         workerWithMinFreeMemory = workerProgress.getTaskId();
122       }
123       minFreeMemoryFraction = Math.min(minFreeMemoryFraction,
124           workerProgress.getFreeMemoryFraction());
125       freeMemoryMB += workerProgress.getFreeMemoryMB();
126       int percentage = workerProgress.getLowestGraphPercentageInMemory();
127       if (percentage < minGraphPercentageInMemory) {
128         minGraphPercentageInMemory = percentage;
129         workerWithMinGraphPercentageInMemory = workerProgress.getTaskId();
130       }
131     }
132     if (!Iterables.isEmpty(workerProgresses)) {
133       freeMemoryMB /= Iterables.size(workerProgresses);
134     }
135   }
136 
137   /**
138    * Is the application done
139    *
140    * @param expectedWorkersDone Number of workers which should be done in
141    *                            order for application to be done
142    * @return True if application is done
143    */
144   public boolean isDone(int expectedWorkersDone) {
145     return workersDone == expectedWorkersDone;
146   }
147 
148   @Override
149   public String toString() {
150     StringBuilder sb = new StringBuilder();
151     sb.append("Data from ").append(workersInSuperstep).append(" workers - ");
152     if (isInputSuperstep()) {
153       sb.append("Loading data: ");
154       sb.append(verticesLoaded).append(" vertices loaded, ");
155       sb.append(vertexInputSplitsLoaded).append(
156           " vertex input splits loaded; ");
157       sb.append(edgesLoaded).append(" edges loaded, ");
158       sb.append(edgeInputSplitsLoaded).append(" edge input splits loaded");
159     } else if (isComputeSuperstep()) {
160       sb.append("Compute superstep ").append(currentSuperstep).append(": ");
161       sb.append(verticesComputed).append(" out of ").append(
162           verticesToCompute).append(" vertices computed; ");
163       sb.append(partitionsComputed).append(" out of ").append(
164           partitionsToCompute).append(" partitions computed");
165     } else if (isOutputSuperstep()) {
166       sb.append("Storing data: ");
167       sb.append(verticesStored).append(" out of ").append(
168           verticesToStore).append(" vertices stored; ");
169       sb.append(partitionsStored).append(" out of ").append(
170           partitionsToStore).append(" partitions stored");
171     }
172     sb.append("; min free memory on worker ").append(
173         workerWithMinFreeMemory).append(" - ").append(
174         DECIMAL_FORMAT.format(minFreeMemoryMB)).append("MB, average ").append(
175         DECIMAL_FORMAT.format(freeMemoryMB)).append("MB");
176     if (minFreeMemoryFraction < normalFreeMemoryFraction) {
177       sb.append(", ******* YOUR JOB IS RUNNING LOW ON MEMORY *******");
178     }
179     if (minGraphPercentageInMemory < 100) {
180       sb.append(" Spilling ")
181           .append(100 - minGraphPercentageInMemory)
182           .append("% of data to external storage on worker ")
183           .append(workerWithMinGraphPercentageInMemory);
184     }
185     return sb.toString();
186   }
187 }