This project has retired. For details please refer to its Attic page.
SimpleGCMonitoringOracle xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.ooc.policy;
20  
21  import com.google.common.collect.Maps;
22  import com.sun.management.GarbageCollectionNotificationInfo;
23  import org.apache.giraph.conf.FloatConfOption;
24  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
25  import org.apache.giraph.ooc.OutOfCoreEngine;
26  import org.apache.giraph.ooc.OutOfCoreIOStatistics;
27  import org.apache.giraph.ooc.command.IOCommand;
28  import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
29  import org.apache.giraph.ooc.command.WaitIOCommand;
30  import org.apache.log4j.Logger;
31  
32  import java.lang.management.MemoryUsage;
33  import java.util.Map;
34  import java.util.concurrent.atomic.AtomicInteger;
35  import java.util.concurrent.atomic.AtomicLong;
36  
37  /**
38   * Out-of-core oracle to adaptively control data kept in memory, with the goal
39   * of keeping the memory state constantly at a desired state. This oracle
40   * monitors GC behavior to keep track of memory pressure.
41   *
42   * After each GC is done, this oracle retrieve statistics about the memory
43   * pressure (memory used, max memory, and how far away memory is compared to a
44   * max optimal pressure). Based on the the past 2 recent memory statistics,
45   * the oracle predicts the status of the memory, and sets the rate of load/store
46   * of data from/to disk. If the rate of loading data from disk is 'l', and the
47   * rate of storing data to disk is 's', the rate of data injection to memory
48   * from disk can be denoted as 'l-s'. This oracle determines what 'l-s' should
49   * be based on the prediction of memory status.
50   *
51   * Assume that based on the previous GC call the memory usage at time t_0 is
52   * m_0, and based on the most recent GC call the memory usage at time t_1 is
53   * m_1. So, the rate of memory increase is alpha = (m_1 - m_0) / (t_1 - t_0).
54   * Assume that the ideal memory pressure happens when the memory usage is
55   * m_ideal. So, at time 't_2 = t_1 + (t_1 - t_0)', we want m_ideal. That means
56   * the ideal rate would be beta = (m_ideal - m_1) / (t_2 - t_1). If the date
57   * injection rate to memory so far was i, the new injection rate should be:
58   * i_new = i - (alpha - beta)
59   */
60  public class SimpleGCMonitoringOracle implements OutOfCoreOracle {
61    /**
62     * The optimal memory pressure at which GC behavior is close to ideal. This
63     * fraction may be dependant on the GC strategy used for running a job, but
64     * generally should not be dependent on the graph processing application.
65     */
66    public static final FloatConfOption OPTIMAL_MEMORY_PRESSURE =
67        new FloatConfOption("giraph.optimalMemoryPressure", 0.8f,
68            "The memory pressure (fraction of used memory) at which the job " +
69                "shows the optimal GC behavior. This fraction may be dependent " +
70                "on the GC strategy used in running the job.");
71  
72    /** Class logger */
73    private static final Logger LOG =
74        Logger.getLogger(SimpleGCMonitoringOracle.class);
75    /** Cached value for OPTIMAL_MEMORY_PRESSURE */
76    private final float optimalMemoryPressure;
77    /** Out-of-core engine */
78    private final OutOfCoreEngine oocEngine;
79    /** Status of memory from the last GC call */
80    private GCObservation lastGCObservation;
81    /** Desired rate of data injection to memory */
82    private final AtomicLong desiredDiskToMemoryDataRate =
83        new AtomicLong(0);
84    /** Number of on the fly (outstanding) IO commands for each command type */
85    private final Map<IOCommand.IOCommandType, AtomicInteger> commandOccurrences =
86        Maps.newConcurrentMap();
87  
88    /**
89     * Constructor
90     *
91     * @param conf configuration
92     * @param oocEngine out-of-core engine
93     */
94    public SimpleGCMonitoringOracle(ImmutableClassesGiraphConfiguration conf,
95                                    OutOfCoreEngine oocEngine) {
96      this.optimalMemoryPressure = OPTIMAL_MEMORY_PRESSURE.get(conf);
97      this.oocEngine = oocEngine;
98      this.lastGCObservation = new GCObservation(-1, 0, 0);
99      for (IOCommand.IOCommandType type : IOCommand.IOCommandType.values()) {
100       commandOccurrences.put(type, new AtomicInteger(0));
101     }
102   }
103 
104   @Override
105   public synchronized void gcCompleted(GarbageCollectionNotificationInfo
106                                              gcInfo) {
107     long time = System.currentTimeMillis();
108     Map<String, MemoryUsage> memAfter = gcInfo.getGcInfo()
109         .getMemoryUsageAfterGc();
110     long usedMemory = 0;
111     long maxMemory = 0;
112     for (MemoryUsage memDetail : memAfter.values()) {
113       usedMemory += memDetail.getUsed();
114       maxMemory += memDetail.getMax();
115     }
116     GCObservation observation = new GCObservation(time, usedMemory, maxMemory);
117     if (LOG.isInfoEnabled()) {
118       LOG.info("gcCompleted: GC completed with: " + observation);
119     }
120     // Whether this is not the first GC call in the application
121     if (lastGCObservation.isValid()) {
122       long deltaDataRate =
123           lastGCObservation.getDesiredDeltaDataRate(observation);
124       long diskBandwidthEstimate =
125           oocEngine.getIOStatistics().getDiskBandwidth();
126       // Update the desired data injection rate to memory. The data injection
127       // rate cannot be less than -disk_bandwidth (the extreme case happens if
128       // we only do 'store'), and cannot be more than disk_bandwidth (the
129       // extreme case happens if we only do 'load').
130       long dataInjectionRate = desiredDiskToMemoryDataRate.get();
131       desiredDiskToMemoryDataRate.set(Math.max(
132           Math.min(desiredDiskToMemoryDataRate.get() - deltaDataRate,
133               diskBandwidthEstimate), -diskBandwidthEstimate));
134       if (LOG.isInfoEnabled()) {
135         LOG.info("gcCompleted: changing data injection rate from " +
136             String.format("%.2f", dataInjectionRate / 1024.0 / 1024.0) +
137             " to " + String.format("%.2f", desiredDiskToMemoryDataRate.get() /
138             1024.0 / 1024.0));
139       }
140     }
141     lastGCObservation = observation;
142   }
143 
144   @Override
145   public void startIteration() {
146   }
147 
148   /**
149    * Get the current data injection rate to memory based on the commands ran
150    * in the history (retrieved from statistics collector), and outstanding
151    * commands issued by the IO scheduler.
152    *
153    * @return the current data injection rate to memory
154    */
155   private long getCurrentDataInjectionRate() {
156     long effectiveBytesTransferred = 0;
157     long effectiveDuration = 0;
158     for (IOCommand.IOCommandType type : IOCommand.IOCommandType.values()) {
159       OutOfCoreIOStatistics.BytesDuration stats =
160           oocEngine.getIOStatistics().getCommandTypeStats(type);
161       int occurrence = commandOccurrences.get(type).get();
162       long typeBytesTransferred = stats.getBytes();
163       long typeDuration = stats.getDuration();
164       // If there is an outstanding command, we still do not know how many bytes
165       // it will transfer, and how long it will take. So, we guesstimate these
166       // numbers based on other similar commands happened in the history. We
167       // simply take the average number of bytes transferred for the particular
168       // command, and we take average duration for the particular command. We
169       // should multiply these numbers by the number of outstanding commands of
170       // this particular command type.
171       if (stats.getOccurrence() != 0) {
172         typeBytesTransferred += stats.getBytes() / stats.getOccurrence() *
173             occurrence;
174         typeDuration += stats.getDuration() / stats.getOccurrence() *
175             occurrence;
176       }
177       if (type == IOCommand.IOCommandType.LOAD_PARTITION) {
178         effectiveBytesTransferred += typeBytesTransferred;
179       } else {
180         // Store (data going out of memory), or wait (no data transferred)
181         effectiveBytesTransferred -= typeBytesTransferred;
182       }
183       effectiveDuration += typeDuration;
184     }
185     if (effectiveDuration == 0) {
186       return 0;
187     } else {
188       return effectiveBytesTransferred / effectiveDuration;
189     }
190   }
191 
192   @Override
193   public IOAction[] getNextIOActions() {
194     long error = (long) (oocEngine.getIOStatistics().getDiskBandwidth() * 0.05);
195     long desiredRate = desiredDiskToMemoryDataRate.get();
196     long currentRate = getCurrentDataInjectionRate();
197     if (desiredRate > error) {
198       // 'l-s' is positive, we should do more load than store.
199       if (currentRate > desiredRate + error) {
200         // We should decrease 'l-s'. This can be done either by increasing 's'
201         // or issuing wait command. We prioritize wait over hard store.
202         return new IOAction[]{
203           IOAction.STORE_MESSAGES_AND_BUFFERS,
204           IOAction.STORE_PROCESSED_PARTITION};
205       } else if (currentRate < desiredRate - error) {
206         // We should increase 'l-s'. We can simply load partitions/data.
207         return new IOAction[]{IOAction.LOAD_PARTITION};
208       } else {
209         // We are in a proper state and we should keep up with the rate. We can
210         // either soft store data or load data (hard load, since we desired rate
211         // is positive).
212         return new IOAction[]{
213           IOAction.STORE_MESSAGES_AND_BUFFERS,
214           IOAction.STORE_PROCESSED_PARTITION,
215           IOAction.LOAD_PARTITION};
216       }
217     } else if (desiredRate < -error) {
218       // 'l-s' is negative, we should do more store than load.
219       if (currentRate < desiredRate - error) {
220         // We should increase 'l-s', but we should be cautious. We only do soft
221         // load, or wait.
222         return new IOAction[]{IOAction.LOAD_UNPROCESSED_PARTITION};
223       } else if (currentRate > desiredRate + error) {
224         // We should reduce 'l-s', we do hard store.
225         return new IOAction[]{
226           IOAction.STORE_MESSAGES_AND_BUFFERS,
227           IOAction.STORE_PARTITION};
228       } else {
229         // We should keep up with the rate. We can either soft store data, or
230         // soft load data.
231         return new IOAction[]{
232           IOAction.STORE_MESSAGES_AND_BUFFERS,
233           IOAction.STORE_PROCESSED_PARTITION,
234           IOAction.LOAD_UNPROCESSED_PARTITION};
235       }
236     } else {
237       // 'l-s' is almost zero. If current rate is over the desired rate, we do
238       // soft store. If the current rate is below the desired rate, we do soft
239       // load.
240       if (currentRate > desiredRate + error) {
241         return new IOAction[]{
242           IOAction.STORE_MESSAGES_AND_BUFFERS,
243           IOAction.STORE_PROCESSED_PARTITION};
244       } else if (currentRate < desiredRate - error) {
245         return new IOAction[]{IOAction.LOAD_UNPROCESSED_PARTITION};
246       } else {
247         return new IOAction[]{
248           IOAction.STORE_MESSAGES_AND_BUFFERS,
249           IOAction.STORE_PROCESSED_PARTITION,
250           IOAction.LOAD_UNPROCESSED_PARTITION};
251       }
252     }
253   }
254 
255   @Override
256   public synchronized boolean approve(IOCommand command) {
257     long error = (long) (oocEngine.getIOStatistics().getDiskBandwidth() * 0.05);
258     long desiredRate = desiredDiskToMemoryDataRate.get();
259     long currentRate = getCurrentDataInjectionRate();
260     // The command is denied iff the current rate is above the desired rate and
261     // we are doing load (instead of store), or the current rate is below the
262     // desired rate and we are doing store (instead of loading).
263     if (currentRate > desiredRate + error &&
264         command instanceof LoadPartitionIOCommand) {
265       return false;
266     }
267     if (currentRate < desiredRate - error &&
268         !(command instanceof LoadPartitionIOCommand) &&
269         !(command instanceof WaitIOCommand)) {
270       return false;
271     }
272     commandOccurrences.get(command.getType()).getAndIncrement();
273     return true;
274   }
275 
276   @Override
277   public void commandCompleted(IOCommand command) {
278     commandOccurrences.get(command.getType()).getAndDecrement();
279   }
280 
281   /** Helper class to record memory status after GC calls */
282   private class GCObservation {
283     /** The time at which the GC happened (in milliseconds) */
284     private long time;
285     /** Amount of memory used after the GC call */
286     private long usedMemory;
287     /** Maximum amounts of memory reported by GC listener */
288     private long maxMemory;
289 
290     /**
291      * Constructor
292      *
293      * @param time time of GC
294      * @param usedMemory amount of used memory after GC
295      * @param maxMemory amount of all available memory based on GC observation
296      */
297     public GCObservation(long time, long usedMemory, long maxMemory) {
298       this.time = time;
299       this.usedMemory = usedMemory;
300       this.maxMemory = maxMemory;
301     }
302 
303     /**
304      * Is this a valid observation?
305      *
306      * @return true iff it is a valid observation
307      */
308     public boolean isValid() {
309       return time > 0;
310     }
311 
312     /**
313      * Considering a new observation of memory status after the most recent GC,
314      * what is the desired rate for data injection to memory.
315      *
316      * @param newObservation the most recent GC observation
317      * @return desired rate of data injection to memory
318      */
319     public long getDesiredDeltaDataRate(GCObservation newObservation) {
320       long newUsedMemory = newObservation.usedMemory;
321       long newMaxMemory = newObservation.maxMemory;
322       long lastUsedMemory = usedMemory;
323       long lastMaxMemory = maxMemory;
324       // Scale the memory status of two GC observation to be the same
325       long scaledMaxMemory = Math.min(lastMaxMemory, newMaxMemory);
326       newUsedMemory =
327           (long) (((double) scaledMaxMemory / newMaxMemory) * newUsedMemory);
328       lastUsedMemory =
329           (long) (((double) scaledMaxMemory / lastMaxMemory) * lastUsedMemory);
330       long desiredUsedMemory = (long) (optimalMemoryPressure * scaledMaxMemory);
331       if (LOG.isInfoEnabled()) {
332         LOG.info("getDesiredDeltaDataRate: " + String.format("previous usage " +
333             "= %.2f MB, ", lastUsedMemory / 1024.0 / 1024.0) + String.format(
334             "current usage = %.2f MB, ", newUsedMemory / 1024.0 / 1024.0) +
335             String.format("ideal usage = %.2f MB", desiredUsedMemory / 1024.0 /
336                 1024.0));
337       }
338       long interval = newObservation.time - time;
339       if (interval == 0) {
340         interval = 1;
341         LOG.warn("getDesiredDeltaRate: two GC happened almost at the same " +
342             "time!");
343       }
344       long currentDataRate = (long) ((double) (newUsedMemory -
345           lastUsedMemory) / interval * 1000);
346       long desiredDataRate = (long) ((double) (desiredUsedMemory -
347           newUsedMemory) / interval * 1000);
348       return currentDataRate - desiredDataRate;
349     }
350 
351     @Override
352     public String toString() {
353       return String.format("(usedMemory: %.2f MB, maxMemory: %.2f MB at " +
354           "time: %d ms)", usedMemory / 1024.0 / 1024.0,
355           maxMemory / 1024.0 / 1024.0, time);
356     }
357   }
358 }