1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.ooc.policy;
2021import com.google.common.collect.Maps;
22import com.sun.management.GarbageCollectionNotificationInfo;
23import org.apache.giraph.conf.FloatConfOption;
24import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
25import org.apache.giraph.ooc.OutOfCoreEngine;
26import org.apache.giraph.ooc.OutOfCoreIOStatistics;
27import org.apache.giraph.ooc.command.IOCommand;
28import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
29import org.apache.giraph.ooc.command.WaitIOCommand;
30import org.apache.log4j.Logger;
3132import java.lang.management.MemoryUsage;
33import java.util.Map;
34import java.util.concurrent.atomic.AtomicInteger;
35import java.util.concurrent.atomic.AtomicLong;
3637/**38 * Out-of-core oracle to adaptively control data kept in memory, with the goal39 * of keeping the memory state constantly at a desired state. This oracle40 * monitors GC behavior to keep track of memory pressure.41 *42 * After each GC is done, this oracle retrieve statistics about the memory43 * pressure (memory used, max memory, and how far away memory is compared to a44 * max optimal pressure). Based on the the past 2 recent memory statistics,45 * the oracle predicts the status of the memory, and sets the rate of load/store46 * of data from/to disk. If the rate of loading data from disk is 'l', and the47 * rate of storing data to disk is 's', the rate of data injection to memory48 * from disk can be denoted as 'l-s'. This oracle determines what 'l-s' should49 * be based on the prediction of memory status.50 *51 * Assume that based on the previous GC call the memory usage at time t_0 is52 * m_0, and based on the most recent GC call the memory usage at time t_1 is53 * m_1. So, the rate of memory increase is alpha = (m_1 - m_0) / (t_1 - t_0).54 * Assume that the ideal memory pressure happens when the memory usage is55 * m_ideal. So, at time 't_2 = t_1 + (t_1 - t_0)', we want m_ideal. That means56 * the ideal rate would be beta = (m_ideal - m_1) / (t_2 - t_1). If the date57 * injection rate to memory so far was i, the new injection rate should be:58 * i_new = i - (alpha - beta)59 */60publicclassSimpleGCMonitoringOracleimplementsOutOfCoreOracle {
61/**62 * The optimal memory pressure at which GC behavior is close to ideal. This63 * fraction may be dependant on the GC strategy used for running a job, but64 * generally should not be dependent on the graph processing application.65 */66publicstaticfinalFloatConfOption OPTIMAL_MEMORY_PRESSURE =
67newFloatConfOption("giraph.optimalMemoryPressure", 0.8f,
68"The memory pressure (fraction of used memory) at which the job " +
69"shows the optimal GC behavior. This fraction may be dependent " +
70"on the GC strategy used in running the job.");
7172/** Class logger */73privatestaticfinal Logger LOG =
74 Logger.getLogger(SimpleGCMonitoringOracle.class);
75/** Cached value for OPTIMAL_MEMORY_PRESSURE */76privatefinalfloat optimalMemoryPressure;
77/** Out-of-core engine */78privatefinalOutOfCoreEngine oocEngine;
79/** Status of memory from the last GC call */80privateGCObservation lastGCObservation;
81/** Desired rate of data injection to memory */82privatefinal AtomicLong desiredDiskToMemoryDataRate =
83new AtomicLong(0);
84/** Number of on the fly (outstanding) IO commands for each command type */85privatefinal Map<IOCommand.IOCommandType, AtomicInteger> commandOccurrences =
86 Maps.newConcurrentMap();
8788/**89 * Constructor90 *91 * @param conf configuration92 * @param oocEngine out-of-core engine93 */94publicSimpleGCMonitoringOracle(ImmutableClassesGiraphConfiguration conf,
95OutOfCoreEngine oocEngine) {
96this.optimalMemoryPressure = OPTIMAL_MEMORY_PRESSURE.get(conf);
97this.oocEngine = oocEngine;
98this.lastGCObservation = newGCObservation(-1, 0, 0);
99for (IOCommand.IOCommandType type : IOCommand.IOCommandType.values()) {
100 commandOccurrences.put(type, new AtomicInteger(0));
101 }
102 }
103104 @Override
105publicsynchronizedvoid gcCompleted(GarbageCollectionNotificationInfo
106 gcInfo) {
107long time = System.currentTimeMillis();
108 Map<String, MemoryUsage> memAfter = gcInfo.getGcInfo()
109 .getMemoryUsageAfterGc();
110long usedMemory = 0;
111long maxMemory = 0;
112for (MemoryUsage memDetail : memAfter.values()) {
113 usedMemory += memDetail.getUsed();
114 maxMemory += memDetail.getMax();
115 }
116GCObservation observation = newGCObservation(time, usedMemory, maxMemory);
117if (LOG.isInfoEnabled()) {
118 LOG.info("gcCompleted: GC completed with: " + observation);
119 }
120// Whether this is not the first GC call in the application121if (lastGCObservation.isValid()) {
122long deltaDataRate =
123 lastGCObservation.getDesiredDeltaDataRate(observation);
124long diskBandwidthEstimate =
125 oocEngine.getIOStatistics().getDiskBandwidth();
126// Update the desired data injection rate to memory. The data injection127// rate cannot be less than -disk_bandwidth (the extreme case happens if128// we only do 'store'), and cannot be more than disk_bandwidth (the129// extreme case happens if we only do 'load').130long dataInjectionRate = desiredDiskToMemoryDataRate.get();
131 desiredDiskToMemoryDataRate.set(Math.max(
132 Math.min(desiredDiskToMemoryDataRate.get() - deltaDataRate,
133 diskBandwidthEstimate), -diskBandwidthEstimate));
134if (LOG.isInfoEnabled()) {
135 LOG.info("gcCompleted: changing data injection rate from " +
136 String.format("%.2f", dataInjectionRate / 1024.0 / 1024.0) +
137" to " + String.format("%.2f", desiredDiskToMemoryDataRate.get() /
138 1024.0 / 1024.0));
139 }
140 }
141 lastGCObservation = observation;
142 }
143144 @Override
145publicvoid startIteration() {
146 }
147148/**149 * Get the current data injection rate to memory based on the commands ran150 * in the history (retrieved from statistics collector), and outstanding151 * commands issued by the IO scheduler.152 *153 * @return the current data injection rate to memory154 */155privatelong getCurrentDataInjectionRate() {
156long effectiveBytesTransferred = 0;
157long effectiveDuration = 0;
158for (IOCommand.IOCommandType type : IOCommand.IOCommandType.values()) {
159 OutOfCoreIOStatistics.BytesDuration stats =
160 oocEngine.getIOStatistics().getCommandTypeStats(type);
161int occurrence = commandOccurrences.get(type).get();
162long typeBytesTransferred = stats.getBytes();
163long typeDuration = stats.getDuration();
164// If there is an outstanding command, we still do not know how many bytes165// it will transfer, and how long it will take. So, we guesstimate these166// numbers based on other similar commands happened in the history. We167// simply take the average number of bytes transferred for the particular168// command, and we take average duration for the particular command. We169// should multiply these numbers by the number of outstanding commands of170// this particular command type.171if (stats.getOccurrence() != 0) {
172 typeBytesTransferred += stats.getBytes() / stats.getOccurrence() *
173 occurrence;
174 typeDuration += stats.getDuration() / stats.getOccurrence() *
175 occurrence;
176 }
177if (type == IOCommand.IOCommandType.LOAD_PARTITION) {
178 effectiveBytesTransferred += typeBytesTransferred;
179 } else {
180// Store (data going out of memory), or wait (no data transferred)181 effectiveBytesTransferred -= typeBytesTransferred;
182 }
183 effectiveDuration += typeDuration;
184 }
185if (effectiveDuration == 0) {
186return 0;
187 } else {
188return effectiveBytesTransferred / effectiveDuration;
189 }
190 }
191192 @Override
193publicIOAction[] getNextIOActions() {
194long error = (long) (oocEngine.getIOStatistics().getDiskBandwidth() * 0.05);
195long desiredRate = desiredDiskToMemoryDataRate.get();
196long currentRate = getCurrentDataInjectionRate();
197if (desiredRate > error) {
198// 'l-s' is positive, we should do more load than store.199if (currentRate > desiredRate + error) {
200// We should decrease 'l-s'. This can be done either by increasing 's'201// or issuing wait command. We prioritize wait over hard store.202returnnewIOAction[]{
203 IOAction.STORE_MESSAGES_AND_BUFFERS,
204 IOAction.STORE_PROCESSED_PARTITION};
205 } elseif (currentRate < desiredRate - error) {
206// We should increase 'l-s'. We can simply load partitions/data.207returnnewIOAction[]{IOAction.LOAD_PARTITION};
208 } else {
209// We are in a proper state and we should keep up with the rate. We can210// either soft store data or load data (hard load, since we desired rate211// is positive).212returnnewIOAction[]{
213 IOAction.STORE_MESSAGES_AND_BUFFERS,
214 IOAction.STORE_PROCESSED_PARTITION,
215 IOAction.LOAD_PARTITION};
216 }
217 } elseif (desiredRate < -error) {
218// 'l-s' is negative, we should do more store than load.219if (currentRate < desiredRate - error) {
220// We should increase 'l-s', but we should be cautious. We only do soft221// load, or wait.222returnnewIOAction[]{IOAction.LOAD_UNPROCESSED_PARTITION};
223 } elseif (currentRate > desiredRate + error) {
224// We should reduce 'l-s', we do hard store.225returnnewIOAction[]{
226 IOAction.STORE_MESSAGES_AND_BUFFERS,
227 IOAction.STORE_PARTITION};
228 } else {
229// We should keep up with the rate. We can either soft store data, or230// soft load data.231returnnewIOAction[]{
232 IOAction.STORE_MESSAGES_AND_BUFFERS,
233 IOAction.STORE_PROCESSED_PARTITION,
234 IOAction.LOAD_UNPROCESSED_PARTITION};
235 }
236 } else {
237// 'l-s' is almost zero. If current rate is over the desired rate, we do238// soft store. If the current rate is below the desired rate, we do soft239// load.240if (currentRate > desiredRate + error) {
241returnnewIOAction[]{
242 IOAction.STORE_MESSAGES_AND_BUFFERS,
243 IOAction.STORE_PROCESSED_PARTITION};
244 } elseif (currentRate < desiredRate - error) {
245returnnewIOAction[]{IOAction.LOAD_UNPROCESSED_PARTITION};
246 } else {
247returnnewIOAction[]{
248 IOAction.STORE_MESSAGES_AND_BUFFERS,
249 IOAction.STORE_PROCESSED_PARTITION,
250 IOAction.LOAD_UNPROCESSED_PARTITION};
251 }
252 }
253 }
254255 @Override
256publicsynchronizedboolean approve(IOCommand command) {
257long error = (long) (oocEngine.getIOStatistics().getDiskBandwidth() * 0.05);
258long desiredRate = desiredDiskToMemoryDataRate.get();
259long currentRate = getCurrentDataInjectionRate();
260// The command is denied iff the current rate is above the desired rate and261// we are doing load (instead of store), or the current rate is below the262// desired rate and we are doing store (instead of loading).263if (currentRate > desiredRate + error &&
264 command instanceof LoadPartitionIOCommand) {
265return false;
266 }
267if (currentRate < desiredRate - error &&
268 !(command instanceof LoadPartitionIOCommand) &&
269 !(command instanceof WaitIOCommand)) {
270return false;
271 }
272 commandOccurrences.get(command.getType()).getAndIncrement();
273returntrue;
274 }
275276 @Override
277publicvoid commandCompleted(IOCommand command) {
278 commandOccurrences.get(command.getType()).getAndDecrement();
279 }
280281/** Helper class to record memory status after GC calls */282privateclassGCObservation {
283/** The time at which the GC happened (in milliseconds) */284privatelong time;
285/** Amount of memory used after the GC call */286privatelong usedMemory;
287/** Maximum amounts of memory reported by GC listener */288privatelong maxMemory;
289290/**291 * Constructor292 *293 * @param time time of GC294 * @param usedMemory amount of used memory after GC295 * @param maxMemory amount of all available memory based on GC observation296 */297publicGCObservation(long time, long usedMemory, long maxMemory) {
298this.time = time;
299this.usedMemory = usedMemory;
300this.maxMemory = maxMemory;
301 }
302303/**304 * Is this a valid observation?305 *306 * @return true iff it is a valid observation307 */308publicboolean isValid() {
309return time > 0;
310 }
311312/**313 * Considering a new observation of memory status after the most recent GC,314 * what is the desired rate for data injection to memory.315 *316 * @param newObservation the most recent GC observation317 * @return desired rate of data injection to memory318 */319publiclong getDesiredDeltaDataRate(GCObservation newObservation) {
320long newUsedMemory = newObservation.usedMemory;
321long newMaxMemory = newObservation.maxMemory;
322long lastUsedMemory = usedMemory;
323long lastMaxMemory = maxMemory;
324// Scale the memory status of two GC observation to be the same325long scaledMaxMemory = Math.min(lastMaxMemory, newMaxMemory);
326 newUsedMemory =
327 (long) (((double) scaledMaxMemory / newMaxMemory) * newUsedMemory);
328 lastUsedMemory =
329 (long) (((double) scaledMaxMemory / lastMaxMemory) * lastUsedMemory);
330long desiredUsedMemory = (long) (optimalMemoryPressure * scaledMaxMemory);
331if (LOG.isInfoEnabled()) {
332 LOG.info("getDesiredDeltaDataRate: " + String.format("previous usage " +
333"= %.2f MB, ", lastUsedMemory / 1024.0 / 1024.0) + String.format(
334"current usage = %.2f MB, ", newUsedMemory / 1024.0 / 1024.0) +
335 String.format("ideal usage = %.2f MB", desiredUsedMemory / 1024.0 /
336 1024.0));
337 }
338long interval = newObservation.time - time;
339if (interval == 0) {
340 interval = 1;
341 LOG.warn("getDesiredDeltaRate: two GC happened almost at the same " +
342"time!");
343 }
344long currentDataRate = (long) ((double) (newUsedMemory -
345 lastUsedMemory) / interval * 1000);
346long desiredDataRate = (long) ((double) (desiredUsedMemory -
347 newUsedMemory) / interval * 1000);
348return currentDataRate - desiredDataRate;
349 }
350351 @Override
352public String toString() {
353return String.format("(usedMemory: %.2f MB, maxMemory: %.2f MB at " +
354"time: %d ms)", usedMemory / 1024.0 / 1024.0,
355 maxMemory / 1024.0 / 1024.0, time);
356 }
357 }
358 }