1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.ooc.policy;
1920import com.sun.management.GarbageCollectionNotificationInfo;
21import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
22import org.apache.commons.math.stat.regression.OLSMultipleLinearRegression;
23import org.apache.giraph.comm.NetworkMetrics;
24import org.apache.giraph.conf.FloatConfOption;
25import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
26import org.apache.giraph.conf.LongConfOption;
27import org.apache.giraph.edge.AbstractEdgeStore;
28import org.apache.giraph.ooc.OutOfCoreEngine;
29import org.apache.giraph.ooc.command.IOCommand;
30import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
31import org.apache.giraph.ooc.command.WaitIOCommand;
32import org.apache.giraph.utils.ThreadUtils;
33import org.apache.giraph.worker.EdgeInputSplitsCallable;
34import org.apache.giraph.worker.VertexInputSplitsCallable;
35import org.apache.giraph.worker.WorkerProgress;
36import org.apache.log4j.Logger;
3738import javax.annotation.Nullable;
39import java.lang.management.ManagementFactory;
40import java.lang.management.MemoryPoolMXBean;
41import java.lang.management.MemoryUsage;
42import java.util.ArrayList;
43import java.util.Arrays;
44import java.util.List;
45import java.util.Map;
46import java.util.concurrent.atomic.AtomicLong;
47import java.util.concurrent.locks.Lock;
48import java.util.concurrent.locks.ReentrantLock;
4950importstatic com.google.common.base.Preconditions.checkState;
5152/**53 * Implementation of {@link OutOfCoreOracle} that uses a linear regression model54 * to estimate actual memory usage based on the current state of computation.55 * The model takes into consideration 5 parameters:56 *57 * y = c0 + c1*x1 + c2*x2 + c3*x3 + c4*x4 + c5*x558 *59 * y: memory usage60 * x1: edges loaded61 * x2: vertices loaded62 * x3: vertices processed63 * x4: bytes received due to messages64 * x5: bytes loaded/stored from/to disk due to OOC.65 *66 */67publicclassMemoryEstimatorOracleimplementsOutOfCoreOracle {
68/** Memory check interval in msec */69publicstaticfinalLongConfOption CHECK_MEMORY_INTERVAL =
70newLongConfOption("giraph.garbageEstimator.checkMemoryInterval", 1000,
71"The interval where memory checker thread wakes up and " +
72"monitors memory footprint (in milliseconds)");
73/**74 * If mem-usage is above this threshold and no Full GC has been called,75 * we call it manually76 */77publicstaticfinalFloatConfOption MANUAL_GC_MEMORY_PRESSURE =
78newFloatConfOption("giraph.garbageEstimator.manualGCPressure", 0.95f,
79"The threshold above which GC is called manually if Full GC has not " +
80"happened in a while");
81/** Used to detect a high memory pressure situation */82publicstaticfinalFloatConfOption GC_MINIMUM_RECLAIM_FRACTION =
83newFloatConfOption("giraph.garbageEstimator.gcReclaimFraction", 0.05f,
84"Minimum percentage of memory we expect to be reclaimed after a Full " +
85"GC. If less than this amount is reclaimed, it is sage to say " +
86"we are in a high memory situation and the estimation mechanism " +
87"has not recognized it yet!");
88/** If mem-usage is above this threshold, active threads are set to 0 */89publicstaticfinalFloatConfOption AM_HIGH_THRESHOLD =
90newFloatConfOption("giraph.amHighThreshold", 0.95f,
91"If mem-usage is above this threshold, all active threads " +
92"(compute/input) are paused.");
93/** If mem-usage is below this threshold, active threads are set to max */94publicstaticfinalFloatConfOption AM_LOW_THRESHOLD =
95newFloatConfOption("giraph.amLowThreshold", 0.90f,
96"If mem-usage is below this threshold, all active threads " +
97"(compute/input) are running.");
98/** If mem-usage is above this threshold, credit is set to 0 */99publicstaticfinalFloatConfOption CREDIT_HIGH_THRESHOLD =
100newFloatConfOption("giraph.creditHighThreshold", 0.95f,
101"If mem-usage is above this threshold, credit is set to 0");
102/** If mem-usage is below this threshold, credit is set to max */103publicstaticfinalFloatConfOption CREDIT_LOW_THRESHOLD =
104newFloatConfOption("giraph.creditLowThreshold", 0.90f,
105"If mem-usage is below this threshold, credit is set to max");
106/** OOC starts if mem-usage is above this threshold */107publicstaticfinalFloatConfOption OOC_THRESHOLD =
108newFloatConfOption("giraph.oocThreshold", 0.90f,
109"If mem-usage is above this threshold, out of core threads starts " +
110"writing data to disk");
111112/** Logger */113privatestaticfinal Logger LOG =
114 Logger.getLogger(MemoryEstimatorOracle.class);
115116/** Cached value for {@link #MANUAL_GC_MEMORY_PRESSURE} */117privatefinalfloat manualGCMemoryPressure;
118/** Cached value for {@link #GC_MINIMUM_RECLAIM_FRACTION} */119privatefinalfloat gcReclaimFraction;
120/** Cached value for {@link #AM_HIGH_THRESHOLD} */121privatefinalfloat amHighThreshold;
122/** Cached value for {@link #AM_LOW_THRESHOLD} */123privatefinalfloat amLowThreshold;
124/** Cached value for {@link #CREDIT_HIGH_THRESHOLD} */125privatefinalfloat creditHighThreshold;
126/** Cached value for {@link #CREDIT_LOW_THRESHOLD} */127privatefinalfloat creditLowThreshold;
128/** Cached value for {@link #OOC_THRESHOLD} */129privatefinalfloat oocThreshold;
130131/** Reference to running OOC engine */132privatefinalOutOfCoreEngine oocEngine;
133/** Memory estimator instance */134privatefinalMemoryEstimator memoryEstimator;
135/** Keeps track of the number of bytes stored/loaded by OOC */136privatefinal AtomicLong oocBytesInjected = new AtomicLong(0);
137/** How many bytes to offload */138privatefinal AtomicLong numBytesToOffload = new AtomicLong(0);
139/** Current state of the OOC */140privatevolatileState state = State.STABLE;
141/** Timestamp of the last major GC */142privatevolatilelong lastMajorGCTime = 0;
143144/**145 * Different states the OOC can be in.146 */147private enum State {
148/** No offloading */149 STABLE,
150/** Current offloading */151 OFFLOADING,
152 }
153154/**155 * Constructor.156 * @param conf Configuration157 * @param oocEngine OOC engine.:w158 *159 */160publicMemoryEstimatorOracle(ImmutableClassesGiraphConfiguration conf,
161finalOutOfCoreEngine oocEngine) {
162this.oocEngine = oocEngine;
163this.memoryEstimator = newMemoryEstimator(this.oocBytesInjected,
164 oocEngine.getNetworkMetrics());
165166this.manualGCMemoryPressure = MANUAL_GC_MEMORY_PRESSURE.get(conf);
167this.gcReclaimFraction = GC_MINIMUM_RECLAIM_FRACTION.get(conf);
168this.amHighThreshold = AM_HIGH_THRESHOLD.get(conf);
169this.amLowThreshold = AM_LOW_THRESHOLD.get(conf);
170this.creditHighThreshold = CREDIT_HIGH_THRESHOLD.get(conf);
171this.creditLowThreshold = CREDIT_LOW_THRESHOLD.get(conf);
172this.oocThreshold = OOC_THRESHOLD.get(conf);
173174finallong checkMemoryInterval = CHECK_MEMORY_INTERVAL.get(conf);
175176 ThreadUtils.startThread(new Runnable() {
177 @Override
178publicvoid run() {
179while (true) {
180long oldGenUsageEstimate = memoryEstimator.getUsageEstimate();
181 MemoryUsage usage = getOldGenUsed();
182if (oldGenUsageEstimate > 0) {
183 updateRates(oldGenUsageEstimate, usage.getMax());
184 } else {
185long time = System.currentTimeMillis();
186if (time - lastMajorGCTime >= 10000) {
187double used = (double) usage.getUsed() / usage.getMax();
188if (used > manualGCMemoryPressure) {
189if (LOG.isInfoEnabled()) {
190 LOG.info(
191"High memory pressure with no full GC from the JVM. " +
192"Calling GC manually. Used fraction of old-gen is " +
193 String.format("%.2f", used) + ".");
194 }
195 System.gc();
196 time = System.currentTimeMillis() - time;
197 usage = getOldGenUsed();
198 used = (double) usage.getUsed() / usage.getMax();
199if (LOG.isInfoEnabled()) {
200 LOG.info("Manual GC done. It took " +
201 String.format("%.2f", time / 1000.0) +
202" seconds. Used fraction of old-gen is " +
203 String.format("%.2f", used) + ".");
204 }
205 }
206 }
207 }
208try {
209 Thread.sleep(checkMemoryInterval);
210 } catch (InterruptedException e) {
211 LOG.warn("run: exception occurred!", e);
212return;
213 }
214 }
215 }
216 }, "ooc-memory-checker", oocEngine.getServiceWorker().getGraphTaskManager()
217 .createUncaughtExceptionHandler());
218 }
219220/**221 * Resets all the counters used in the memory estimation. This is called at222 * the beginning of a new superstep.223 * <p>224 * The number of vertices to compute in the next superstep gets reset in225 * {@link org.apache.giraph.graph.GraphTaskManager#processGraphPartitions}226 * right before227 * {@link org.apache.giraph.partition.PartitionStore#startIteration()} gets228 * called.229 */230 @Override
231publicvoid startIteration() {
232 AbstractEdgeStore.PROGRESS_COUNTER.reset();
233 oocBytesInjected.set(0);
234 memoryEstimator.clear();
235 memoryEstimator.setCurrentSuperstep(oocEngine.getSuperstep());
236 oocEngine.updateRequestsCreditFraction(1);
237 oocEngine.updateActiveThreadsFraction(1);
238 }
239240241 @Override
242publicIOAction[] getNextIOActions() {
243if (state == State.OFFLOADING) {
244returnnewIOAction[]{
245 IOAction.STORE_MESSAGES_AND_BUFFERS, IOAction.STORE_PARTITION};
246 }
247long oldGenUsage = memoryEstimator.getUsageEstimate();
248 MemoryUsage usage = getOldGenUsed();
249if (oldGenUsage > 0) {
250double usageEstimate = (double) oldGenUsage / usage.getMax();
251if (usageEstimate > oocThreshold) {
252returnnewIOAction[]{
253 IOAction.STORE_MESSAGES_AND_BUFFERS,
254 IOAction.STORE_PARTITION};
255 } else {
256returnnewIOAction[]{IOAction.LOAD_PARTITION};
257 }
258 } else {
259returnnewIOAction[]{IOAction.LOAD_PARTITION};
260 }
261 }
262263 @Override
264publicboolean approve(IOCommand command) {
265returntrue;
266 }
267268 @Override
269publicvoid commandCompleted(IOCommand command) {
270if (command instanceof LoadPartitionIOCommand) {
271 oocBytesInjected.getAndAdd(command.bytesTransferred());
272if (state == State.OFFLOADING) {
273 numBytesToOffload.getAndAdd(command.bytesTransferred());
274 }
275 } elseif (!(command instanceof WaitIOCommand)) {
276 oocBytesInjected.getAndAdd(0 - command.bytesTransferred());
277if (state == State.OFFLOADING) {
278 numBytesToOffload.getAndAdd(0 - command.bytesTransferred());
279 }
280 }
281282if (state == State.OFFLOADING && numBytesToOffload.get() <= 0) {
283 numBytesToOffload.set(0);
284 state = State.STABLE;
285 updateRates(-1, 1);
286 }
287 }
288289/**290 * When a new GC has completed, we can get an accurate measurement of the291 * memory usage. We use this to update the linear regression model.292 *293 * @param gcInfo GC information294 */295 @Override
296publicsynchronizedvoid gcCompleted(
297 GarbageCollectionNotificationInfo gcInfo) {
298 String action = gcInfo.getGcAction().toLowerCase();
299 String cause = gcInfo.getGcCause().toLowerCase();
300if (action.contains("major") &&
301 (cause.contains("ergo") || cause.contains("system"))) {
302 lastMajorGCTime = System.currentTimeMillis();
303 MemoryUsage before = null;
304 MemoryUsage after = null;
305306for (Map.Entry<String, MemoryUsage> entry :
307 gcInfo.getGcInfo().getMemoryUsageBeforeGc().entrySet()) {
308 String poolName = entry.getKey();
309if (poolName.toLowerCase().contains("old")) {
310 before = entry.getValue();
311 after = gcInfo.getGcInfo().getMemoryUsageAfterGc().get(poolName);
312break;
313 }
314 }
315if (after == null) {
316thrownew IllegalStateException("Missing Memory Usage After GC info");
317 }
318if (before == null) {
319thrownew IllegalStateException("Missing Memory Usage Before GC info");
320 }
321322// Compare the estimation with the actual value323long usedMemoryEstimate = memoryEstimator.getUsageEstimate();
324long usedMemoryReal = after.getUsed();
325if (usedMemoryEstimate >= 0) {
326if (LOG.isInfoEnabled()) {
327 LOG.info("gcCompleted: estimate=" + usedMemoryEstimate + " real=" +
328 usedMemoryReal + " error=" +
329 ((double) Math.abs(usedMemoryEstimate - usedMemoryReal) /
330 usedMemoryReal * 100));
331 }
332 }
333334// Number of edges loaded so far (if in input superstep)335long edgesLoaded = oocEngine.getSuperstep() >= 0 ? 0 :
336 EdgeInputSplitsCallable.getTotalEdgesLoadedMeter().count();
337// Number of vertices loaded so far (if in input superstep)338long verticesLoaded = oocEngine.getSuperstep() >= 0 ? 0 :
339 VertexInputSplitsCallable.getTotalVerticesLoadedMeter().count();
340// Number of vertices computed (if either in compute or store phase)341long verticesComputed = WorkerProgress.get().getVerticesComputed() +
342 WorkerProgress.get().getVerticesStored() +
343 AbstractEdgeStore.PROGRESS_COUNTER.getProgress();
344// Number of bytes received345long receivedBytes =
346 oocEngine.getNetworkMetrics().getBytesReceivedPerSuperstep();
347// Number of OOC bytes348long oocBytes = oocBytesInjected.get();
349350 memoryEstimator.addRecord(getOldGenUsed().getUsed(), edgesLoaded,
351 verticesLoaded, verticesComputed, receivedBytes, oocBytes);
352353long garbage = before.getUsed() - after.getUsed();
354long maxMem = after.getMax();
355long memUsed = after.getUsed();
356boolean isTight = (maxMem - memUsed) < 2 * gcReclaimFraction * maxMem &&
357 garbage < gcReclaimFraction * maxMem;
358boolean predictionExist = memoryEstimator.getUsageEstimate() > 0;
359if (isTight && !predictionExist) {
360if (LOG.isInfoEnabled()) {
361 LOG.info("gcCompleted: garbage=" + garbage + " memUsed=" +
362 memUsed + " maxMem=" + maxMem);
363 }
364 numBytesToOffload.set((long) (2 * gcReclaimFraction * maxMem) -
365 (maxMem - memUsed));
366if (LOG.isInfoEnabled()) {
367 LOG.info("gcCompleted: tight memory usage. Starting to offload " +
368"until " + numBytesToOffload.get() + " bytes are offloaded");
369 }
370 state = State.OFFLOADING;
371 updateRates(1, 1);
372 }
373 }
374 }
375376/**377 * Given an estimate for the current memory usage and the maximum available378 * memory, it updates the active threads and flow control credit in the379 * OOC engine.380 *381 * @param usageEstimateMem Estimate of memory usage.382 * @param maxMemory Maximum memory.383 */384privatevoid updateRates(long usageEstimateMem, long maxMemory) {
385double usageEstimate = (double) usageEstimateMem / maxMemory;
386if (usageEstimate > 0) {
387if (usageEstimate >= amHighThreshold) {
388 oocEngine.updateActiveThreadsFraction(0);
389 } elseif (usageEstimate < amLowThreshold) {
390 oocEngine.updateActiveThreadsFraction(1);
391 } else {
392 oocEngine.updateActiveThreadsFraction(1 -
393 (usageEstimate - amLowThreshold) /
394 (amHighThreshold - amLowThreshold));
395 }
396397if (usageEstimate >= creditHighThreshold) {
398 oocEngine.updateRequestsCreditFraction(0);
399 } elseif (usageEstimate < creditLowThreshold) {
400 oocEngine.updateRequestsCreditFraction(1);
401 } else {
402 oocEngine.updateRequestsCreditFraction(1 -
403 (usageEstimate - creditLowThreshold) /
404 (creditHighThreshold - creditLowThreshold));
405 }
406 } else {
407 oocEngine.updateActiveThreadsFraction(1);
408 oocEngine.updateRequestsCreditFraction(1);
409 }
410 }
411412/**413 * Returns statistics about the old gen pool.414 * @return {@link MemoryUsage}.415 */416private MemoryUsage getOldGenUsed() {
417 List<MemoryPoolMXBean> memoryPoolList =
418 ManagementFactory.getMemoryPoolMXBeans();
419for (MemoryPoolMXBean pool : memoryPoolList) {
420 String normalName = pool.getName().toLowerCase();
421if (normalName.contains("old") || normalName.contains("tenured")) {
422return pool.getUsage();
423 }
424 }
425thrownew IllegalStateException("Bad Memory Pool");
426 }
427428/**429 * Maintains statistics about the current state and progress of the430 * computation and produces estimates of memory usage using a technique431 * based on linear regression.432 *433 * Upon a GC events, it gets updated with the most recent statistics through434 * the {@link #addRecord} method.435 */436privatestaticclassMemoryEstimator {
437/** Stores the (x1,x2,...,x5) arrays of data samples, one for each sample */438private List<double[]> dataSamples = new ArrayList<>();
439/** Stores the y memory usage dataSamples, one for each sample */440private DoubleArrayList memorySamples = new DoubleArrayList();
441/** Stores the coefficients computed by the linear regression model */442privatedouble[] coefficient = newdouble[6];
443/** Stores the column indices that can be used in the regression model */444private List<Integer> validColumnIndices = new ArrayList<>();
445/** Potentially out-of-range coefficient values */446privatedouble[] extreme = newdouble[6];
447/** Indicates whether current coefficients can be used for estimation */448privateboolean isValid = false;
449/** Implementation of linear regression */450private OLSMultipleLinearRegression mlr = new OLSMultipleLinearRegression();
451/** Used to synchronize access to the data samples */452private Lock lock = new ReentrantLock();
453/** The estimation method depends on the current superstep. */454privatelong currentSuperstep = -1;
455/** The estimation method depends on the bytes injected. */456privatefinal AtomicLong oocBytesInjected;
457/** Provides network statistics */458privatefinalNetworkMetrics networkMetrics;
459460/**461 * Constructor462 * @param oocBytesInjected Reference to {@link AtomicLong} object463 * maintaining the number of OOC bytes stored.464 * @param networkMetrics Interface to get network stats.465 */466publicMemoryEstimator(AtomicLong oocBytesInjected,
467NetworkMetrics networkMetrics) {
468this.oocBytesInjected = oocBytesInjected;
469this.networkMetrics = networkMetrics;
470 }
471472473/**474 * Clear data structure (called from single threaded program).475 */476publicvoid clear() {
477 dataSamples.clear();
478 memorySamples.clear();
479 isValid = false;
480 }
481482publicvoid setCurrentSuperstep(long superstep) {
483this.currentSuperstep = superstep;
484 }
485486/**487 * Given the current state of computation (i.e. current edges loaded,488 * vertices computed etc) and the current model (i.e. the regression489 * coefficient), it returns a prediction about the memory usage in bytes.490 *491 * @return Memory estimate in bytes.492 */493publiclong getUsageEstimate() {
494long usage = -1;
495 lock.lock();
496try {
497if (isValid) {
498// Number of edges loaded so far (if in input superstep)499long edgesLoaded = currentSuperstep >= 0 ? 0 :
500 EdgeInputSplitsCallable.getTotalEdgesLoadedMeter().count();
501// Number of vertices loaded so far (if in input superstep)502long verticesLoaded = currentSuperstep >= 0 ? 0 :
503 VertexInputSplitsCallable.getTotalVerticesLoadedMeter().count();
504// Number of vertices computed (if either in compute or store phase)505long verticesComputed = WorkerProgress.get().getVerticesComputed() +
506 WorkerProgress.get().getVerticesStored() +
507 AbstractEdgeStore.PROGRESS_COUNTER.getProgress();
508// Number of bytes received509long receivedBytes = networkMetrics.getBytesReceivedPerSuperstep();
510// Number of OOC bytes511long oocBytes = this.oocBytesInjected.get();
512513 usage = (long) (edgesLoaded * coefficient[0] +
514 verticesLoaded * coefficient[1] +
515 verticesComputed * coefficient[2] +
516 receivedBytes * coefficient[3] +
517 oocBytes * coefficient[4] +
518 coefficient[5]);
519 }
520 } finally {
521 lock.unlock();
522 }
523return usage;
524 }
525526/**527 * Updates the linear regression model with a new data point.528 *529 * @param memUsed Current real value of memory usage.530 * @param edges Number of edges loaded.531 * @param vertices Number of vertices loaded.532 * @param verticesProcessed Number of vertices processed.533 * @param bytesReceived Number of bytes received.534 * @param oocBytesInjected Number of bytes stored/loaded due to OOC.535 */536publicvoid addRecord(long memUsed, long edges, long vertices,
537long verticesProcessed,
538long bytesReceived, long oocBytesInjected) {
539 checkState(memUsed > 0, "Memory Usage cannot be negative");
540if (dataSamples.size() > 0) {
541double[] last = dataSamples.get(dataSamples.size() - 1);
542if (edges == last[0] && vertices == last[1] &&
543 verticesProcessed == last[2] && bytesReceived == last[3] &&
544 oocBytesInjected == last[4]) {
545if (LOG.isDebugEnabled()) {
546 LOG.debug(
547"addRecord: avoiding to add the same entry as the last one!");
548 }
549return;
550 }
551 }
552 dataSamples.add(newdouble[] {edges, vertices, verticesProcessed,
553 bytesReceived, oocBytesInjected});
554 memorySamples.add((double) memUsed);
555556// Weed out the columns that are all zero557 validColumnIndices.clear();
558for (int i = 0; i < 5; ++i) {
559boolean validIndex = false;
560// Check if there is a non-zero entry in the column561for (double[] value : dataSamples) {
562if (value[i] != 0) {
563 validIndex = true;
564break;
565 }
566 }
567if (validIndex) {
568// check if all entries are not equal to each other569double firstValue = -1;
570boolean allEqual = true;
571for (double[] value : dataSamples) {
572if (firstValue == -1) {
573 firstValue = value[i];
574 } else {
575if (Math.abs((value[i] - firstValue) / firstValue) > 0.01) {
576 allEqual = false;
577break;
578 }
579 }
580 }
581 validIndex = !allEqual;
582if (validIndex) {
583// Check if the column has linear dependency with another column584for (int col = i + 1; col < 5; ++col) {
585if (isLinearDependence(dataSamples, i, col)) {
586 validIndex = false;
587break;
588 }
589 }
590 }
591 }
592593if (validIndex) {
594 validColumnIndices.add(i);
595 }
596 }
597598// If we filtered out columns in the previous step, we are going to run599// the regression without those columns.600601// Create the coefficient table602boolean setIsValid = false;
603 lock.lock();
604try {
605if (validColumnIndices.size() >= 1 &&
606 dataSamples.size() >= validColumnIndices.size() + 1) {
607608double[][] xValues = newdouble[dataSamples.size()][];
609 fillXMatrix(dataSamples, validColumnIndices, xValues);
610double[] yValues =
611 memorySamples.toDoubleArray(newdouble[memorySamples.size()]);
612 mlr.newSampleData(yValues, xValues);
613boolean isRegressionValid =
614 calculateRegression(coefficient, validColumnIndices, mlr);
615616if (!isRegressionValid) { // invalid regression result617return; // The finally-block at the end will release any locks.618 }
619620// After the computation of the regression, some coefficients may have621// values outside the valid value range. In this case, we set the622// coefficient to the minimum or maximum value allowed, and re-run the623// regression.624// We only care about coefficients of two of the variables:625// bytes received due to messages (receivedBytes -- index 3 in626// `coefficient` array) and bytes transferred due to OOC (oocBytes --627// index 4 in `coefficient` array).628//629// receivedByte's coefficient cannot be negative, meaning that it does630// not make sense that memory footprint decreases because of receipt631// of messages. We either have message combiner or we do not have632// combiner. If message combiner is used, the memory footprint633// will not change much due to messages leading to the coefficient for634// oocBytes to be close to 0. If message combiner is not used, the635// memory only increase with messages, and the coefficient should be636// positive. In this case, a message is usually deserialized and then637// written to the message store. We assume that the process of638// deserializing the message and putting it into the message store639// will not result in more than twice the size of the serialized form640// of the message (meaning that the memory footprint for message store641// will not be more than 2*receivedBytes). Based on this assumption642// the upper bound for coefficient of receivedBytes should be 2.643//644// "oocBytes" represents the size of the serialized form of data that645// has transferred to/from secondary storage. On the other hand, in646// our estimation mechanism, we are estimating the aggregate size of647// all live objects in memory, meaning that we are estimating the size648// of deserialized for of data in memory. Since we are not using any649// (de)compression for (de)serialization of data, we assume that650// size of serialized data <= size of deserialized data <=651// 2 * (size of serialized dat)652// This basically means that the coefficient for oocBytes should be653// between 1 and 2.654655boolean changed;
656 extreme[3] = -1;
657 extreme[4] = -1;
658do {
659 Boolean result = null;
660661 result = refineCoefficient(4, 1, 2, xValues, yValues);
662if (result == null) { // invalid regression result663return; // finally-block will release lock664 }
665 changed = result;
666667 result = refineCoefficient(3, 0, 2, xValues, yValues);
668if (result == null) { // invalid regression result669return; // finally-block will release lock670 }
671 changed |= result;
672 } while (changed);
673if (extreme[3] != -1) {
674 coefficient[3] = extreme[3];
675 }
676if (extreme[4] != -1) {
677 coefficient[4] = extreme[4];
678 }
679 setIsValid = true;
680return; // the finally-block will execute before return681 }
682 } finally {
683// This inner try-finally block is necessary to ensure that the684// lock is always released.685try {
686 isValid = setIsValid;
687 printStats();
688 } finally {
689 lock.unlock();
690 }
691 }
692 }
693694/**695 * Certain coefficients need to be within a specific range.696 * If the coefficient is not in this range, we set it to the closest bound697 * and re-run the linear regression. In this case, we keep the possible698 * extremum coefficient in an intermediate array ("extreme"). Also, if699 * we choose the extremum coefficient for an index, that index is removed700 * from the regression calculation as well as the rest of the refinement701 * process.702 *703 * Note that the regression calculation here is based on the method of704 * least square for minimizing the error. The sum of squares of errors for705 * all points is a convex function. This means if we solve the706 * non-constrained linear regression and then refine the coefficient to707 * apply their bounds, we will achieve a solution to our constrained708 * linear regression problem.709 *710 * This method is called in a loop to refine certain coefficients. The loop711 * should continue until all coefficients are refined and are within their712 * range.713 *714 * @param coefIndex Coefficient index715 * @param lowerBound Lower bound716 * @param upperBound Upper bound717 * @param xValues double[][] matrix with data samples718 * @param yValues double[] matrix with y samples719 * @return True if coefficients were out-of-range, false otherwise. A null720 * value means the regression result was invalid and the result of721 * this method is invalid too.722 */723 @Nullable
724private Boolean refineCoefficient(int coefIndex, double lowerBound,
725double upperBound, double[][] xValues, double[] yValues) {
726727boolean result = false;
728if (coefficient[coefIndex] < lowerBound ||
729 coefficient[coefIndex] > upperBound) {
730731double value;
732if (coefficient[coefIndex] < lowerBound) {
733 value = lowerBound;
734 } else {
735 value = upperBound;
736 }
737int ptr = -1;
738// Finding the 'coefIndex' in the valid indices. Since this method is739// used only for the variables with higher indices, we use a reverse740// loop to lookup the 'coefIndex' for faster termination of the loop.741for (int i = validColumnIndices.size() - 1; i >= 0; --i) {
742if (validColumnIndices.get(i) == coefIndex) {
743 ptr = i;
744break;
745 }
746 }
747if (ptr != -1) {
748if (LOG.isDebugEnabled()) {
749 LOG.debug("addRecord: coefficient at index " + coefIndex +
750" is wrong in the regression, setting it to " + value);
751 }
752// remove from valid column753 validColumnIndices.remove(ptr);
754// re-create the X matrix755 fillXMatrix(dataSamples, validColumnIndices, xValues);
756// adjust Y values757for (int i = 0; i < memorySamples.size(); ++i) {
758 yValues[i] -= value * dataSamples.get(i)[coefIndex];
759 }
760// save new coefficient value in intermediate array761 extreme[coefIndex] = value;
762// re-run regression763 mlr.newSampleData(yValues, xValues);
764 result = calculateRegression(coefficient, validColumnIndices, mlr);
765if (!result) { // invalid regression result766returnnull;
767 }
768 } else {
769if (LOG.isDebugEnabled()) {
770 LOG.debug(
771"addRecord: coefficient was not in the regression, " +
772"setting it to the extreme of the bound");
773 }
774 result = false;
775 }
776 coefficient[coefIndex] = value;
777 }
778return result;
779 }
780781/**782 * Calculates the regression.783 * @param coefficient Array of coefficients784 * @param validColumnIndices List of valid columns785 * @param mlr {@link OLSMultipleLinearRegression} instance.786 * @return True if the result is valid, false otherwise.787 */788privatestaticboolean calculateRegression(double[] coefficient,
789 List<Integer> validColumnIndices, OLSMultipleLinearRegression mlr) {
790791if (coefficient.length != validColumnIndices.size()) {
792 LOG.info("There are " + coefficient.length +
793" coefficients, and " + validColumnIndices.size() +
794" valid columns in the regression");
795 }
796797double[] beta = mlr.estimateRegressionParameters();
798 Arrays.fill(coefficient, 0);
799for (int i = 0; i < validColumnIndices.size(); ++i) {
800 coefficient[validColumnIndices.get(i)] = beta[i];
801 }
802 coefficient[5] = beta[validColumnIndices.size()];
803returntrue;
804 }
805806/**807 * Copies values from a List of double[] to a double[][]. Takes into808 * consideration the list of valid column indices.809 * @param sourceValues Source List of double[]810 * @param validColumnIndices Valid column indices811 * @param xValues Target double[][] matrix.812 */813privatestaticvoid fillXMatrix(List<double[]> sourceValues,
814 List<Integer> validColumnIndices,
815double[][] xValues) {
816817for (int i = 0; i < sourceValues.size(); ++i) {
818 xValues[i] = newdouble[validColumnIndices.size() + 1];
819for (int j = 0; j < validColumnIndices.size(); ++j) {
820 xValues[i][j] = sourceValues.get(i)[validColumnIndices.get(j)];
821 }
822 xValues[i][validColumnIndices.size()] = 1;
823 }
824 }
825826/**827 * Utility function that checks whether two doubles are equals given an828 * accuracy tolerance.829 *830 * @param val1 First value831 * @param val2 Second value832 * @return True if within a threshold833 */834privatestaticboolean equal(double val1, double val2) {
835return Math.abs(val1 - val2) < 0.01;
836 }
837838/**839 * Utility function that checks if two columns have linear dependence.840 *841 * @param values Matrix in the form of a List of double[] values.842 * @param col1 First column index843 * @param col2 Second column index844 * @return True if there is linear dependence.845 */846privatestaticboolean isLinearDependence(List<double[]> values,
847int col1, int col2) {
848boolean firstValSeen = false;
849double firstVal = 0;
850for (double[] value : values) {
851double val1 = value[col1];
852double val2 = value[col2];
853if (equal(val1, 0)) {
854if (equal(val2, 0)) {
855continue;
856 } else {
857return false;
858 }
859 }
860if (equal(val2, 0)) {
861return false;
862 }
863if (!firstValSeen) {
864 firstVal = val1 / val2;
865 firstValSeen = true;
866 } else {
867if (!equal((val1 / val2 - firstVal) / firstVal, 0)) {
868return false;
869 }
870 }
871 }
872returntrue;
873 }
874875/**876 * Prints statistics about the regression model.877 */878privatevoid printStats() {
879if (LOG.isDebugEnabled()) {
880 StringBuilder sb = new StringBuilder();
881 sb.append(
882"\nEDGES\t\tVERTICES\t\tV_PROC\t\tRECEIVED\t\tOOC\t\tMEM_USED\n");
883for (int i = 0; i < dataSamples.size(); ++i) {
884for (int j = 0; j < dataSamples.get(i).length; ++j) {
885 sb.append(String.format("%.2f\t\t", dataSamples.get(i)[j]));
886 }
887 sb.append(memorySamples.get(i));
888 sb.append("\n");
889 }
890 sb.append("COEFFICIENT:\n");
891for (int i = 0; i < coefficient.length; ++i) {
892 sb.append(String.format("%.2f\t\t", coefficient[i]));
893 }
894 sb.append("\n");
895 LOG.debug("printStats: isValid=" + isValid + sb.toString());
896 }
897 }
898 }
899 }