View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.ooc.persistence;
20  
21  import com.esotericsoftware.kryo.io.Input;
22  import com.esotericsoftware.kryo.io.KryoDataInput;
23  import com.esotericsoftware.kryo.io.KryoDataOutput;
24  import com.esotericsoftware.kryo.io.Output;
25  import com.esotericsoftware.kryo.io.UnsafeInput;
26  import com.esotericsoftware.kryo.io.UnsafeOutput;
27  import org.apache.giraph.conf.GiraphConstants;
28  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
29  import org.apache.giraph.conf.IntConfOption;
30  import org.apache.log4j.Logger;
31  
32  import java.io.DataInput;
33  import java.io.DataOutput;
34  import java.io.File;
35  import java.io.FileInputStream;
36  import java.io.FileOutputStream;
37  import java.io.IOException;
38  import java.io.RandomAccessFile;
39  
40  import static com.google.common.base.Preconditions.checkState;
41  import static org.apache.giraph.conf.GiraphConstants.ONE_MB;
42  
43  /**
44   * Data accessor object to read/write data in local disk.
45   * Note: This class assumes that the data are partitioned across IO threads,
46   *       i.e. each part of data can be accessed by one and only one IO thread
47   *       throughout the execution. Also, each IO thread reads a particular
48   *       type of data completely and, only then, it can read other type of data;
49   *       i.e. an IO thread cannot be used to read two different files at the
50   *       same time. These assumptions are based on the assumptions that the
51   *       current out-of-core mechanism is designed for.
52   */
53  public class LocalDiskDataAccessor implements OutOfCoreDataAccessor {
54    /**
55     * Size of the buffer used for (de)serializing data when reading/writing
56     * from/to disk
57     */
58    public static final IntConfOption OOC_DISK_BUFFER_SIZE =
59        new IntConfOption("graph.oocDiskBufferSize", 4 * ONE_MB,
60            "size of the buffer when (de)serializing data for reading/writing " +
61                "from/to disk");
62  
63    /** Class logger */
64    private static final Logger LOG =
65        Logger.getLogger(LocalDiskDataAccessor.class);
66    /**
67     * In-memory buffer used for (de)serializing data when reading/writing
68     * from/to disk using Kryo
69     */
70    private final byte[][] perThreadBuffers;
71    /** Path prefix for different disks */
72    private final String[] basePaths;
73    /** How many disks (i.e. IO threads) do we have? */
74    private final int numDisks;
75  
76    /**
77     * Constructor
78     *
79     * @param conf Configuration
80     */
81    public LocalDiskDataAccessor(
82        ImmutableClassesGiraphConfiguration<?, ?, ?> conf) {
83      // Take advantage of multiple disks
84      String[] userPaths = GiraphConstants.PARTITIONS_DIRECTORY.getArray(conf);
85      this.numDisks = userPaths.length;
86      if (!GiraphConstants.NUM_OUT_OF_CORE_THREADS.isDefaultValue(conf) ||
87          GiraphConstants.NUM_OUT_OF_CORE_THREADS.get(conf) != numDisks) {
88        LOG.warn("LocalDiskDataAccessor: with this data accessor, number of " +
89            "out-of-core threads is only specified by the number of " +
90            "directories given by 'giraph.partitionsDirectory' flag! Now using " +
91            numDisks + " IO threads!");
92      }
93      this.basePaths = new String[numDisks];
94      int ptr = 0;
95      String jobId = conf.getJobId();
96      for (String path : userPaths) {
97        String jobDirectory = path + "/" + jobId;
98        File file = new File(jobDirectory);
99        checkState(file.mkdirs(), "LocalDiskDataAccessor: cannot create " +
100           "directory " + file.getAbsolutePath());
101       basePaths[ptr] = jobDirectory + "/";
102       ptr++;
103     }
104     final int diskBufferSize = OOC_DISK_BUFFER_SIZE.get(conf);
105     this.perThreadBuffers = new byte[numDisks][diskBufferSize];
106   }
107 
108   @Override
109   public void initialize() { }
110 
111   @Override
112   public void shutdown() {
113     for (String path : basePaths) {
114       File file = new File(path);
115       for (String subFileName : file.list()) {
116         File subFile = new File(file.getPath(), subFileName);
117         checkState(subFile.delete(), "shutdown: cannot delete file %s",
118             subFile.getAbsoluteFile());
119       }
120       checkState(file.delete(), "shutdown: cannot delete directory %s",
121           file.getAbsoluteFile());
122     }
123   }
124 
125   @Override
126   public int getNumAccessorThreads() {
127     return numDisks;
128   }
129 
130   @Override
131   public DataInputWrapper prepareInput(int threadId, DataIndex index)
132       throws IOException {
133     return new LocalDiskDataInputWrapper(basePaths[threadId] + index.toString(),
134         perThreadBuffers[threadId]);
135   }
136 
137   @Override
138   public DataOutputWrapper prepareOutput(
139       int threadId, DataIndex index, boolean shouldAppend) throws IOException {
140     return new LocalDiskDataOutputWrapper(
141         basePaths[threadId] + index.toString(), shouldAppend,
142         perThreadBuffers[threadId]);
143   }
144 
145   @Override
146   public boolean dataExist(int threadId, DataIndex index) {
147     return new File(basePaths[threadId] + index.toString()).exists();
148   }
149 
150   /** Implementation of <code>DataInput</code> wrapper for local disk reader */
151   private static class LocalDiskDataInputWrapper implements DataInputWrapper {
152     /** File used to read the data from */
153     private final File file;
154     /** Kryo's handle to read the data */
155     private final Input input;
156 
157     /**
158      * Constructor
159      *
160      * @param fileName file name
161      * @param buffer reusable byte buffer that will be used in Kryo's Input
162      *               reader
163      * @throws IOException
164      */
165     @edu.umd.cs.findbugs.annotations.SuppressWarnings(
166         "OBL_UNSATISFIED_OBLIGATION")
167     LocalDiskDataInputWrapper(String fileName, byte[] buffer)
168         throws IOException {
169       file = new File(fileName);
170       if (LOG.isDebugEnabled()) {
171         LOG.debug("LocalDiskDataInputWrapper: obtaining a data input from " +
172             "local file " + file.getAbsolutePath());
173       }
174       input = new UnsafeInput(buffer);
175       input.setInputStream(new FileInputStream(
176           new RandomAccessFile(file, "r").getFD()));
177     }
178 
179     @Override
180     public DataInput getDataInput() {
181       return new KryoDataInput(input);
182     }
183 
184     @Override
185     public long finalizeInput(boolean deleteOnClose) {
186       input.close();
187       long count = input.total();
188       checkState(!deleteOnClose || file.delete(),
189           "finalizeInput: failed to delete %s.", file.getAbsoluteFile());
190       return count;
191     }
192   }
193 
194   /** Implementation of <code>DataOutput</code> wrapper for local disk writer */
195   private static class LocalDiskDataOutputWrapper implements DataOutputWrapper {
196     /** File used to write the data to */
197     private final File file;
198     /** Kryo's handle to write the date */
199     private final Output output;
200 
201     /**
202      * Constructor
203      *
204      * @param fileName file name
205      * @param shouldAppend whether the <code>DataOutput</code> should be used
206      *                     for appending to already existing files
207      * @param buffer reusable byte buffer that will be used in Kryo's Output
208      *               writer
209      * @throws IOException
210      */
211     @edu.umd.cs.findbugs.annotations.SuppressWarnings(
212         "OBL_UNSATISFIED_OBLIGATION")
213     LocalDiskDataOutputWrapper(String fileName, boolean shouldAppend,
214                                byte[] buffer) throws IOException {
215       file = new File(fileName);
216       if (LOG.isDebugEnabled()) {
217         LOG.debug("LocalDiskDataOutputWrapper: obtaining a data output from " +
218             "local file " + file.getAbsolutePath());
219         if (!shouldAppend) {
220           checkState(!file.exists(), "LocalDiskDataOutputWrapper: file %s " +
221               "already exist", file.getAbsoluteFile());
222           checkState(file.createNewFile(), "LocalDiskDataOutputWrapper: " +
223               "cannot create file %s", file.getAbsolutePath());
224         }
225       }
226       output = new UnsafeOutput(buffer);
227       RandomAccessFile raf = new RandomAccessFile(file, "rw");
228       if (shouldAppend) {
229         raf.seek(file.length());
230       }
231       output.setOutputStream(new FileOutputStream(raf.getFD()));
232     }
233 
234     @Override
235     public DataOutput getDataOutput() {
236       return new KryoDataOutput(output);
237     }
238 
239 
240     @Override
241     public long finalizeOutput() {
242       output.close();
243       long count = output.total();
244       return count;
245     }
246   }
247 }