View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.ooc.persistence;
20  
21  import com.esotericsoftware.kryo.io.Input;
22  import com.esotericsoftware.kryo.io.KryoDataInput;
23  import com.esotericsoftware.kryo.io.KryoDataOutput;
24  import com.esotericsoftware.kryo.io.Output;
25  import com.esotericsoftware.kryo.io.UnsafeInput;
26  import com.esotericsoftware.kryo.io.UnsafeOutput;
27  import org.apache.giraph.conf.GiraphConstants;
28  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
29  import org.apache.giraph.conf.IntConfOption;
30  import org.apache.log4j.Logger;
31  
32  import java.io.DataInput;
33  import java.io.DataOutput;
34  import java.io.File;
35  import java.io.FileInputStream;
36  import java.io.FileOutputStream;
37  import java.io.IOException;
38  import java.io.RandomAccessFile;
39  
40  import static com.google.common.base.Preconditions.checkState;
41  import static org.apache.giraph.conf.GiraphConstants.ONE_MB;
42  
43  /**
44   * Data accessor object to read/write data in local disk.
45   * Note: This class assumes that the data are partitioned across IO threads,
46   *       i.e. each part of data can be accessed by one and only one IO thread
47   *       throughout the execution. Also, each IO thread reads a particular
48   *       type of data completely and, only then, it can read other type of data;
49   *       i.e. an IO thread cannot be used to read two different files at the
50   *       same time. These assumptions are based on the assumptions that the
51   *       current out-of-core mechanism is designed for.
52   */
53  public class LocalDiskDataAccessor implements OutOfCoreDataAccessor {
54    /**
55     * Size of the buffer used for (de)serializing data when reading/writing
56     * from/to disk
57     */
58    public static final IntConfOption OOC_DISK_BUFFER_SIZE =
59        new IntConfOption("graph.oocDiskBufferSize", 4 * ONE_MB,
60            "size of the buffer when (de)serializing data for reading/writing " +
61                "from/to disk");
62  
63    /** Class logger */
64    private static final Logger LOG =
65        Logger.getLogger(LocalDiskDataAccessor.class);
66    /**
67     * In-memory buffer used for (de)serializing data when reading/writing
68     * from/to disk using Kryo
69     */
70    private final byte[][] perThreadBuffers;
71    /** Path prefix for different disks */
72    private final String[] basePaths;
73    /** How many disks (i.e. IO threads) do we have? */
74    private final int numDisks;
75  
76    /**
77     * Constructor
78     *
79     * @param conf Configuration
80     */
81    public LocalDiskDataAccessor(
82        ImmutableClassesGiraphConfiguration<?, ?, ?> conf) {
83      // Take advantage of multiple disks
84      String[] userPaths = GiraphConstants.PARTITIONS_DIRECTORY.getArray(conf);
85      this.numDisks = userPaths.length;
86      if (!GiraphConstants.NUM_OUT_OF_CORE_THREADS.isDefaultValue(conf) ||
87          GiraphConstants.NUM_OUT_OF_CORE_THREADS.get(conf) != numDisks) {
88        LOG.warn("LocalDiskDataAccessor: with this data accessor, number of " +
89            "out-of-core threads is only specified by the number of " +
90            "directories given by 'giraph.partitionsDirectory' flag! Now using " +
91            numDisks + " IO threads!");
92      }
93      this.basePaths = new String[numDisks];
94      int ptr = 0;
95      String jobId = conf.getJobId();
96      for (String path : userPaths) {
97        String jobDirectory = path + "/" + jobId;
98        File file = new File(jobDirectory);
99        checkState(file.mkdirs(), "LocalDiskDataAccessor: cannot create " +
100           "directory " + file.getAbsolutePath());
101       basePaths[ptr] = jobDirectory + "/";
102       ptr++;
103     }
104     final int diskBufferSize = OOC_DISK_BUFFER_SIZE.get(conf);
105     this.perThreadBuffers = new byte[numDisks][diskBufferSize];
106   }
107 
108   @Override
109   public void initialize() { }
110 
111   @Override
112   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
113     "NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE")
114   public void shutdown() {
115     for (String path : basePaths) {
116       File file = new File(path);
117       for (String subFileName : file.list()) {
118         File subFile = new File(file.getPath(), subFileName);
119         checkState(subFile.delete(), "shutdown: cannot delete file %s",
120             subFile.getAbsoluteFile());
121       }
122       checkState(file.delete(), "shutdown: cannot delete directory %s",
123           file.getAbsoluteFile());
124     }
125   }
126 
127   @Override
128   public int getNumAccessorThreads() {
129     return numDisks;
130   }
131 
132   @Override
133   public DataInputWrapper prepareInput(int threadId, DataIndex index)
134       throws IOException {
135     return new LocalDiskDataInputWrapper(basePaths[threadId] + index.toString(),
136         perThreadBuffers[threadId]);
137   }
138 
139   @Override
140   public DataOutputWrapper prepareOutput(
141       int threadId, DataIndex index, boolean shouldAppend) throws IOException {
142     return new LocalDiskDataOutputWrapper(
143         basePaths[threadId] + index.toString(), shouldAppend,
144         perThreadBuffers[threadId]);
145   }
146 
147   @Override
148   public boolean dataExist(int threadId, DataIndex index) {
149     return new File(basePaths[threadId] + index.toString()).exists();
150   }
151 
152   /** Implementation of <code>DataInput</code> wrapper for local disk reader */
153   private static class LocalDiskDataInputWrapper implements DataInputWrapper {
154     /** File used to read the data from */
155     private final File file;
156     /** Kryo's handle to read the data */
157     private final Input input;
158 
159     /**
160      * Constructor
161      *
162      * @param fileName file name
163      * @param buffer reusable byte buffer that will be used in Kryo's Input
164      *               reader
165      * @throws IOException
166      */
167     @edu.umd.cs.findbugs.annotations.SuppressWarnings(
168         "OBL_UNSATISFIED_OBLIGATION")
169     LocalDiskDataInputWrapper(String fileName, byte[] buffer)
170         throws IOException {
171       file = new File(fileName);
172       if (LOG.isDebugEnabled()) {
173         LOG.debug("LocalDiskDataInputWrapper: obtaining a data input from " +
174             "local file " + file.getAbsolutePath());
175       }
176       input = new UnsafeInput(buffer);
177       input.setInputStream(new FileInputStream(
178           new RandomAccessFile(file, "r").getFD()));
179     }
180 
181     @Override
182     public DataInput getDataInput() {
183       return new KryoDataInput(input);
184     }
185 
186     @Override
187     public long finalizeInput(boolean deleteOnClose) {
188       input.close();
189       long count = input.total();
190       checkState(!deleteOnClose || file.delete(),
191           "finalizeInput: failed to delete %s.", file.getAbsoluteFile());
192       return count;
193     }
194   }
195 
196   /** Implementation of <code>DataOutput</code> wrapper for local disk writer */
197   private static class LocalDiskDataOutputWrapper implements DataOutputWrapper {
198     /** File used to write the data to */
199     private final File file;
200     /** Kryo's handle to write the date */
201     private final Output output;
202 
203     /**
204      * Constructor
205      *
206      * @param fileName file name
207      * @param shouldAppend whether the <code>DataOutput</code> should be used
208      *                     for appending to already existing files
209      * @param buffer reusable byte buffer that will be used in Kryo's Output
210      *               writer
211      * @throws IOException
212      */
213     @edu.umd.cs.findbugs.annotations.SuppressWarnings(
214         "OBL_UNSATISFIED_OBLIGATION")
215     LocalDiskDataOutputWrapper(String fileName, boolean shouldAppend,
216                                byte[] buffer) throws IOException {
217       file = new File(fileName);
218       if (LOG.isDebugEnabled()) {
219         LOG.debug("LocalDiskDataOutputWrapper: obtaining a data output from " +
220             "local file " + file.getAbsolutePath());
221         if (!shouldAppend) {
222           checkState(!file.exists(), "LocalDiskDataOutputWrapper: file %s " +
223               "already exist", file.getAbsoluteFile());
224           checkState(file.createNewFile(), "LocalDiskDataOutputWrapper: " +
225               "cannot create file %s", file.getAbsolutePath());
226         }
227       }
228       output = new UnsafeOutput(buffer);
229       RandomAccessFile raf = new RandomAccessFile(file, "rw");
230       if (shouldAppend) {
231         raf.seek(file.length());
232       }
233       output.setOutputStream(new FileOutputStream(raf.getFD()));
234     }
235 
236     @Override
237     public DataOutput getDataOutput() {
238       return new KryoDataOutput(output);
239     }
240 
241 
242     @Override
243     public long finalizeOutput() {
244       output.close();
245       long count = output.total();
246       return count;
247     }
248   }
249 }