This project has retired. For details please refer to its Attic page.
OutOfCoreIOStatistics xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.ooc;
20  
21  import com.google.common.collect.Maps;
22  import org.apache.giraph.conf.GiraphConstants;
23  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24  import org.apache.giraph.conf.IntConfOption;
25  import org.apache.giraph.ooc.command.IOCommand.IOCommandType;
26  import org.apache.log4j.Logger;
27  
28  import java.util.Map;
29  import java.util.Queue;
30  import java.util.concurrent.ArrayBlockingQueue;
31  import java.util.concurrent.atomic.AtomicLong;
32  
33  /**
34   * Class to collect statistics regarding IO operations done in out-of-core
35   * mechanism.
36   */
37  public class OutOfCoreIOStatistics {
38    /**
39     * An estimate of disk bandwidth. This number is only used just at the start
40     * of the computation, and will be updated/replaced later on once a few disk
41     * operations happen.
42     */
43    public static final IntConfOption DISK_BANDWIDTH_ESTIMATE =
44        new IntConfOption("giraph.diskBandwidthEstimate", 125,
45            "An estimate of disk bandwidth (MB/s). This number is used just at " +
46                "the beginning of the computation, and it will be " +
47                "updated/replaced once a few disk operations happen.");
48    /**
49     * How many recent IO operations should we keep track of? Any report given out
50     * of this statistics collector is only based on most recent IO operations.
51     */
52    public static final IntConfOption IO_COMMAND_HISTORY_SIZE =
53        new IntConfOption("giraph.ioCommandHistorySize", 50,
54            "Number of most recent IO operations to consider for reporting the" +
55                "statistics.");
56  
57    /**
58     * Use this option to control how frequently to print OOC statistics.
59     */
60    public static final IntConfOption STATS_PRINT_FREQUENCY =
61        new IntConfOption("giraph.oocStatPrintFrequency", 200,
62            "Number of updates before stats are printed.");
63  
64    /** Class logger */
65    private static final Logger LOG =
66        Logger.getLogger(OutOfCoreIOStatistics.class);
67    /** Estimate of disk bandwidth (bytes/second) */
68    private final AtomicLong diskBandwidthEstimate;
69    /** Cached value for IO_COMMAND_HISTORY_SIZE */
70    private final int maxHistorySize;
71    /**
72     * Coefficient/Weight of the most recent IO operation toward the disk
73     * bandwidth estimate. Basically if the disk bandwidth estimate if d, and the
74     * latest IO command happened at the rate of r, the new estimate of disk
75     * bandwidth is calculated as:
76     * d_new = updateCoefficient * r + (1 - updateCoefficient) * d
77     */
78    private final double updateCoefficient;
79    /** Queue of all recent commands */
80    private final Queue<StatisticsEntry> commandHistory;
81    /**
82     * Command statistics for each type of IO command. This is the statistics of
83     * the recent commands in the history we keep track of (with 'maxHistorySize'
84     * command in the history).
85     */
86    private final Map<IOCommandType, StatisticsEntry> aggregateStats;
87    /** How many IO command completed? */
88    private int numUpdates = 0;
89    /** Cached value for {@link #STATS_PRINT_FREQUENCY} */
90    private int statsPrintFrequency = 0;
91  
92    /**
93     * Constructor
94     *
95     * @param conf configuration
96     * @param numIOThreads number of disks/IO threads
97     */
98    public OutOfCoreIOStatistics(ImmutableClassesGiraphConfiguration conf,
99                                 int numIOThreads) {
100     this.diskBandwidthEstimate =
101         new AtomicLong(DISK_BANDWIDTH_ESTIMATE.get(conf) *
102             (long) GiraphConstants.ONE_MB);
103     this.maxHistorySize = IO_COMMAND_HISTORY_SIZE.get(conf);
104     this.updateCoefficient = 1.0 / maxHistorySize;
105     // Adding more entry to the capacity of the queue to have a wiggle room
106     // if all IO threads are adding/removing entries from the queue
107     this.commandHistory =
108         new ArrayBlockingQueue<>(maxHistorySize + numIOThreads);
109     this.aggregateStats = Maps.newConcurrentMap();
110     for (IOCommandType type : IOCommandType.values()) {
111       aggregateStats.put(type, new StatisticsEntry(type, 0, 0, 0));
112     }
113     this.statsPrintFrequency = STATS_PRINT_FREQUENCY.get(conf);
114   }
115 
116   /**
117    * Update statistics with the last IO command that is executed.
118    *
119    * @param type type of the IO command that is executed
120    * @param bytesTransferred number of bytes transferred in the last IO command
121    * @param duration duration it took for the last IO command to complete
122    *                 (milliseconds)
123    */
124   public void update(IOCommandType type, long bytesTransferred,
125                      long duration) {
126     StatisticsEntry entry = aggregateStats.get(type);
127     synchronized (entry) {
128       entry.setOccurrence(entry.getOccurrence() + 1);
129       entry.setDuration(duration + entry.getDuration());
130       entry.setBytesTransferred(bytesTransferred + entry.getBytesTransferred());
131     }
132     commandHistory.offer(
133         new StatisticsEntry(type, bytesTransferred, duration, 0));
134     if (type != IOCommandType.WAIT) {
135       // If the current estimate is 'd', the new rate is 'r', and the size of
136       // history is 'n', we can simply model all the past command's rate as:
137       // d, d, d, ..., d, r
138       // where 'd' happens for 'n-1' times. Hence the new estimate of the
139       // bandwidth would be:
140       // d_new = (d * (n-1) + r) / n = (1-1/n)*d + 1/n*r
141       // where updateCoefficient = 1/n
142       diskBandwidthEstimate.set((long)
143           (updateCoefficient * (bytesTransferred / duration * 1000) +
144               (1 - updateCoefficient) * diskBandwidthEstimate.get()));
145     }
146     if (commandHistory.size() > maxHistorySize) {
147       StatisticsEntry removedEntry = commandHistory.poll();
148       entry = aggregateStats.get(removedEntry.getType());
149       synchronized (entry) {
150         entry.setOccurrence(entry.getOccurrence() - 1);
151         entry.setDuration(entry.getDuration() - removedEntry.getDuration());
152         entry.setBytesTransferred(
153             entry.getBytesTransferred() - removedEntry.getBytesTransferred());
154       }
155     }
156     numUpdates++;
157     // Outputting log every so many commands
158     if (numUpdates % statsPrintFrequency == 0) {
159       if (LOG.isInfoEnabled()) {
160         LOG.info(this);
161       }
162     }
163   }
164 
165   @Override
166   public String toString() {
167     StringBuffer sb = new StringBuffer();
168     long waitTime = 0;
169     long loadTime = 0;
170     long bytesRead = 0;
171     long storeTime = 0;
172     long bytesWritten = 0;
173     for (Map.Entry<IOCommandType, StatisticsEntry> entry :
174         aggregateStats.entrySet()) {
175       synchronized (entry.getValue()) {
176         sb.append(entry.getKey() + ": " + entry.getValue() + ", ");
177         if (entry.getKey() == IOCommandType.WAIT) {
178           waitTime += entry.getValue().getDuration();
179         } else if (entry.getKey() == IOCommandType.LOAD_PARTITION) {
180           loadTime += entry.getValue().getDuration();
181           bytesRead += entry.getValue().getBytesTransferred();
182         } else {
183           storeTime += entry.getValue().getDuration();
184           bytesWritten += entry.getValue().getBytesTransferred();
185         }
186       }
187     }
188     sb.append(String.format("Average STORE: %.2f MB/s, ",
189         (double) bytesWritten / storeTime * 1000 / 1024 / 1024));
190     sb.append(String.format("DATA_INJECTION: %.2f MB/s, ",
191         (double) (bytesRead - bytesWritten) /
192             (waitTime + loadTime + storeTime) * 1000 / 1024 / 1024));
193     sb.append(String.format("DISK_BANDWIDTH: %.2f MB/s",
194         (double) diskBandwidthEstimate.get() / 1024 / 1024));
195 
196     return sb.toString();
197   }
198 
199   /**
200    * @return most recent estimate of the disk bandwidth
201    */
202   public long getDiskBandwidth() {
203     return diskBandwidthEstimate.get();
204   }
205 
206   /**
207    * Get aggregate statistics of a given command type in the command history
208    *
209    * @param type type of the command to get the statistics for
210    * @return aggregate statistics for the given command type
211    */
212   public BytesDuration getCommandTypeStats(IOCommandType type) {
213     StatisticsEntry entry = aggregateStats.get(type);
214     synchronized (entry) {
215       return new BytesDuration(entry.getBytesTransferred(), entry.getDuration(),
216           entry.getOccurrence());
217     }
218   }
219 
220   /**
221    * Helper class to return results of statistics collector for a certain
222    * command type
223    */
224   public static class BytesDuration {
225     /** Number of bytes transferred in a few commands of the same type */
226     private long bytes;
227     /** Duration of it took to execute a few commands of the same type */
228     private long duration;
229     /** Number of commands executed of the same type */
230     private int occurrence;
231 
232     /**
233      * Constructor
234      * @param bytes number of bytes transferred
235      * @param duration duration it took to execute commands
236      * @param occurrence number of commands
237      */
238     BytesDuration(long bytes, long duration, int occurrence) {
239       this.bytes = bytes;
240       this.duration = duration;
241       this.occurrence = occurrence;
242     }
243 
244     /**
245      * @return number of bytes transferred for the same command type
246      */
247     public long getBytes() {
248       return bytes;
249     }
250 
251     /**
252      * @return duration it took to execute a few commands of the same type
253      */
254     public long getDuration() {
255       return duration;
256     }
257 
258     /**
259      * @return number of commands that are executed of the same type
260      */
261     public int getOccurrence() {
262       return occurrence;
263     }
264   }
265 
266   /**
267    * Helper class to keep statistics for a certain command type
268    */
269   private static class StatisticsEntry {
270     /** Type of the command */
271     private IOCommandType type;
272     /**
273      * Aggregate number of bytes transferred executing the particular command
274      * type in the history we keep
275      */
276     private long bytesTransferred;
277     /**
278      * Aggregate duration it took executing the particular command type in the
279      * history we keep
280      */
281     private long duration;
282     /**
283      * Number of occurrences of the particular command type in the history we
284      * keep
285      */
286     private int occurrence;
287 
288     /**
289      * Constructor
290      *
291      * @param type type of the command
292      * @param bytesTransferred aggregate number of bytes transferred
293      * @param duration aggregate execution time
294      * @param occurrence number of occurrences of the particular command type
295      */
296     public StatisticsEntry(IOCommandType type, long bytesTransferred,
297                            long duration, int occurrence) {
298       this.type = type;
299       this.bytesTransferred = bytesTransferred;
300       this.duration = duration;
301       this.occurrence = occurrence;
302     }
303 
304     /**
305      * @return type of the command
306      */
307     public IOCommandType getType() {
308       return type;
309     }
310 
311     /**
312      * @return aggregate number of bytes transferred in the particular command
313      *         type
314      */
315     public long getBytesTransferred() {
316       return bytesTransferred;
317     }
318 
319     /**
320      * Update the aggregate number of bytes transferred
321      *
322      * @param bytesTransferred aggregate number of bytes
323      */
324     public void setBytesTransferred(long bytesTransferred) {
325       this.bytesTransferred = bytesTransferred;
326     }
327 
328     /**
329      * @return aggregate duration it took to execute the particular command type
330      */
331     public long getDuration() {
332       return duration;
333     }
334 
335     /**
336      * Update the aggregate duration
337      *
338      * @param duration aggregate duration
339      */
340     public void setDuration(long duration) {
341       this.duration = duration;
342     }
343 
344     /**
345      * @return number of occurrences of the particular command type
346      */
347     public int getOccurrence() {
348       return occurrence;
349     }
350 
351     /**
352      * Update the number of occurrences of the particular command type
353      *
354      * @param occurrence number of occurrences
355      */
356     public void setOccurrence(int occurrence) {
357       this.occurrence = occurrence;
358     }
359 
360     @Override
361     public String toString() {
362       if (type == IOCommandType.WAIT) {
363         return String.format("%.2f sec", duration / 1000.0);
364       } else {
365         return String.format("%.2f MB/s",
366             (double) bytesTransferred / duration * 1000 / 1024 / 2014);
367       }
368     }
369   }
370 }