This project has retired. For details please refer to its Attic page.
MemoryEstimatorOracle xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.ooc.policy;
19  
20  import com.sun.management.GarbageCollectionNotificationInfo;
21  import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
22  import org.apache.commons.math.stat.regression.OLSMultipleLinearRegression;
23  import org.apache.giraph.comm.NetworkMetrics;
24  import org.apache.giraph.conf.FloatConfOption;
25  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
26  import org.apache.giraph.conf.LongConfOption;
27  import org.apache.giraph.edge.AbstractEdgeStore;
28  import org.apache.giraph.ooc.OutOfCoreEngine;
29  import org.apache.giraph.ooc.command.IOCommand;
30  import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
31  import org.apache.giraph.ooc.command.WaitIOCommand;
32  import org.apache.giraph.utils.ThreadUtils;
33  import org.apache.giraph.worker.EdgeInputSplitsCallable;
34  import org.apache.giraph.worker.VertexInputSplitsCallable;
35  import org.apache.giraph.worker.WorkerProgress;
36  import org.apache.log4j.Logger;
37  
38  import javax.annotation.Nullable;
39  import java.lang.management.ManagementFactory;
40  import java.lang.management.MemoryPoolMXBean;
41  import java.lang.management.MemoryUsage;
42  import java.util.ArrayList;
43  import java.util.Arrays;
44  import java.util.List;
45  import java.util.Map;
46  import java.util.concurrent.atomic.AtomicLong;
47  import java.util.concurrent.locks.Lock;
48  import java.util.concurrent.locks.ReentrantLock;
49  
50  import static com.google.common.base.Preconditions.checkState;
51  
52  /**
53   * Implementation of {@link OutOfCoreOracle} that uses a linear regression model
54   * to estimate actual memory usage based on the current state of computation.
55   * The model takes into consideration 5 parameters:
56   *
57   * y = c0 + c1*x1 + c2*x2 + c3*x3 + c4*x4 + c5*x5
58   *
59   * y: memory usage
60   * x1: edges loaded
61   * x2: vertices loaded
62   * x3: vertices processed
63   * x4: bytes received due to messages
64   * x5: bytes loaded/stored from/to disk due to OOC.
65   *
66   */
67  public class MemoryEstimatorOracle implements OutOfCoreOracle {
68    /** Memory check interval in msec */
69    public static final LongConfOption CHECK_MEMORY_INTERVAL =
70      new LongConfOption("giraph.garbageEstimator.checkMemoryInterval", 1000,
71          "The interval where memory checker thread wakes up and " +
72              "monitors memory footprint (in milliseconds)");
73    /**
74     * If mem-usage is above this threshold and no Full GC has been called,
75     * we call it manually
76     */
77    public static final FloatConfOption MANUAL_GC_MEMORY_PRESSURE =
78      new FloatConfOption("giraph.garbageEstimator.manualGCPressure", 0.95f,
79          "The threshold above which GC is called manually if Full GC has not " +
80              "happened in a while");
81    /** Used to detect a high memory pressure situation */
82    public static final FloatConfOption GC_MINIMUM_RECLAIM_FRACTION =
83      new FloatConfOption("giraph.garbageEstimator.gcReclaimFraction", 0.05f,
84          "Minimum percentage of memory we expect to be reclaimed after a Full " +
85              "GC. If less than this amount is reclaimed, it is sage to say " +
86              "we are in a high memory situation and the estimation mechanism " +
87              "has not recognized it yet!");
88    /** If mem-usage is above this threshold, active threads are set to 0 */
89    public static final FloatConfOption AM_HIGH_THRESHOLD =
90      new FloatConfOption("giraph.amHighThreshold", 0.95f,
91          "If mem-usage is above this threshold, all active threads " +
92              "(compute/input) are paused.");
93    /** If mem-usage is below this threshold, active threads are set to max */
94    public static final FloatConfOption AM_LOW_THRESHOLD =
95      new FloatConfOption("giraph.amLowThreshold", 0.90f,
96          "If mem-usage is below this threshold, all active threads " +
97              "(compute/input) are running.");
98    /** If mem-usage is above this threshold, credit is set to 0 */
99    public static final FloatConfOption CREDIT_HIGH_THRESHOLD =
100     new FloatConfOption("giraph.creditHighThreshold", 0.95f,
101         "If mem-usage is above this threshold, credit is set to 0");
102   /** If mem-usage is below this threshold, credit is set to max */
103   public static final FloatConfOption CREDIT_LOW_THRESHOLD =
104     new FloatConfOption("giraph.creditLowThreshold", 0.90f,
105         "If mem-usage is below this threshold, credit is set to max");
106   /** OOC starts if mem-usage is above this threshold */
107   public static final FloatConfOption OOC_THRESHOLD =
108     new FloatConfOption("giraph.oocThreshold", 0.90f,
109         "If mem-usage is above this threshold, out of core threads starts " +
110             "writing data to disk");
111 
112   /** Logger */
113   private static final Logger LOG =
114     Logger.getLogger(MemoryEstimatorOracle.class);
115 
116   /** Cached value for {@link #MANUAL_GC_MEMORY_PRESSURE} */
117   private final float manualGCMemoryPressure;
118   /** Cached value for {@link #GC_MINIMUM_RECLAIM_FRACTION} */
119   private final float gcReclaimFraction;
120   /** Cached value for {@link #AM_HIGH_THRESHOLD} */
121   private final float amHighThreshold;
122   /** Cached value for {@link #AM_LOW_THRESHOLD} */
123   private final float amLowThreshold;
124   /** Cached value for {@link #CREDIT_HIGH_THRESHOLD} */
125   private final float creditHighThreshold;
126   /** Cached value for {@link #CREDIT_LOW_THRESHOLD} */
127   private final float creditLowThreshold;
128   /** Cached value for {@link #OOC_THRESHOLD} */
129   private final float oocThreshold;
130 
131   /** Reference to running OOC engine */
132   private final OutOfCoreEngine oocEngine;
133   /** Memory estimator instance */
134   private final MemoryEstimator memoryEstimator;
135   /** Keeps track of the number of bytes stored/loaded by OOC */
136   private final AtomicLong oocBytesInjected = new AtomicLong(0);
137   /** How many bytes to offload */
138   private final AtomicLong numBytesToOffload = new AtomicLong(0);
139   /** Current state of the OOC */
140   private volatile State state = State.STABLE;
141   /** Timestamp of the last major GC */
142   private volatile long lastMajorGCTime = 0;
143 
144   /**
145    * Different states the OOC can be in.
146    */
147   private enum State {
148     /** No offloading */
149     STABLE,
150     /** Current offloading */
151     OFFLOADING,
152   }
153 
154   /**
155    * Constructor.
156    * @param conf Configuration
157    * @param oocEngine OOC engine.:w
158    *
159    */
160   public MemoryEstimatorOracle(ImmutableClassesGiraphConfiguration conf,
161                                final OutOfCoreEngine oocEngine) {
162     this.oocEngine = oocEngine;
163     this.memoryEstimator = new MemoryEstimator(this.oocBytesInjected,
164       oocEngine.getNetworkMetrics());
165 
166     this.manualGCMemoryPressure = MANUAL_GC_MEMORY_PRESSURE.get(conf);
167     this.gcReclaimFraction = GC_MINIMUM_RECLAIM_FRACTION.get(conf);
168     this.amHighThreshold = AM_HIGH_THRESHOLD.get(conf);
169     this.amLowThreshold = AM_LOW_THRESHOLD.get(conf);
170     this.creditHighThreshold = CREDIT_HIGH_THRESHOLD.get(conf);
171     this.creditLowThreshold = CREDIT_LOW_THRESHOLD.get(conf);
172     this.oocThreshold = OOC_THRESHOLD.get(conf);
173 
174     final long checkMemoryInterval = CHECK_MEMORY_INTERVAL.get(conf);
175 
176     ThreadUtils.startThread(new Runnable() {
177       @Override
178       public void run() {
179         while (true) {
180           long oldGenUsageEstimate = memoryEstimator.getUsageEstimate();
181           MemoryUsage usage = getOldGenUsed();
182           if (oldGenUsageEstimate > 0) {
183             updateRates(oldGenUsageEstimate, usage.getMax());
184           } else {
185             long time = System.currentTimeMillis();
186             if (time - lastMajorGCTime >= 10000) {
187               double used = (double) usage.getUsed() / usage.getMax();
188               if (used > manualGCMemoryPressure) {
189                 if (LOG.isInfoEnabled()) {
190                   LOG.info(
191                     "High memory pressure with no full GC from the JVM. " +
192                       "Calling GC manually. Used fraction of old-gen is " +
193                       String.format("%.2f", used) + ".");
194                 }
195                 System.gc();
196                 time = System.currentTimeMillis() - time;
197                 usage = getOldGenUsed();
198                 used = (double) usage.getUsed() / usage.getMax();
199                 if (LOG.isInfoEnabled()) {
200                   LOG.info("Manual GC done. It took " +
201                     String.format("%.2f", time / 1000.0) +
202                     " seconds. Used fraction of old-gen is " +
203                     String.format("%.2f", used) + ".");
204                 }
205               }
206             }
207           }
208           try {
209             Thread.sleep(checkMemoryInterval);
210           } catch (InterruptedException e) {
211             LOG.warn("run: exception occurred!", e);
212             return;
213           }
214         }
215       }
216     }, "ooc-memory-checker", oocEngine.getServiceWorker().getGraphTaskManager()
217         .createUncaughtExceptionHandler());
218   }
219 
220   /**
221    * Resets all the counters used in the memory estimation. This is called at
222    * the beginning of a new superstep.
223    * <p>
224    * The number of vertices to compute in the next superstep gets reset in
225    * {@link org.apache.giraph.graph.GraphTaskManager#processGraphPartitions}
226    * right before
227    * {@link org.apache.giraph.partition.PartitionStore#startIteration()} gets
228    * called.
229    */
230   @Override
231   public void startIteration() {
232     AbstractEdgeStore.PROGRESS_COUNTER.reset();
233     oocBytesInjected.set(0);
234     memoryEstimator.clear();
235     memoryEstimator.setCurrentSuperstep(oocEngine.getSuperstep());
236     oocEngine.updateRequestsCreditFraction(1);
237     oocEngine.updateActiveThreadsFraction(1);
238   }
239 
240 
241   @Override
242   public IOAction[] getNextIOActions() {
243     if (state == State.OFFLOADING) {
244       return new IOAction[]{
245         IOAction.STORE_MESSAGES_AND_BUFFERS, IOAction.STORE_PARTITION};
246     }
247     long oldGenUsage = memoryEstimator.getUsageEstimate();
248     MemoryUsage usage = getOldGenUsed();
249     if (oldGenUsage > 0) {
250       double usageEstimate = (double) oldGenUsage / usage.getMax();
251       if (usageEstimate > oocThreshold) {
252         return new IOAction[]{
253           IOAction.STORE_MESSAGES_AND_BUFFERS,
254           IOAction.STORE_PARTITION};
255       } else {
256         return new IOAction[]{IOAction.LOAD_PARTITION};
257       }
258     } else {
259       return new IOAction[]{IOAction.LOAD_PARTITION};
260     }
261   }
262 
263   @Override
264   public boolean approve(IOCommand command) {
265     return true;
266   }
267 
268   @Override
269   public void commandCompleted(IOCommand command) {
270     if (command instanceof LoadPartitionIOCommand) {
271       oocBytesInjected.getAndAdd(command.bytesTransferred());
272       if (state == State.OFFLOADING) {
273         numBytesToOffload.getAndAdd(command.bytesTransferred());
274       }
275     } else if (!(command instanceof WaitIOCommand)) {
276       oocBytesInjected.getAndAdd(0 - command.bytesTransferred());
277       if (state == State.OFFLOADING) {
278         numBytesToOffload.getAndAdd(0 - command.bytesTransferred());
279       }
280     }
281 
282     if (state == State.OFFLOADING && numBytesToOffload.get() <= 0) {
283       numBytesToOffload.set(0);
284       state = State.STABLE;
285       updateRates(-1, 1);
286     }
287   }
288 
289   /**
290    * When a new GC has completed, we can get an accurate measurement of the
291    * memory usage. We use this to update the linear regression model.
292    *
293    * @param gcInfo GC information
294    */
295   @Override
296   public synchronized void gcCompleted(
297     GarbageCollectionNotificationInfo gcInfo) {
298     String action = gcInfo.getGcAction().toLowerCase();
299     String cause = gcInfo.getGcCause().toLowerCase();
300     if (action.contains("major") &&
301       (cause.contains("ergo") || cause.contains("system"))) {
302       lastMajorGCTime = System.currentTimeMillis();
303       MemoryUsage before = null;
304       MemoryUsage after = null;
305 
306       for (Map.Entry<String, MemoryUsage> entry :
307         gcInfo.getGcInfo().getMemoryUsageBeforeGc().entrySet()) {
308         String poolName = entry.getKey();
309         if (poolName.toLowerCase().contains("old")) {
310           before = entry.getValue();
311           after = gcInfo.getGcInfo().getMemoryUsageAfterGc().get(poolName);
312           break;
313         }
314       }
315       if (after == null) {
316         throw new IllegalStateException("Missing Memory Usage After GC info");
317       }
318       if (before == null) {
319         throw new IllegalStateException("Missing Memory Usage Before GC info");
320       }
321 
322       // Compare the estimation with the actual value
323       long usedMemoryEstimate = memoryEstimator.getUsageEstimate();
324       long usedMemoryReal = after.getUsed();
325       if (usedMemoryEstimate >= 0) {
326         if (LOG.isInfoEnabled()) {
327           LOG.info("gcCompleted: estimate=" + usedMemoryEstimate + " real=" +
328             usedMemoryReal + " error=" +
329             ((double) Math.abs(usedMemoryEstimate - usedMemoryReal) /
330               usedMemoryReal * 100));
331         }
332       }
333 
334       // Number of edges loaded so far (if in input superstep)
335       long edgesLoaded = oocEngine.getSuperstep() >= 0 ? 0 :
336         EdgeInputSplitsCallable.getTotalEdgesLoadedMeter().count();
337       // Number of vertices loaded so far (if in input superstep)
338       long verticesLoaded = oocEngine.getSuperstep() >= 0 ? 0 :
339         VertexInputSplitsCallable.getTotalVerticesLoadedMeter().count();
340       // Number of vertices computed (if either in compute or store phase)
341       long verticesComputed = WorkerProgress.get().getVerticesComputed() +
342         WorkerProgress.get().getVerticesStored() +
343         AbstractEdgeStore.PROGRESS_COUNTER.getProgress();
344       // Number of bytes received
345       long receivedBytes =
346         oocEngine.getNetworkMetrics().getBytesReceivedPerSuperstep();
347       // Number of OOC bytes
348       long oocBytes = oocBytesInjected.get();
349 
350       memoryEstimator.addRecord(getOldGenUsed().getUsed(), edgesLoaded,
351         verticesLoaded, verticesComputed, receivedBytes, oocBytes);
352 
353       long garbage = before.getUsed() - after.getUsed();
354       long maxMem = after.getMax();
355       long memUsed = after.getUsed();
356       boolean isTight = (maxMem - memUsed) < 2 * gcReclaimFraction * maxMem &&
357         garbage < gcReclaimFraction * maxMem;
358       boolean predictionExist = memoryEstimator.getUsageEstimate() > 0;
359       if (isTight && !predictionExist) {
360         if (LOG.isInfoEnabled()) {
361           LOG.info("gcCompleted: garbage=" + garbage + " memUsed=" +
362             memUsed + " maxMem=" + maxMem);
363         }
364         numBytesToOffload.set((long) (2 * gcReclaimFraction * maxMem) -
365           (maxMem - memUsed));
366         if (LOG.isInfoEnabled()) {
367           LOG.info("gcCompleted: tight memory usage. Starting to offload " +
368             "until " + numBytesToOffload.get() + " bytes are offloaded");
369         }
370         state = State.OFFLOADING;
371         updateRates(1, 1);
372       }
373     }
374   }
375 
376   /**
377    * Given an estimate for the current memory usage and the maximum available
378    * memory, it updates the active threads and flow control credit in the
379    * OOC engine.
380    *
381    * @param usageEstimateMem Estimate of memory usage.
382    * @param maxMemory Maximum memory.
383    */
384   private void updateRates(long usageEstimateMem, long maxMemory) {
385     double usageEstimate = (double) usageEstimateMem / maxMemory;
386     if (usageEstimate > 0) {
387       if (usageEstimate >= amHighThreshold) {
388         oocEngine.updateActiveThreadsFraction(0);
389       } else if (usageEstimate < amLowThreshold) {
390         oocEngine.updateActiveThreadsFraction(1);
391       } else {
392         oocEngine.updateActiveThreadsFraction(1 -
393           (usageEstimate - amLowThreshold) /
394             (amHighThreshold - amLowThreshold));
395       }
396 
397       if (usageEstimate >= creditHighThreshold) {
398         oocEngine.updateRequestsCreditFraction(0);
399       } else if (usageEstimate < creditLowThreshold) {
400         oocEngine.updateRequestsCreditFraction(1);
401       } else {
402         oocEngine.updateRequestsCreditFraction(1 -
403           (usageEstimate - creditLowThreshold) /
404             (creditHighThreshold - creditLowThreshold));
405       }
406     } else {
407       oocEngine.updateActiveThreadsFraction(1);
408       oocEngine.updateRequestsCreditFraction(1);
409     }
410   }
411 
412   /**
413    * Returns statistics about the old gen pool.
414    * @return {@link MemoryUsage}.
415    */
416   private MemoryUsage getOldGenUsed() {
417     List<MemoryPoolMXBean> memoryPoolList =
418       ManagementFactory.getMemoryPoolMXBeans();
419     for (MemoryPoolMXBean pool : memoryPoolList) {
420       String normalName = pool.getName().toLowerCase();
421       if (normalName.contains("old") || normalName.contains("tenured")) {
422         return pool.getUsage();
423       }
424     }
425     throw new IllegalStateException("Bad Memory Pool");
426   }
427 
428   /**
429    * Maintains statistics about the current state and progress of the
430    * computation and produces estimates of memory usage using a technique
431    * based on linear regression.
432    *
433    * Upon a GC events, it gets updated with the most recent statistics through
434    * the {@link #addRecord} method.
435    */
436   private static class MemoryEstimator {
437     /** Stores the (x1,x2,...,x5) arrays of data samples, one for each sample */
438     private List<double[]> dataSamples = new ArrayList<>();
439     /** Stores the y memory usage dataSamples, one for each sample */
440     private DoubleArrayList memorySamples = new DoubleArrayList();
441     /** Stores the coefficients computed by the linear regression model */
442     private double[] coefficient = new double[6];
443     /** Stores the column indices that can be used in the regression model */
444     private List<Integer> validColumnIndices = new ArrayList<>();
445     /** Potentially out-of-range coefficient values */
446     private double[] extreme = new double[6];
447     /** Indicates whether current coefficients can be used for estimation */
448     private boolean isValid = false;
449     /** Implementation of linear regression */
450     private OLSMultipleLinearRegression mlr = new OLSMultipleLinearRegression();
451     /** Used to synchronize access to the data samples */
452     private Lock lock = new ReentrantLock();
453     /** The estimation method depends on the current superstep. */
454     private long currentSuperstep = -1;
455     /** The estimation method depends on the bytes injected. */
456     private final AtomicLong oocBytesInjected;
457     /** Provides network statistics */
458     private final NetworkMetrics networkMetrics;
459 
460     /**
461      * Constructor
462      * @param oocBytesInjected Reference to {@link AtomicLong} object
463      *                         maintaining the number of OOC bytes stored.
464      * @param networkMetrics Interface to get network stats.
465      */
466     public MemoryEstimator(AtomicLong oocBytesInjected,
467                            NetworkMetrics networkMetrics) {
468       this.oocBytesInjected = oocBytesInjected;
469       this.networkMetrics = networkMetrics;
470     }
471 
472 
473     /**
474      * Clear data structure (called from single threaded program).
475      */
476     public void clear() {
477       dataSamples.clear();
478       memorySamples.clear();
479       isValid = false;
480     }
481 
482     public void setCurrentSuperstep(long superstep) {
483       this.currentSuperstep = superstep;
484     }
485 
486     /**
487      * Given the current state of computation (i.e. current edges loaded,
488      * vertices computed etc) and the current model (i.e. the regression
489      * coefficient), it returns a prediction about the memory usage in bytes.
490      *
491      * @return Memory estimate in bytes.
492      */
493     public long getUsageEstimate() {
494       long usage = -1;
495       lock.lock();
496       try {
497         if (isValid) {
498           // Number of edges loaded so far (if in input superstep)
499           long edgesLoaded = currentSuperstep >= 0 ? 0 :
500             EdgeInputSplitsCallable.getTotalEdgesLoadedMeter().count();
501           // Number of vertices loaded so far (if in input superstep)
502           long verticesLoaded = currentSuperstep >= 0 ? 0 :
503             VertexInputSplitsCallable.getTotalVerticesLoadedMeter().count();
504           // Number of vertices computed (if either in compute or store phase)
505           long verticesComputed = WorkerProgress.get().getVerticesComputed() +
506             WorkerProgress.get().getVerticesStored() +
507             AbstractEdgeStore.PROGRESS_COUNTER.getProgress();
508           // Number of bytes received
509           long receivedBytes = networkMetrics.getBytesReceivedPerSuperstep();
510           // Number of OOC bytes
511           long oocBytes = this.oocBytesInjected.get();
512 
513           usage = (long) (edgesLoaded * coefficient[0] +
514             verticesLoaded * coefficient[1] +
515             verticesComputed * coefficient[2] +
516             receivedBytes * coefficient[3] +
517             oocBytes * coefficient[4] +
518             coefficient[5]);
519         }
520       } finally {
521         lock.unlock();
522       }
523       return usage;
524     }
525 
526     /**
527      * Updates the linear regression model with a new data point.
528      *
529      * @param memUsed Current real value of memory usage.
530      * @param edges Number of edges loaded.
531      * @param vertices Number of vertices loaded.
532      * @param verticesProcessed Number of vertices processed.
533      * @param bytesReceived Number of bytes received.
534      * @param oocBytesInjected Number of bytes stored/loaded due to OOC.
535      */
536     public void addRecord(long memUsed, long edges, long vertices,
537                           long verticesProcessed,
538                           long bytesReceived, long oocBytesInjected) {
539       checkState(memUsed > 0, "Memory Usage cannot be negative");
540       if (dataSamples.size() > 0) {
541         double[] last = dataSamples.get(dataSamples.size() - 1);
542         if (edges == last[0] && vertices == last[1] &&
543           verticesProcessed == last[2] && bytesReceived == last[3] &&
544           oocBytesInjected == last[4]) {
545           if (LOG.isDebugEnabled()) {
546             LOG.debug(
547               "addRecord: avoiding to add the same entry as the last one!");
548           }
549           return;
550         }
551       }
552       dataSamples.add(new double[] {edges, vertices, verticesProcessed,
553         bytesReceived, oocBytesInjected});
554       memorySamples.add((double) memUsed);
555 
556       // Weed out the columns that are all zero
557       validColumnIndices.clear();
558       for (int i = 0; i < 5; ++i) {
559         boolean validIndex = false;
560         // Check if there is a non-zero entry in the column
561         for (double[] value : dataSamples) {
562           if (value[i] != 0) {
563             validIndex = true;
564             break;
565           }
566         }
567         if (validIndex) {
568           // check if all entries are not equal to each other
569           double firstValue = -1;
570           boolean allEqual = true;
571           for (double[] value : dataSamples) {
572             if (firstValue == -1) {
573               firstValue = value[i];
574             } else {
575               if (Math.abs((value[i] - firstValue) / firstValue) > 0.01) {
576                 allEqual = false;
577                 break;
578               }
579             }
580           }
581           validIndex = !allEqual;
582           if (validIndex) {
583             // Check if the column has linear dependency with another column
584             for (int col = i + 1; col < 5; ++col) {
585               if (isLinearDependence(dataSamples, i, col)) {
586                 validIndex = false;
587                 break;
588               }
589             }
590           }
591         }
592 
593         if (validIndex) {
594           validColumnIndices.add(i);
595         }
596       }
597 
598       // If we filtered out columns in the previous step, we are going to run
599       // the regression without those columns.
600 
601       // Create the coefficient table
602       boolean setIsValid = false;
603       lock.lock();
604       try {
605         if (validColumnIndices.size() >= 1 &&
606           dataSamples.size() >= validColumnIndices.size() + 1) {
607 
608           double[][] xValues = new double[dataSamples.size()][];
609           fillXMatrix(dataSamples, validColumnIndices, xValues);
610           double[] yValues =
611               memorySamples.toDoubleArray(new double[memorySamples.size()]);
612           mlr.newSampleData(yValues, xValues);
613           boolean isRegressionValid =
614             calculateRegression(coefficient, validColumnIndices, mlr);
615 
616           if (!isRegressionValid) { // invalid regression result
617             return; // The finally-block at the end will release any locks.
618           }
619 
620           // After the computation of the regression, some coefficients may have
621           // values outside the valid value range. In this case, we set the
622           // coefficient to the minimum or maximum value allowed, and re-run the
623           // regression.
624           // We only care about coefficients of two of the variables:
625           // bytes received due to messages (receivedBytes -- index 3 in
626           // `coefficient` array) and bytes transferred due to OOC (oocBytes --
627           // index 4 in `coefficient` array).
628           //
629           // receivedByte's coefficient cannot be negative, meaning that it does
630           // not make sense that memory footprint decreases because of receipt
631           // of messages. We either have message combiner or we do not have
632           // combiner. If message combiner is used, the memory footprint
633           // will not change much due to messages leading to the coefficient for
634           // oocBytes to be close to 0. If message combiner is not used, the
635           // memory only increase with messages, and the coefficient should be
636           // positive. In this case, a message is usually deserialized and then
637           // written to the message store. We assume that the process of
638           // deserializing the message and putting it into the message store
639           // will not result in more than twice the size of the serialized form
640           // of the message (meaning that the memory footprint for message store
641           // will not be more than 2*receivedBytes). Based on this assumption
642           // the upper bound for coefficient of receivedBytes should be 2.
643           //
644           // "oocBytes" represents the size of the serialized form of data that
645           // has transferred to/from secondary storage. On the other hand, in
646           // our estimation mechanism, we are estimating the aggregate size of
647           // all live objects in memory, meaning that we are estimating the size
648           // of deserialized for of data in memory. Since we are not using any
649           // (de)compression for (de)serialization of data, we assume that
650           // size of serialized data <= size of deserialized data <=
651           // 2 * (size of serialized dat)
652           // This basically means that the coefficient for oocBytes should be
653           // between 1 and 2.
654 
655           boolean changed;
656           extreme[3] = -1;
657           extreme[4] = -1;
658           do {
659             Boolean result = null;
660 
661             result = refineCoefficient(4, 1, 2, xValues, yValues);
662             if (result == null) { // invalid regression result
663               return;  // finally-block will release lock
664             }
665             changed = result;
666 
667             result = refineCoefficient(3, 0, 2, xValues, yValues);
668             if (result == null) { // invalid regression result
669               return;  // finally-block will release lock
670             }
671             changed |= result;
672           } while (changed);
673           if (extreme[3] != -1) {
674             coefficient[3] = extreme[3];
675           }
676           if (extreme[4] != -1) {
677             coefficient[4] = extreme[4];
678           }
679           setIsValid = true;
680           return; // the finally-block will execute before return
681         }
682       } finally {
683         // This inner try-finally block is necessary to ensure that the
684         // lock is always released.
685         try {
686           isValid = setIsValid;
687           printStats();
688         } finally {
689           lock.unlock();
690         }
691       }
692     }
693 
694     /**
695      * Certain coefficients need to be within a specific range.
696      * If the coefficient is not in this range, we set it to the closest bound
697      * and re-run the linear regression. In this case, we keep the possible
698      * extremum coefficient in an intermediate array ("extreme"). Also, if
699      * we choose the extremum coefficient for an index, that index is removed
700      * from the regression calculation as well as the rest of the refinement
701      * process.
702      *
703      * Note that the regression calculation here is based on the method of
704      * least square for minimizing the error. The sum of squares of errors for
705      * all points is a convex function. This means if we solve the
706      * non-constrained linear regression and then refine the coefficient to
707      * apply their bounds, we will achieve a solution to our constrained
708      * linear regression problem.
709      *
710      * This method is called in a loop to refine certain coefficients. The loop
711      * should continue until all coefficients are refined and are within their
712      * range.
713      *
714      * @param coefIndex Coefficient index
715      * @param lowerBound Lower bound
716      * @param upperBound Upper bound
717      * @param xValues double[][] matrix with data samples
718      * @param yValues double[] matrix with y samples
719      * @return True if coefficients were out-of-range, false otherwise. A null
720      *         value means the regression result was invalid and the result of
721      *         this method is invalid too.
722      */
723     @Nullable
724     private Boolean refineCoefficient(int coefIndex, double lowerBound,
725       double upperBound, double[][] xValues, double[] yValues) {
726 
727       boolean result = false;
728       if (coefficient[coefIndex] < lowerBound ||
729         coefficient[coefIndex] > upperBound) {
730 
731         double value;
732         if (coefficient[coefIndex] < lowerBound) {
733           value = lowerBound;
734         } else {
735           value = upperBound;
736         }
737         int ptr = -1;
738         // Finding the 'coefIndex' in the valid indices. Since this method is
739         // used only for the variables with higher indices, we use a reverse
740         // loop to lookup the 'coefIndex' for faster termination of the loop.
741         for (int i = validColumnIndices.size() - 1; i >= 0; --i) {
742           if (validColumnIndices.get(i) == coefIndex) {
743             ptr = i;
744             break;
745           }
746         }
747         if (ptr != -1) {
748           if (LOG.isDebugEnabled()) {
749             LOG.debug("addRecord: coefficient at index " + coefIndex +
750               " is wrong in the regression, setting it to " + value);
751           }
752           // remove from valid column
753           validColumnIndices.remove(ptr);
754           // re-create the X matrix
755           fillXMatrix(dataSamples, validColumnIndices, xValues);
756           // adjust Y values
757           for (int i = 0; i < memorySamples.size(); ++i) {
758             yValues[i] -= value * dataSamples.get(i)[coefIndex];
759           }
760           // save new coefficient value in intermediate array
761           extreme[coefIndex] = value;
762           // re-run regression
763           mlr.newSampleData(yValues, xValues);
764           result = calculateRegression(coefficient, validColumnIndices, mlr);
765           if (!result) { // invalid regression result
766             return null;
767           }
768         } else {
769           if (LOG.isDebugEnabled()) {
770             LOG.debug(
771               "addRecord: coefficient was not in the regression, " +
772                 "setting it to the extreme of the bound");
773           }
774           result = false;
775         }
776         coefficient[coefIndex] = value;
777       }
778       return result;
779     }
780 
781     /**
782      * Calculates the regression.
783      * @param coefficient Array of coefficients
784      * @param validColumnIndices List of valid columns
785      * @param mlr {@link OLSMultipleLinearRegression} instance.
786      * @return True if the result is valid, false otherwise.
787      */
788     private static boolean calculateRegression(double[] coefficient,
789       List<Integer> validColumnIndices, OLSMultipleLinearRegression mlr) {
790 
791       if (coefficient.length != validColumnIndices.size()) {
792         LOG.info("There are " + coefficient.length +
793           " coefficients, and " + validColumnIndices.size() +
794           " valid columns in the regression");
795       }
796 
797       double[] beta = mlr.estimateRegressionParameters();
798       Arrays.fill(coefficient, 0);
799       for (int i = 0; i < validColumnIndices.size(); ++i) {
800         coefficient[validColumnIndices.get(i)] = beta[i];
801       }
802       coefficient[5] = beta[validColumnIndices.size()];
803       return true;
804     }
805 
806     /**
807      * Copies values from a List of double[] to a double[][]. Takes into
808      * consideration the list of valid column indices.
809      * @param sourceValues Source List of double[]
810      * @param validColumnIndices Valid column indices
811      * @param xValues Target double[][] matrix.
812      */
813     private static void fillXMatrix(List<double[]> sourceValues,
814                                     List<Integer> validColumnIndices,
815                                     double[][] xValues) {
816 
817       for (int i = 0; i < sourceValues.size(); ++i) {
818         xValues[i] = new double[validColumnIndices.size() + 1];
819         for (int j = 0; j < validColumnIndices.size(); ++j) {
820           xValues[i][j] = sourceValues.get(i)[validColumnIndices.get(j)];
821         }
822         xValues[i][validColumnIndices.size()] = 1;
823       }
824     }
825 
826     /**
827      * Utility function that checks whether two doubles are equals given an
828      * accuracy tolerance.
829      *
830      * @param val1 First value
831      * @param val2 Second value
832      * @return True if within a threshold
833      */
834     private static boolean equal(double val1, double val2) {
835       return Math.abs(val1 - val2) < 0.01;
836     }
837 
838     /**
839      * Utility function that checks if two columns have linear dependence.
840      *
841      * @param values Matrix in the form of a List of double[] values.
842      * @param col1 First column index
843      * @param col2 Second column index
844      * @return True if there is linear dependence.
845      */
846     private static boolean isLinearDependence(List<double[]> values,
847                                               int col1, int col2) {
848       boolean firstValSeen = false;
849       double firstVal = 0;
850       for (double[] value : values) {
851         double val1 = value[col1];
852         double val2 = value[col2];
853         if (equal(val1, 0)) {
854           if (equal(val2, 0)) {
855             continue;
856           } else {
857             return false;
858           }
859         }
860         if (equal(val2, 0)) {
861           return false;
862         }
863         if (!firstValSeen) {
864           firstVal = val1 / val2;
865           firstValSeen = true;
866         } else {
867           if (!equal((val1 / val2 - firstVal) / firstVal, 0)) {
868             return false;
869           }
870         }
871       }
872       return true;
873     }
874 
875     /**
876      * Prints statistics about the regression model.
877      */
878     private void printStats() {
879       if (LOG.isDebugEnabled()) {
880         StringBuilder sb = new StringBuilder();
881         sb.append(
882           "\nEDGES\t\tVERTICES\t\tV_PROC\t\tRECEIVED\t\tOOC\t\tMEM_USED\n");
883         for (int i = 0; i < dataSamples.size(); ++i) {
884           for (int j = 0; j < dataSamples.get(i).length; ++j) {
885             sb.append(String.format("%.2f\t\t", dataSamples.get(i)[j]));
886           }
887           sb.append(memorySamples.get(i));
888           sb.append("\n");
889         }
890         sb.append("COEFFICIENT:\n");
891         for (int i = 0; i < coefficient.length; ++i) {
892           sb.append(String.format("%.2f\t\t", coefficient[i]));
893         }
894         sb.append("\n");
895         LOG.debug("printStats: isValid=" + isValid + sb.toString());
896       }
897     }
898   }
899 }