1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.ooc.policy;
2021import com.sun.management.GarbageCollectionNotificationInfo;
22import org.apache.giraph.comm.netty.NettyClient;
23import org.apache.giraph.conf.FloatConfOption;
24import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
25import org.apache.giraph.conf.LongConfOption;
26import org.apache.giraph.ooc.OutOfCoreEngine;
27import org.apache.giraph.ooc.command.IOCommand;
28import org.apache.giraph.utils.MemoryUtils;
29import org.apache.giraph.utils.ThreadUtils;
30import org.apache.log4j.Logger;
3132importstatic com.google.common.base.Preconditions.checkState;
3334/**35 * Out-of-core oracle to adaptively control data kept in memory, with the goal36 * of keeping the memory usage at a desired state. Out-of-core policy in this37 * oracle is based on several user-defined thresholds. Also, this oracle spawns38 * a thread to periodically check the memory usage. This thread would issue39 * manual GC calls if JVM fails to call major/full GC for a while and the amount40 * of used memory is about to cause high-memory pressure. This oracle, also,41 * monitors GC activities. The monitoring mechanism looks for major/full GC42 * calls, and updates out-of-core decisions based on the amount of available43 * memory after such GCs. There are three out-of-core decisions:44 * - Which IO operations should be done (load/offload of partitions and45 * messages)46 * - What the incoming messages rate should be (updating credits announced by47 * this worker in credit-based flow-control mechanism)48 * - How many processing threads should remain active (tethering rate of49 * data generation)50 *51 * The following table shows the relationship of these decisions and52 * used-defined thresholds.53 * --------------------------------------------------------------54 * Memory Pressure | Manual | IO | Credit | Active |55 * (memory usage) | GC? | Action | | Threads |56 * --------------------------------------------------------------57 * | Yes | hard | 0 | 0 |58 * | | store | | |59 * failPressure -------------------------------------------------60 * | Yes | hard | 0 | fraction |61 * | | store | | |62 * emergencyPressure --------------------------------------------63 * | Yes | hard | fraction | max |64 * | | store | | |65 * highPressure -------------------------------------------------66 * | No | soft | fraction | max |67 * | | store | | |68 * optimalPressure ----------------------------------------------69 * | No | soft | max | max |70 * | | load | | |71 * lowPressure --------------------------------------------------72 * | No | hard | max | max |73 * | | load | | |74 * --------------------------------------------------------------75 *76 */77publicclassThresholdBasedOracleimplementsOutOfCoreOracle {
78/** The memory pressure at/above which the job would fail */79publicstaticfinalFloatConfOption FAIL_MEMORY_PRESSURE =
80newFloatConfOption("giraph.threshold.failPressure", 0.975f,
81"The memory pressure (fraction of used memory) at/above which the " +
82"job would fail.");
83/**84 * The memory pressure at which the job is cloe to fail, even though we were85 * using maximal disk bandwidth and minimal network rate. We should reduce86 * job processing rate.87 */88publicstaticfinalFloatConfOption EMERGENCY_MEMORY_PRESSURE =
89newFloatConfOption("giraph.threshold.emergencyPressure", 0.925f,
90"The memory pressure (fraction of used memory) at which the job " +
91"is close to fail, hence we should reduce its processing rate " +
92"as much as possible.");
93/** The memory pressure at which the job is suffering from GC overhead. */94publicstaticfinalFloatConfOption HIGH_MEMORY_PRESSURE =
95newFloatConfOption("giraph.threshold.highPressure", 0.875f,
96"The memory pressure (fraction of used memory) at which the job " +
97"is suffering from GC overhead.");
98/**99 * The memory pressure at which we expect GC to perform optimally for a100 * memory intensive job.101 */102publicstaticfinalFloatConfOption OPTIMAL_MEMORY_PRESSURE =
103newFloatConfOption("giraph.threshold.optimalPressure", 0.8f,
104"The memory pressure (fraction of used memory) at which a " +
105"memory-intensive job shows the optimal GC behavior.");
106/**107 * The memory pressure at/below which the job can use more memory without108 * suffering from GC overhead.109 */110publicstaticfinalFloatConfOption LOW_MEMORY_PRESSURE =
111newFloatConfOption("giraph.threshold.lowPressure", 0.7f,
112"The memory pressure (fraction of used memory) at/below which the " +
113"job can use more memory without suffering the performance.");
114/** The interval at which memory observer thread wakes up. */115publicstaticfinalLongConfOption CHECK_MEMORY_INTERVAL =
116newLongConfOption("giraph.threshold.checkMemoryInterval", 2500,
117"The interval/period where memory observer thread wakes up and " +
118"monitors memory footprint (in milliseconds)");
119/**120 * Memory observer thread would manually call GC if major/full GC has not121 * been called for a while. The period where we expect GC to be happened in122 * past is specified in this parameter123 */124publicstaticfinalLongConfOption LAST_GC_CALL_INTERVAL =
125newLongConfOption("giraph.threshold.lastGcCallInterval", 10 * 1000,
126"How long after last major/full GC should we call manual GC?");
127128/** Class logger */129privatestaticfinal Logger LOG =
130 Logger.getLogger(ThresholdBasedOracle.class);
131/** Cached value for FAIL_MEMORY_PRESSURE */132privatefinalfloat failMemoryPressure;
133/** Cached value for EMERGENCY_MEMORY_PRESSURE */134privatefinalfloat emergencyMemoryPressure;
135/** Cached value for HIGH_MEMORY_PRESSURE */136privatefinalfloat highMemoryPressure;
137/** Cached value for OPTIMAL_MEMORY_PRESSURE */138privatefinalfloat optimalMemoryPressure;
139/** Cached value for LOW_MEMORY_PRESSURE */140privatefinalfloat lowMemoryPressure;
141/** Cached value for CHECK_MEMORY_INTERVAL */142privatefinallong checkMemoryInterval;
143/** Cached value for LAST_GC_CALL_INTERVAL */144privatefinallong lastGCCallInterval;
145/** Out-of-core engine */146privatefinalOutOfCoreEngine oocEngine;
147/** Last time a major/full GC has been called (in milliseconds) */148privatevolatilelong lastMajorGCTime;
149/** Last time a non major/full GC has been called (in milliseconds) */150privatevolatilelong lastMinorGCTime;
151152/**153 * Constructor154 *155 * @param conf configuration156 * @param oocEngine out-of-core engine157 */158publicThresholdBasedOracle(ImmutableClassesGiraphConfiguration conf,
159OutOfCoreEngine oocEngine) {
160this.failMemoryPressure = FAIL_MEMORY_PRESSURE.get(conf);
161this.emergencyMemoryPressure = EMERGENCY_MEMORY_PRESSURE.get(conf);
162this.highMemoryPressure = HIGH_MEMORY_PRESSURE.get(conf);
163this.optimalMemoryPressure = OPTIMAL_MEMORY_PRESSURE.get(conf);
164this.lowMemoryPressure = LOW_MEMORY_PRESSURE.get(conf);
165this.checkMemoryInterval = CHECK_MEMORY_INTERVAL.get(conf);
166this.lastGCCallInterval = LAST_GC_CALL_INTERVAL.get(conf);
167 NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.setIfUnset(conf, true);
168boolean useCredit = NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.get(conf);
169 checkState(useCredit, "ThresholdBasedOracle: credit-based flow control " +
170"must be enabled. Use giraph.waitForPerWorkerRequests=true");
171this.oocEngine = oocEngine;
172this.lastMajorGCTime = 0;
173174 ThreadUtils.startThread(new Runnable() {
175 @Override
176publicvoid run() {
177while (true) {
178double usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction();
179long time = System.currentTimeMillis();
180if ((usedMemoryFraction > highMemoryPressure &&
181 time - lastMajorGCTime >= lastGCCallInterval) ||
182 (usedMemoryFraction > optimalMemoryPressure &&
183 time - lastMajorGCTime >= lastGCCallInterval &&
184 time - lastMinorGCTime >= lastGCCallInterval)) {
185if (LOG.isInfoEnabled()) {
186 LOG.info("call: last GC happened a while ago and the " +
187"amount of used memory is high (used memory " +
188"fraction is " +
189 String.format("%.2f", usedMemoryFraction) + "). " +
190"Calling GC manually");
191 }
192 System.gc();
193 time = System.currentTimeMillis() - time;
194 usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction();
195if (LOG.isInfoEnabled()) {
196 LOG.info("call: manual GC is done. It took " +
197 String.format("%.2f", (double) time / 1000) +
198" seconds. Used memory fraction is " +
199 String.format("%.2f", usedMemoryFraction));
200 }
201 }
202 updateRates(usedMemoryFraction);
203try {
204 Thread.sleep(checkMemoryInterval);
205 } catch (InterruptedException e) {
206 LOG.warn("run: exception occurred!", e);
207return;
208 }
209 }
210 }
211 }, "memory-checker", oocEngine.getServiceWorker().getGraphTaskManager().
212 createUncaughtExceptionHandler());
213 }
214215/**216 * Update statistics and rate regarding communication credits and number of217 * active threads.218 *219 * @param usedMemoryFraction the fraction of used memory over max memory220 */221publicvoid updateRates(double usedMemoryFraction) {
222// Update the fraction of processing threads that should remain active223if (usedMemoryFraction >= failMemoryPressure) {
224 oocEngine.updateActiveThreadsFraction(0);
225 } elseif (usedMemoryFraction < emergencyMemoryPressure) {
226 oocEngine.updateActiveThreadsFraction(1);
227 } else {
228 oocEngine.updateActiveThreadsFraction(1 -
229 (usedMemoryFraction - emergencyMemoryPressure) /
230 (failMemoryPressure - emergencyMemoryPressure));
231 }
232233// Update the fraction of credit that should be used in credit-based flow-234// control235if (usedMemoryFraction >= emergencyMemoryPressure) {
236 oocEngine.updateRequestsCreditFraction(0);
237 } elseif (usedMemoryFraction < optimalMemoryPressure) {
238 oocEngine.updateRequestsCreditFraction(1);
239 } else {
240 oocEngine.updateRequestsCreditFraction(1 -
241 (usedMemoryFraction - optimalMemoryPressure) /
242 (emergencyMemoryPressure - optimalMemoryPressure));
243 }
244 }
245246 @Override
247publicIOAction[] getNextIOActions() {
248double usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction();
249if (LOG.isDebugEnabled()) {
250 LOG.debug(String.format("getNextIOActions: usedMemoryFraction = %.2f",
251 usedMemoryFraction));
252 }
253if (usedMemoryFraction > highMemoryPressure) {
254returnnewIOAction[]{
255 IOAction.STORE_MESSAGES_AND_BUFFERS,
256 IOAction.STORE_PARTITION};
257 } elseif (usedMemoryFraction > optimalMemoryPressure) {
258returnnewIOAction[]{
259 IOAction.LOAD_UNPROCESSED_PARTITION,
260 IOAction.STORE_MESSAGES_AND_BUFFERS,
261 IOAction.STORE_PROCESSED_PARTITION};
262 } elseif (usedMemoryFraction > lowMemoryPressure) {
263returnnewIOAction[]{
264 IOAction.LOAD_UNPROCESSED_PARTITION,
265 IOAction.STORE_MESSAGES_AND_BUFFERS,
266 IOAction.LOAD_PARTITION};
267 } else {
268returnnewIOAction[]{IOAction.LOAD_PARTITION};
269 }
270 }
271272 @Override
273publicboolean approve(IOCommand command) {
274returntrue;
275 }
276277 @Override
278publicvoid commandCompleted(IOCommand command) {
279// Do nothing280 }
281282 @Override
283publicvoid gcCompleted(GarbageCollectionNotificationInfo gcInfo) {
284 String gcAction = gcInfo.getGcAction().toLowerCase();
285if (gcAction.contains("full") || gcAction.contains("major")) {
286if (!gcInfo.getGcCause().contains("No GC")) {
287 lastMajorGCTime = System.currentTimeMillis();
288 }
289 } else {
290 lastMinorGCTime = System.currentTimeMillis();
291 }
292 }
293294 @Override
295publicvoid startIteration() {
296 }
297 }