This project has retired. For details please refer to its Attic page.
DiskBackedDataStore xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.ooc.data;
20  
21  import com.google.common.collect.Maps;
22  import com.google.common.collect.Sets;
23  import org.apache.commons.lang3.tuple.MutablePair;
24  import org.apache.commons.lang3.tuple.Pair;
25  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
26  import org.apache.giraph.conf.IntConfOption;
27  import org.apache.giraph.ooc.OutOfCoreEngine;
28  import org.apache.giraph.ooc.persistence.DataIndex;
29  import org.apache.giraph.ooc.persistence.DataIndex.NumericIndexEntry;
30  import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
31  import org.apache.log4j.Logger;
32  
33  import java.io.DataInput;
34  import java.io.DataOutput;
35  import java.io.IOException;
36  import java.util.ArrayList;
37  import java.util.HashSet;
38  import java.util.List;
39  import java.util.Map;
40  import java.util.Set;
41  import java.util.concurrent.ConcurrentMap;
42  import java.util.concurrent.locks.ReadWriteLock;
43  import java.util.concurrent.locks.ReentrantReadWriteLock;
44  
45  import static com.google.common.base.Preconditions.checkNotNull;
46  import static com.google.common.base.Preconditions.checkState;
47  import static org.apache.giraph.conf.GiraphConstants.ONE_MB;
48  
49  /**
50   * This class provides basic operations for data structures that have to
51   * participate in out-of-core mechanism. Essential subclasses of this class are:
52   *  - DiskBackedPartitionStore (for partition data)
53   *  - DiskBackedMessageStore (for messages)
54   *  - DiskBackedEdgeStore (for edges read in INPUT_SUPERSTEP)
55   * Basically, any data structure that may cause OOM to happen can be implemented
56   * as a subclass of this class.
57   *
58   * There are two different terms used in the rest of this class:
59   *  - "data store" refers to in-memory representation of data. Usually this is
60   *    stored per-partition in in-memory implementations of data structures. For
61   *    instance, "data store" of a DiskBackedPartitionStore would collection of
62   *    all partitions kept in the in-memory partition store within the
63   *    DiskBackedPartitionStore.
64   *  - "raw data buffer" refers to raw data which were supposed to be
65   *    de-serialized and added to the data store, but they remain 'as is' in the
66   *    memory because their corresponding partition is offloaded to disk and is
67   *    not available in the data store.
68   *
69   * @param <T> raw data format of the data store subclassing this class
70   */
71  public abstract class DiskBackedDataStore<T> {
72    /**
73     * Minimum size of a buffer (in bytes) to flush to disk. This is used to
74     * decide whether vertex/edge buffers are large enough to flush to disk.
75     */
76    public static final IntConfOption MINIMUM_BUFFER_SIZE_TO_FLUSH =
77        new IntConfOption("giraph.flushBufferSize", 8 * ONE_MB,
78            "Minimum size of a buffer (in bytes) to flush to disk.");
79  
80    /** Class logger. */
81    private static final Logger LOG = Logger.getLogger(
82        DiskBackedDataStore.class);
83    /** Out-of-core engine */
84    protected final OutOfCoreEngine oocEngine;
85    /**
86     * Set containing ids of all partitions where the partition data is in some
87     * file on disk.
88     * Note that the out-of-core mechanism may decide to put the data for a
89     * partition on disk, while the partition data is empty. For instance, at the
90     * beginning of a superstep, out-of-core mechanism may decide to put incoming
91     * messages of a partition on disk, while the partition has not received any
92     * messages. In such scenarios, the "out-of-core mechanism" thinks that the
93     * partition data is on disk, while disk-backed data stores may want to
94     * optimize for IO/metadata accesses and decide not to create/write anything
95     * on files on disk.
96     * In summary, there is a subtle difference between this field and
97     * `hasPartitionOnDisk` field. Basically, this field is used for optimizing
98     * IO (mainly metadata) accesses by disk-backed stores, while
99     * `hasPartitionDataOnDisk` is the view that out-of-core mechanism has
100    * regarding partition storage statuses. Since out-of-core mechanism does not
101    * know about the actual data for a partition, these two fields have to be
102    * separate.
103    */
104   protected final Set<Integer> hasPartitionDataOnFile =
105       Sets.newConcurrentHashSet();
106   /** Cached value for MINIMUM_BUFFER_SIZE_TO_FLUSH */
107   private final int minBufferSizeToOffload;
108   /** Set containing ids of all out-of-core partitions */
109   private final Set<Integer> hasPartitionDataOnDisk =
110       Sets.newConcurrentHashSet();
111   /**
112    * Map of partition ids to list of raw data buffers. The map will have entries
113    * only for partitions that their in-memory data structures are currently
114    * offloaded to disk. We keep the aggregate size of buffers for each partition
115    * as part of the values in the map to estimate how much memory we can free up
116    * if we offload data buffers of a particular partition to disk.
117    */
118   private final ConcurrentMap<Integer, Pair<Integer, List<T>>> dataBuffers =
119       Maps.newConcurrentMap();
120   /**
121    * Map of partition ids to number of raw data buffers offloaded to disk for
122    * each partition. The map will have entries only for partitions that their
123    * in-memory data structures are currently out of core. It is necessary to
124    * know the number of data buffers on disk for a particular partition when we
125    * are loading all these buffers back in memory.
126    */
127   private final ConcurrentMap<Integer, Integer> numDataBuffersOnDisk =
128       Maps.newConcurrentMap();
129   /**
130    * Lock to avoid overlapping of read and write on data associated with each
131    * partition.
132    * */
133   private final ConcurrentMap<Integer, ReadWriteLock> locks =
134       Maps.newConcurrentMap();
135 
136   /**
137    * Constructor.
138    *
139    * @param conf Configuration
140    * @param oocEngine Out-of-core engine
141    */
142   DiskBackedDataStore(ImmutableClassesGiraphConfiguration conf,
143                       OutOfCoreEngine oocEngine) {
144     this.minBufferSizeToOffload = MINIMUM_BUFFER_SIZE_TO_FLUSH.get(conf);
145     this.oocEngine = oocEngine;
146   }
147 
148   /**
149    * Retrieves a lock for a given partition. If the lock for the given partition
150    * does not exist, creates a new lock.
151    *
152    * @param partitionId id of the partition the lock is needed for
153    * @return lock for a given partition
154    */
155   private ReadWriteLock getPartitionLock(int partitionId) {
156     ReadWriteLock readWriteLock = locks.get(partitionId);
157     if (readWriteLock == null) {
158       readWriteLock = new ReentrantReadWriteLock();
159       ReadWriteLock temp = locks.putIfAbsent(partitionId, readWriteLock);
160       if (temp != null) {
161         readWriteLock = temp;
162       }
163     }
164     return readWriteLock;
165   }
166 
167   /**
168    * Adds a data entry for a given partition to the current data store. If data
169    * of a given partition in data store is already offloaded to disk, adds the
170    * data entry to appropriate raw data buffer list.
171    *
172    * @param partitionId id of the partition to add the data entry to
173    * @param entry data entry to add
174    */
175   protected void addEntry(int partitionId, T entry) {
176     // Addition of data entries to a data store is much more common than
177     // out-of-core operations. Besides, in-memory data store implementations
178     // existing in the code base already account for parallel addition to data
179     // stores. Therefore, using read lock would optimize for parallel addition
180     // to data stores, specially for cases where the addition should happen for
181     // partitions that are entirely in memory.
182     ReadWriteLock rwLock = getPartitionLock(partitionId);
183     rwLock.readLock().lock();
184     if (hasPartitionDataOnDisk.contains(partitionId)) {
185       List<T> entryList = new ArrayList<>();
186       entryList.add(entry);
187       int entrySize = entrySerializedSize(entry);
188       MutablePair<Integer, List<T>> newPair =
189           new MutablePair<>(entrySize, entryList);
190       Pair<Integer, List<T>> oldPair =
191           dataBuffers.putIfAbsent(partitionId, newPair);
192       if (oldPair != null) {
193         synchronized (oldPair) {
194           newPair = (MutablePair<Integer, List<T>>) oldPair;
195           newPair.setLeft(oldPair.getLeft() + entrySize);
196           newPair.getRight().add(entry);
197         }
198       }
199     } else {
200       addEntryToInMemoryPartitionData(partitionId, entry);
201     }
202     rwLock.readLock().unlock();
203   }
204 
205   /**
206    * Loads and assembles all data for a given partition, and put it into the
207    * data store. Returns the number of bytes transferred from disk to memory in
208    * the loading process.
209    *
210    * @param partitionId id of the partition to load and assemble all data for
211    * @return number of bytes loaded from disk to memory
212    * @throws IOException
213    */
214   public abstract long loadPartitionData(int partitionId) throws IOException;
215 
216   /**
217    * The proxy method that does the actual operation for `loadPartitionData`,
218    * but uses the data index given by the caller.
219    *
220    * @param partitionId id of the partition to load and assemble all data for
221    * @param index data index chain for the data to load
222    * @return number of bytes loaded from disk to memory
223    * @throws IOException
224    */
225   protected long loadPartitionDataProxy(int partitionId, DataIndex index)
226       throws IOException {
227     long numBytes = 0;
228     ReadWriteLock rwLock = getPartitionLock(partitionId);
229     rwLock.writeLock().lock();
230     if (hasPartitionDataOnDisk.contains(partitionId)) {
231       int ioThreadId =
232           oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId);
233       numBytes += loadInMemoryPartitionData(partitionId, ioThreadId,
234           index.addIndex(NumericIndexEntry.createPartitionEntry(partitionId)));
235       hasPartitionDataOnDisk.remove(partitionId);
236       // Loading raw data buffers from disk if there is any and applying those
237       // to already loaded in-memory data.
238       Integer numBuffers = numDataBuffersOnDisk.remove(partitionId);
239       if (numBuffers != null) {
240         checkState(numBuffers > 0);
241         index.addIndex(DataIndex.TypeIndexEntry.BUFFER);
242         OutOfCoreDataAccessor.DataInputWrapper inputWrapper =
243             oocEngine.getDataAccessor().prepareInput(ioThreadId, index.copy());
244         DataInput dataInput = inputWrapper.getDataInput();
245         for (int i = 0; i < numBuffers; ++i) {
246           T entry = readNextEntry(dataInput);
247           addEntryToInMemoryPartitionData(partitionId, entry);
248         }
249         numBytes += inputWrapper.finalizeInput(true);
250         index.removeLastIndex();
251       }
252       index.removeLastIndex();
253       // Applying in-memory raw data buffers to in-memory partition data.
254       Pair<Integer, List<T>> pair = dataBuffers.remove(partitionId);
255       if (pair != null) {
256         for (T entry : pair.getValue()) {
257           addEntryToInMemoryPartitionData(partitionId, entry);
258         }
259       }
260     }
261     rwLock.writeLock().unlock();
262     return numBytes;
263   }
264 
265   /**
266    * Offloads partition data of a given partition in the data store to disk, and
267    * returns the number of bytes offloaded from memory to disk.
268    *
269    * @param partitionId id of the partition to offload its data
270    * @return number of bytes offloaded from memory to disk
271    * @throws IOException
272    */
273   public abstract long offloadPartitionData(int partitionId) throws IOException;
274 
275   /**
276    * The proxy method that does the actual operation for `offloadPartitionData`,
277    * but uses the data index given by the caller.
278    *
279    * @param partitionId id of the partition to offload its data
280    * @param index data index chain for the data to offload
281    * @return number of bytes offloaded from memory to disk
282    * @throws IOException
283    */
284   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
285       "UL_UNRELEASED_LOCK_EXCEPTION_PATH")
286   protected long offloadPartitionDataProxy(
287       int partitionId, DataIndex index) throws IOException {
288     ReadWriteLock rwLock = getPartitionLock(partitionId);
289     rwLock.writeLock().lock();
290     hasPartitionDataOnDisk.add(partitionId);
291     rwLock.writeLock().unlock();
292     int ioThreadId =
293         oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId);
294     long numBytes = offloadInMemoryPartitionData(partitionId, ioThreadId,
295         index.addIndex(NumericIndexEntry.createPartitionEntry(partitionId)));
296     index.removeLastIndex();
297     return numBytes;
298   }
299 
300   /**
301    * Offloads raw data buffers of a given partition to disk, and returns the
302    * number of bytes offloaded from memory to disk.
303    *
304    * @param partitionId id of the partition to offload its raw data buffers
305    * @return number of bytes offloaded from memory to disk
306    * @throws IOException
307    */
308   public abstract long offloadBuffers(int partitionId) throws IOException;
309 
310   /**
311    * The proxy method that does the actual operation for `offloadBuffers`,
312    * but uses the data index given by the caller.
313    *
314    * @param partitionId id of the partition to offload its raw data buffers
315    * @param index data index chain for the data to offload its buffers
316    * @return number of bytes offloaded from memory to disk
317    * @throws IOException
318    */
319   protected long offloadBuffersProxy(int partitionId, DataIndex index)
320       throws IOException {
321     Pair<Integer, List<T>> pair = dataBuffers.get(partitionId);
322     if (pair == null || pair.getLeft() < minBufferSizeToOffload) {
323       return 0;
324     }
325     ReadWriteLock rwLock = getPartitionLock(partitionId);
326     rwLock.writeLock().lock();
327     pair = dataBuffers.remove(partitionId);
328     rwLock.writeLock().unlock();
329     checkNotNull(pair);
330     checkState(!pair.getRight().isEmpty());
331     int ioThreadId =
332         oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId);
333     index.addIndex(NumericIndexEntry.createPartitionEntry(partitionId))
334         .addIndex(DataIndex.TypeIndexEntry.BUFFER);
335     OutOfCoreDataAccessor.DataOutputWrapper outputWrapper =
336         oocEngine.getDataAccessor().prepareOutput(ioThreadId, index.copy(),
337             true);
338     for (T entry : pair.getRight()) {
339       writeEntry(entry, outputWrapper.getDataOutput());
340     }
341     long numBytes = outputWrapper.finalizeOutput();
342     index.removeLastIndex().removeLastIndex();
343     int numBuffers = pair.getRight().size();
344     Integer oldNumBuffersOnDisk =
345         numDataBuffersOnDisk.putIfAbsent(partitionId, numBuffers);
346     if (oldNumBuffersOnDisk != null) {
347       numDataBuffersOnDisk.replace(partitionId,
348           oldNumBuffersOnDisk + numBuffers);
349     }
350     return numBytes;
351   }
352 
353   /**
354    * Looks through all partitions that their data is not in the data store (is
355    * offloaded to disk), and sees if any of them has enough raw data buffer in
356    * memory. If so, puts that partition in a list to return.
357    *
358    * @param ioThreadId Id of the IO thread who would offload the buffers
359    * @return Set of partition ids of all partition raw buffers where the
360    *         aggregate size of buffers are large enough and it is worth flushing
361    *         those buffers to disk
362    */
363   public Set<Integer> getCandidateBuffersToOffload(int ioThreadId) {
364     Set<Integer> result = new HashSet<>();
365     for (Map.Entry<Integer, Pair<Integer, List<T>>> entry :
366         dataBuffers.entrySet()) {
367       int partitionId = entry.getKey();
368       long aggregateBufferSize = entry.getValue().getLeft();
369       if (aggregateBufferSize > minBufferSizeToOffload &&
370           oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId) ==
371               ioThreadId) {
372         result.add(partitionId);
373       }
374     }
375     return result;
376   }
377 
378   /**
379    * Writes a single raw entry to a given output stream.
380    *
381    * @param entry entry to write to output
382    * @param out output stream to write the entry to
383    * @throws IOException
384    */
385   protected abstract void writeEntry(T entry, DataOutput out)
386       throws IOException;
387 
388   /**
389    * Reads the next available raw entry from a given input stream.
390    *
391    * @param in input stream to read the entry from
392    * @return entry read from an input stream
393    * @throws IOException
394    */
395   protected abstract T readNextEntry(DataInput in) throws IOException;
396 
397   /**
398    * Loads data of a partition into data store. Returns number of bytes loaded.
399    *
400    * @param partitionId id of the partition to load its data
401    * @param ioThreadId id of the IO thread performing the load
402    * @param index data index chain for the data to load
403    * @return number of bytes loaded from disk to memory
404    * @throws IOException
405    */
406   protected abstract long loadInMemoryPartitionData(
407       int partitionId, int ioThreadId, DataIndex index) throws IOException;
408 
409   /**
410    * Offloads data of a partition in data store to disk. Returns the number of
411    * bytes offloaded to disk
412    *
413    * @param partitionId id of the partition to offload to disk
414    * @param ioThreadId id of the IO thread performing the offload
415    * @param index data index chain for the data to offload
416    * @return number of bytes offloaded from memory to disk
417    * @throws IOException
418    */
419   protected abstract long offloadInMemoryPartitionData(
420       int partitionId, int ioThreadId, DataIndex index) throws IOException;
421 
422   /**
423    * Gets the size of a given entry in bytes.
424    *
425    * @param entry input entry to find its size
426    * @return size of given input entry in bytes
427    */
428   protected abstract int entrySerializedSize(T entry);
429 
430   /**
431    * Adds a single entry for a given partition to the in-memory data store.
432    *
433    * @param partitionId id of the partition to add the data to
434    * @param entry input entry to add to the data store
435    */
436   protected abstract void addEntryToInMemoryPartitionData(int partitionId,
437                                                           T entry);
438 }