View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.ooc;
20  
21  import com.google.common.collect.Maps;
22  import org.apache.giraph.conf.GiraphConstants;
23  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24  import org.apache.giraph.conf.IntConfOption;
25  import org.apache.giraph.ooc.command.IOCommand.IOCommandType;
26  import org.apache.log4j.Logger;
27  
28  import java.util.Map;
29  import java.util.Queue;
30  import java.util.concurrent.ArrayBlockingQueue;
31  import java.util.concurrent.atomic.AtomicLong;
32  
33  /**
34   * Class to collect statistics regarding IO operations done in out-of-core
35   * mechanism.
36   */
37  public class OutOfCoreIOStatistics {
38    /**
39     * An estimate of disk bandwidth. This number is only used just at the start
40     * of the computation, and will be updated/replaced later on once a few disk
41     * operations happen.
42     */
43    public static final IntConfOption DISK_BANDWIDTH_ESTIMATE =
44        new IntConfOption("giraph.diskBandwidthEstimate", 125,
45            "An estimate of disk bandwidth (MB/s). This number is used just at " +
46                "the beginning of the computation, and it will be " +
47                "updated/replaced once a few disk operations happen.");
48    /**
49     * How many recent IO operations should we keep track of? Any report given out
50     * of this statistics collector is only based on most recent IO operations.
51     */
52    public static final IntConfOption IO_COMMAND_HISTORY_SIZE =
53        new IntConfOption("giraph.ioCommandHistorySize", 50,
54            "Number of most recent IO operations to consider for reporting the" +
55                "statistics.");
56  
57    /** Class logger */
58    private static final Logger LOG =
59        Logger.getLogger(OutOfCoreIOStatistics.class);
60    /** Estimate of disk bandwidth (bytes/second) */
61    private final AtomicLong diskBandwidthEstimate;
62    /** Cached value for IO_COMMAND_HISTORY_SIZE */
63    private final int maxHistorySize;
64    /**
65     * Coefficient/Weight of the most recent IO operation toward the disk
66     * bandwidth estimate. Basically if the disk bandwidth estimate if d, and the
67     * latest IO command happened at the rate of r, the new estimate of disk
68     * bandwidth is calculated as:
69     * d_new = updateCoefficient * r + (1 - updateCoefficient) * d
70     */
71    private final double updateCoefficient;
72    /** Queue of all recent commands */
73    private final Queue<StatisticsEntry> commandHistory;
74    /**
75     * Command statistics for each type of IO command. This is the statistics of
76     * the recent commands in the history we keep track of (with 'maxHistorySize'
77     * command in the history).
78     */
79    private final Map<IOCommandType, StatisticsEntry> aggregateStats;
80    /** How many IO command completed? */
81    private int numUpdates = 0;
82  
83    /**
84     * Constructor
85     *
86     * @param conf configuration
87     * @param numIOThreads number of disks/IO threads
88     */
89    public OutOfCoreIOStatistics(ImmutableClassesGiraphConfiguration conf,
90                                 int numIOThreads) {
91      this.diskBandwidthEstimate =
92          new AtomicLong(DISK_BANDWIDTH_ESTIMATE.get(conf) *
93              (long) GiraphConstants.ONE_MB);
94      this.maxHistorySize = IO_COMMAND_HISTORY_SIZE.get(conf);
95      this.updateCoefficient = 1.0 / maxHistorySize;
96      // Adding more entry to the capacity of the queue to have a wiggle room
97      // if all IO threads are adding/removing entries from the queue
98      this.commandHistory =
99          new ArrayBlockingQueue<>(maxHistorySize + numIOThreads);
100     this.aggregateStats = Maps.newConcurrentMap();
101     for (IOCommandType type : IOCommandType.values()) {
102       aggregateStats.put(type, new StatisticsEntry(type, 0, 0, 0));
103     }
104   }
105 
106   /**
107    * Update statistics with the last IO command that is executed.
108    *
109    * @param type type of the IO command that is executed
110    * @param bytesTransferred number of bytes transferred in the last IO command
111    * @param duration duration it took for the last IO command to complete
112    *                 (milliseconds)
113    */
114   public void update(IOCommandType type, long bytesTransferred,
115                      long duration) {
116     StatisticsEntry entry = aggregateStats.get(type);
117     synchronized (entry) {
118       entry.setOccurrence(entry.getOccurrence() + 1);
119       entry.setDuration(duration + entry.getDuration());
120       entry.setBytesTransferred(bytesTransferred + entry.getBytesTransferred());
121     }
122     commandHistory.offer(
123         new StatisticsEntry(type, bytesTransferred, duration, 0));
124     if (type != IOCommandType.WAIT) {
125       // If the current estimate is 'd', the new rate is 'r', and the size of
126       // history is 'n', we can simply model all the past command's rate as:
127       // d, d, d, ..., d, r
128       // where 'd' happens for 'n-1' times. Hence the new estimate of the
129       // bandwidth would be:
130       // d_new = (d * (n-1) + r) / n = (1-1/n)*d + 1/n*r
131       // where updateCoefficient = 1/n
132       diskBandwidthEstimate.set((long)
133           (updateCoefficient * (bytesTransferred / duration * 1000) +
134               (1 - updateCoefficient) * diskBandwidthEstimate.get()));
135     }
136     if (commandHistory.size() > maxHistorySize) {
137       StatisticsEntry removedEntry = commandHistory.poll();
138       entry = aggregateStats.get(removedEntry.getType());
139       synchronized (entry) {
140         entry.setOccurrence(entry.getOccurrence() - 1);
141         entry.setDuration(entry.getDuration() - removedEntry.getDuration());
142         entry.setBytesTransferred(
143             entry.getBytesTransferred() - removedEntry.getBytesTransferred());
144       }
145     }
146     numUpdates++;
147     // Outputting log every so many commands
148     if (numUpdates % 10 == 0) {
149       if (LOG.isInfoEnabled()) {
150         LOG.info(this);
151       }
152     }
153   }
154 
155   @Override
156   public String toString() {
157     StringBuffer sb = new StringBuffer();
158     long waitTime = 0;
159     long loadTime = 0;
160     long bytesRead = 0;
161     long storeTime = 0;
162     long bytesWritten = 0;
163     for (Map.Entry<IOCommandType, StatisticsEntry> entry :
164         aggregateStats.entrySet()) {
165       synchronized (entry.getValue()) {
166         sb.append(entry.getKey() + ": " + entry.getValue() + ", ");
167         if (entry.getKey() == IOCommandType.WAIT) {
168           waitTime += entry.getValue().getDuration();
169         } else if (entry.getKey() == IOCommandType.LOAD_PARTITION) {
170           loadTime += entry.getValue().getDuration();
171           bytesRead += entry.getValue().getBytesTransferred();
172         } else {
173           storeTime += entry.getValue().getDuration();
174           bytesWritten += entry.getValue().getBytesTransferred();
175         }
176       }
177     }
178     sb.append(String.format("Average STORE: %.2f MB/s, ",
179         (double) bytesWritten / storeTime * 1000 / 1024 / 1024));
180     sb.append(String.format("DATA_INJECTION: %.2f MB/s, ",
181         (double) (bytesRead - bytesWritten) /
182             (waitTime + loadTime + storeTime) * 1000 / 1024 / 1024));
183     sb.append(String.format("DISK_BANDWIDTH: %.2f MB/s",
184         (double) diskBandwidthEstimate.get() / 1024 / 1024));
185 
186     return sb.toString();
187   }
188 
189   /**
190    * @return most recent estimate of the disk bandwidth
191    */
192   public long getDiskBandwidth() {
193     return diskBandwidthEstimate.get();
194   }
195 
196   /**
197    * Get aggregate statistics of a given command type in the command history
198    *
199    * @param type type of the command to get the statistics for
200    * @return aggregate statistics for the given command type
201    */
202   public BytesDuration getCommandTypeStats(IOCommandType type) {
203     StatisticsEntry entry = aggregateStats.get(type);
204     synchronized (entry) {
205       return new BytesDuration(entry.getBytesTransferred(), entry.getDuration(),
206           entry.getOccurrence());
207     }
208   }
209 
210   /**
211    * Helper class to return results of statistics collector for a certain
212    * command type
213    */
214   public static class BytesDuration {
215     /** Number of bytes transferred in a few commands of the same type */
216     private long bytes;
217     /** Duration of it took to execute a few commands of the same type */
218     private long duration;
219     /** Number of commands executed of the same type */
220     private int occurrence;
221 
222     /**
223      * Constructor
224      * @param bytes number of bytes transferred
225      * @param duration duration it took to execute commands
226      * @param occurrence number of commands
227      */
228     BytesDuration(long bytes, long duration, int occurrence) {
229       this.bytes = bytes;
230       this.duration = duration;
231       this.occurrence = occurrence;
232     }
233 
234     /**
235      * @return number of bytes transferred for the same command type
236      */
237     public long getBytes() {
238       return bytes;
239     }
240 
241     /**
242      * @return duration it took to execute a few commands of the same type
243      */
244     public long getDuration() {
245       return duration;
246     }
247 
248     /**
249      * @return number of commands that are executed of the same type
250      */
251     public int getOccurrence() {
252       return occurrence;
253     }
254   }
255 
256   /**
257    * Helper class to keep statistics for a certain command type
258    */
259   private static class StatisticsEntry {
260     /** Type of the command */
261     private IOCommandType type;
262     /**
263      * Aggregate number of bytes transferred executing the particular command
264      * type in the history we keep
265      */
266     private long bytesTransferred;
267     /**
268      * Aggregate duration it took executing the particular command type in the
269      * history we keep
270      */
271     private long duration;
272     /**
273      * Number of occurrences of the particular command type in the history we
274      * keep
275      */
276     private int occurrence;
277 
278     /**
279      * Constructor
280      *
281      * @param type type of the command
282      * @param bytesTransferred aggregate number of bytes transferred
283      * @param duration aggregate execution time
284      * @param occurrence number of occurrences of the particular command type
285      */
286     public StatisticsEntry(IOCommandType type, long bytesTransferred,
287                            long duration, int occurrence) {
288       this.type = type;
289       this.bytesTransferred = bytesTransferred;
290       this.duration = duration;
291       this.occurrence = occurrence;
292     }
293 
294     /**
295      * @return type of the command
296      */
297     public IOCommandType getType() {
298       return type;
299     }
300 
301     /**
302      * @return aggregate number of bytes transferred in the particular command
303      *         type
304      */
305     public long getBytesTransferred() {
306       return bytesTransferred;
307     }
308 
309     /**
310      * Update the aggregate number of bytes transferred
311      *
312      * @param bytesTransferred aggregate number of bytes
313      */
314     public void setBytesTransferred(long bytesTransferred) {
315       this.bytesTransferred = bytesTransferred;
316     }
317 
318     /**
319      * @return aggregate duration it took to execute the particular command type
320      */
321     public long getDuration() {
322       return duration;
323     }
324 
325     /**
326      * Update the aggregate duration
327      *
328      * @param duration aggregate duration
329      */
330     public void setDuration(long duration) {
331       this.duration = duration;
332     }
333 
334     /**
335      * @return number of occurrences of the particular command type
336      */
337     public int getOccurrence() {
338       return occurrence;
339     }
340 
341     /**
342      * Update the number of occurrences of the particular command type
343      *
344      * @param occurrence number of occurrences
345      */
346     public void setOccurrence(int occurrence) {
347       this.occurrence = occurrence;
348     }
349 
350     @Override
351     public String toString() {
352       if (type == IOCommandType.WAIT) {
353         return String.format("%.2f sec", duration / 1000.0);
354       } else {
355         return String.format("%.2f MB/s",
356             (double) bytesTransferred / duration * 1000 / 1024 / 2014);
357       }
358     }
359   }
360 }