This project has retired. For details please refer to its
Attic page.
GiraphFileInputFormat xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.io.formats;
20
21 import org.apache.hadoop.conf.Configuration;
22 import org.apache.hadoop.fs.BlockLocation;
23 import org.apache.hadoop.fs.FileStatus;
24 import org.apache.hadoop.fs.FileSystem;
25 import org.apache.hadoop.fs.Path;
26 import org.apache.hadoop.fs.PathFilter;
27 import org.apache.hadoop.mapreduce.InputSplit;
28 import org.apache.hadoop.mapreduce.JobContext;
29 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
30 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
31 import org.apache.hadoop.mapreduce.lib.input.InvalidInputException;
32 import org.apache.hadoop.util.StringUtils;
33 import org.apache.log4j.Logger;
34
35 import java.io.IOException;
36 import java.util.ArrayList;
37 import java.util.Collections;
38 import java.util.List;
39
40
41
42
43
44
45
46
47
48
49
50
51
52 public abstract class GiraphFileInputFormat<K, V>
53 extends FileInputFormat<K, V> {
54
55 public static final String VERTEX_INPUT_DIR = "giraph.vertex.input.dir";
56
57 public static final String EDGE_INPUT_DIR = "giraph.edge.input.dir";
58
59 public static final String NUM_VERTEX_INPUT_FILES =
60 "giraph.input.vertex.num.files";
61
62 public static final String NUM_EDGE_INPUT_FILES =
63 "giraph.input.edge.num.files";
64
65
66 private static final double SPLIT_SLOP = 1.1;
67
68
69 private static final PathFilter HIDDEN_FILE_FILTER = new PathFilter() {
70 public boolean accept(Path p) {
71 String name = p.getName();
72 return !name.startsWith("_") && !name.startsWith(".");
73 }
74 };
75
76
77 private static final Logger LOG =
78 Logger.getLogger(GiraphFileInputFormat.class);
79
80
81
82
83
84
85
86
87 public static void addVertexInputPath(Configuration conf,
88 Path path) throws IOException {
89 String dirStr = pathToDirString(conf, path);
90 String dirs = conf.get(VERTEX_INPUT_DIR);
91 conf.set(VERTEX_INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
92 }
93
94
95
96
97
98
99
100 public static void setVertexInputPath(Configuration conf,
101 Path path) throws IOException {
102 conf.set(VERTEX_INPUT_DIR, pathToDirString(conf, path));
103 }
104
105
106
107
108
109
110
111
112 public static void addEdgeInputPath(Configuration conf,
113 Path path) throws IOException {
114 String dirStr = pathToDirString(conf, path);
115 String dirs = conf.get(EDGE_INPUT_DIR);
116 conf.set(EDGE_INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
117 }
118
119
120
121
122
123
124
125 public static void setEdgeInputPath(Configuration conf,
126 Path path) throws IOException {
127 conf.set(EDGE_INPUT_DIR, pathToDirString(conf, path));
128 }
129
130
131
132
133
134
135
136
137
138
139 private static String pathToDirString(Configuration conf, Path path)
140 throws IOException {
141 path = path.getFileSystem(conf).makeQualified(path);
142 return StringUtils.escapeString(path.toString());
143 }
144
145
146
147
148
149
150
151 public static Path[] getVertexInputPaths(JobContext context) {
152 String dirs = context.getConfiguration().get(VERTEX_INPUT_DIR, "");
153 String [] list = StringUtils.split(dirs);
154 Path[] result = new Path[list.length];
155 for (int i = 0; i < list.length; i++) {
156 result[i] = new Path(StringUtils.unEscapeString(list[i]));
157 }
158 return result;
159 }
160
161
162
163
164
165
166
167 public static Path[] getEdgeInputPaths(JobContext context) {
168 String dirs = context.getConfiguration().get(EDGE_INPUT_DIR, "");
169 String [] list = StringUtils.split(dirs);
170 Path[] result = new Path[list.length];
171 for (int i = 0; i < list.length; i++) {
172 result[i] = new Path(StringUtils.unEscapeString(list[i]));
173 }
174 return result;
175 }
176
177
178
179
180
181
182 private static class MultiPathFilter implements PathFilter {
183
184 private List<PathFilter> filters;
185
186
187
188
189
190
191 public MultiPathFilter(List<PathFilter> filters) {
192 this.filters = filters;
193 }
194
195
196
197
198
199
200
201 public boolean accept(Path path) {
202 for (PathFilter filter : filters) {
203 if (!filter.accept(path)) {
204 return false;
205 }
206 }
207 return true;
208 }
209 }
210
211
212
213
214
215
216
217
218
219 private List<FileStatus> listStatus(JobContext job, Path[] dirs)
220 throws IOException {
221 List<FileStatus> result = new ArrayList<FileStatus>();
222 if (dirs.length == 0) {
223 throw new IOException("No input paths specified in job");
224 }
225
226
227
228
229
230
231
232
233 List<IOException> errors = new ArrayList<IOException>();
234
235
236
237 List<PathFilter> filters = new ArrayList<PathFilter>();
238 filters.add(HIDDEN_FILE_FILTER);
239 PathFilter jobFilter = getInputPathFilter(job);
240 if (jobFilter != null) {
241 filters.add(jobFilter);
242 }
243 PathFilter inputFilter = new MultiPathFilter(filters);
244
245 for (Path p : dirs) {
246 FileSystem fs = p.getFileSystem(job.getConfiguration());
247 FileStatus[] matches = fs.globStatus(p, inputFilter);
248 if (matches == null) {
249 errors.add(new IOException("Input path does not exist: " + p));
250 } else if (matches.length == 0) {
251 errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
252 } else {
253 for (FileStatus globStat: matches) {
254 if (globStat.isDir()) {
255 Collections.addAll(result, fs.listStatus(globStat.getPath(),
256 inputFilter));
257 } else {
258 result.add(globStat);
259 }
260 }
261 }
262 }
263
264 if (!errors.isEmpty()) {
265 throw new InvalidInputException(errors);
266 }
267 LOG.info("Total input paths to process : " + result.size());
268 return result;
269 }
270
271
272
273
274
275
276
277
278 protected List<FileStatus> listVertexStatus(JobContext job)
279 throws IOException {
280 return listStatus(job, getVertexInputPaths(job));
281 }
282
283
284
285
286
287
288
289
290 protected List<FileStatus> listEdgeStatus(JobContext job)
291 throws IOException {
292 return listStatus(job, getEdgeInputPaths(job));
293 }
294
295
296
297
298
299
300
301
302
303 private List<InputSplit> getSplits(JobContext job, List<FileStatus> files)
304 throws IOException {
305 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
306 long maxSize = getMaxSplitSize(job);
307
308
309 List<InputSplit> splits = new ArrayList<InputSplit>();
310
311 for (FileStatus file: files) {
312 Path path = file.getPath();
313 FileSystem fs = path.getFileSystem(job.getConfiguration());
314 long length = file.getLen();
315 BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
316 if ((length != 0) && isSplitable(job, path)) {
317 long blockSize = file.getBlockSize();
318 long splitSize = computeSplitSize(blockSize, minSize, maxSize);
319
320 long bytesRemaining = length;
321 while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
322 int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
323 splits.add(new FileSplit(path, length - bytesRemaining, splitSize,
324 blkLocations[blkIndex].getHosts()));
325 bytesRemaining -= splitSize;
326 }
327
328 if (bytesRemaining != 0) {
329 splits.add(new FileSplit(path, length - bytesRemaining,
330 bytesRemaining,
331 blkLocations[blkLocations.length - 1].getHosts()));
332 }
333 } else if (length != 0) {
334 splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
335 } else {
336
337 splits.add(new FileSplit(path, 0, length, new String[0]));
338 }
339 }
340 return splits;
341 }
342
343
344
345
346
347
348
349
350 public List<InputSplit> getVertexSplits(JobContext job) throws IOException {
351 List<FileStatus> files = listVertexStatus(job);
352 List<InputSplit> splits = getSplits(job, files);
353
354 job.getConfiguration().setLong(NUM_VERTEX_INPUT_FILES, files.size());
355 LOG.debug("Total # of vertex splits: " + splits.size());
356 return splits;
357 }
358
359
360
361
362
363
364
365
366 public List<InputSplit> getEdgeSplits(JobContext job) throws IOException {
367 List<FileStatus> files = listEdgeStatus(job);
368 List<InputSplit> splits = getSplits(job, files);
369
370 job.getConfiguration().setLong(NUM_EDGE_INPUT_FILES, files.size());
371 LOG.debug("Total # of edge splits: " + splits.size());
372 return splits;
373 }
374 }