1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.ooc.data;
2021import com.google.common.collect.Maps;
22import com.google.common.collect.Sets;
23import org.apache.commons.lang3.tuple.MutablePair;
24import org.apache.commons.lang3.tuple.Pair;
25import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
26import org.apache.giraph.conf.IntConfOption;
27import org.apache.giraph.ooc.OutOfCoreEngine;
28import org.apache.giraph.ooc.persistence.DataIndex;
29import org.apache.giraph.ooc.persistence.DataIndex.NumericIndexEntry;
30import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
31import org.apache.log4j.Logger;
3233import java.io.DataInput;
34import java.io.DataOutput;
35import java.io.IOException;
36import java.util.ArrayList;
37import java.util.HashSet;
38import java.util.List;
39import java.util.Map;
40import java.util.Set;
41import java.util.concurrent.ConcurrentMap;
42import java.util.concurrent.locks.ReadWriteLock;
43import java.util.concurrent.locks.ReentrantReadWriteLock;
4445importstatic com.google.common.base.Preconditions.checkNotNull;
46importstatic com.google.common.base.Preconditions.checkState;
47importstatic org.apache.giraph.conf.GiraphConstants.ONE_MB;
4849/**50 * This class provides basic operations for data structures that have to51 * participate in out-of-core mechanism. Essential subclasses of this class are:52 * - DiskBackedPartitionStore (for partition data)53 * - DiskBackedMessageStore (for messages)54 * - DiskBackedEdgeStore (for edges read in INPUT_SUPERSTEP)55 * Basically, any data structure that may cause OOM to happen can be implemented56 * as a subclass of this class.57 *58 * There are two different terms used in the rest of this class:59 * - "data store" refers to in-memory representation of data. Usually this is60 * stored per-partition in in-memory implementations of data structures. For61 * instance, "data store" of a DiskBackedPartitionStore would collection of62 * all partitions kept in the in-memory partition store within the63 * DiskBackedPartitionStore.64 * - "raw data buffer" refers to raw data which were supposed to be65 * de-serialized and added to the data store, but they remain 'as is' in the66 * memory because their corresponding partition is offloaded to disk and is67 * not available in the data store.68 *69 * @param <T> raw data format of the data store subclassing this class70 */71publicabstractclass DiskBackedDataStore<T> {
72/**73 * Minimum size of a buffer (in bytes) to flush to disk. This is used to74 * decide whether vertex/edge buffers are large enough to flush to disk.75 */76publicstaticfinalIntConfOption MINIMUM_BUFFER_SIZE_TO_FLUSH =
77newIntConfOption("giraph.flushBufferSize", 8 * ONE_MB,
78"Minimum size of a buffer (in bytes) to flush to disk.");
7980/** Class logger. */81privatestaticfinal Logger LOG = Logger.getLogger(
82 DiskBackedDataStore.class);
83/** Out-of-core engine */84protectedfinalOutOfCoreEngine oocEngine;
85/**86 * Set containing ids of all partitions where the partition data is in some87 * file on disk.88 * Note that the out-of-core mechanism may decide to put the data for a89 * partition on disk, while the partition data is empty. For instance, at the90 * beginning of a superstep, out-of-core mechanism may decide to put incoming91 * messages of a partition on disk, while the partition has not received any92 * messages. In such scenarios, the "out-of-core mechanism" thinks that the93 * partition data is on disk, while disk-backed data stores may want to94 * optimize for IO/metadata accesses and decide not to create/write anything95 * on files on disk.96 * In summary, there is a subtle difference between this field and97 * `hasPartitionOnDisk` field. Basically, this field is used for optimizing98 * IO (mainly metadata) accesses by disk-backed stores, while99 * `hasPartitionDataOnDisk` is the view that out-of-core mechanism has100 * regarding partition storage statuses. Since out-of-core mechanism does not101 * know about the actual data for a partition, these two fields have to be102 * separate.103 */104protectedfinal Set<Integer> hasPartitionDataOnFile =
105 Sets.newConcurrentHashSet();
106/** Cached value for MINIMUM_BUFFER_SIZE_TO_FLUSH */107privatefinalint minBufferSizeToOffload;
108/** Set containing ids of all out-of-core partitions */109privatefinal Set<Integer> hasPartitionDataOnDisk =
110 Sets.newConcurrentHashSet();
111/**112 * Map of partition ids to list of raw data buffers. The map will have entries113 * only for partitions that their in-memory data structures are currently114 * offloaded to disk. We keep the aggregate size of buffers for each partition115 * as part of the values in the map to estimate how much memory we can free up116 * if we offload data buffers of a particular partition to disk.117 */118privatefinal ConcurrentMap<Integer, Pair<Integer, List<T>>> dataBuffers =
119 Maps.newConcurrentMap();
120/**121 * Map of partition ids to number of raw data buffers offloaded to disk for122 * each partition. The map will have entries only for partitions that their123 * in-memory data structures are currently out of core. It is necessary to124 * know the number of data buffers on disk for a particular partition when we125 * are loading all these buffers back in memory.126 */127privatefinal ConcurrentMap<Integer, Integer> numDataBuffersOnDisk =
128 Maps.newConcurrentMap();
129/**130 * Lock to avoid overlapping of read and write on data associated with each131 * partition.132 * */133privatefinal ConcurrentMap<Integer, ReadWriteLock> locks =
134 Maps.newConcurrentMap();
135136/**137 * Constructor.138 *139 * @param conf Configuration140 * @param oocEngine Out-of-core engine141 */142DiskBackedDataStore(ImmutableClassesGiraphConfiguration conf,
143OutOfCoreEngine oocEngine) {
144this.minBufferSizeToOffload = MINIMUM_BUFFER_SIZE_TO_FLUSH.get(conf);
145this.oocEngine = oocEngine;
146 }
147148/**149 * Retrieves a lock for a given partition. If the lock for the given partition150 * does not exist, creates a new lock.151 *152 * @param partitionId id of the partition the lock is needed for153 * @return lock for a given partition154 */155private ReadWriteLock getPartitionLock(int partitionId) {
156 ReadWriteLock readWriteLock = locks.get(partitionId);
157if (readWriteLock == null) {
158 readWriteLock = new ReentrantReadWriteLock();
159 ReadWriteLock temp = locks.putIfAbsent(partitionId, readWriteLock);
160if (temp != null) {
161 readWriteLock = temp;
162 }
163 }
164return readWriteLock;
165 }
166167/**168 * Adds a data entry for a given partition to the current data store. If data169 * of a given partition in data store is already offloaded to disk, adds the170 * data entry to appropriate raw data buffer list.171 *172 * @param partitionId id of the partition to add the data entry to173 * @param entry data entry to add174 */175protectedvoid addEntry(int partitionId, T entry) {
176// Addition of data entries to a data store is much more common than177// out-of-core operations. Besides, in-memory data store implementations178// existing in the code base already account for parallel addition to data179// stores. Therefore, using read lock would optimize for parallel addition180// to data stores, specially for cases where the addition should happen for181// partitions that are entirely in memory.182 ReadWriteLock rwLock = getPartitionLock(partitionId);
183 rwLock.readLock().lock();
184if (hasPartitionDataOnDisk.contains(partitionId)) {
185 List<T> entryList = new ArrayList<>();
186 entryList.add(entry);
187int entrySize = entrySerializedSize(entry);
188 MutablePair<Integer, List<T>> newPair =
189new MutablePair<>(entrySize, entryList);
190 Pair<Integer, List<T>> oldPair =
191 dataBuffers.putIfAbsent(partitionId, newPair);
192if (oldPair != null) {
193synchronized (oldPair) {
194 newPair = (MutablePair<Integer, List<T>>) oldPair;
195 newPair.setLeft(oldPair.getLeft() + entrySize);
196 newPair.getRight().add(entry);
197 }
198 }
199 } else {
200 addEntryToInMemoryPartitionData(partitionId, entry);
201 }
202 rwLock.readLock().unlock();
203 }
204205/**206 * Loads and assembles all data for a given partition, and put it into the207 * data store. Returns the number of bytes transferred from disk to memory in208 * the loading process.209 *210 * @param partitionId id of the partition to load and assemble all data for211 * @return number of bytes loaded from disk to memory212 * @throws IOException213 */214publicabstractlong loadPartitionData(int partitionId) throws IOException;
215216/**217 * The proxy method that does the actual operation for `loadPartitionData`,218 * but uses the data index given by the caller.219 *220 * @param partitionId id of the partition to load and assemble all data for221 * @param index data index chain for the data to load222 * @return number of bytes loaded from disk to memory223 * @throws IOException224 */225protectedlong loadPartitionDataProxy(int partitionId, DataIndex index)
226throws IOException {
227long numBytes = 0;
228 ReadWriteLock rwLock = getPartitionLock(partitionId);
229 rwLock.writeLock().lock();
230if (hasPartitionDataOnDisk.contains(partitionId)) {
231int ioThreadId =
232 oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId);
233 numBytes += loadInMemoryPartitionData(partitionId, ioThreadId,
234 index.addIndex(NumericIndexEntry.createPartitionEntry(partitionId)));
235 hasPartitionDataOnDisk.remove(partitionId);
236// Loading raw data buffers from disk if there is any and applying those237// to already loaded in-memory data.238 Integer numBuffers = numDataBuffersOnDisk.remove(partitionId);
239if (numBuffers != null) {
240 checkState(numBuffers > 0);
241 index.addIndex(DataIndex.TypeIndexEntry.BUFFER);
242 OutOfCoreDataAccessor.DataInputWrapper inputWrapper =
243 oocEngine.getDataAccessor().prepareInput(ioThreadId, index.copy());
244 DataInput dataInput = inputWrapper.getDataInput();
245for (int i = 0; i < numBuffers; ++i) {
246 T entry = readNextEntry(dataInput);
247 addEntryToInMemoryPartitionData(partitionId, entry);
248 }
249 numBytes += inputWrapper.finalizeInput(true);
250 index.removeLastIndex();
251 }
252 index.removeLastIndex();
253// Applying in-memory raw data buffers to in-memory partition data.254 Pair<Integer, List<T>> pair = dataBuffers.remove(partitionId);
255if (pair != null) {
256for (T entry : pair.getValue()) {
257 addEntryToInMemoryPartitionData(partitionId, entry);
258 }
259 }
260 }
261 rwLock.writeLock().unlock();
262return numBytes;
263 }
264265/**266 * Offloads partition data of a given partition in the data store to disk, and267 * returns the number of bytes offloaded from memory to disk.268 *269 * @param partitionId id of the partition to offload its data270 * @return number of bytes offloaded from memory to disk271 * @throws IOException272 */273publicabstractlong offloadPartitionData(int partitionId) throws IOException;
274275/**276 * The proxy method that does the actual operation for `offloadPartitionData`,277 * but uses the data index given by the caller.278 *279 * @param partitionId id of the partition to offload its data280 * @param index data index chain for the data to offload281 * @return number of bytes offloaded from memory to disk282 * @throws IOException283 */284 @edu.umd.cs.findbugs.annotations.SuppressWarnings(
285"UL_UNRELEASED_LOCK_EXCEPTION_PATH")
286protectedlong offloadPartitionDataProxy(
287int partitionId, DataIndex index) throws IOException {
288 ReadWriteLock rwLock = getPartitionLock(partitionId);
289 rwLock.writeLock().lock();
290 hasPartitionDataOnDisk.add(partitionId);
291 rwLock.writeLock().unlock();
292int ioThreadId =
293 oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId);
294long numBytes = offloadInMemoryPartitionData(partitionId, ioThreadId,
295 index.addIndex(NumericIndexEntry.createPartitionEntry(partitionId)));
296 index.removeLastIndex();
297return numBytes;
298 }
299300/**301 * Offloads raw data buffers of a given partition to disk, and returns the302 * number of bytes offloaded from memory to disk.303 *304 * @param partitionId id of the partition to offload its raw data buffers305 * @return number of bytes offloaded from memory to disk306 * @throws IOException307 */308publicabstractlong offloadBuffers(int partitionId) throws IOException;
309310/**311 * The proxy method that does the actual operation for `offloadBuffers`,312 * but uses the data index given by the caller.313 *314 * @param partitionId id of the partition to offload its raw data buffers315 * @param index data index chain for the data to offload its buffers316 * @return number of bytes offloaded from memory to disk317 * @throws IOException318 */319protectedlong offloadBuffersProxy(int partitionId, DataIndex index)
320throws IOException {
321 Pair<Integer, List<T>> pair = dataBuffers.get(partitionId);
322if (pair == null || pair.getLeft() < minBufferSizeToOffload) {
323return 0;
324 }
325 ReadWriteLock rwLock = getPartitionLock(partitionId);
326 rwLock.writeLock().lock();
327 pair = dataBuffers.remove(partitionId);
328 rwLock.writeLock().unlock();
329 checkNotNull(pair);
330 checkState(!pair.getRight().isEmpty());
331int ioThreadId =
332 oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId);
333 index.addIndex(NumericIndexEntry.createPartitionEntry(partitionId))
334 .addIndex(DataIndex.TypeIndexEntry.BUFFER);
335 OutOfCoreDataAccessor.DataOutputWrapper outputWrapper =
336 oocEngine.getDataAccessor().prepareOutput(ioThreadId, index.copy(),
337true);
338for (T entry : pair.getRight()) {
339 writeEntry(entry, outputWrapper.getDataOutput());
340 }
341long numBytes = outputWrapper.finalizeOutput();
342 index.removeLastIndex().removeLastIndex();
343int numBuffers = pair.getRight().size();
344 Integer oldNumBuffersOnDisk =
345 numDataBuffersOnDisk.putIfAbsent(partitionId, numBuffers);
346if (oldNumBuffersOnDisk != null) {
347 numDataBuffersOnDisk.replace(partitionId,
348 oldNumBuffersOnDisk + numBuffers);
349 }
350return numBytes;
351 }
352353/**354 * Looks through all partitions that their data is not in the data store (is355 * offloaded to disk), and sees if any of them has enough raw data buffer in356 * memory. If so, puts that partition in a list to return.357 *358 * @param ioThreadId Id of the IO thread who would offload the buffers359 * @return Set of partition ids of all partition raw buffers where the360 * aggregate size of buffers are large enough and it is worth flushing361 * those buffers to disk362 */363public Set<Integer> getCandidateBuffersToOffload(int ioThreadId) {
364 Set<Integer> result = new HashSet<>();
365for (Map.Entry<Integer, Pair<Integer, List<T>>> entry :
366 dataBuffers.entrySet()) {
367int partitionId = entry.getKey();
368long aggregateBufferSize = entry.getValue().getLeft();
369if (aggregateBufferSize > minBufferSizeToOffload &&
370 oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId) ==
371 ioThreadId) {
372 result.add(partitionId);
373 }
374 }
375return result;
376 }
377378/**379 * Writes a single raw entry to a given output stream.380 *381 * @param entry entry to write to output382 * @param out output stream to write the entry to383 * @throws IOException384 */385protectedabstractvoid writeEntry(T entry, DataOutput out)
386throws IOException;
387388/**389 * Reads the next available raw entry from a given input stream.390 *391 * @param in input stream to read the entry from392 * @return entry read from an input stream393 * @throws IOException394 */395protectedabstract T readNextEntry(DataInput in) throws IOException;
396397/**398 * Loads data of a partition into data store. Returns number of bytes loaded.399 *400 * @param partitionId id of the partition to load its data401 * @param ioThreadId id of the IO thread performing the load402 * @param index data index chain for the data to load403 * @return number of bytes loaded from disk to memory404 * @throws IOException405 */406protectedabstractlong loadInMemoryPartitionData(
407int partitionId, int ioThreadId, DataIndex index) throws IOException;
408409/**410 * Offloads data of a partition in data store to disk. Returns the number of411 * bytes offloaded to disk412 *413 * @param partitionId id of the partition to offload to disk414 * @param ioThreadId id of the IO thread performing the offload415 * @param index data index chain for the data to offload416 * @return number of bytes offloaded from memory to disk417 * @throws IOException418 */419protectedabstractlong offloadInMemoryPartitionData(
420int partitionId, int ioThreadId, DataIndex index) throws IOException;
421422/**423 * Gets the size of a given entry in bytes.424 *425 * @param entry input entry to find its size426 * @return size of given input entry in bytes427 */428protectedabstractint entrySerializedSize(T entry);
429430/**431 * Adds a single entry for a given partition to the in-memory data store.432 *433 * @param partitionId id of the partition to add the data to434 * @param entry input entry to add to the data store435 */436protectedabstractvoid addEntryToInMemoryPartitionData(int partitionId,
437 T entry);
438 }