View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.ooc.policy;
20  
21  import com.google.common.collect.Maps;
22  import com.sun.management.GarbageCollectionNotificationInfo;
23  import org.apache.giraph.conf.FloatConfOption;
24  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
25  import org.apache.giraph.ooc.OutOfCoreEngine;
26  import org.apache.giraph.ooc.OutOfCoreIOStatistics;
27  import org.apache.giraph.ooc.command.IOCommand;
28  import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
29  import org.apache.giraph.ooc.command.WaitIOCommand;
30  import org.apache.log4j.Logger;
31  
32  import java.lang.management.MemoryUsage;
33  import java.util.Map;
34  import java.util.concurrent.atomic.AtomicInteger;
35  import java.util.concurrent.atomic.AtomicLong;
36  
37  /**
38   * Out-of-core oracle to adaptively control data kept in memory, with the goal
39   * of keeping the memory state constantly at a desired state. This oracle
40   * monitors GC behavior to keep track of memory pressure.
41   *
42   * After each GC is done, this oracle retrieve statistics about the memory
43   * pressure (memory used, max memory, and how far away memory is compared to a
44   * max optimal pressure). Based on the the past 2 recent memory statistics,
45   * the oracle predicts the status of the memory, and sets the rate of load/store
46   * of data from/to disk. If the rate of loading data from disk is 'l', and the
47   * rate of storing data to disk is 's', the rate of data injection to memory
48   * from disk can be denoted as 'l-s'. This oracle determines what 'l-s' should
49   * be based on the prediction of memory status.
50   *
51   * Assume that based on the previous GC call the memory usage at time t_0 is
52   * m_0, and based on the most recent GC call the memory usage at time t_1 is
53   * m_1. So, the rate of memory increase is alpha = (m_1 - m_0) / (t_1 - t_0).
54   * Assume that the ideal memory pressure happens when the memory usage is
55   * m_ideal. So, at time 't_2 = t_1 + (t_1 - t_0)', we want m_ideal. That means
56   * the ideal rate would be beta = (m_ideal - m_1) / (t_2 - t_1). If the date
57   * injection rate to memory so far was i, the new injection rate should be:
58   * i_new = i - (alpha - beta)
59   */
60  public class SimpleGCMonitoringOracle implements OutOfCoreOracle {
61    /**
62     * The optimal memory pressure at which GC behavior is close to ideal. This
63     * fraction may be dependant on the GC strategy used for running a job, but
64     * generally should not be dependent on the graph processing application.
65     */
66    public static final FloatConfOption OPTIMAL_MEMORY_PRESSURE =
67        new FloatConfOption("giraph.optimalMemoryPressure", 0.8f,
68            "The memory pressure (fraction of used memory) at which the job " +
69                "shows the optimal GC behavior. This fraction may be dependent " +
70                "on the GC strategy used in running the job.");
71  
72    /** Class logger */
73    private static final Logger LOG =
74        Logger.getLogger(SimpleGCMonitoringOracle.class);
75    /** Cached value for OPTIMAL_MEMORY_PRESSURE */
76    private final float optimalMemoryPressure;
77    /** Out-of-core engine */
78    private final OutOfCoreEngine oocEngine;
79    /** Status of memory from the last GC call */
80    private GCObservation lastGCObservation;
81    /** Desired rate of data injection to memory */
82    private final AtomicLong desiredDiskToMemoryDataRate =
83        new AtomicLong(0);
84    /** Number of on the fly (outstanding) IO commands for each command type */
85    private final Map<IOCommand.IOCommandType, AtomicInteger> commandOccurrences =
86        Maps.newConcurrentMap();
87  
88    /**
89     * Constructor
90     *
91     * @param conf configuration
92     * @param oocEngine out-of-core engine
93     */
94    public SimpleGCMonitoringOracle(ImmutableClassesGiraphConfiguration conf,
95                                    OutOfCoreEngine oocEngine) {
96      this.optimalMemoryPressure = OPTIMAL_MEMORY_PRESSURE.get(conf);
97      this.oocEngine = oocEngine;
98      this.lastGCObservation = new GCObservation(-1, 0, 0);
99      for (IOCommand.IOCommandType type : IOCommand.IOCommandType.values()) {
100       commandOccurrences.put(type, new AtomicInteger(0));
101     }
102   }
103 
104   @Override
105   public synchronized void gcCompleted(GarbageCollectionNotificationInfo
106                                              gcInfo) {
107     long time = System.currentTimeMillis();
108     Map<String, MemoryUsage> memAfter = gcInfo.getGcInfo()
109         .getMemoryUsageAfterGc();
110     long usedMemory = 0;
111     long maxMemory = 0;
112     for (MemoryUsage memDetail : memAfter.values()) {
113       usedMemory += memDetail.getUsed();
114       maxMemory += memDetail.getMax();
115     }
116     GCObservation observation = new GCObservation(time, usedMemory, maxMemory);
117     if (LOG.isInfoEnabled()) {
118       LOG.info("gcCompleted: GC completed with: " + observation);
119     }
120     // Whether this is not the first GC call in the application
121     if (lastGCObservation.isValid()) {
122       long deltaDataRate =
123           lastGCObservation.getDesiredDeltaDataRate(observation);
124       long diskBandwidthEstimate =
125           oocEngine.getIOStatistics().getDiskBandwidth();
126       // Update the desired data injection rate to memory. The data injection
127       // rate cannot be less than -disk_bandwidth (the extreme case happens if
128       // we only do 'store'), and cannot be more than disk_bandwidth (the
129       // extreme case happens if we only do 'load').
130       long dataInjectionRate = desiredDiskToMemoryDataRate.get();
131       desiredDiskToMemoryDataRate.set(Math.max(
132           Math.min(desiredDiskToMemoryDataRate.get() - deltaDataRate,
133               diskBandwidthEstimate), -diskBandwidthEstimate));
134       if (LOG.isInfoEnabled()) {
135         LOG.info("gcCompleted: changing data injection rate from " +
136             String.format("%.2f", dataInjectionRate / 1024.0 / 1024.0) +
137             " to " + String.format("%.2f", desiredDiskToMemoryDataRate.get() /
138             1024.0 / 1024.0));
139       }
140     }
141     lastGCObservation = observation;
142   }
143 
144   /**
145    * Get the current data injection rate to memory based on the commands ran
146    * in the history (retrieved from statistics collector), and outstanding
147    * commands issued by the IO scheduler.
148    *
149    * @return the current data injection rate to memory
150    */
151   private long getCurrentDataInjectionRate() {
152     long effectiveBytesTransferred = 0;
153     long effectiveDuration = 0;
154     for (IOCommand.IOCommandType type : IOCommand.IOCommandType.values()) {
155       OutOfCoreIOStatistics.BytesDuration stats =
156           oocEngine.getIOStatistics().getCommandTypeStats(type);
157       int occurrence = commandOccurrences.get(type).get();
158       long typeBytesTransferred = stats.getBytes();
159       long typeDuration = stats.getDuration();
160       // If there is an outstanding command, we still do not know how many bytes
161       // it will transfer, and how long it will take. So, we guesstimate these
162       // numbers based on other similar commands happened in the history. We
163       // simply take the average number of bytes transferred for the particular
164       // command, and we take average duration for the particular command. We
165       // should multiply these numbers by the number of outstanding commands of
166       // this particular command type.
167       if (stats.getOccurrence() != 0) {
168         typeBytesTransferred += stats.getBytes() / stats.getOccurrence() *
169             occurrence;
170         typeDuration += stats.getDuration() / stats.getOccurrence() *
171             occurrence;
172       }
173       if (type == IOCommand.IOCommandType.LOAD_PARTITION) {
174         effectiveBytesTransferred += typeBytesTransferred;
175       } else {
176         // Store (data going out of memory), or wait (no data transferred)
177         effectiveBytesTransferred -= typeBytesTransferred;
178       }
179       effectiveDuration += typeDuration;
180     }
181     if (effectiveDuration == 0) {
182       return 0;
183     } else {
184       return effectiveBytesTransferred / effectiveDuration;
185     }
186   }
187 
188   @Override
189   public IOAction[] getNextIOActions() {
190     long error = (long) (oocEngine.getIOStatistics().getDiskBandwidth() * 0.05);
191     long desiredRate = desiredDiskToMemoryDataRate.get();
192     long currentRate = getCurrentDataInjectionRate();
193     if (desiredRate > error) {
194       // 'l-s' is positive, we should do more load than store.
195       if (currentRate > desiredRate + error) {
196         // We should decrease 'l-s'. This can be done either by increasing 's'
197         // or issuing wait command. We prioritize wait over hard store.
198         return new IOAction[]{
199           IOAction.STORE_MESSAGES_AND_BUFFERS,
200           IOAction.STORE_PROCESSED_PARTITION};
201       } else if (currentRate < desiredRate - error) {
202         // We should increase 'l-s'. We can simply load partitions/data.
203         return new IOAction[]{IOAction.LOAD_PARTITION};
204       } else {
205         // We are in a proper state and we should keep up with the rate. We can
206         // either soft store data or load data (hard load, since we desired rate
207         // is positive).
208         return new IOAction[]{
209           IOAction.STORE_MESSAGES_AND_BUFFERS,
210           IOAction.STORE_PROCESSED_PARTITION,
211           IOAction.LOAD_PARTITION};
212       }
213     } else if (desiredRate < -error) {
214       // 'l-s' is negative, we should do more store than load.
215       if (currentRate < desiredRate - error) {
216         // We should increase 'l-s', but we should be cautious. We only do soft
217         // load, or wait.
218         return new IOAction[]{IOAction.LOAD_UNPROCESSED_PARTITION};
219       } else if (currentRate > desiredRate + error) {
220         // We should reduce 'l-s', we do hard store.
221         return new IOAction[]{
222           IOAction.STORE_MESSAGES_AND_BUFFERS,
223           IOAction.STORE_PARTITION};
224       } else {
225         // We should keep up with the rate. We can either soft store data, or
226         // soft load data.
227         return new IOAction[]{
228           IOAction.STORE_MESSAGES_AND_BUFFERS,
229           IOAction.STORE_PROCESSED_PARTITION,
230           IOAction.LOAD_UNPROCESSED_PARTITION};
231       }
232     } else {
233       // 'l-s' is almost zero. If current rate is over the desired rate, we do
234       // soft store. If the current rate is below the desired rate, we do soft
235       // load.
236       if (currentRate > desiredRate + error) {
237         return new IOAction[]{
238           IOAction.STORE_MESSAGES_AND_BUFFERS,
239           IOAction.STORE_PROCESSED_PARTITION};
240       } else if (currentRate < desiredRate - error) {
241         return new IOAction[]{IOAction.LOAD_UNPROCESSED_PARTITION};
242       } else {
243         return new IOAction[]{
244           IOAction.STORE_MESSAGES_AND_BUFFERS,
245           IOAction.STORE_PROCESSED_PARTITION,
246           IOAction.LOAD_UNPROCESSED_PARTITION};
247       }
248     }
249   }
250 
251   @Override
252   public synchronized boolean approve(IOCommand command) {
253     long error = (long) (oocEngine.getIOStatistics().getDiskBandwidth() * 0.05);
254     long desiredRate = desiredDiskToMemoryDataRate.get();
255     long currentRate = getCurrentDataInjectionRate();
256     // The command is denied iff the current rate is above the desired rate and
257     // we are doing load (instead of store), or the current rate is below the
258     // desired rate and we are doing store (instead of loading).
259     if (currentRate > desiredRate + error &&
260         command instanceof LoadPartitionIOCommand) {
261       return false;
262     }
263     if (currentRate < desiredRate - error &&
264         !(command instanceof LoadPartitionIOCommand) &&
265         !(command instanceof WaitIOCommand)) {
266       return false;
267     }
268     commandOccurrences.get(command.getType()).getAndIncrement();
269     return true;
270   }
271 
272   @Override
273   public void commandCompleted(IOCommand command) {
274     commandOccurrences.get(command.getType()).getAndDecrement();
275   }
276 
277   @Override
278   public void shutdown() { }
279 
280   /** Helper class to record memory status after GC calls */
281   private class GCObservation {
282     /** The time at which the GC happened (in milliseconds) */
283     private long time;
284     /** Amount of memory used after the GC call */
285     private long usedMemory;
286     /** Maximum amounts of memory reported by GC listener */
287     private long maxMemory;
288 
289     /**
290      * Constructor
291      *
292      * @param time time of GC
293      * @param usedMemory amount of used memory after GC
294      * @param maxMemory amount of all available memory based on GC observation
295      */
296     public GCObservation(long time, long usedMemory, long maxMemory) {
297       this.time = time;
298       this.usedMemory = usedMemory;
299       this.maxMemory = maxMemory;
300     }
301 
302     /**
303      * Is this a valid observation?
304      *
305      * @return true iff it is a valid observation
306      */
307     public boolean isValid() {
308       return time > 0;
309     }
310 
311     /**
312      * Considering a new observation of memory status after the most recent GC,
313      * what is the desired rate for data injection to memory.
314      *
315      * @param newObservation the most recent GC observation
316      * @return desired rate of data injection to memory
317      */
318     public long getDesiredDeltaDataRate(GCObservation newObservation) {
319       long newUsedMemory = newObservation.usedMemory;
320       long newMaxMemory = newObservation.maxMemory;
321       long lastUsedMemory = usedMemory;
322       long lastMaxMemory = maxMemory;
323       // Scale the memory status of two GC observation to be the same
324       long scaledMaxMemory = Math.min(lastMaxMemory, newMaxMemory);
325       newUsedMemory =
326           (long) (((double) scaledMaxMemory / newMaxMemory) * newUsedMemory);
327       lastUsedMemory =
328           (long) (((double) scaledMaxMemory / lastMaxMemory) * lastUsedMemory);
329       long desiredUsedMemory = (long) (optimalMemoryPressure * scaledMaxMemory);
330       if (LOG.isInfoEnabled()) {
331         LOG.info("getDesiredDeltaDataRate: " + String.format("previous usage " +
332             "= %.2f MB, ", lastUsedMemory / 1024.0 / 1024.0) + String.format(
333             "current usage = %.2f MB, ", newUsedMemory / 1024.0 / 1024.0) +
334             String.format("ideal usage = %.2f MB", desiredUsedMemory / 1024.0 /
335                 1024.0));
336       }
337       long interval = newObservation.time - time;
338       if (interval == 0) {
339         interval = 1;
340         LOG.warn("getDesiredDeltaRate: two GC happened almost at the same " +
341             "time!");
342       }
343       long currentDataRate = (long) ((double) (newUsedMemory -
344           lastUsedMemory) / interval * 1000);
345       long desiredDataRate = (long) ((double) (desiredUsedMemory -
346           newUsedMemory) / interval * 1000);
347       return currentDataRate - desiredDataRate;
348     }
349 
350     @Override
351     public String toString() {
352       return String.format("(usedMemory: %.2f MB, maxMemory: %.2f MB at " +
353           "time: %d ms)", usedMemory / 1024.0 / 1024.0,
354           maxMemory / 1024.0 / 1024.0, time);
355     }
356   }
357 }