View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.ooc.persistence;
20  
21  import com.esotericsoftware.kryo.io.Input;
22  import com.esotericsoftware.kryo.io.KryoDataInput;
23  import com.esotericsoftware.kryo.io.KryoDataOutput;
24  import com.esotericsoftware.kryo.io.Output;
25  import com.esotericsoftware.kryo.io.UnsafeInput;
26  import com.esotericsoftware.kryo.io.UnsafeOutput;
27  import org.apache.giraph.conf.GiraphConstants;
28  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
29  import org.apache.giraph.conf.IntConfOption;
30  import org.apache.log4j.Logger;
31  
32  import java.io.DataInput;
33  import java.io.DataOutput;
34  import java.io.File;
35  import java.io.FileInputStream;
36  import java.io.FileOutputStream;
37  import java.io.IOException;
38  import java.io.RandomAccessFile;
39  
40  import static com.google.common.base.Preconditions.checkState;
41  import static org.apache.giraph.conf.GiraphConstants.ONE_MB;
42  
43  /**
44   * Data accessor object to read/write data in local disk.
45   * Note: This class assumes that the data are partitioned across IO threads,
46   *       i.e. each part of data can be accessed by one and only one IO thread
47   *       throughout the execution. Also, each IO thread reads a particular
48   *       type of data completely and, only then, it can read other type of data;
49   *       i.e. an IO thread cannot be used to read two different files at the
50   *       same time. These assumptions are based on the assumptions that the
51   *       current out-of-core mechanism is designed for.
52   */
53  public class LocalDiskDataAccessor implements OutOfCoreDataAccessor {
54    /**
55     * Size of the buffer used for (de)serializing data when reading/writing
56     * from/to disk
57     */
58    public static final IntConfOption OOC_DISK_BUFFER_SIZE =
59        new IntConfOption("graph.oocDiskBufferSize", 4 * ONE_MB,
60            "size of the buffer when (de)serializing data for reading/writing " +
61                "from/to disk");
62  
63    /** Class logger */
64    private static final Logger LOG =
65        Logger.getLogger(LocalDiskDataAccessor.class);
66    /**
67     * In-memory buffer used for (de)serializing data when reading/writing
68     * from/to disk using Kryo
69     */
70    private final byte[][] perThreadBuffers;
71    /** Path prefix for different disks */
72    private final String[] basePaths;
73    /** How many disks (i.e. IO threads) do we have? */
74    private final int numDisks;
75  
76    /**
77     * Constructor
78     *
79     * @param conf Configuration
80     */
81    public LocalDiskDataAccessor(
82        ImmutableClassesGiraphConfiguration<?, ?, ?> conf) {
83      // Take advantage of multiple disks
84      String[] userPaths = GiraphConstants.PARTITIONS_DIRECTORY.getArray(conf);
85      this.numDisks = userPaths.length;
86      if (!GiraphConstants.NUM_OUT_OF_CORE_THREADS.isDefaultValue(conf) ||
87          GiraphConstants.NUM_OUT_OF_CORE_THREADS.get(conf) != numDisks) {
88        LOG.warn("LocalDiskDataAccessor: with this data accessor, number of " +
89            "out-of-core threads is only specified by the number of " +
90            "directories given by 'giraph.partitionsDirectory' flag! Now using " +
91            numDisks + " IO threads!");
92      }
93      this.basePaths = new String[numDisks];
94      int ptr = 0;
95      String jobId = conf.getJobId();
96      for (String path : userPaths) {
97        File file = new File(path);
98        if (!file.exists()) {
99          checkState(file.mkdirs(), "LocalDiskDataAccessor: cannot create " +
100             "directory " + file.getAbsolutePath());
101       }
102       basePaths[ptr] = path + "/" + jobId;
103       ptr++;
104     }
105     final int diskBufferSize = OOC_DISK_BUFFER_SIZE.get(conf);
106     this.perThreadBuffers = new byte[numDisks][diskBufferSize];
107   }
108 
109   @Override
110   public void initialize() { }
111 
112   @Override
113   public void shutdown() {
114     for (String path : basePaths) {
115       File file = new File(path).getParentFile();
116       for (String subFileName : file.list()) {
117         File subFile = new File(file.getPath(), subFileName);
118         checkState(subFile.delete(), "shutdown: cannot delete file %s",
119             subFile.getAbsoluteFile());
120       }
121       checkState(file.delete(), "shutdown: cannot delete directory %s",
122           file.getAbsoluteFile());
123     }
124   }
125 
126   @Override
127   public int getNumAccessorThreads() {
128     return numDisks;
129   }
130 
131   @Override
132   public DataInputWrapper prepareInput(int threadId, DataIndex index)
133       throws IOException {
134     return new LocalDiskDataInputWrapper(basePaths[threadId] + index.toString(),
135         perThreadBuffers[threadId]);
136   }
137 
138   @Override
139   public DataOutputWrapper prepareOutput(
140       int threadId, DataIndex index, boolean shouldAppend) throws IOException {
141     return new LocalDiskDataOutputWrapper(
142         basePaths[threadId] + index.toString(), shouldAppend,
143         perThreadBuffers[threadId]);
144   }
145 
146   @Override
147   public boolean dataExist(int threadId, DataIndex index) {
148     return new File(basePaths[threadId] + index.toString()).exists();
149   }
150 
151   /** Implementation of <code>DataInput</code> wrapper for local disk reader */
152   private static class LocalDiskDataInputWrapper implements DataInputWrapper {
153     /** File used to read the data from */
154     private final File file;
155     /** Kryo's handle to read the data */
156     private final Input input;
157 
158     /**
159      * Constructor
160      *
161      * @param fileName file name
162      * @param buffer reusable byte buffer that will be used in Kryo's Input
163      *               reader
164      * @throws IOException
165      */
166     @edu.umd.cs.findbugs.annotations.SuppressWarnings(
167         "OBL_UNSATISFIED_OBLIGATION")
168     LocalDiskDataInputWrapper(String fileName, byte[] buffer)
169         throws IOException {
170       file = new File(fileName);
171       if (LOG.isDebugEnabled()) {
172         LOG.debug("LocalDiskDataInputWrapper: obtaining a data input from " +
173             "local file " + file.getAbsolutePath());
174       }
175       input = new UnsafeInput(buffer);
176       input.setInputStream(new FileInputStream(
177           new RandomAccessFile(file, "r").getFD()));
178     }
179 
180     @Override
181     public DataInput getDataInput() {
182       return new KryoDataInput(input);
183     }
184 
185     @Override
186     public long finalizeInput(boolean deleteOnClose) {
187       input.close();
188       long count = input.total();
189       checkState(!deleteOnClose || file.delete(),
190           "finalizeInput: failed to delete %s.", file.getAbsoluteFile());
191       return count;
192     }
193   }
194 
195   /** Implementation of <code>DataOutput</code> wrapper for local disk writer */
196   private static class LocalDiskDataOutputWrapper implements DataOutputWrapper {
197     /** File used to write the data to */
198     private final File file;
199     /** Kryo's handle to write the date */
200     private final Output output;
201 
202     /**
203      * Constructor
204      *
205      * @param fileName file name
206      * @param shouldAppend whether the <code>DataOutput</code> should be used
207      *                     for appending to already existing files
208      * @param buffer reusable byte buffer that will be used in Kryo's Output
209      *               writer
210      * @throws IOException
211      */
212     @edu.umd.cs.findbugs.annotations.SuppressWarnings(
213         "OBL_UNSATISFIED_OBLIGATION")
214     LocalDiskDataOutputWrapper(String fileName, boolean shouldAppend,
215                                byte[] buffer) throws IOException {
216       file = new File(fileName);
217       if (LOG.isDebugEnabled()) {
218         LOG.debug("LocalDiskDataOutputWrapper: obtaining a data output from " +
219             "local file " + file.getAbsolutePath());
220         if (!shouldAppend) {
221           checkState(!file.exists(), "LocalDiskDataOutputWrapper: file %s " +
222               "already exist", file.getAbsoluteFile());
223           checkState(file.createNewFile(), "LocalDiskDataOutputWrapper: " +
224               "cannot create file %s", file.getAbsolutePath());
225         }
226       }
227       output = new UnsafeOutput(buffer);
228       RandomAccessFile raf = new RandomAccessFile(file, "rw");
229       if (shouldAppend) {
230         raf.seek(file.length());
231       }
232       output.setOutputStream(new FileOutputStream(raf.getFD()));
233     }
234 
235     @Override
236     public DataOutput getDataOutput() {
237       return new KryoDataOutput(output);
238     }
239 
240 
241     @Override
242     public long finalizeOutput() {
243       output.close();
244       long count = output.total();
245       return count;
246     }
247   }
248 }