This project has retired. For details please refer to its Attic page.
TextEdgeInputFormat xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.io.formats;
20  
21  import java.io.IOException;
22  import java.util.List;
23  import org.apache.giraph.edge.Edge;
24  import org.apache.giraph.edge.EdgeFactory;
25  import org.apache.giraph.io.EdgeInputFormat;
26  import org.apache.giraph.io.EdgeReader;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.io.LongWritable;
29  import org.apache.hadoop.io.Text;
30  import org.apache.hadoop.io.Writable;
31  import org.apache.hadoop.io.WritableComparable;
32  import org.apache.hadoop.mapreduce.InputSplit;
33  import org.apache.hadoop.mapreduce.JobContext;
34  import org.apache.hadoop.mapreduce.RecordReader;
35  import org.apache.hadoop.mapreduce.TaskAttemptContext;
36  
37  /**
38   * Abstract class that users should subclass to use their own text based
39   * edge output format.
40   *
41   * @param <I> Vertex id
42   * @param <E> Edge data
43   */
44  @SuppressWarnings("rawtypes")
45  public abstract class TextEdgeInputFormat<I extends WritableComparable,
46      E extends Writable> extends EdgeInputFormat<I, E> {
47    /** Underlying GiraphTextInputFormat. */
48    protected GiraphTextInputFormat textInputFormat = new GiraphTextInputFormat();
49  
50    @Override public void checkInputSpecs(Configuration conf) { }
51  
52    @Override
53    public List<InputSplit> getSplits(
54        JobContext context, int minSplitCountHint) throws IOException,
55        InterruptedException {
56      // Ignore the hint of numWorkers here since we are using
57      // GiraphTextInputFormat to do this for us
58      return textInputFormat.getEdgeSplits(context);
59    }
60  
61    /**
62     * {@link EdgeReader} for {@link TextEdgeInputFormat}.
63     */
64    protected abstract class TextEdgeReader extends EdgeReader<I, E> {
65      /** Internal line record reader */
66      private RecordReader<LongWritable, Text> lineRecordReader;
67      /** Context passed to initialize */
68      private TaskAttemptContext context;
69  
70      @Override
71      public void initialize(InputSplit inputSplit, TaskAttemptContext context)
72        throws IOException, InterruptedException {
73        this.context = context;
74        lineRecordReader = createLineRecordReader(inputSplit, context);
75        lineRecordReader.initialize(inputSplit, context);
76      }
77  
78      /**
79       * Create the line record reader. Override this to use a different
80       * underlying record reader (useful for testing).
81       *
82       * @param inputSplit
83       *          the split to read
84       * @param context
85       *          the context passed to initialize
86       * @return
87       *         the record reader to be used
88       * @throws IOException
89       *           exception that can be thrown during creation
90       * @throws InterruptedException
91       *           exception that can be thrown during creation
92       */
93      protected RecordReader<LongWritable, Text>
94      createLineRecordReader(InputSplit inputSplit, TaskAttemptContext context)
95        throws IOException, InterruptedException {
96        return textInputFormat.createRecordReader(inputSplit, context);
97      }
98  
99      @Override
100     public void close() throws IOException {
101       lineRecordReader.close();
102     }
103 
104     @Override
105     public float getProgress() throws IOException, InterruptedException {
106       return lineRecordReader.getProgress();
107     }
108 
109     /**
110      * Get the line record reader.
111      *
112      * @return Record reader to be used for reading.
113      */
114     protected RecordReader<LongWritable, Text> getRecordReader() {
115       return lineRecordReader;
116     }
117 
118     /**
119      * Get the context.
120      *
121      * @return Context passed to initialize.
122      */
123     protected TaskAttemptContext getContext() {
124       return context;
125     }
126   }
127 
128   /**
129    * Abstract class to be implemented by the user to read an edge from each
130    * text line.
131    */
132   protected abstract class TextEdgeReaderFromEachLine extends TextEdgeReader {
133     @Override
134     public final I getCurrentSourceId() throws IOException,
135         InterruptedException {
136       Text line = getRecordReader().getCurrentValue();
137       return getSourceVertexId(line);
138     }
139 
140     @Override
141     public final Edge<I, E> getCurrentEdge() throws IOException,
142         InterruptedException {
143       Text line = getRecordReader().getCurrentValue();
144       I targetVertexId = getTargetVertexId(line);
145       E edgeValue = getValue(line);
146       return EdgeFactory.create(targetVertexId, edgeValue);
147     }
148 
149     @Override
150     public final boolean nextEdge() throws IOException, InterruptedException {
151       return getRecordReader().nextKeyValue();
152     }
153 
154     /**
155      * Reads source vertex id from the current line.
156      *
157      * @param line
158      *          the current line
159      * @return
160      *         the source vertex id corresponding to the line
161      * @throws IOException
162      *           exception that can be thrown while reading
163      */
164     protected abstract I getSourceVertexId(Text line) throws IOException;
165 
166 
167     /**
168      * Reads target vertex id from the current line.
169      *
170      * @param line
171      *          the current line
172      * @return
173      *         the target vertex id corresponding to the line
174      * @throws IOException
175      *           exception that can be thrown while reading
176      */
177     protected abstract I getTargetVertexId(Text line) throws IOException;
178 
179     /**
180      * Reads edge value from the current line.
181      *
182      * @param line
183      *          the current line
184      * @return
185      *         the edge value corresponding to the line
186      * @throws IOException
187      *           exception that can be thrown while reading
188      */
189     protected abstract E getValue(Text line) throws IOException;
190   }
191 
192   /**
193    * Abstract class to be implemented by the user to read an edge from each
194    * text line after preprocessing it.
195    *
196    * @param <T>
197    *          The resulting type of preprocessing.
198    */
199   protected abstract class TextEdgeReaderFromEachLineProcessed<T> extends
200       TextEdgeReader {
201     /** Generic type holding processed line */
202     private T processedLine;
203 
204     @Override
205     public I getCurrentSourceId() throws IOException, InterruptedException {
206       T processed = processCurrentLine();
207       return getSourceVertexId(processed);
208     }
209 
210     @Override
211     public final Edge<I, E> getCurrentEdge() throws IOException,
212         InterruptedException {
213       T processed = processCurrentLine();
214       I targetVertexId = getTargetVertexId(processed);
215       E edgeValue = getValue(processed);
216       return EdgeFactory.create(targetVertexId, edgeValue);
217     }
218 
219     /**
220      * Process the current line to the user's type.
221      *
222      * @return T processed line
223      * @throws IOException on I/O error
224      * @throws InterruptedException on interruption
225      */
226     private T processCurrentLine() throws IOException, InterruptedException {
227       if (processedLine == null) {
228         Text line = getRecordReader().getCurrentValue();
229         processedLine = preprocessLine(line);
230       }
231       return processedLine;
232     }
233 
234     @Override
235     public final boolean nextEdge() throws IOException, InterruptedException {
236       processedLine = null;
237       return getRecordReader().nextKeyValue();
238     }
239 
240     /**
241      * Preprocess the line so other methods can easily read necessary
242      * information for creating edge
243      *
244      * @param line
245      *          the current line to be read
246      * @return
247      *         the preprocessed object
248      * @throws IOException
249      *           exception that can be thrown while reading
250      */
251     protected abstract T preprocessLine(Text line) throws IOException;
252 
253     /**
254      * Reads target vertex id from the preprocessed line.
255      *
256      * @param line
257      *          the object obtained by preprocessing the line
258      * @return
259      *         the target vertex id
260      * @throws IOException
261      *           exception that can be thrown while reading
262      */
263     protected abstract I getTargetVertexId(T line) throws IOException;
264 
265     /**
266      * Reads source vertex id from the preprocessed line.
267      *
268      * @param line
269      *          the object obtained by preprocessing the line
270      * @return
271      *         the source vertex id
272      * @throws IOException
273      *           exception that can be thrown while reading
274      */
275     protected abstract I getSourceVertexId(T line) throws IOException;
276 
277     /**
278      * Reads edge value from the preprocessed line.
279      *
280      * @param line
281      *          the object obtained by preprocessing the line
282      * @return
283      *         the edge value
284      * @throws IOException
285      *           exception that can be thrown while reading
286      */
287     protected abstract E getValue(T line) throws IOException;
288   }
289 }