1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.giraph.ooc.data;
20
21 import com.google.common.collect.Maps;
22 import com.google.common.collect.Sets;
23 import org.apache.commons.lang3.tuple.MutablePair;
24 import org.apache.commons.lang3.tuple.Pair;
25 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
26 import org.apache.giraph.conf.IntConfOption;
27 import org.apache.giraph.ooc.OutOfCoreEngine;
28 import org.apache.giraph.ooc.persistence.DataIndex;
29 import org.apache.giraph.ooc.persistence.DataIndex.NumericIndexEntry;
30 import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
31 import org.apache.log4j.Logger;
32
33 import java.io.DataInput;
34 import java.io.DataOutput;
35 import java.io.IOException;
36 import java.util.ArrayList;
37 import java.util.HashSet;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.Set;
41 import java.util.concurrent.ConcurrentMap;
42 import java.util.concurrent.locks.ReadWriteLock;
43 import java.util.concurrent.locks.ReentrantReadWriteLock;
44
45 import static com.google.common.base.Preconditions.checkNotNull;
46 import static com.google.common.base.Preconditions.checkState;
47 import static org.apache.giraph.conf.GiraphConstants.ONE_MB;
48
49 /**
50 * This class provides basic operations for data structures that have to
51 * participate in out-of-core mechanism. Essential subclasses of this class are:
52 * - DiskBackedPartitionStore (for partition data)
53 * - DiskBackedMessageStore (for messages)
54 * - DiskBackedEdgeStore (for edges read in INPUT_SUPERSTEP)
55 * Basically, any data structure that may cause OOM to happen can be implemented
56 * as a subclass of this class.
57 *
58 * There are two different terms used in the rest of this class:
59 * - "data store" refers to in-memory representation of data. Usually this is
60 * stored per-partition in in-memory implementations of data structures. For
61 * instance, "data store" of a DiskBackedPartitionStore would collection of
62 * all partitions kept in the in-memory partition store within the
63 * DiskBackedPartitionStore.
64 * - "raw data buffer" refers to raw data which were supposed to be
65 * de-serialized and added to the data store, but they remain 'as is' in the
66 * memory because their corresponding partition is offloaded to disk and is
67 * not available in the data store.
68 *
69 * @param <T> raw data format of the data store subclassing this class
70 */
71 public abstract class DiskBackedDataStore<T> {
72 /**
73 * Minimum size of a buffer (in bytes) to flush to disk. This is used to
74 * decide whether vertex/edge buffers are large enough to flush to disk.
75 */
76 public static final IntConfOption MINIMUM_BUFFER_SIZE_TO_FLUSH =
77 new IntConfOption("giraph.flushBufferSize", 8 * ONE_MB,
78 "Minimum size of a buffer (in bytes) to flush to disk.");
79
80 /** Class logger. */
81 private static final Logger LOG = Logger.getLogger(
82 DiskBackedDataStore.class);
83 /** Out-of-core engine */
84 protected final OutOfCoreEngine oocEngine;
85 /**
86 * Set containing ids of all partitions where the partition data is in some
87 * file on disk.
88 * Note that the out-of-core mechanism may decide to put the data for a
89 * partition on disk, while the partition data is empty. For instance, at the
90 * beginning of a superstep, out-of-core mechanism may decide to put incoming
91 * messages of a partition on disk, while the partition has not received any
92 * messages. In such scenarios, the "out-of-core mechanism" thinks that the
93 * partition data is on disk, while disk-backed data stores may want to
94 * optimize for IO/metadata accesses and decide not to create/write anything
95 * on files on disk.
96 * In summary, there is a subtle difference between this field and
97 * `hasPartitionOnDisk` field. Basically, this field is used for optimizing
98 * IO (mainly metadata) accesses by disk-backed stores, while
99 * `hasPartitionDataOnDisk` is the view that out-of-core mechanism has
100 * regarding partition storage statuses. Since out-of-core mechanism does not
101 * know about the actual data for a partition, these two fields have to be
102 * separate.
103 */
104 protected final Set<Integer> hasPartitionDataOnFile =
105 Sets.newConcurrentHashSet();
106 /** Cached value for MINIMUM_BUFFER_SIZE_TO_FLUSH */
107 private final int minBufferSizeToOffload;
108 /** Set containing ids of all out-of-core partitions */
109 private final Set<Integer> hasPartitionDataOnDisk =
110 Sets.newConcurrentHashSet();
111 /**
112 * Map of partition ids to list of raw data buffers. The map will have entries
113 * only for partitions that their in-memory data structures are currently
114 * offloaded to disk. We keep the aggregate size of buffers for each partition
115 * as part of the values in the map to estimate how much memory we can free up
116 * if we offload data buffers of a particular partition to disk.
117 */
118 private final ConcurrentMap<Integer, Pair<Integer, List<T>>> dataBuffers =
119 Maps.newConcurrentMap();
120 /**
121 * Map of partition ids to number of raw data buffers offloaded to disk for
122 * each partition. The map will have entries only for partitions that their
123 * in-memory data structures are currently out of core. It is necessary to
124 * know the number of data buffers on disk for a particular partition when we
125 * are loading all these buffers back in memory.
126 */
127 private final ConcurrentMap<Integer, Integer> numDataBuffersOnDisk =
128 Maps.newConcurrentMap();
129 /**
130 * Lock to avoid overlapping of read and write on data associated with each
131 * partition.
132 * */
133 private final ConcurrentMap<Integer, ReadWriteLock> locks =
134 Maps.newConcurrentMap();
135
136 /**
137 * Constructor.
138 *
139 * @param conf Configuration
140 * @param oocEngine Out-of-core engine
141 */
142 DiskBackedDataStore(ImmutableClassesGiraphConfiguration conf,
143 OutOfCoreEngine oocEngine) {
144 this.minBufferSizeToOffload = MINIMUM_BUFFER_SIZE_TO_FLUSH.get(conf);
145 this.oocEngine = oocEngine;
146 }
147
148 /**
149 * Retrieves a lock for a given partition. If the lock for the given partition
150 * does not exist, creates a new lock.
151 *
152 * @param partitionId id of the partition the lock is needed for
153 * @return lock for a given partition
154 */
155 private ReadWriteLock getPartitionLock(int partitionId) {
156 ReadWriteLock readWriteLock = locks.get(partitionId);
157 if (readWriteLock == null) {
158 readWriteLock = new ReentrantReadWriteLock();
159 ReadWriteLock temp = locks.putIfAbsent(partitionId, readWriteLock);
160 if (temp != null) {
161 readWriteLock = temp;
162 }
163 }
164 return readWriteLock;
165 }
166
167 /**
168 * Adds a data entry for a given partition to the current data store. If data
169 * of a given partition in data store is already offloaded to disk, adds the
170 * data entry to appropriate raw data buffer list.
171 *
172 * @param partitionId id of the partition to add the data entry to
173 * @param entry data entry to add
174 */
175 protected void addEntry(int partitionId, T entry) {
176 // Addition of data entries to a data store is much more common than
177 // out-of-core operations. Besides, in-memory data store implementations
178 // existing in the code base already account for parallel addition to data
179 // stores. Therefore, using read lock would optimize for parallel addition
180 // to data stores, specially for cases where the addition should happen for
181 // partitions that are entirely in memory.
182 ReadWriteLock rwLock = getPartitionLock(partitionId);
183 rwLock.readLock().lock();
184 if (hasPartitionDataOnDisk.contains(partitionId)) {
185 List<T> entryList = new ArrayList<>();
186 entryList.add(entry);
187 int entrySize = entrySerializedSize(entry);
188 MutablePair<Integer, List<T>> newPair =
189 new MutablePair<>(entrySize, entryList);
190 Pair<Integer, List<T>> oldPair =
191 dataBuffers.putIfAbsent(partitionId, newPair);
192 if (oldPair != null) {
193 synchronized (oldPair) {
194 newPair = (MutablePair<Integer, List<T>>) oldPair;
195 newPair.setLeft(oldPair.getLeft() + entrySize);
196 newPair.getRight().add(entry);
197 }
198 }
199 } else {
200 addEntryToInMemoryPartitionData(partitionId, entry);
201 }
202 rwLock.readLock().unlock();
203 }
204
205 /**
206 * Loads and assembles all data for a given partition, and put it into the
207 * data store. Returns the number of bytes transferred from disk to memory in
208 * the loading process.
209 *
210 * @param partitionId id of the partition to load and assemble all data for
211 * @return number of bytes loaded from disk to memory
212 * @throws IOException
213 */
214 public abstract long loadPartitionData(int partitionId) throws IOException;
215
216 /**
217 * The proxy method that does the actual operation for `loadPartitionData`,
218 * but uses the data index given by the caller.
219 *
220 * @param partitionId id of the partition to load and assemble all data for
221 * @param index data index chain for the data to load
222 * @return number of bytes loaded from disk to memory
223 * @throws IOException
224 */
225 protected long loadPartitionDataProxy(int partitionId, DataIndex index)
226 throws IOException {
227 long numBytes = 0;
228 ReadWriteLock rwLock = getPartitionLock(partitionId);
229 rwLock.writeLock().lock();
230 if (hasPartitionDataOnDisk.contains(partitionId)) {
231 int ioThreadId =
232 oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId);
233 numBytes += loadInMemoryPartitionData(partitionId, ioThreadId,
234 index.addIndex(NumericIndexEntry.createPartitionEntry(partitionId)));
235 hasPartitionDataOnDisk.remove(partitionId);
236 // Loading raw data buffers from disk if there is any and applying those
237 // to already loaded in-memory data.
238 Integer numBuffers = numDataBuffersOnDisk.remove(partitionId);
239 if (numBuffers != null) {
240 checkState(numBuffers > 0);
241 index.addIndex(DataIndex.TypeIndexEntry.BUFFER);
242 OutOfCoreDataAccessor.DataInputWrapper inputWrapper =
243 oocEngine.getDataAccessor().prepareInput(ioThreadId, index.copy());
244 DataInput dataInput = inputWrapper.getDataInput();
245 for (int i = 0; i < numBuffers; ++i) {
246 T entry = readNextEntry(dataInput);
247 addEntryToInMemoryPartitionData(partitionId, entry);
248 }
249 numBytes += inputWrapper.finalizeInput(true);
250 index.removeLastIndex();
251 }
252 index.removeLastIndex();
253 // Applying in-memory raw data buffers to in-memory partition data.
254 Pair<Integer, List<T>> pair = dataBuffers.remove(partitionId);
255 if (pair != null) {
256 for (T entry : pair.getValue()) {
257 addEntryToInMemoryPartitionData(partitionId, entry);
258 }
259 }
260 }
261 rwLock.writeLock().unlock();
262 return numBytes;
263 }
264
265 /**
266 * Offloads partition data of a given partition in the data store to disk, and
267 * returns the number of bytes offloaded from memory to disk.
268 *
269 * @param partitionId id of the partition to offload its data
270 * @return number of bytes offloaded from memory to disk
271 * @throws IOException
272 */
273 public abstract long offloadPartitionData(int partitionId) throws IOException;
274
275 /**
276 * The proxy method that does the actual operation for `offloadPartitionData`,
277 * but uses the data index given by the caller.
278 *
279 * @param partitionId id of the partition to offload its data
280 * @param index data index chain for the data to offload
281 * @return number of bytes offloaded from memory to disk
282 * @throws IOException
283 */
284 @edu.umd.cs.findbugs.annotations.SuppressWarnings(
285 "UL_UNRELEASED_LOCK_EXCEPTION_PATH")
286 protected long offloadPartitionDataProxy(
287 int partitionId, DataIndex index) throws IOException {
288 ReadWriteLock rwLock = getPartitionLock(partitionId);
289 rwLock.writeLock().lock();
290 hasPartitionDataOnDisk.add(partitionId);
291 rwLock.writeLock().unlock();
292 int ioThreadId =
293 oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId);
294 long numBytes = offloadInMemoryPartitionData(partitionId, ioThreadId,
295 index.addIndex(NumericIndexEntry.createPartitionEntry(partitionId)));
296 index.removeLastIndex();
297 return numBytes;
298 }
299
300 /**
301 * Offloads raw data buffers of a given partition to disk, and returns the
302 * number of bytes offloaded from memory to disk.
303 *
304 * @param partitionId id of the partition to offload its raw data buffers
305 * @return number of bytes offloaded from memory to disk
306 * @throws IOException
307 */
308 public abstract long offloadBuffers(int partitionId) throws IOException;
309
310 /**
311 * The proxy method that does the actual operation for `offloadBuffers`,
312 * but uses the data index given by the caller.
313 *
314 * @param partitionId id of the partition to offload its raw data buffers
315 * @param index data index chain for the data to offload its buffers
316 * @return number of bytes offloaded from memory to disk
317 * @throws IOException
318 */
319 protected long offloadBuffersProxy(int partitionId, DataIndex index)
320 throws IOException {
321 Pair<Integer, List<T>> pair = dataBuffers.get(partitionId);
322 if (pair == null || pair.getLeft() < minBufferSizeToOffload) {
323 return 0;
324 }
325 ReadWriteLock rwLock = getPartitionLock(partitionId);
326 rwLock.writeLock().lock();
327 pair = dataBuffers.remove(partitionId);
328 rwLock.writeLock().unlock();
329 checkNotNull(pair);
330 checkState(!pair.getRight().isEmpty());
331 int ioThreadId =
332 oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId);
333 index.addIndex(NumericIndexEntry.createPartitionEntry(partitionId))
334 .addIndex(DataIndex.TypeIndexEntry.BUFFER);
335 OutOfCoreDataAccessor.DataOutputWrapper outputWrapper =
336 oocEngine.getDataAccessor().prepareOutput(ioThreadId, index.copy(),
337 true);
338 for (T entry : pair.getRight()) {
339 writeEntry(entry, outputWrapper.getDataOutput());
340 }
341 long numBytes = outputWrapper.finalizeOutput();
342 index.removeLastIndex().removeLastIndex();
343 int numBuffers = pair.getRight().size();
344 Integer oldNumBuffersOnDisk =
345 numDataBuffersOnDisk.putIfAbsent(partitionId, numBuffers);
346 if (oldNumBuffersOnDisk != null) {
347 numDataBuffersOnDisk.replace(partitionId,
348 oldNumBuffersOnDisk + numBuffers);
349 }
350 return numBytes;
351 }
352
353 /**
354 * Looks through all partitions that their data is not in the data store (is
355 * offloaded to disk), and sees if any of them has enough raw data buffer in
356 * memory. If so, puts that partition in a list to return.
357 *
358 * @param ioThreadId Id of the IO thread who would offload the buffers
359 * @return Set of partition ids of all partition raw buffers where the
360 * aggregate size of buffers are large enough and it is worth flushing
361 * those buffers to disk
362 */
363 public Set<Integer> getCandidateBuffersToOffload(int ioThreadId) {
364 Set<Integer> result = new HashSet<>();
365 for (Map.Entry<Integer, Pair<Integer, List<T>>> entry :
366 dataBuffers.entrySet()) {
367 int partitionId = entry.getKey();
368 long aggregateBufferSize = entry.getValue().getLeft();
369 if (aggregateBufferSize > minBufferSizeToOffload &&
370 oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId) ==
371 ioThreadId) {
372 result.add(partitionId);
373 }
374 }
375 return result;
376 }
377
378 /**
379 * Writes a single raw entry to a given output stream.
380 *
381 * @param entry entry to write to output
382 * @param out output stream to write the entry to
383 * @throws IOException
384 */
385 protected abstract void writeEntry(T entry, DataOutput out)
386 throws IOException;
387
388 /**
389 * Reads the next available raw entry from a given input stream.
390 *
391 * @param in input stream to read the entry from
392 * @return entry read from an input stream
393 * @throws IOException
394 */
395 protected abstract T readNextEntry(DataInput in) throws IOException;
396
397 /**
398 * Loads data of a partition into data store. Returns number of bytes loaded.
399 *
400 * @param partitionId id of the partition to load its data
401 * @param ioThreadId id of the IO thread performing the load
402 * @param index data index chain for the data to load
403 * @return number of bytes loaded from disk to memory
404 * @throws IOException
405 */
406 protected abstract long loadInMemoryPartitionData(
407 int partitionId, int ioThreadId, DataIndex index) throws IOException;
408
409 /**
410 * Offloads data of a partition in data store to disk. Returns the number of
411 * bytes offloaded to disk
412 *
413 * @param partitionId id of the partition to offload to disk
414 * @param ioThreadId id of the IO thread performing the offload
415 * @param index data index chain for the data to offload
416 * @return number of bytes offloaded from memory to disk
417 * @throws IOException
418 */
419 protected abstract long offloadInMemoryPartitionData(
420 int partitionId, int ioThreadId, DataIndex index) throws IOException;
421
422 /**
423 * Gets the size of a given entry in bytes.
424 *
425 * @param entry input entry to find its size
426 * @return size of given input entry in bytes
427 */
428 protected abstract int entrySerializedSize(T entry);
429
430 /**
431 * Adds a single entry for a given partition to the in-memory data store.
432 *
433 * @param partitionId id of the partition to add the data to
434 * @param entry input entry to add to the data store
435 */
436 protected abstract void addEntryToInMemoryPartitionData(int partitionId,
437 T entry);
438 }