This project has retired. For details please refer to its Attic page.
ThresholdBasedOracle xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.ooc.policy;
20  
21  import com.sun.management.GarbageCollectionNotificationInfo;
22  import org.apache.giraph.comm.netty.NettyClient;
23  import org.apache.giraph.conf.FloatConfOption;
24  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
25  import org.apache.giraph.conf.LongConfOption;
26  import org.apache.giraph.ooc.OutOfCoreEngine;
27  import org.apache.giraph.ooc.command.IOCommand;
28  import org.apache.giraph.utils.MemoryUtils;
29  import org.apache.giraph.utils.ThreadUtils;
30  import org.apache.log4j.Logger;
31  
32  import static com.google.common.base.Preconditions.checkState;
33  
34  /**
35   * Out-of-core oracle to adaptively control data kept in memory, with the goal
36   * of keeping the memory usage at a desired state. Out-of-core policy in this
37   * oracle is based on several user-defined thresholds. Also, this oracle spawns
38   * a thread to periodically check the memory usage. This thread would issue
39   * manual GC calls if JVM fails to call major/full GC for a while and the amount
40   * of used memory is about to cause high-memory pressure. This oracle, also,
41   * monitors GC activities. The monitoring mechanism looks for major/full GC
42   * calls, and updates out-of-core decisions based on the amount of available
43   * memory after such GCs. There are three out-of-core decisions:
44   *  - Which IO operations should be done (load/offload of partitions and
45   *    messages)
46   *  - What the incoming messages rate should be (updating credits announced by
47   *    this worker in credit-based flow-control mechanism)
48   *  - How many processing threads should remain active (tethering rate of
49   *    data generation)
50   *
51   * The following table shows the relationship of these decisions and
52   * used-defined thresholds.
53   * --------------------------------------------------------------
54   * Memory Pressure     |  Manual |   IO   | Credit   | Active   |
55   * (memory usage)      |   GC?   | Action |          | Threads  |
56   * --------------------------------------------------------------
57   *                     |  Yes    | hard   |  0       |  0       |
58   *                     |         | store  |          |          |
59   * failPressure -------------------------------------------------
60   *                     |  Yes    | hard   |  0       | fraction |
61   *                     |         | store  |          |          |
62   * emergencyPressure --------------------------------------------
63   *                     |  Yes    | hard   | fraction |  max     |
64   *                     |         | store  |          |          |
65   * highPressure -------------------------------------------------
66   *                     |  No     | soft   | fraction |  max     |
67   *                     |         | store  |          |          |
68   * optimalPressure ----------------------------------------------
69   *                     |  No     | soft   |  max     |  max     |
70   *                     |         | load   |          |          |
71   * lowPressure --------------------------------------------------
72   *                     |  No     | hard   |  max     |  max     |
73   *                     |         | load   |          |          |
74   * --------------------------------------------------------------
75   *
76   */
77  public class ThresholdBasedOracle implements OutOfCoreOracle {
78    /** The memory pressure at/above which the job would fail */
79    public static final FloatConfOption FAIL_MEMORY_PRESSURE =
80        new FloatConfOption("giraph.threshold.failPressure", 0.975f,
81            "The memory pressure (fraction of used memory) at/above which the " +
82                "job would fail.");
83    /**
84     * The memory pressure at which the job is cloe to fail, even though we were
85     * using maximal disk bandwidth and minimal network rate. We should reduce
86     * job processing rate.
87     */
88    public static final FloatConfOption EMERGENCY_MEMORY_PRESSURE =
89        new FloatConfOption("giraph.threshold.emergencyPressure", 0.925f,
90            "The memory pressure (fraction of used memory) at which the job " +
91                "is close to fail, hence we should reduce its processing rate " +
92                "as much as possible.");
93    /** The memory pressure at which the job is suffering from GC overhead. */
94    public static final FloatConfOption HIGH_MEMORY_PRESSURE =
95        new FloatConfOption("giraph.threshold.highPressure", 0.875f,
96            "The memory pressure (fraction of used memory) at which the job " +
97                "is suffering from GC overhead.");
98    /**
99     * The memory pressure at which we expect GC to perform optimally for a
100    * memory intensive job.
101    */
102   public static final FloatConfOption OPTIMAL_MEMORY_PRESSURE =
103       new FloatConfOption("giraph.threshold.optimalPressure", 0.8f,
104           "The memory pressure (fraction of used memory) at which a " +
105               "memory-intensive job shows the optimal GC behavior.");
106   /**
107    * The memory pressure at/below which the job can use more memory without
108    * suffering from GC overhead.
109    */
110   public static final FloatConfOption LOW_MEMORY_PRESSURE =
111       new FloatConfOption("giraph.threshold.lowPressure", 0.7f,
112           "The memory pressure (fraction of used memory) at/below which the " +
113               "job can use more memory without suffering the performance.");
114   /** The interval at which memory observer thread wakes up. */
115   public static final LongConfOption CHECK_MEMORY_INTERVAL =
116       new LongConfOption("giraph.threshold.checkMemoryInterval", 2500,
117           "The interval/period where memory observer thread wakes up and " +
118               "monitors memory footprint (in milliseconds)");
119   /**
120    * Memory observer thread would manually call GC if major/full GC has not
121    * been called for a while. The period where we expect GC to be happened in
122    * past is specified in this parameter
123    */
124   public static final LongConfOption LAST_GC_CALL_INTERVAL =
125       new LongConfOption("giraph.threshold.lastGcCallInterval", 10 * 1000,
126           "How long after last major/full GC should we call manual GC?");
127 
128   /** Class logger */
129   private static final Logger LOG =
130       Logger.getLogger(ThresholdBasedOracle.class);
131   /** Cached value for FAIL_MEMORY_PRESSURE */
132   private final float failMemoryPressure;
133   /** Cached value for EMERGENCY_MEMORY_PRESSURE */
134   private final float emergencyMemoryPressure;
135   /** Cached value for HIGH_MEMORY_PRESSURE */
136   private final float highMemoryPressure;
137   /** Cached value for OPTIMAL_MEMORY_PRESSURE */
138   private final float optimalMemoryPressure;
139   /** Cached value for LOW_MEMORY_PRESSURE */
140   private final float lowMemoryPressure;
141   /** Cached value for CHECK_MEMORY_INTERVAL */
142   private final long checkMemoryInterval;
143   /** Cached value for LAST_GC_CALL_INTERVAL */
144   private final long lastGCCallInterval;
145   /** Out-of-core engine */
146   private final OutOfCoreEngine oocEngine;
147   /** Last time a major/full GC has been called (in milliseconds) */
148   private volatile long lastMajorGCTime;
149   /** Last time a non major/full GC has been called (in milliseconds) */
150   private volatile long lastMinorGCTime;
151 
152   /**
153    * Constructor
154    *
155    * @param conf configuration
156    * @param oocEngine out-of-core engine
157    */
158   public ThresholdBasedOracle(ImmutableClassesGiraphConfiguration conf,
159                               OutOfCoreEngine oocEngine) {
160     this.failMemoryPressure = FAIL_MEMORY_PRESSURE.get(conf);
161     this.emergencyMemoryPressure = EMERGENCY_MEMORY_PRESSURE.get(conf);
162     this.highMemoryPressure = HIGH_MEMORY_PRESSURE.get(conf);
163     this.optimalMemoryPressure = OPTIMAL_MEMORY_PRESSURE.get(conf);
164     this.lowMemoryPressure = LOW_MEMORY_PRESSURE.get(conf);
165     this.checkMemoryInterval = CHECK_MEMORY_INTERVAL.get(conf);
166     this.lastGCCallInterval = LAST_GC_CALL_INTERVAL.get(conf);
167     NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.setIfUnset(conf, true);
168     boolean useCredit = NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.get(conf);
169     checkState(useCredit, "ThresholdBasedOracle: credit-based flow control " +
170         "must be enabled. Use giraph.waitForPerWorkerRequests=true");
171     this.oocEngine = oocEngine;
172     this.lastMajorGCTime = 0;
173 
174     ThreadUtils.startThread(new Runnable() {
175       @Override
176       public void run() {
177         while (true) {
178           double usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction();
179           long time = System.currentTimeMillis();
180           if ((usedMemoryFraction > highMemoryPressure &&
181               time - lastMajorGCTime >= lastGCCallInterval) ||
182               (usedMemoryFraction > optimalMemoryPressure &&
183                   time - lastMajorGCTime >= lastGCCallInterval &&
184                   time - lastMinorGCTime >= lastGCCallInterval)) {
185             if (LOG.isInfoEnabled()) {
186               LOG.info("call: last GC happened a while ago and the " +
187                   "amount of used memory is high (used memory " +
188                   "fraction is " +
189                   String.format("%.2f", usedMemoryFraction) + "). " +
190                   "Calling GC manually");
191             }
192             System.gc();
193             time = System.currentTimeMillis() - time;
194             usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction();
195             if (LOG.isInfoEnabled()) {
196               LOG.info("call: manual GC is done. It took " +
197                   String.format("%.2f", (double) time / 1000) +
198                   " seconds. Used memory fraction is " +
199                   String.format("%.2f", usedMemoryFraction));
200             }
201           }
202           updateRates(usedMemoryFraction);
203           try {
204             Thread.sleep(checkMemoryInterval);
205           } catch (InterruptedException e) {
206             LOG.warn("run: exception occurred!", e);
207             return;
208           }
209         }
210       }
211     }, "memory-checker", oocEngine.getServiceWorker().getGraphTaskManager().
212         createUncaughtExceptionHandler());
213   }
214 
215   /**
216    * Update statistics and rate regarding communication credits and number of
217    * active threads.
218    *
219    * @param usedMemoryFraction the fraction of used memory over max memory
220    */
221   public void updateRates(double usedMemoryFraction) {
222     // Update the fraction of processing threads that should remain active
223     if (usedMemoryFraction >= failMemoryPressure) {
224       oocEngine.updateActiveThreadsFraction(0);
225     } else if (usedMemoryFraction < emergencyMemoryPressure) {
226       oocEngine.updateActiveThreadsFraction(1);
227     } else {
228       oocEngine.updateActiveThreadsFraction(1 -
229           (usedMemoryFraction - emergencyMemoryPressure) /
230               (failMemoryPressure - emergencyMemoryPressure));
231     }
232 
233     // Update the fraction of credit that should be used in credit-based flow-
234     // control
235     if (usedMemoryFraction >= emergencyMemoryPressure) {
236       oocEngine.updateRequestsCreditFraction(0);
237     } else if (usedMemoryFraction < optimalMemoryPressure) {
238       oocEngine.updateRequestsCreditFraction(1);
239     } else {
240       oocEngine.updateRequestsCreditFraction(1 -
241           (usedMemoryFraction - optimalMemoryPressure) /
242               (emergencyMemoryPressure - optimalMemoryPressure));
243     }
244   }
245 
246   @Override
247   public IOAction[] getNextIOActions() {
248     double usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction();
249     if (LOG.isDebugEnabled()) {
250       LOG.debug(String.format("getNextIOActions: usedMemoryFraction = %.2f",
251           usedMemoryFraction));
252     }
253     if (usedMemoryFraction > highMemoryPressure) {
254       return new IOAction[]{
255         IOAction.STORE_MESSAGES_AND_BUFFERS,
256         IOAction.STORE_PARTITION};
257     } else if (usedMemoryFraction > optimalMemoryPressure) {
258       return new IOAction[]{
259         IOAction.LOAD_UNPROCESSED_PARTITION,
260         IOAction.STORE_MESSAGES_AND_BUFFERS,
261         IOAction.STORE_PROCESSED_PARTITION};
262     } else if (usedMemoryFraction > lowMemoryPressure) {
263       return new IOAction[]{
264         IOAction.LOAD_UNPROCESSED_PARTITION,
265         IOAction.STORE_MESSAGES_AND_BUFFERS,
266         IOAction.LOAD_PARTITION};
267     } else {
268       return new IOAction[]{IOAction.LOAD_PARTITION};
269     }
270   }
271 
272   @Override
273   public boolean approve(IOCommand command) {
274     return true;
275   }
276 
277   @Override
278   public void commandCompleted(IOCommand command) {
279     // Do nothing
280   }
281 
282   @Override
283   public void gcCompleted(GarbageCollectionNotificationInfo gcInfo) {
284     String gcAction = gcInfo.getGcAction().toLowerCase();
285     if (gcAction.contains("full") || gcAction.contains("major")) {
286       if (!gcInfo.getGcCause().contains("No GC")) {
287         lastMajorGCTime = System.currentTimeMillis();
288       }
289     } else {
290       lastMinorGCTime = System.currentTimeMillis();
291     }
292   }
293 
294   @Override
295   public void startIteration() {
296   }
297 }