This project has retired. For details please refer to its Attic page.
GiraphFileInputFormat xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.io.formats;
20  
21  import org.apache.hadoop.conf.Configuration;
22  import org.apache.hadoop.fs.BlockLocation;
23  import org.apache.hadoop.fs.FileStatus;
24  import org.apache.hadoop.fs.FileSystem;
25  import org.apache.hadoop.fs.Path;
26  import org.apache.hadoop.fs.PathFilter;
27  import org.apache.hadoop.mapreduce.InputSplit;
28  import org.apache.hadoop.mapreduce.JobContext;
29  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
30  import org.apache.hadoop.mapreduce.lib.input.FileSplit;
31  import org.apache.hadoop.mapreduce.lib.input.InvalidInputException;
32  import org.apache.hadoop.util.StringUtils;
33  import org.apache.log4j.Logger;
34  
35  import java.io.IOException;
36  import java.util.ArrayList;
37  import java.util.Collections;
38  import java.util.List;
39  
40  /*if[HADOOP_NON_SECURE]
41  else[HADOOP_NON_SECURE]
42  import org.apache.hadoop.mapreduce.security.TokenCache;
43  end[HADOOP_NON_SECURE]*/
44  
45  /**
46   * Provides functionality similar to {@link FileInputFormat},
47   * but allows for different data sources (vertex and edge data).
48   *
49   * @param <K> Key
50   * @param <V> Value
51   */
52  public abstract class GiraphFileInputFormat<K, V>
53      extends FileInputFormat<K, V> {
54    /** Vertex input file paths. */
55    public static final String VERTEX_INPUT_DIR = "giraph.vertex.input.dir";
56    /** Edge input file paths. */
57    public static final String EDGE_INPUT_DIR = "giraph.edge.input.dir";
58    /** Number of vertex input files. */
59    public static final String NUM_VERTEX_INPUT_FILES =
60        "giraph.input.vertex.num.files";
61    /** Number of edge input files. */
62    public static final String NUM_EDGE_INPUT_FILES =
63        "giraph.input.edge.num.files";
64  
65    /** Split slop. */
66    private static final double SPLIT_SLOP = 1.1; // 10% slop
67  
68    /** Filter for hidden files. */
69    private static final PathFilter HIDDEN_FILE_FILTER = new PathFilter() {
70      public boolean accept(Path p) {
71        String name = p.getName();
72        return !name.startsWith("_") && !name.startsWith(".");
73      }
74    };
75  
76    /** Class logger. */
77    private static final Logger LOG =
78        Logger.getLogger(GiraphFileInputFormat.class);
79  
80    /**
81     * Add a {@link org.apache.hadoop.fs.Path} to the list of vertex inputs.
82     *
83     * @param conf the Configuration to store the input paths
84     * @param path {@link org.apache.hadoop.fs.Path} to be added to the list of
85     *                                              vertex inputs
86     */
87    public static void addVertexInputPath(Configuration conf,
88      Path path) throws IOException {
89      String dirStr = pathToDirString(conf, path);
90      String dirs = conf.get(VERTEX_INPUT_DIR);
91      conf.set(VERTEX_INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
92    }
93  
94    /**
95     * Set the {@link Path} for vertex input.
96     * @param conf Configuration to store in
97     * @param path {@link Path} to set
98     * @throws IOException on I/O errors
99     */
100   public static void setVertexInputPath(Configuration conf,
101       Path path) throws IOException {
102     conf.set(VERTEX_INPUT_DIR, pathToDirString(conf, path));
103   }
104 
105   /**
106    * Add a {@link org.apache.hadoop.fs.Path} to the list of edge inputs.
107    *
108    * @param conf the Configuration to store the input paths
109    * @param path {@link org.apache.hadoop.fs.Path} to be added to the list of
110    *                                              edge inputs
111    */
112   public static void addEdgeInputPath(Configuration conf,
113     Path path) throws IOException {
114     String dirStr = pathToDirString(conf, path);
115     String dirs = conf.get(EDGE_INPUT_DIR);
116     conf.set(EDGE_INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
117   }
118 
119   /**
120    * Set the {@link Path} for edge input.
121    * @param conf Configuration to store in
122    * @param path {@link Path} to set
123    * @throws IOException on I/O errors
124    */
125   public static void setEdgeInputPath(Configuration conf,
126       Path path) throws IOException {
127     conf.set(EDGE_INPUT_DIR, pathToDirString(conf, path));
128   }
129 
130   /**
131    * Convert from a Path to a string.
132    * This makes the path fully qualified and does escaping.
133    *
134    * @param conf Configuration to use
135    * @param path Path to convert
136    * @return String of escaped dir
137    * @throws IOException on I/O errors
138    */
139   private static String pathToDirString(Configuration conf, Path path)
140     throws IOException {
141     path = path.getFileSystem(conf).makeQualified(path);
142     return StringUtils.escapeString(path.toString());
143   }
144 
145   /**
146    * Get the list of vertex input {@link Path}s.
147    *
148    * @param context The job
149    * @return The list of input {@link Path}s
150    */
151   public static Path[] getVertexInputPaths(JobContext context) {
152     String dirs = context.getConfiguration().get(VERTEX_INPUT_DIR, "");
153     String [] list = StringUtils.split(dirs);
154     Path[] result = new Path[list.length];
155     for (int i = 0; i < list.length; i++) {
156       result[i] = new Path(StringUtils.unEscapeString(list[i]));
157     }
158     return result;
159   }
160 
161   /**
162    * Get the list of edge input {@link Path}s.
163    *
164    * @param context The job
165    * @return The list of input {@link Path}s
166    */
167   public static Path[] getEdgeInputPaths(JobContext context) {
168     String dirs = context.getConfiguration().get(EDGE_INPUT_DIR, "");
169     String [] list = StringUtils.split(dirs);
170     Path[] result = new Path[list.length];
171     for (int i = 0; i < list.length; i++) {
172       result[i] = new Path(StringUtils.unEscapeString(list[i]));
173     }
174     return result;
175   }
176 
177   /**
178    * Proxy PathFilter that accepts a path only if all filters given in the
179    * constructor do. Used by the listPaths() to apply the built-in
180    * HIDDEN_FILE_FILTER together with a user provided one (if any).
181    */
182   private static class MultiPathFilter implements PathFilter {
183     /** List of filters. */
184     private List<PathFilter> filters;
185 
186     /**
187      * Constructor.
188      *
189      * @param filters The list of filters
190      */
191     public MultiPathFilter(List<PathFilter> filters) {
192       this.filters = filters;
193     }
194 
195     /**
196      * True iff all filters accept the given path.
197      *
198      * @param path The path to check
199      * @return Whether the path is accepted
200      */
201     public boolean accept(Path path) {
202       for (PathFilter filter : filters) {
203         if (!filter.accept(path)) {
204           return false;
205         }
206       }
207       return true;
208     }
209   }
210 
211   /**
212    * Common method for listing vertex/edge input directories.
213    *
214    * @param job The job
215    * @param dirs list of vertex/edge input paths
216    * @return Array of FileStatus objects
217    * @throws IOException
218    */
219   private List<FileStatus> listStatus(JobContext job, Path[] dirs)
220     throws IOException {
221     List<FileStatus> result = new ArrayList<FileStatus>();
222     if (dirs.length == 0) {
223       throw new IOException("No input paths specified in job");
224     }
225 
226 /*if[HADOOP_NON_SECURE]
227 else[HADOOP_NON_SECURE]
228     // get tokens for all the required FileSystems..
229     TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,
230         job.getConfiguration());
231 end[HADOOP_NON_SECURE]*/
232 
233     List<IOException> errors = new ArrayList<IOException>();
234 
235     // creates a MultiPathFilter with the HIDDEN_FILE_FILTER and the
236     // user provided one (if any).
237     List<PathFilter> filters = new ArrayList<PathFilter>();
238     filters.add(HIDDEN_FILE_FILTER);
239     PathFilter jobFilter = getInputPathFilter(job);
240     if (jobFilter != null) {
241       filters.add(jobFilter);
242     }
243     PathFilter inputFilter = new MultiPathFilter(filters);
244 
245     for (Path p : dirs) {
246       FileSystem fs = p.getFileSystem(job.getConfiguration());
247       FileStatus[] matches = fs.globStatus(p, inputFilter);
248       if (matches == null) {
249         errors.add(new IOException("Input path does not exist: " + p));
250       } else if (matches.length == 0) {
251         errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
252       } else {
253         for (FileStatus globStat: matches) {
254           if (globStat.isDir()) {
255             Collections.addAll(result, fs.listStatus(globStat.getPath(),
256                 inputFilter));
257           } else {
258             result.add(globStat);
259           }
260         }
261       }
262     }
263 
264     if (!errors.isEmpty()) {
265       throw new InvalidInputException(errors);
266     }
267     LOG.info("Total input paths to process : " + result.size());
268     return result;
269   }
270 
271   /**
272    * List vertex input directories.
273    *
274    * @param job the job to list vertex input paths for
275    * @return array of FileStatus objects
276    * @throws IOException if zero items.
277    */
278   protected List<FileStatus> listVertexStatus(JobContext job)
279     throws IOException {
280     return listStatus(job, getVertexInputPaths(job));
281   }
282 
283   /**
284    * List edge input directories.
285    *
286    * @param job the job to list edge input paths for
287    * @return array of FileStatus objects
288    * @throws IOException if zero items.
289    */
290   protected List<FileStatus> listEdgeStatus(JobContext job)
291     throws IOException {
292     return listStatus(job, getEdgeInputPaths(job));
293   }
294 
295   /**
296    * Common method for generating the list of vertex/edge input splits.
297    *
298    * @param job The job
299    * @param files Array of FileStatus objects for vertex/edge input files
300    * @return The list of vertex/edge input splits
301    * @throws IOException
302    */
303   private List<InputSplit> getSplits(JobContext job, List<FileStatus> files)
304     throws IOException {
305     long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
306     long maxSize = getMaxSplitSize(job);
307 
308     // generate splits
309     List<InputSplit> splits = new ArrayList<InputSplit>();
310 
311     for (FileStatus file: files) {
312       Path path = file.getPath();
313       FileSystem fs = path.getFileSystem(job.getConfiguration());
314       long length = file.getLen();
315       BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
316       if ((length != 0) && isSplitable(job, path)) {
317         long blockSize = file.getBlockSize();
318         long splitSize = computeSplitSize(blockSize, minSize, maxSize);
319 
320         long bytesRemaining = length;
321         while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
322           int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
323           splits.add(new FileSplit(path, length - bytesRemaining, splitSize,
324               blkLocations[blkIndex].getHosts()));
325           bytesRemaining -= splitSize;
326         }
327 
328         if (bytesRemaining != 0) {
329           splits.add(new FileSplit(path, length - bytesRemaining,
330               bytesRemaining,
331               blkLocations[blkLocations.length - 1].getHosts()));
332         }
333       } else if (length != 0) {
334         splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
335       } else {
336         //Create empty hosts array for zero length files
337         splits.add(new FileSplit(path, 0, length, new String[0]));
338       }
339     }
340     return splits;
341   }
342 
343   /**
344    * Generate the list of vertex input splits.
345    *
346    * @param job The job
347    * @return The list of vertex input splits
348    * @throws IOException
349    */
350   public List<InputSplit> getVertexSplits(JobContext job) throws IOException {
351     List<FileStatus> files = listVertexStatus(job);
352     List<InputSplit> splits = getSplits(job, files);
353     // Save the number of input files in the job-conf
354     job.getConfiguration().setLong(NUM_VERTEX_INPUT_FILES, files.size());
355     LOG.debug("Total # of vertex splits: " + splits.size());
356     return splits;
357   }
358 
359   /**
360    * Generate the list of edge input splits.
361    *
362    * @param job The job
363    * @return The list of edge input splits
364    * @throws IOException
365    */
366   public List<InputSplit> getEdgeSplits(JobContext job) throws IOException {
367     List<FileStatus> files = listEdgeStatus(job);
368     List<InputSplit> splits = getSplits(job, files);
369     // Save the number of input files in the job-conf
370     job.getConfiguration().setLong(NUM_EDGE_INPUT_FILES, files.size());
371     LOG.debug("Total # of edge splits: " + splits.size());
372     return splits;
373   }
374 }