This project has retired. For details please refer to its Attic page.
TextVertexInputFormat xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.io.formats;
20  
21  import org.apache.giraph.edge.Edge;
22  import org.apache.giraph.graph.Vertex;
23  import org.apache.giraph.io.VertexInputFormat;
24  import org.apache.giraph.io.VertexReader;
25  import org.apache.hadoop.conf.Configuration;
26  import org.apache.hadoop.io.LongWritable;
27  import org.apache.hadoop.io.Text;
28  import org.apache.hadoop.io.Writable;
29  import org.apache.hadoop.io.WritableComparable;
30  import org.apache.hadoop.mapreduce.InputSplit;
31  import org.apache.hadoop.mapreduce.JobContext;
32  import org.apache.hadoop.mapreduce.RecordReader;
33  import org.apache.hadoop.mapreduce.TaskAttemptContext;
34  
35  import java.io.IOException;
36  import java.util.List;
37  
38  /**
39   * Abstract class that users should subclass to use their own text based
40   * vertex input format.
41   *
42   * @param <I> Vertex index value
43   * @param <V> Vertex value
44   * @param <E> Edge value
45   */
46  @SuppressWarnings("rawtypes")
47  public abstract class TextVertexInputFormat<I extends WritableComparable,
48      V extends Writable, E extends Writable>
49      extends VertexInputFormat<I, V, E> {
50    /** Uses the GiraphTextInputFormat to do everything */
51    protected GiraphTextInputFormat textInputFormat = new GiraphTextInputFormat();
52  
53    @Override public void checkInputSpecs(Configuration conf) { }
54  
55    @Override
56    public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
57      throws IOException, InterruptedException {
58      // Ignore the hint of numWorkers here since we are using
59      // GiraphTextInputFormat to do this for us
60      return textInputFormat.getVertexSplits(context);
61    }
62  
63    /**
64     * The factory method which produces the {@link TextVertexReader} used by this
65     * input format.
66     *
67     * @param split
68     *          the split to be read
69     * @param context
70     *          the information about the task
71     * @return
72     *         the text vertex reader to be used
73     */
74    @Override
75    public abstract TextVertexReader createVertexReader(InputSplit split,
76        TaskAttemptContext context) throws IOException;
77  
78    /**
79     * Abstract class to be implemented by the user based on their specific
80     * vertex input. Easiest to ignore the key value separator and only use
81     * key instead.
82     *
83     * When reading a vertex from each line, extend
84     * {@link TextVertexReaderFromEachLine}. If you need to preprocess each line
85     * first, then extend {@link TextVertexReaderFromEachLineProcessed}. If you
86     * need common exception handling while preprocessing, then extend
87     * {@link TextVertexReaderFromEachLineProcessedHandlingExceptions}.
88     */
89    protected abstract class TextVertexReader extends VertexReader<I, V, E> {
90      /** Internal line record reader */
91      private RecordReader<LongWritable, Text> lineRecordReader;
92      /** Context passed to initialize */
93      private TaskAttemptContext context;
94  
95      @Override
96      public void initialize(InputSplit inputSplit, TaskAttemptContext context)
97        throws IOException, InterruptedException {
98        this.context = context;
99        lineRecordReader = createLineRecordReader(inputSplit, context);
100       lineRecordReader.initialize(inputSplit, context);
101     }
102 
103     /**
104      * Create the line record reader. Override this to use a different
105      * underlying record reader (useful for testing).
106      *
107      * @param inputSplit
108      *          the split to read
109      * @param context
110      *          the context passed to initialize
111      * @return
112      *         the record reader to be used
113      * @throws IOException
114      *           exception that can be thrown during creation
115      * @throws InterruptedException
116      *           exception that can be thrown during creation
117      */
118     protected RecordReader<LongWritable, Text>
119     createLineRecordReader(InputSplit inputSplit, TaskAttemptContext context)
120       throws IOException, InterruptedException {
121       return textInputFormat.createRecordReader(inputSplit, context);
122     }
123 
124     @Override
125     public void close() throws IOException {
126       lineRecordReader.close();
127     }
128 
129     @Override
130     public float getProgress() throws IOException, InterruptedException {
131       return lineRecordReader.getProgress();
132     }
133 
134     /**
135      * Get the line record reader.
136      *
137      * @return Record reader to be used for reading.
138      */
139     protected RecordReader<LongWritable, Text> getRecordReader() {
140       return lineRecordReader;
141     }
142 
143     /**
144      * Get the context.
145      *
146      * @return Context passed to initialize.
147      */
148     protected TaskAttemptContext getContext() {
149       return context;
150     }
151   }
152 
153   /**
154    * Abstract class to be implemented by the user to read a vertex from each
155    * text line.
156    */
157   protected abstract class TextVertexReaderFromEachLine extends
158     TextVertexReader {
159 
160     @Override
161     public final Vertex<I, V, E> getCurrentVertex() throws IOException,
162     InterruptedException {
163       Text line = getRecordReader().getCurrentValue();
164       Vertex<I, V, E> vertex = getConf().createVertex();
165       vertex.initialize(getId(line), getValue(line), getEdges(line));
166       return vertex;
167     }
168 
169     @Override
170     public final boolean nextVertex() throws IOException, InterruptedException {
171       return getRecordReader().nextKeyValue();
172     }
173 
174     /**
175      * Reads vertex id from the current line.
176      *
177      * @param line
178      *          the current line
179      * @return
180      *         the vertex id corresponding to the line
181      * @throws IOException
182      *           exception that can be thrown while reading
183      */
184     protected abstract I getId(Text line) throws IOException;
185 
186     /**
187      * Reads vertex value from the current line.
188      *
189      * @param line
190      *          the current line
191      * @return
192      *         the vertex value corresponding to the line
193      * @throws IOException
194      *           exception that can be thrown while reading
195      */
196     protected abstract V getValue(Text line) throws IOException;
197 
198     /**
199      * Reads edges value from the current line.
200      *
201      * @param line
202      *          the current line
203      * @return
204      *         the edges
205      * @throws IOException
206      *           exception that can be thrown while reading
207      */
208     protected abstract Iterable<Edge<I, E>> getEdges(Text line) throws
209         IOException;
210 
211   }
212 
213   /**
214    * Abstract class to be implemented by the user to read a vertex from each
215    * text line after preprocessing it.
216    *
217    * @param <T>
218    *          The resulting type of preprocessing.
219    */
220   protected abstract class TextVertexReaderFromEachLineProcessed<T> extends
221       TextVertexReader {
222 
223     @Override
224     public final boolean nextVertex() throws IOException, InterruptedException {
225       return getRecordReader().nextKeyValue();
226     }
227 
228     @Override
229     public final Vertex<I, V, E> getCurrentVertex() throws IOException,
230     InterruptedException {
231       Text line = getRecordReader().getCurrentValue();
232       Vertex<I, V, E> vertex;
233       T processed = preprocessLine(line);
234       vertex = getConf().createVertex();
235       vertex.initialize(getId(processed), getValue(processed),
236           getEdges(processed));
237       return vertex;
238     }
239 
240     /**
241      * Preprocess the line so other methods can easily read necessary
242      * information for creating vertex.
243      *
244      * @param line
245      *          the current line to be read
246      * @return
247      *         the preprocessed object
248      * @throws IOException
249      *           exception that can be thrown while reading
250      */
251     protected abstract T preprocessLine(Text line) throws IOException;
252 
253     /**
254      * Reads vertex id from the preprocessed line.
255      *
256      * @param line
257      *          the object obtained by preprocessing the line
258      * @return
259      *         the vertex id
260      * @throws IOException
261      *           exception that can be thrown while reading
262      */
263     protected abstract I getId(T line) throws IOException;
264 
265     /**
266      * Reads vertex value from the preprocessed line.
267      *
268      * @param line
269      *          the object obtained by preprocessing the line
270      * @return
271      *         the vertex value
272      * @throws IOException
273      *           exception that can be thrown while reading
274      */
275     protected abstract V getValue(T line) throws IOException;
276 
277     /**
278      * Reads edges from the preprocessed line.
279      *
280      *
281      * @param line
282      *          the object obtained by preprocessing the line
283      * @return
284      *         the edges
285      * @throws IOException
286      *           exception that can be thrown while reading
287      */
288     protected abstract Iterable<Edge<I, E>> getEdges(T line) throws IOException;
289 
290   }
291 
292   // CHECKSTYLE: stop RedundantThrows
293   /**
294    * Abstract class to be implemented by the user to read a vertex from each
295    * text line after preprocessing it with exception handling.
296    *
297    * @param <T>
298    *          The resulting type of preprocessing.
299    * @param <X>
300    *          The exception type that can be thrown due to preprocessing.
301    */
302   protected abstract class
303   TextVertexReaderFromEachLineProcessedHandlingExceptions<T, X extends
304     Throwable> extends TextVertexReader {
305 
306     @Override
307     public final boolean nextVertex() throws IOException, InterruptedException {
308       return getRecordReader().nextKeyValue();
309     }
310 
311     @SuppressWarnings("unchecked")
312     @Override
313     public final Vertex<I, V, E> getCurrentVertex() throws IOException,
314         InterruptedException {
315       // Note we are reading from value only since key is the line number
316       Text line = getRecordReader().getCurrentValue();
317       Vertex<I, V, E> vertex;
318       T processed = null;
319       try {
320         processed = preprocessLine(line);
321         vertex = getConf().createVertex();
322         vertex.initialize(getId(processed), getValue(processed),
323             getEdges(processed));
324       } catch (IOException e) {
325         throw e;
326       // CHECKSTYLE: stop IllegalCatch
327       } catch (Throwable t) {
328         return handleException(line, processed, (X) t);
329       // CHECKSTYLE: resume IllegalCatch
330       }
331       return vertex;
332     }
333 
334     /**
335      * Preprocess the line so other methods can easily read necessary
336      * information for creating vertex.
337      *
338      * @param line
339      *          the current line to be read
340      * @return
341      *         the preprocessed object
342      * @throws X
343      *           exception that can be thrown while preprocessing the line
344      * @throws IOException
345      *           exception that can be thrown while reading
346      */
347     protected abstract T preprocessLine(Text line) throws X, IOException;
348 
349     /**
350      * Reads vertex id from the preprocessed line.
351      *
352      * @param line
353      *          the object obtained by preprocessing the line
354      * @return
355      *         the vertex id
356      * @throws X
357      *           exception that can be thrown while reading the preprocessed
358      *           object
359      * @throws IOException
360      *           exception that can be thrown while reading
361      */
362     protected abstract I getId(T line) throws X, IOException;
363 
364     /**
365      * Reads vertex value from the preprocessed line.
366      *
367      * @param line
368      *          the object obtained by preprocessing the line
369      * @return
370      *         the vertex value
371      * @throws X
372      *           exception that can be thrown while reading the preprocessed
373      *           object
374      * @throws IOException
375      *           exception that can be thrown while reading
376      */
377     protected abstract V getValue(T line) throws X, IOException;
378 
379     /**
380      * Reads edges from the preprocessed line.
381      *
382      *
383      * @param line
384      *          the object obtained by preprocessing the line
385      * @return
386      *         the edges
387      * @throws X
388      *           exception that can be thrown while reading the preprocessed
389      *           object
390      * @throws IOException
391      *           exception that can be thrown while reading
392      */
393     protected abstract Iterable<Edge<I, E>> getEdges(T line) throws X,
394         IOException;
395 
396     /**
397      * Handles exceptions while reading vertex from each line.
398      *
399      * @param line
400      *          the line that was being read when the exception was thrown
401      * @param processed
402      *          the object obtained by preprocessing the line. Can be null if
403      *          exception was thrown during preprocessing.
404      * @param e
405      *          the exception thrown while reading the line
406      * @return the recovered/alternative vertex to be used
407      */
408     protected Vertex<I, V, E> handleException(Text line, T processed, X e) {
409       throw new IllegalArgumentException(e);
410     }
411 
412   }
413   // CHECKSTYLE: resume RedundantThrows
414 
415 }