This project has retired. For details please refer to its Attic page.
OutOfCoreEngine xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.ooc;
20  
21  import com.sun.management.GarbageCollectionNotificationInfo;
22  import com.yammer.metrics.core.Gauge;
23  import org.apache.giraph.bsp.BspService;
24  import org.apache.giraph.bsp.CentralizedServiceWorker;
25  import org.apache.giraph.comm.NetworkMetrics;
26  import org.apache.giraph.comm.ServerData;
27  import org.apache.giraph.comm.flow_control.CreditBasedFlowControl;
28  import org.apache.giraph.comm.flow_control.FlowControl;
29  import org.apache.giraph.conf.GiraphConstants;
30  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
31  import org.apache.giraph.metrics.GiraphMetrics;
32  import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
33  import org.apache.giraph.metrics.SuperstepMetricsRegistry;
34  import org.apache.giraph.ooc.data.MetaPartitionManager;
35  import org.apache.giraph.ooc.command.IOCommand;
36  import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
37  import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
38  import org.apache.giraph.ooc.policy.FixedPartitionsOracle;
39  import org.apache.giraph.ooc.policy.OutOfCoreOracle;
40  import org.apache.giraph.utils.AdjustableSemaphore;
41  import org.apache.giraph.worker.BspServiceWorker;
42  import org.apache.log4j.Logger;
43  
44  import java.lang.reflect.Constructor;
45  import java.lang.reflect.InvocationTargetException;
46  import java.util.concurrent.locks.ReadWriteLock;
47  import java.util.concurrent.locks.ReentrantReadWriteLock;
48  
49  import static com.google.common.base.Preconditions.checkState;
50  
51  /**
52   * Class to represent an out-of-core engine.
53   */
54  public class OutOfCoreEngine implements ResetSuperstepMetricsObserver {
55    /**
56     * Number of 'units of processing' after which an active thread should
57     * check-in with the out-of-core engine in order to re-claim its permission to
58     * stay active. For a compute thread, the 'unit of processing' is processing
59     * of one vertex, and for an input thread, the 'unit of processing' is reading
60     * a row of input data.
61     */
62    public static final int CHECK_IN_INTERVAL = (1 << 10) - 1;
63    /** Name of metric for percentage of graph on disk */
64    public static final String GRAPH_PERCENTAGE_IN_MEMORY = "ooc-graph-in-mem-%";
65    /** Class logger. */
66    private static final Logger LOG = Logger.getLogger(OutOfCoreEngine.class);
67    /**
68     * When getting partitions, how many milliseconds to wait if no partition was
69     * available in memory
70     */
71    private static final long MSEC_TO_WAIT = 10000;
72    /** Service worker */
73    private final CentralizedServiceWorker<?, ?, ?> service;
74    /** Flow control used in sending requests */
75    private FlowControl flowControl;
76    /** Scheduler for IO threads */
77    private final OutOfCoreIOScheduler ioScheduler;
78    /** Data structure to keep meta partition information */
79    private final MetaPartitionManager metaPartitionManager;
80    /** Out-of-core oracle (brain of out-of-core mechanism) */
81    private final OutOfCoreOracle oracle;
82    /** IO statistics collector */
83    private final OutOfCoreIOStatistics statistics;
84    /**
85     * Global lock for entire superstep. This lock helps to avoid overlapping of
86     * out-of-core decisions (what to do next to help the out-of-core mechanism)
87     * with out-of-core operations (actual IO operations).
88     */
89    private final ReadWriteLock superstepLock = new ReentrantReadWriteLock();
90    /** Data accessor object (DAO) used as persistence layer in out-of-core */
91    private final OutOfCoreDataAccessor dataAccessor;
92    /** Callable factory for IO threads */
93    private final OutOfCoreIOCallableFactory oocIOCallableFactory;
94    /**
95     * Dummy object to wait on until a partition becomes available in memory
96     * for processing
97     */
98    private final Object partitionAvailable = new Object();
99    /** How many compute threads do we have? */
100   private int numComputeThreads;
101   /** How many threads (input/compute) are processing data? */
102   private volatile int numProcessingThreads;
103   /** Semaphore used for controlling number of active threads at each moment */
104   private final AdjustableSemaphore activeThreadsPermit;
105   /**
106    * Cached value for NettyClient.MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER (max
107    * credit used for credit-based flow-control mechanism)
108    */
109   private final short maxRequestsCredit;
110   /**
111    * Generally, the logic in Giraph for change of the superstep happens in the
112    * following order:
113    *   (1) Compute threads are done processing all partitions
114    *   (2) Superstep number increases
115    *   (3) New message store is created and message stores are prepared
116    *   (4) Iteration over partitions starts
117    * Note that there are other operations happening at the same time as well as
118    * the above operations, but the above operations are the ones which may
119    * interfere with out-of-core operations. The goal of `superstepLock` is to
120    * isolate operations 2, 3, and 4 from the rest of computations and IO
121    * operations. Specifically, increasing the superstep counter (operation 2)
122    * should be exclusive and no IO operation should happen at the same time.
123    * This is due to the fact that prefetching mechanism uses superstep counter
124    * as a mean to identify which data should be read. That being said, superstep
125    * counter should be cached in out-of-core engine, and all IO operations and
126    * out-of-core logic should access superstep counter through this cached
127    * value.
128    */
129   private long superstep;
130   /**
131    * Generally, the logic of a graph computations happens in the following order
132    * with respect to `startIteration` and `reset` method:
133    * ...
134    * startIteration (for moving edges)
135    * ...
136    * reset (to prepare messages/partitions for superstep 0)
137    * ...
138    * startIteration (superstep 0)
139    * ...
140    * reset (to prepare messages/partitions for superstep 1)
141    * ...
142    *
143    * However, in the unit tests, we usually consider only one superstep (usually
144    * INPUT_SUPERSTEP), and we move through partitions multiple times. Out-of-
145    * core mechanism works only if partitions are reset in a proper way. So,
146    * we keep the following flag to reset partitions if necessary.
147    */
148   private boolean resetDone;
149 
150   /**
151    * Provides statistics about network traffic (e.g. received bytes per
152    * superstep etc).
153    */
154   private final NetworkMetrics networkMetrics;
155 
156   /**
157    * Constructor
158    *
159    * @param conf Configuration
160    * @param service Service worker
161    * @param networkMetrics Interface for network stats
162    */
163   public OutOfCoreEngine(ImmutableClassesGiraphConfiguration<?, ?, ?> conf,
164                          CentralizedServiceWorker<?, ?, ?> service,
165                          NetworkMetrics networkMetrics) {
166     this.service = service;
167     this.networkMetrics = networkMetrics;
168     Class<? extends OutOfCoreDataAccessor> accessorClass =
169         GiraphConstants.OUT_OF_CORE_DATA_ACCESSOR.get(conf);
170     try {
171       Constructor<?> constructor = accessorClass.getConstructor(
172           ImmutableClassesGiraphConfiguration.class);
173       this.dataAccessor = (OutOfCoreDataAccessor) constructor.newInstance(conf);
174     } catch (NoSuchMethodException | InstantiationException |
175         InvocationTargetException | IllegalAccessException e) {
176       throw new IllegalStateException("OutOfCoreEngine: caught exception " +
177           "while creating the data accessor instance!", e);
178     }
179     int numIOThreads = dataAccessor.getNumAccessorThreads();
180     this.oocIOCallableFactory =
181         new OutOfCoreIOCallableFactory(this, numIOThreads,
182             service.getGraphTaskManager().createUncaughtExceptionHandler());
183     this.ioScheduler = new OutOfCoreIOScheduler(conf, this, numIOThreads);
184     this.metaPartitionManager = new MetaPartitionManager(numIOThreads, this);
185     this.statistics = new OutOfCoreIOStatistics(conf, numIOThreads);
186     int maxPartitionsInMemory =
187         GiraphConstants.MAX_PARTITIONS_IN_MEMORY.get(conf);
188     Class<? extends OutOfCoreOracle> oracleClass =
189         GiraphConstants.OUT_OF_CORE_ORACLE.get(conf);
190     if (maxPartitionsInMemory != 0 &&
191         oracleClass != FixedPartitionsOracle.class) {
192       LOG.warn("OutOfCoreEngine: Max number of partitions in memory is set " +
193           "but the out-of-core oracle used is not tailored for fixed " +
194           "out-of-core policy. Setting the oracle to be FixedPartitionsOracle");
195       oracleClass = FixedPartitionsOracle.class;
196     }
197     this.numComputeThreads = conf.getNumComputeThreads();
198     // At the beginning of the execution, only input threads are processing data
199     this.numProcessingThreads = conf.getNumInputSplitsThreads();
200     this.activeThreadsPermit = new AdjustableSemaphore(numProcessingThreads);
201     this.maxRequestsCredit = (short)
202         CreditBasedFlowControl.MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER.get(conf);
203     this.superstep = BspService.INPUT_SUPERSTEP;
204     this.resetDone = false;
205     GiraphMetrics.get().addSuperstepResetObserver(this);
206     try {
207       Constructor<?> constructor = oracleClass.getConstructor(
208         ImmutableClassesGiraphConfiguration.class, OutOfCoreEngine.class);
209       this.oracle = (OutOfCoreOracle) constructor.newInstance(conf, this);
210     } catch (NoSuchMethodException | IllegalAccessException |
211       InstantiationException | InvocationTargetException e) {
212       throw new IllegalStateException("OutOfCoreEngine: caught exception " +
213         "while creating the oracle!", e);
214     }
215   }
216 
217   /**
218    * Initialize/Start the out-of-core engine.
219    */
220   public void initialize() {
221     dataAccessor.initialize();
222     oocIOCallableFactory.createCallable();
223   }
224 
225   /**
226    * Shutdown/Stop the out-of-core engine.
227    */
228   public void shutdown() {
229     if (LOG.isInfoEnabled()) {
230       LOG.info("shutdown: out-of-core engine shutting down, signalling IO " +
231           "threads to shutdown");
232     }
233     ioScheduler.shutdown();
234     oocIOCallableFactory.shutdown();
235     dataAccessor.shutdown();
236   }
237 
238   /**
239    * Get a reference to the server data
240    *
241    * @return ServerData
242    */
243   public ServerData getServerData() {
244     return service.getServerData();
245   }
246 
247   /**
248    * Get a reference to the service worker
249    *
250    * @return CentralizedServiceWorker
251    */
252   public CentralizedServiceWorker getServiceWorker() {
253     return service;
254   }
255 
256   /**
257    * Get a reference to IO scheduler
258    *
259    * @return OutOfCoreIOScheduler
260    */
261   public OutOfCoreIOScheduler getIOScheduler() {
262     return ioScheduler;
263   }
264 
265   /**
266    * Get a reference to meta partition information
267    *
268    * @return MetaPartitionManager
269    */
270   public MetaPartitionManager getMetaPartitionManager() {
271     return metaPartitionManager;
272   }
273 
274   /**
275    * Get a reference to superstep lock
276    *
277    * @return read/write lock used for global superstep lock
278    */
279   public ReadWriteLock getSuperstepLock() {
280     return superstepLock;
281   }
282 
283   /**
284    * Get a reference to IO statistics collector
285    *
286    * @return IO statistics collector
287    */
288   public OutOfCoreIOStatistics getIOStatistics() {
289     return statistics;
290   }
291 
292   /**
293    * Get a reference to out-of-core oracle
294    *
295    * @return out-of-core oracle
296    */
297   public OutOfCoreOracle getOracle() {
298     return oracle;
299   }
300 
301   /**
302    * Get the id of the next partition to process in the current iteration over
303    * all the partitions. If all partitions are already processed, this method
304    * returns null.
305    *
306    * @return id of a partition to process. 'null' if all partitions are
307    *         processed in current iteration over partitions.
308    */
309   public Integer getNextPartition() {
310     Integer partitionId;
311     synchronized (partitionAvailable) {
312       while ((partitionId = metaPartitionManager.getNextPartition()) == null) {
313         try {
314           if (LOG.isInfoEnabled()) {
315             LOG.info("getNextPartition: waiting until a partition becomes " +
316                 "available!");
317           }
318           partitionAvailable.wait(MSEC_TO_WAIT);
319         } catch (InterruptedException e) {
320           throw new IllegalStateException("getNextPartition: caught " +
321               "InterruptedException while waiting to retrieve a partition to " +
322               "process");
323         }
324       }
325       if (partitionId == MetaPartitionManager.NO_PARTITION_TO_PROCESS) {
326         partitionAvailable.notifyAll();
327         partitionId = null;
328       }
329     }
330     return partitionId;
331   }
332 
333   /**
334    * Notify out-of-core engine that processing of a particular partition is done
335    *
336    * @param partitionId id of the partition that its processing is done
337    */
338   public void doneProcessingPartition(int partitionId) {
339     metaPartitionManager.setPartitionIsProcessed(partitionId);
340     if (LOG.isInfoEnabled()) {
341       LOG.info("doneProcessingPartition: processing partition " + partitionId +
342           " is done!");
343     }
344   }
345 
346   /**
347    * Notify out-of-core engine that iteration cycle over all partitions is about
348    * to begin.
349    */
350   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
351       "UL_UNRELEASED_LOCK_EXCEPTION_PATH")
352   public void startIteration() {
353     oracle.startIteration();
354     if (!resetDone) {
355       superstepLock.writeLock().lock();
356       metaPartitionManager.resetPartitions();
357       superstepLock.writeLock().unlock();
358     }
359     if (superstep != BspServiceWorker.INPUT_SUPERSTEP &&
360         numProcessingThreads != numComputeThreads) {
361       // This method is only executed by the main thread, and at this point
362       // no other input/compute thread is alive. So, all the permits in
363       // `activeThreadsPermit` is available. However, now that we are changing
364       // the maximum number of active threads, we need to adjust the number
365       // of available permits on `activeThreadsPermit`.
366       activeThreadsPermit.setMaxPermits(activeThreadsPermit.availablePermits() *
367           numComputeThreads / numProcessingThreads);
368       numProcessingThreads = numComputeThreads;
369     }
370     if (LOG.isInfoEnabled()) {
371       LOG.info("startIteration: with " +
372           metaPartitionManager.getNumInMemoryPartitions() +
373           " partitions in memory and " +
374           activeThreadsPermit.availablePermits() + " active threads");
375     }
376     resetDone = false;
377   }
378 
379   /**
380    * Retrieve a particular partition. After this method is complete the
381    * requested partition should be in memory.
382    *
383    * @param partitionId id of the partition to retrieve
384    */
385   public void retrievePartition(int partitionId) {
386     if (metaPartitionManager.isPartitionOnDisk(partitionId)) {
387       ioScheduler.addIOCommand(new LoadPartitionIOCommand(this, partitionId,
388           superstep));
389       synchronized (partitionAvailable) {
390         while (metaPartitionManager.isPartitionOnDisk(partitionId)) {
391           try {
392             if (LOG.isInfoEnabled()) {
393               LOG.info("retrievePartition: waiting until partition " +
394                   partitionId + " becomes available");
395             }
396             partitionAvailable.wait();
397           } catch (InterruptedException e) {
398             throw new IllegalStateException("retrievePartition: caught " +
399                 "InterruptedException while waiting to retrieve partition " +
400                 partitionId);
401           }
402         }
403       }
404     }
405   }
406 
407   /**
408    * Notify out-of-core engine that an IO command is completed by an IO thread
409    *
410    * @param command the IO command that is completed
411    */
412   public void ioCommandCompleted(IOCommand command) {
413     oracle.commandCompleted(command);
414     if (command instanceof LoadPartitionIOCommand) {
415       // Notifying compute threads who are waiting for a partition to become
416       // available in memory to process.
417       synchronized (partitionAvailable) {
418         partitionAvailable.notifyAll();
419       }
420     }
421   }
422 
423   /**
424    * Update the fraction of processing threads that should remain active. It is
425    * the responsibility of out-of-core oracle to update the number of active
426    * threads.
427    *
428    * @param fraction the fraction of processing threads to remain active. This
429    *                 number is in range [0, 1]
430    */
431   public void updateActiveThreadsFraction(double fraction) {
432     checkState(fraction >= 0 && fraction <= 1);
433     int numActiveThreads = (int) (numProcessingThreads * fraction);
434     if (LOG.isInfoEnabled()) {
435       LOG.info("updateActiveThreadsFraction: updating the number of active " +
436           "threads to " + numActiveThreads);
437     }
438     activeThreadsPermit.setMaxPermits(numActiveThreads);
439   }
440 
441   /**
442    * A processing thread would check in with out-of-core engine every once in a
443    * while to make sure that it can still remain active. It is the
444    * responsibility of the out-of-core oracle to update the number of active
445    * threads in a way that the computation never fails, and yet achieve the
446    * optimal performance it can achieve.
447    */
448   public void activeThreadCheckIn() {
449     activeThreadsPermit.release();
450     try {
451       activeThreadsPermit.acquire();
452     } catch (InterruptedException e) {
453       LOG.error("activeThreadCheckIn: exception while acquiring a permit to " +
454           "remain an active thread");
455       throw new IllegalStateException(e);
456     }
457   }
458 
459   /**
460    * Notify the out-of-core engine that a processing (input/compute) thread has
461    * started.
462    */
463   public void processingThreadStart() {
464     try {
465       activeThreadsPermit.acquire();
466     } catch (InterruptedException e) {
467       LOG.error("processingThreadStart: exception while acquiring a permit to" +
468           " start the processing thread!");
469       throw new IllegalStateException(e);
470     }
471   }
472 
473   /**
474    * Notify the out-of-core engine that a processing (input/compute) thread has
475    * finished.
476    */
477   public void processingThreadFinish() {
478     activeThreadsPermit.release();
479   }
480 
481   /**
482    * Update the credit announced for this worker in Netty. The lower the credit
483    * is, the lower rate incoming messages arrive at this worker. Thus, credit
484    * is an indirect way of controlling amount of memory incoming messages would
485    * take.
486    *
487    * @param fraction the fraction of max credits others can use to send requests
488    *                 to this worker
489    */
490   public void updateRequestsCreditFraction(double fraction) {
491     checkState(fraction >= 0 && fraction <= 1);
492     short newCredit = (short) (maxRequestsCredit * fraction);
493     if (LOG.isInfoEnabled()) {
494       LOG.info("updateRequestsCreditFraction: updating the credit to " +
495           newCredit);
496     }
497     if (flowControl != null) {
498       ((CreditBasedFlowControl) flowControl).updateCredit(newCredit);
499     }
500   }
501 
502   /**
503    * Reset partitions and messages meta data. Also, reset the cached value of
504    * superstep counter.
505    */
506   public void reset() {
507     metaPartitionManager.resetPartitions();
508     metaPartitionManager.resetMessages();
509     superstep = service.getSuperstep();
510     resetDone = true;
511   }
512 
513   /**
514    * @return cached value of the superstep counter
515    */
516   public long getSuperstep() {
517     return superstep;
518   }
519 
520   /**
521    * Notify the out-of-core engine that a GC has just been completed
522    *
523    * @param info GC information
524    */
525   public void gcCompleted(GarbageCollectionNotificationInfo info) {
526     oracle.gcCompleted(info);
527   }
528 
529   @Override
530   public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
531     superstepMetrics.getGauge(GRAPH_PERCENTAGE_IN_MEMORY, new Gauge<Double>() {
532       @Override
533       public Double value() {
534         return metaPartitionManager.getGraphFractionInMemory() * 100;
535       }
536     });
537   }
538 
539   public FlowControl getFlowControl() {
540     return flowControl;
541   }
542 
543   public void setFlowControl(FlowControl flowControl) {
544     this.flowControl = flowControl;
545   }
546 
547   public OutOfCoreDataAccessor getDataAccessor() {
548     return dataAccessor;
549   }
550 
551   public NetworkMetrics getNetworkMetrics() {
552     return networkMetrics;
553   }
554 }