1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.ooc.persistence;
2021import com.esotericsoftware.kryo.io.Input;
22import com.esotericsoftware.kryo.io.KryoDataInput;
23import com.esotericsoftware.kryo.io.KryoDataOutput;
24import com.esotericsoftware.kryo.io.Output;
25import com.esotericsoftware.kryo.io.UnsafeInput;
26import com.esotericsoftware.kryo.io.UnsafeOutput;
27import org.apache.giraph.conf.GiraphConstants;
28import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
29import org.apache.giraph.conf.IntConfOption;
30import org.apache.log4j.Logger;
3132import java.io.DataInput;
33import java.io.DataOutput;
34import java.io.File;
35import java.io.FileInputStream;
36import java.io.FileOutputStream;
37import java.io.IOException;
38import java.io.RandomAccessFile;
3940importstatic com.google.common.base.Preconditions.checkState;
41importstatic org.apache.giraph.conf.GiraphConstants.ONE_MB;
4243/**44 * Data accessor object to read/write data in local disk.45 * Note: This class assumes that the data are partitioned across IO threads,46 * i.e. each part of data can be accessed by one and only one IO thread47 * throughout the execution. Also, each IO thread reads a particular48 * type of data completely and, only then, it can read other type of data;49 * i.e. an IO thread cannot be used to read two different files at the50 * same time. These assumptions are based on the assumptions that the51 * current out-of-core mechanism is designed for.52 */53publicclassLocalDiskDataAccessorimplementsOutOfCoreDataAccessor {
54/**55 * Size of the buffer used for (de)serializing data when reading/writing56 * from/to disk57 */58publicstaticfinalIntConfOption OOC_DISK_BUFFER_SIZE =
59newIntConfOption("graph.oocDiskBufferSize", 4 * ONE_MB,
60"size of the buffer when (de)serializing data for reading/writing " +
61"from/to disk");
6263/** Class logger */64privatestaticfinal Logger LOG =
65 Logger.getLogger(LocalDiskDataAccessor.class);
66/**67 * In-memory buffer used for (de)serializing data when reading/writing68 * from/to disk using Kryo69 */70privatefinal byte[][] perThreadBuffers;
71/** Path prefix for different disks */72privatefinal String[] basePaths;
73/** How many disks (i.e. IO threads) do we have? */74privatefinalint numDisks;
7576/**77 * Constructor78 *79 * @param conf Configuration80 */81publicLocalDiskDataAccessor(
82 ImmutableClassesGiraphConfiguration<?, ?, ?> conf) {
83// Take advantage of multiple disks84 String[] userPaths = GiraphConstants.PARTITIONS_DIRECTORY.getArray(conf);
85this.numDisks = userPaths.length;
86if (!GiraphConstants.NUM_OUT_OF_CORE_THREADS.isDefaultValue(conf) ||
87 GiraphConstants.NUM_OUT_OF_CORE_THREADS.get(conf) != numDisks) {
88 LOG.warn("LocalDiskDataAccessor: with this data accessor, number of " +
89"out-of-core threads is only specified by the number of " +
90"directories given by 'giraph.partitionsDirectory' flag! Now using " +
91 numDisks + " IO threads!");
92 }
93this.basePaths = new String[numDisks];
94int ptr = 0;
95 String jobId = conf.getJobId();
96for (String path : userPaths) {
97 String jobDirectory = path + "/" + jobId;
98 File file = new File(jobDirectory);
99 checkState(file.mkdirs(), "LocalDiskDataAccessor: cannot create " +
100"directory " + file.getAbsolutePath());
101 basePaths[ptr] = jobDirectory + "/";
102 ptr++;
103 }
104finalint diskBufferSize = OOC_DISK_BUFFER_SIZE.get(conf);
105this.perThreadBuffers = new byte[numDisks][diskBufferSize];
106 }
107108 @Override
109publicvoid initialize() { }
110111 @Override
112 @edu.umd.cs.findbugs.annotations.SuppressWarnings(
113"NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE")
114publicvoid shutdown() {
115for (String path : basePaths) {
116 File file = new File(path);
117for (String subFileName : file.list()) {
118 File subFile = new File(file.getPath(), subFileName);
119 checkState(subFile.delete(), "shutdown: cannot delete file %s",
120 subFile.getAbsoluteFile());
121 }
122 checkState(file.delete(), "shutdown: cannot delete directory %s",
123 file.getAbsoluteFile());
124 }
125 }
126127 @Override
128publicint getNumAccessorThreads() {
129return numDisks;
130 }
131132 @Override
133publicDataInputWrapper prepareInput(int threadId, DataIndex index)
134throws IOException {
135returnnewLocalDiskDataInputWrapper(basePaths[threadId] + index.toString(),
136 perThreadBuffers[threadId]);
137 }
138139 @Override
140publicDataOutputWrapper prepareOutput(
141int threadId, DataIndex index, boolean shouldAppend) throws IOException {
142returnnewLocalDiskDataOutputWrapper(
143 basePaths[threadId] + index.toString(), shouldAppend,
144 perThreadBuffers[threadId]);
145 }
146147 @Override
148publicboolean dataExist(int threadId, DataIndex index) {
149returnnew File(basePaths[threadId] + index.toString()).exists();
150 }
151152/** Implementation of <code>DataInput</code> wrapper for local disk reader */153privatestaticclassLocalDiskDataInputWrapperimplementsDataInputWrapper {
154/** File used to read the data from */155privatefinal File file;
156/** Kryo's handle to read the data */157privatefinal Input input;
158159/**160 * Constructor161 *162 * @param fileName file name163 * @param buffer reusable byte buffer that will be used in Kryo's Input164 * reader165 * @throws IOException166 */167 @edu.umd.cs.findbugs.annotations.SuppressWarnings(
168"OBL_UNSATISFIED_OBLIGATION")
169LocalDiskDataInputWrapper(String fileName, byte[] buffer)
170throws IOException {
171 file = new File(fileName);
172if (LOG.isDebugEnabled()) {
173 LOG.debug("LocalDiskDataInputWrapper: obtaining a data input from " +
174"local file " + file.getAbsolutePath());
175 }
176 input = new UnsafeInput(buffer);
177 input.setInputStream(new FileInputStream(
178new RandomAccessFile(file, "r").getFD()));
179 }
180181 @Override
182public DataInput getDataInput() {
183returnnew KryoDataInput(input);
184 }
185186 @Override
187publiclong finalizeInput(boolean deleteOnClose) {
188 input.close();
189long count = input.total();
190 checkState(!deleteOnClose || file.delete(),
191"finalizeInput: failed to delete %s.", file.getAbsoluteFile());
192return count;
193 }
194 }
195196/** Implementation of <code>DataOutput</code> wrapper for local disk writer */197privatestaticclassLocalDiskDataOutputWrapperimplementsDataOutputWrapper {
198/** File used to write the data to */199privatefinal File file;
200/** Kryo's handle to write the date */201privatefinalOutput output;
202203/**204 * Constructor205 *206 * @param fileName file name207 * @param shouldAppend whether the <code>DataOutput</code> should be used208 * for appending to already existing files209 * @param buffer reusable byte buffer that will be used in Kryo's Output210 * writer211 * @throws IOException212 */213 @edu.umd.cs.findbugs.annotations.SuppressWarnings(
214"OBL_UNSATISFIED_OBLIGATION")
215LocalDiskDataOutputWrapper(String fileName, boolean shouldAppend,
216 byte[] buffer) throws IOException {
217 file = new File(fileName);
218if (LOG.isDebugEnabled()) {
219 LOG.debug("LocalDiskDataOutputWrapper: obtaining a data output from " +
220"local file " + file.getAbsolutePath());
221if (!shouldAppend) {
222 checkState(!file.exists(), "LocalDiskDataOutputWrapper: file %s " +
223"already exist", file.getAbsoluteFile());
224 checkState(file.createNewFile(), "LocalDiskDataOutputWrapper: " +
225"cannot create file %s", file.getAbsolutePath());
226 }
227 }
228 output = new UnsafeOutput(buffer);
229 RandomAccessFile raf = new RandomAccessFile(file, "rw");
230if (shouldAppend) {
231 raf.seek(file.length());
232 }
233 output.setOutputStream(new FileOutputStream(raf.getFD()));
234 }
235236 @Override
237public DataOutput getDataOutput() {
238returnnew KryoDataOutput(output);
239 }
240241242 @Override
243publiclong finalizeOutput() {
244 output.close();
245long count = output.total();
246return count;
247 }
248 }
249 }