View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.ooc;
20  
21  import com.sun.management.GarbageCollectionNotificationInfo;
22  import com.yammer.metrics.core.Gauge;
23  import org.apache.giraph.bsp.BspService;
24  import org.apache.giraph.bsp.CentralizedServiceWorker;
25  import org.apache.giraph.comm.ServerData;
26  import org.apache.giraph.comm.flow_control.FlowControl;
27  import org.apache.giraph.conf.GiraphConstants;
28  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
29  import org.apache.giraph.metrics.GiraphMetrics;
30  import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
31  import org.apache.giraph.metrics.SuperstepMetricsRegistry;
32  import org.apache.giraph.ooc.data.MetaPartitionManager;
33  import org.apache.giraph.ooc.command.IOCommand;
34  import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
35  import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
36  import org.apache.giraph.ooc.policy.FixedPartitionsOracle;
37  import org.apache.giraph.ooc.policy.OutOfCoreOracle;
38  import org.apache.giraph.utils.AdjustableSemaphore;
39  import org.apache.giraph.worker.BspServiceWorker;
40  import org.apache.log4j.Logger;
41  
42  import java.lang.reflect.Constructor;
43  import java.lang.reflect.InvocationTargetException;
44  import java.util.concurrent.locks.ReadWriteLock;
45  import java.util.concurrent.locks.ReentrantReadWriteLock;
46  
47  import static com.google.common.base.Preconditions.checkState;
48  
49  /**
50   * Class to represent an out-of-core engine.
51   */
52  public class OutOfCoreEngine implements ResetSuperstepMetricsObserver {
53    /**
54     * Number of 'units of processing' after which an active thread should
55     * check-in with the out-of-core engine in order to re-claim its permission to
56     * stay active. For a compute thread, the 'unit of processing' is processing
57     * of one vertex, and for an input thread, the 'unit of processing' is reading
58     * a row of input data.
59     */
60    public static final int CHECK_IN_INTERVAL = (1 << 10) - 1;
61    /** Name of metric for percentage of graph on disk */
62    public static final String GRAPH_PERCENTAGE_IN_MEMORY = "ooc-graph-in-mem-%";
63    /** Class logger. */
64    private static final Logger LOG = Logger.getLogger(OutOfCoreEngine.class);
65    /**
66     * When getting partitions, how many milliseconds to wait if no partition was
67     * available in memory
68     */
69    private static final long MSEC_TO_WAIT = 10000;
70    /** Service worker */
71    private final CentralizedServiceWorker<?, ?, ?> service;
72    /** Flow control used in sending requests */
73    private FlowControl flowControl;
74    /** Scheduler for IO threads */
75    private final OutOfCoreIOScheduler ioScheduler;
76    /** Data structure to keep meta partition information */
77    private final MetaPartitionManager metaPartitionManager;
78    /** Out-of-core oracle (brain of out-of-core mechanism) */
79    private final OutOfCoreOracle oracle;
80    /** IO statistics collector */
81    private final OutOfCoreIOStatistics statistics;
82    /**
83     * Global lock for entire superstep. This lock helps to avoid overlapping of
84     * out-of-core decisions (what to do next to help the out-of-core mechanism)
85     * with out-of-core operations (actual IO operations).
86     */
87    private final ReadWriteLock superstepLock = new ReentrantReadWriteLock();
88    /** Data accessor object (DAO) used as persistence layer in out-of-core */
89    private final OutOfCoreDataAccessor dataAccessor;
90    /** Callable factory for IO threads */
91    private final OutOfCoreIOCallableFactory oocIOCallableFactory;
92    /**
93     * Dummy object to wait on until a partition becomes available in memory
94     * for processing
95     */
96    private final Object partitionAvailable = new Object();
97    /** How many compute threads do we have? */
98    private int numComputeThreads;
99    /** How many threads (input/compute) are processing data? */
100   private volatile int numProcessingThreads;
101   /** Semaphore used for controlling number of active threads at each moment */
102   private final AdjustableSemaphore activeThreadsPermit;
103   /**
104    * Generally, the logic in Giraph for change of the superstep happens in the
105    * following order:
106    *   (1) Compute threads are done processing all partitions
107    *   (2) Superstep number increases
108    *   (3) New message store is created and message stores are prepared
109    *   (4) Iteration over partitions starts
110    * Note that there are other operations happening at the same time as well as
111    * the above operations, but the above operations are the ones which may
112    * interfere with out-of-core operations. The goal of `superstepLock` is to
113    * isolate operations 2, 3, and 4 from the rest of computations and IO
114    * operations. Specifically, increasing the superstep counter (operation 2)
115    * should be exclusive and no IO operation should happen at the same time.
116    * This is due to the fact that prefetching mechanism uses superstep counter
117    * as a mean to identify which data should be read. That being said, superstep
118    * counter should be cached in out-of-core engine, and all IO operations and
119    * out-of-core logic should access superstep counter through this cached
120    * value.
121    */
122   private long superstep;
123   /**
124    * Generally, the logic of a graph computations happens in the following order
125    * with respect to `startIteration` and `reset` method:
126    * ...
127    * startIteration (for moving edges)
128    * ...
129    * reset (to prepare messages/partitions for superstep 0)
130    * ...
131    * startIteration (superstep 0)
132    * ...
133    * reset (to prepare messages/partitions for superstep 1)
134    * ...
135    *
136    * However, in the unit tests, we usually consider only one superstep (usually
137    * INPUT_SUPERSTEP), and we move through partitions multiple times. Out-of-
138    * core mechanism works only if partitions are reset in a proper way. So,
139    * we keep the following flag to reset partitions if necessary.
140    */
141   private boolean resetDone;
142 
143   /**
144    * Constructor
145    *
146    * @param conf Configuration
147    * @param service Service worker
148    */
149   public OutOfCoreEngine(ImmutableClassesGiraphConfiguration<?, ?, ?> conf,
150                          CentralizedServiceWorker<?, ?, ?> service) {
151     this.service = service;
152     Class<? extends OutOfCoreDataAccessor> accessorClass =
153         GiraphConstants.OUT_OF_CORE_DATA_ACCESSOR.get(conf);
154     try {
155       Constructor<?> constructor = accessorClass.getConstructor(
156           ImmutableClassesGiraphConfiguration.class);
157       this.dataAccessor = (OutOfCoreDataAccessor) constructor.newInstance(conf);
158     } catch (NoSuchMethodException | InstantiationException |
159         InvocationTargetException | IllegalAccessException e) {
160       throw new IllegalStateException("OutOfCoreEngine: caught exception " +
161           "while creating the data accessor instance!", e);
162     }
163     int numIOThreads = dataAccessor.getNumAccessorThreads();
164     this.oocIOCallableFactory =
165         new OutOfCoreIOCallableFactory(this, numIOThreads,
166             service.getGraphTaskManager().createUncaughtExceptionHandler());
167     this.ioScheduler = new OutOfCoreIOScheduler(conf, this, numIOThreads);
168     this.metaPartitionManager = new MetaPartitionManager(numIOThreads, this);
169     this.statistics = new OutOfCoreIOStatistics(conf, numIOThreads);
170     int maxPartitionsInMemory =
171         GiraphConstants.MAX_PARTITIONS_IN_MEMORY.get(conf);
172     Class<? extends OutOfCoreOracle> oracleClass =
173         GiraphConstants.OUT_OF_CORE_ORACLE.get(conf);
174     if (maxPartitionsInMemory != 0 &&
175         oracleClass != FixedPartitionsOracle.class) {
176       LOG.warn("OutOfCoreEngine: Max number of partitions in memory is set " +
177           "but the out-of-core oracle used is not tailored for fixed " +
178           "out-of-core policy. Setting the oracle to be FixedPartitionsOracle");
179       oracleClass = FixedPartitionsOracle.class;
180     }
181     try {
182       Constructor<?> constructor = oracleClass.getConstructor(
183           ImmutableClassesGiraphConfiguration.class, OutOfCoreEngine.class);
184       this.oracle = (OutOfCoreOracle) constructor.newInstance(conf, this);
185     } catch (NoSuchMethodException | IllegalAccessException |
186         InstantiationException | InvocationTargetException e) {
187       throw new IllegalStateException("OutOfCoreEngine: caught exception " +
188           "while creating the oracle!", e);
189     }
190     this.numComputeThreads = conf.getNumComputeThreads();
191     // At the beginning of the execution, only input threads are processing data
192     this.numProcessingThreads = conf.getNumInputSplitsThreads();
193     this.activeThreadsPermit = new AdjustableSemaphore(numProcessingThreads);
194     this.superstep = BspService.INPUT_SUPERSTEP;
195     this.resetDone = false;
196     GiraphMetrics.get().addSuperstepResetObserver(this);
197   }
198 
199   /**
200    * Initialize/Start the out-of-core engine.
201    */
202   public void initialize() {
203     dataAccessor.initialize();
204     oocIOCallableFactory.createCallable();
205   }
206 
207   /**
208    * Shutdown/Stop the out-of-core engine.
209    */
210   public void shutdown() {
211     if (LOG.isInfoEnabled()) {
212       LOG.info("shutdown: out-of-core engine shutting down, signalling IO " +
213           "threads to shutdown");
214     }
215     ioScheduler.shutdown();
216     oocIOCallableFactory.shutdown();
217     dataAccessor.shutdown();
218   }
219 
220   /**
221    * Get a reference to the server data
222    *
223    * @return ServerData
224    */
225   public ServerData getServerData() {
226     return service.getServerData();
227   }
228 
229   /**
230    * Get a reference to the service worker
231    *
232    * @return CentralizedServiceWorker
233    */
234   public CentralizedServiceWorker getServiceWorker() {
235     return service;
236   }
237 
238   /**
239    * Get a reference to IO scheduler
240    *
241    * @return OutOfCoreIOScheduler
242    */
243   public OutOfCoreIOScheduler getIOScheduler() {
244     return ioScheduler;
245   }
246 
247   /**
248    * Get a reference to meta partition information
249    *
250    * @return MetaPartitionManager
251    */
252   public MetaPartitionManager getMetaPartitionManager() {
253     return metaPartitionManager;
254   }
255 
256   /**
257    * Get a reference to superstep lock
258    *
259    * @return read/write lock used for global superstep lock
260    */
261   public ReadWriteLock getSuperstepLock() {
262     return superstepLock;
263   }
264 
265   /**
266    * Get a reference to IO statistics collector
267    *
268    * @return IO statistics collector
269    */
270   public OutOfCoreIOStatistics getIOStatistics() {
271     return statistics;
272   }
273 
274   /**
275    * Get a reference to out-of-core oracle
276    *
277    * @return out-of-core oracle
278    */
279   public OutOfCoreOracle getOracle() {
280     return oracle;
281   }
282 
283   /**
284    * Get the id of the next partition to process in the current iteration over
285    * all the partitions. If all partitions are already processed, this method
286    * returns null.
287    *
288    * @return id of a partition to process. 'null' if all partitions are
289    *         processed in current iteration over partitions.
290    */
291   public Integer getNextPartition() {
292     Integer partitionId;
293     synchronized (partitionAvailable) {
294       while ((partitionId = metaPartitionManager.getNextPartition()) == null) {
295         try {
296           if (LOG.isInfoEnabled()) {
297             LOG.info("getNextPartition: waiting until a partition becomes " +
298                 "available!");
299           }
300           partitionAvailable.wait(MSEC_TO_WAIT);
301         } catch (InterruptedException e) {
302           throw new IllegalStateException("getNextPartition: caught " +
303               "InterruptedException while waiting to retrieve a partition to " +
304               "process");
305         }
306       }
307       if (partitionId == MetaPartitionManager.NO_PARTITION_TO_PROCESS) {
308         partitionAvailable.notifyAll();
309         partitionId = null;
310       }
311     }
312     return partitionId;
313   }
314 
315   /**
316    * Notify out-of-core engine that processing of a particular partition is done
317    *
318    * @param partitionId id of the partition that its processing is done
319    */
320   public void doneProcessingPartition(int partitionId) {
321     metaPartitionManager.setPartitionIsProcessed(partitionId);
322     if (LOG.isInfoEnabled()) {
323       LOG.info("doneProcessingPartition: processing partition " + partitionId +
324           " is done!");
325     }
326   }
327 
328   /**
329    * Notify out-of-core engine that iteration cycle over all partitions is about
330    * to begin.
331    */
332   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
333       "UL_UNRELEASED_LOCK_EXCEPTION_PATH")
334   public void startIteration() {
335     if (!resetDone) {
336       superstepLock.writeLock().lock();
337       metaPartitionManager.resetPartitions();
338       superstepLock.writeLock().unlock();
339     }
340     if (superstep != BspServiceWorker.INPUT_SUPERSTEP &&
341         numProcessingThreads != numComputeThreads) {
342       // This method is only executed by the main thread, and at this point
343       // no other input/compute thread is alive. So, all the permits in
344       // `activeThreadsPermit` is available. However, now that we are changing
345       // the maximum number of active threads, we need to adjust the number
346       // of available permits on `activeThreadsPermit`.
347       activeThreadsPermit.setMaxPermits(activeThreadsPermit.availablePermits() *
348           numComputeThreads / numProcessingThreads);
349       numProcessingThreads = numComputeThreads;
350     }
351     if (LOG.isInfoEnabled()) {
352       LOG.info("startIteration: with " +
353           metaPartitionManager.getNumInMemoryPartitions() +
354           " partitions in memory and " +
355           activeThreadsPermit.availablePermits() + " active threads");
356     }
357     resetDone = false;
358   }
359 
360   /**
361    * Retrieve a particular partition. After this method is complete the
362    * requested partition should be in memory.
363    *
364    * @param partitionId id of the partition to retrieve
365    */
366   public void retrievePartition(int partitionId) {
367     if (metaPartitionManager.isPartitionOnDisk(partitionId)) {
368       ioScheduler.addIOCommand(new LoadPartitionIOCommand(this, partitionId,
369           superstep));
370       synchronized (partitionAvailable) {
371         while (metaPartitionManager.isPartitionOnDisk(partitionId)) {
372           try {
373             if (LOG.isInfoEnabled()) {
374               LOG.info("retrievePartition: waiting until partition " +
375                   partitionId + " becomes available");
376             }
377             partitionAvailable.wait();
378           } catch (InterruptedException e) {
379             throw new IllegalStateException("retrievePartition: caught " +
380                 "InterruptedException while waiting to retrieve partition " +
381                 partitionId);
382           }
383         }
384       }
385     }
386   }
387 
388   /**
389    * Notify out-of-core engine that an IO command is completed by an IO thread
390    *
391    * @param command the IO command that is completed
392    */
393   public void ioCommandCompleted(IOCommand command) {
394     oracle.commandCompleted(command);
395     if (command instanceof LoadPartitionIOCommand) {
396       // Notifying compute threads who are waiting for a partition to become
397       // available in memory to process.
398       synchronized (partitionAvailable) {
399         partitionAvailable.notifyAll();
400       }
401     }
402   }
403 
404   /**
405    * Update the fraction of processing threads that should remain active. It is
406    * the responsibility of out-of-core oracle to update the number of active
407    * threads.
408    *
409    * @param fraction the fraction of processing threads to remain active. This
410    *                 number is in range [0, 1]
411    */
412   public void updateActiveThreadsFraction(double fraction) {
413     checkState(fraction >= 0 && fraction <= 1);
414     int numActiveThreads = (int) (numProcessingThreads * fraction);
415     if (LOG.isInfoEnabled()) {
416       LOG.info("updateActiveThreadsFraction: updating the number of active " +
417           "threads to " + numActiveThreads);
418     }
419     activeThreadsPermit.setMaxPermits(numActiveThreads);
420   }
421 
422   /**
423    * A processing thread would check in with out-of-core engine every once in a
424    * while to make sure that it can still remain active. It is the
425    * responsibility of the out-of-core oracle to update the number of active
426    * threads in a way that the computation never fails, and yet achieve the
427    * optimal performance it can achieve.
428    */
429   public void activeThreadCheckIn() {
430     activeThreadsPermit.release();
431     try {
432       activeThreadsPermit.acquire();
433     } catch (InterruptedException e) {
434       LOG.error("activeThreadCheckIn: exception while acquiring a permit to " +
435           "remain an active thread");
436       throw new IllegalStateException(e);
437     }
438   }
439 
440   /**
441    * Notify the out-of-core engine that a processing (input/compute) thread has
442    * started.
443    */
444   public void processingThreadStart() {
445     try {
446       activeThreadsPermit.acquire();
447     } catch (InterruptedException e) {
448       LOG.error("processingThreadStart: exception while acquiring a permit to" +
449           " start the processing thread!");
450       throw new IllegalStateException(e);
451     }
452   }
453 
454   /**
455    * Notify the out-of-core engine that a processing (input/compute) thread has
456    * finished.
457    */
458   public void processingThreadFinish() {
459     activeThreadsPermit.release();
460   }
461 
462   /**
463    * Reset partitions and messages meta data. Also, reset the cached value of
464    * superstep counter.
465    */
466   public void reset() {
467     metaPartitionManager.resetPartitions();
468     metaPartitionManager.resetMessages();
469     superstep = service.getSuperstep();
470     resetDone = true;
471   }
472 
473   /**
474    * @return cached value of the superstep counter
475    */
476   public long getSuperstep() {
477     return superstep;
478   }
479 
480   /**
481    * Notify the out-of-core engine that a GC has just been completed
482    *
483    * @param info GC information
484    */
485   public void gcCompleted(GarbageCollectionNotificationInfo info) {
486     oracle.gcCompleted(info);
487   }
488 
489   @Override
490   public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
491     superstepMetrics.getGauge(GRAPH_PERCENTAGE_IN_MEMORY, new Gauge<Double>() {
492       @Override
493       public Double value() {
494         return metaPartitionManager.getGraphFractionInMemory() * 100;
495       }
496     });
497   }
498 
499   public FlowControl getFlowControl() {
500     return flowControl;
501   }
502 
503   public void setFlowControl(FlowControl flowControl) {
504     this.flowControl = flowControl;
505   }
506 
507   public OutOfCoreDataAccessor getDataAccessor() {
508     return dataAccessor;
509   }
510 }