View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.ooc.policy;
20  
21  import com.sun.management.GarbageCollectionNotificationInfo;
22  import org.apache.giraph.comm.flow_control.CreditBasedFlowControl;
23  import org.apache.giraph.comm.flow_control.FlowControl;
24  import org.apache.giraph.comm.netty.NettyClient;
25  import org.apache.giraph.conf.FloatConfOption;
26  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
27  import org.apache.giraph.conf.LongConfOption;
28  import org.apache.giraph.ooc.OutOfCoreEngine;
29  import org.apache.giraph.ooc.command.IOCommand;
30  import org.apache.giraph.utils.CallableFactory;
31  import org.apache.giraph.utils.LogStacktraceCallable;
32  import org.apache.giraph.utils.MemoryUtils;
33  import org.apache.giraph.utils.ThreadUtils;
34  import org.apache.log4j.Logger;
35  
36  import java.util.concurrent.Callable;
37  import java.util.concurrent.CountDownLatch;
38  import java.util.concurrent.ExecutionException;
39  import java.util.concurrent.ExecutorService;
40  import java.util.concurrent.Executors;
41  import java.util.concurrent.Future;
42  import java.util.concurrent.TimeUnit;
43  
44  import static com.google.common.base.Preconditions.checkState;
45  
46  /**
47   * Out-of-core oracle to adaptively control data kept in memory, with the goal
48   * of keeping the memory usage at a desired state. Out-of-core policy in this
49   * oracle is based on several user-defined thresholds. Also, this oracle spawns
50   * a thread to periodically check the memory usage. This thread would issue
51   * manual GC calls if JVM fails to call major/full GC for a while and the amount
52   * of used memory is about to cause high-memory pressure. This oracle, also,
53   * monitors GC activities. The monitoring mechanism looks for major/full GC
54   * calls, and updates out-of-core decisions based on the amount of available
55   * memory after such GCs. There are three out-of-core decisions:
56   *  - Which IO operations should be done (load/offload of partitions and
57   *    messages)
58   *  - What the incoming messages rate should be (updating credits announced by
59   *    this worker in credit-based flow-control mechanism)
60   *  - How many processing threads should remain active (tethering rate of
61   *    data generation)
62   *
63   * The following table shows the relationship of these decisions and
64   * used-defined thresholds.
65   * --------------------------------------------------------------
66   * Memory Pressure     |  Manual |   IO   | Credit   | Active   |
67   * (memory usage)      |   GC?   | Action |          | Threads  |
68   * --------------------------------------------------------------
69   *                     |  Yes    | hard   |  0       |  0       |
70   *                     |         | store  |          |          |
71   * failPressure -------------------------------------------------
72   *                     |  Yes    | hard   |  0       | fraction |
73   *                     |         | store  |          |          |
74   * emergencyPressure --------------------------------------------
75   *                     |  Yes    | hard   | fraction |  max     |
76   *                     |         | store  |          |          |
77   * highPressure -------------------------------------------------
78   *                     |  No     | soft   | fraction |  max     |
79   *                     |         | store  |          |          |
80   * optimalPressure ----------------------------------------------
81   *                     |  No     | soft   |  max     |  max     |
82   *                     |         | load   |          |          |
83   * lowPressure --------------------------------------------------
84   *                     |  No     | hard   |  max     |  max     |
85   *                     |         | load   |          |          |
86   * --------------------------------------------------------------
87   *
88   */
89  public class ThresholdBasedOracle implements OutOfCoreOracle {
90    /** The memory pressure at/above which the job would fail */
91    public static final FloatConfOption FAIL_MEMORY_PRESSURE =
92        new FloatConfOption("giraph.memory.failPressure", 0.975f,
93            "The memory pressure (fraction of used memory) at/above which the " +
94                "job would fail.");
95    /**
96     * The memory pressure at which the job is cloe to fail, even though we were
97     * using maximal disk bandwidth and minimal network rate. We should reduce
98     * job processing rate.
99     */
100   public static final FloatConfOption EMERGENCY_MEMORY_PRESSURE =
101       new FloatConfOption("giraph.memory.emergencyPressure", 0.925f,
102           "The memory pressure (fraction of used memory) at which the job " +
103               "is close to fail, hence we should reduce its processing rate " +
104               "as much as possible.");
105   /** The memory pressure at which the job is suffering from GC overhead. */
106   public static final FloatConfOption HIGH_MEMORY_PRESSURE =
107       new FloatConfOption("giraph.memory.highPressure", 0.875f,
108           "The memory pressure (fraction of used memory) at which the job " +
109               "is suffering from GC overhead.");
110   /**
111    * The memory pressure at which we expect GC to perform optimally for a
112    * memory intensive job.
113    */
114   public static final FloatConfOption OPTIMAL_MEMORY_PRESSURE =
115       new FloatConfOption("giraph.memory.optimalPressure", 0.8f,
116           "The memory pressure (fraction of used memory) at which a " +
117               "memory-intensive job shows the optimal GC behavior.");
118   /**
119    * The memory pressure at/below which the job can use more memory without
120    * suffering from GC overhead.
121    */
122   public static final FloatConfOption LOW_MEMORY_PRESSURE =
123       new FloatConfOption("giraph.memory.lowPressure", 0.7f,
124           "The memory pressure (fraction of used memory) at/below which the " +
125               "job can use more memory without suffering the performance.");
126   /** The interval at which memory observer thread wakes up. */
127   public static final LongConfOption CHECK_MEMORY_INTERVAL =
128       new LongConfOption("giraph.checkMemoryInterval", 2500,
129           "The interval/period where memory observer thread wakes up and " +
130               "monitors memory footprint (in milliseconds)");
131   /**
132    * Memory observer thread would manually call GC if major/full GC has not
133    * been called for a while. The period where we expect GC to be happened in
134    * past is specified in this parameter
135    */
136   public static final LongConfOption LAST_GC_CALL_INTERVAL =
137       new LongConfOption("giraph.lastGcCallInterval", 10 * 1000,
138           "How long after last major/full GC should we call manual GC?");
139 
140   /** Class logger */
141   private static final Logger LOG =
142       Logger.getLogger(ThresholdBasedOracle.class);
143   /** Cached value for FAIL_MEMORY_PRESSURE */
144   private final float failMemoryPressure;
145   /** Cached value for EMERGENCY_MEMORY_PRESSURE */
146   private final float emergencyMemoryPressure;
147   /** Cached value for HIGH_MEMORY_PRESSURE */
148   private final float highMemoryPressure;
149   /** Cached value for OPTIMAL_MEMORY_PRESSURE */
150   private final float optimalMemoryPressure;
151   /** Cached value for LOW_MEMORY_PRESSURE */
152   private final float lowMemoryPressure;
153   /** Cached value for CHECK_MEMORY_INTERVAL */
154   private final long checkMemoryInterval;
155   /** Cached value for LAST_GC_CALL_INTERVAL */
156   private final long lastGCCallInterval;
157   /**
158    * Cached value for NettyClient.MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER (max
159    * credit used for credit-based flow-control mechanism)
160    */
161   private final short maxRequestsCredit;
162   /**
163    * Whether the job is shutting down. Used for terminating the memory
164    * observer thread.
165    */
166   private final CountDownLatch shouldTerminate;
167   /** Result of memory observer thread */
168   private final Future<Void> checkMemoryThreadResult;
169   /** Out-of-core engine */
170   private final OutOfCoreEngine oocEngine;
171   /** Last time a major/full GC has been called (in milliseconds) */
172   private volatile long lastMajorGCTime;
173   /** Last time a non major/full GC has been called (in milliseconds) */
174   private volatile long lastMinorGCTime;
175 
176   /**
177    * Constructor
178    *
179    * @param conf configuration
180    * @param oocEngine out-of-core engine
181    */
182   public ThresholdBasedOracle(ImmutableClassesGiraphConfiguration conf,
183                               OutOfCoreEngine oocEngine) {
184     this.failMemoryPressure = FAIL_MEMORY_PRESSURE.get(conf);
185     this.emergencyMemoryPressure = EMERGENCY_MEMORY_PRESSURE.get(conf);
186     this.highMemoryPressure = HIGH_MEMORY_PRESSURE.get(conf);
187     this.optimalMemoryPressure = OPTIMAL_MEMORY_PRESSURE.get(conf);
188     this.lowMemoryPressure = LOW_MEMORY_PRESSURE.get(conf);
189     this.checkMemoryInterval = CHECK_MEMORY_INTERVAL.get(conf);
190     this.lastGCCallInterval = LAST_GC_CALL_INTERVAL.get(conf);
191     this.maxRequestsCredit = (short)
192         CreditBasedFlowControl.MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER.get(conf);
193     NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.setIfUnset(conf, true);
194     boolean useCredit = NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.get(conf);
195     checkState(useCredit, "ThresholdBasedOracle: credit-based flow control " +
196         "must be enabled. Use giraph.waitForPerWorkerRequests=true");
197     this.shouldTerminate = new CountDownLatch(1);
198     this.oocEngine = oocEngine;
199     this.lastMajorGCTime = 0;
200 
201     CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
202       @Override
203       public Callable<Void> newCallable(int callableId) {
204         return new Callable<Void>() {
205           @Override
206           public Void call() throws Exception {
207             while (true) {
208               boolean done = shouldTerminate.await(checkMemoryInterval,
209                   TimeUnit.MILLISECONDS);
210               if (done) {
211                 break;
212               }
213               double usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction();
214               long time = System.currentTimeMillis();
215               if ((usedMemoryFraction > highMemoryPressure &&
216                   time - lastMajorGCTime >= lastGCCallInterval) ||
217                   (usedMemoryFraction > optimalMemoryPressure &&
218                   time - lastMajorGCTime >= lastGCCallInterval &&
219                   time - lastMinorGCTime >= lastGCCallInterval)) {
220                 if (LOG.isInfoEnabled()) {
221                   LOG.info("call: last GC happened a while ago and the " +
222                       "amount of used memory is high (used memory " +
223                       "fraction is " +
224                       String.format("%.2f", usedMemoryFraction) + "). " +
225                       "Calling GC manually");
226                 }
227                 System.gc();
228                 time = System.currentTimeMillis() - time;
229                 usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction();
230                 if (LOG.isInfoEnabled()) {
231                   LOG.info("call: manual GC is done. It took " +
232                       String.format("%.2f", (double) time / 1000) +
233                       " seconds. Used memory fraction is " +
234                       String.format("%.2f", usedMemoryFraction));
235                 }
236               }
237               updateRates(usedMemoryFraction);
238             }
239             return null;
240           }
241         };
242       }
243     };
244     ExecutorService executor = Executors.newSingleThreadExecutor(
245         ThreadUtils.createThreadFactory("check-memory"));
246     this.checkMemoryThreadResult = executor.submit(new LogStacktraceCallable<>(
247         callableFactory.newCallable(0)));
248     executor.shutdown();
249   }
250 
251   /**
252    * upon major/full GC calls.
253    */
254   /**
255    * Update statistics and rate regarding communication credits and number of
256    * active threads.
257    *
258    * @param usedMemoryFraction the fraction of used memory over max memory
259    */
260   public void updateRates(double usedMemoryFraction) {
261     // Update the fraction of processing threads that should remain active
262     if (usedMemoryFraction >= failMemoryPressure) {
263       oocEngine.updateActiveThreadsFraction(0);
264     } else if (usedMemoryFraction < emergencyMemoryPressure) {
265       oocEngine.updateActiveThreadsFraction(1);
266     } else {
267       oocEngine.updateActiveThreadsFraction(1 -
268           (usedMemoryFraction - emergencyMemoryPressure) /
269               (failMemoryPressure - emergencyMemoryPressure));
270     }
271 
272     // Update the fraction of credit that should be used in credit-based flow-
273     // control
274     if (usedMemoryFraction >= emergencyMemoryPressure) {
275       updateRequestsCredit((short) 0);
276     } else if (usedMemoryFraction < optimalMemoryPressure) {
277       updateRequestsCredit(maxRequestsCredit);
278     } else {
279       updateRequestsCredit((short) (maxRequestsCredit *
280           (1 - (usedMemoryFraction - optimalMemoryPressure) /
281               (emergencyMemoryPressure - optimalMemoryPressure))));
282     }
283   }
284 
285   @Override
286   public IOAction[] getNextIOActions() {
287     double usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction();
288     if (LOG.isInfoEnabled()) {
289       LOG.info(String.format("getNextIOActions: usedMemoryFraction = %.2f",
290           usedMemoryFraction));
291     }
292     if (usedMemoryFraction > highMemoryPressure) {
293       return new IOAction[]{
294         IOAction.STORE_MESSAGES_AND_BUFFERS,
295         IOAction.STORE_PARTITION};
296     } else if (usedMemoryFraction > optimalMemoryPressure) {
297       return new IOAction[]{
298         IOAction.LOAD_UNPROCESSED_PARTITION,
299         IOAction.STORE_MESSAGES_AND_BUFFERS,
300         IOAction.STORE_PROCESSED_PARTITION};
301     } else if (usedMemoryFraction > lowMemoryPressure) {
302       return new IOAction[]{
303         IOAction.LOAD_UNPROCESSED_PARTITION,
304         IOAction.STORE_MESSAGES_AND_BUFFERS,
305         IOAction.LOAD_PARTITION};
306     } else {
307       return new IOAction[]{IOAction.LOAD_PARTITION};
308     }
309   }
310 
311   @Override
312   public boolean approve(IOCommand command) {
313     return true;
314   }
315 
316   @Override
317   public void commandCompleted(IOCommand command) {
318     // Do nothing
319   }
320 
321   @Override
322   public void gcCompleted(GarbageCollectionNotificationInfo gcInfo) {
323     String gcAction = gcInfo.getGcAction().toLowerCase();
324     if (gcAction.contains("full") || gcAction.contains("major")) {
325       if (!gcInfo.getGcCause().contains("No GC")) {
326         lastMajorGCTime = System.currentTimeMillis();
327       }
328     } else {
329       lastMinorGCTime = System.currentTimeMillis();
330     }
331   }
332 
333   @Override
334   public void shutdown() {
335     shouldTerminate.countDown();
336     try {
337       checkMemoryThreadResult.get();
338     } catch (InterruptedException | ExecutionException e) {
339       LOG.error("shutdown: caught exception while waiting on check-memory " +
340           "thread to terminate!");
341       throw new IllegalStateException(e);
342     }
343     if (LOG.isInfoEnabled()) {
344       LOG.info("shutdown: ThresholdBasedOracle shutdown complete!");
345     }
346   }
347 
348   /**
349    * Update the credit announced for this worker in Netty. The lower the credit
350    * is, the lower rate incoming messages arrive at this worker. Thus, credit
351    * is an indirect way of controlling amount of memory incoming messages would
352    * take.
353    *
354    * @param newCredit the new credit to announce to other workers
355    */
356   private void updateRequestsCredit(short newCredit) {
357     if (LOG.isInfoEnabled()) {
358       LOG.info("updateRequestsCredit: updating the credit to " + newCredit);
359     }
360     FlowControl flowControl = oocEngine.getFlowControl();
361     if (flowControl != null) {
362       ((CreditBasedFlowControl) flowControl).updateCredit(newCredit);
363     }
364   }
365 }