This project has retired. For details please refer to its Attic page.
TextVertexValueInputFormat xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.io.formats;
20  
21  import org.apache.giraph.io.VertexValueInputFormat;
22  import org.apache.giraph.io.VertexValueReader;
23  import org.apache.hadoop.conf.Configuration;
24  import org.apache.hadoop.io.LongWritable;
25  import org.apache.hadoop.io.Text;
26  import org.apache.hadoop.io.Writable;
27  import org.apache.hadoop.io.WritableComparable;
28  import org.apache.hadoop.mapreduce.InputSplit;
29  import org.apache.hadoop.mapreduce.JobContext;
30  import org.apache.hadoop.mapreduce.RecordReader;
31  import org.apache.hadoop.mapreduce.TaskAttemptContext;
32  
33  import java.io.IOException;
34  import java.util.List;
35  
36  /**
37   * Abstract class that users should subclass to use their own text based
38   * vertex value input format.
39   *
40   * @param <I> Vertex index value
41   * @param <V> Vertex value
42   * @param <E> Edge value
43   */
44  @SuppressWarnings("rawtypes")
45  public abstract class TextVertexValueInputFormat<I extends WritableComparable,
46      V extends Writable, E extends Writable>
47      extends VertexValueInputFormat<I, V> {
48    /** Uses the GiraphTextInputFormat to do everything */
49    protected GiraphTextInputFormat textInputFormat = new GiraphTextInputFormat();
50  
51    @Override public void checkInputSpecs(Configuration conf) { }
52  
53    @Override
54    public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
55      throws IOException, InterruptedException {
56      // Ignore the hint of numWorkers here since we are using
57      // GiraphTextInputFormat to do this for us
58      return textInputFormat.getVertexSplits(context);
59    }
60  
61    @Override
62    public abstract TextVertexValueReader createVertexValueReader(
63        InputSplit split, TaskAttemptContext context) throws IOException;
64  
65    /**
66     * {@link VertexValueReader} for {@link VertexValueInputFormat}.
67     */
68    protected abstract class TextVertexValueReader extends
69        VertexValueReader<I, V> {
70      /** Internal line record reader */
71      private RecordReader<LongWritable, Text> lineRecordReader;
72      /** Context passed to initialize */
73      private TaskAttemptContext context;
74  
75      @Override
76      public void initialize(InputSplit inputSplit, TaskAttemptContext context)
77        throws IOException, InterruptedException {
78        super.initialize(inputSplit, context);
79        this.context = context;
80        lineRecordReader = createLineRecordReader(inputSplit, context);
81        lineRecordReader.initialize(inputSplit, context);
82      }
83  
84      /**
85       * Create the line record reader. Override this to use a different
86       * underlying record reader (useful for testing).
87       *
88       * @param inputSplit the split to read
89       * @param context the context passed to initialize
90       * @return the record reader to be used
91       * @throws IOException exception that can be thrown during creation
92       * @throws InterruptedException exception that can be thrown during creation
93       */
94      protected RecordReader<LongWritable, Text>
95      createLineRecordReader(InputSplit inputSplit, TaskAttemptContext context)
96        throws IOException, InterruptedException {
97        return textInputFormat.createRecordReader(inputSplit, context);
98      }
99  
100     @Override
101     public void close() throws IOException {
102       lineRecordReader.close();
103     }
104 
105     @Override
106     public float getProgress() throws IOException, InterruptedException {
107       return lineRecordReader.getProgress();
108     }
109 
110     /**
111      * Get the line record reader.
112      *
113      * @return Record reader to be used for reading.
114      */
115     protected RecordReader<LongWritable, Text> getRecordReader() {
116       return lineRecordReader;
117     }
118 
119     /**
120      * Get the context.
121      *
122      * @return Context passed to initialize.
123      */
124     protected TaskAttemptContext getContext() {
125       return context;
126     }
127   }
128 
129   /**
130    * Abstract class to be implemented by the user to read a vertex value from
131    * each text line.
132    */
133   protected abstract class TextVertexValueReaderFromEachLine extends
134       TextVertexValueReader {
135     @Override
136     public final I getCurrentVertexId() throws IOException,
137         InterruptedException {
138       return getId(getRecordReader().getCurrentValue());
139     }
140 
141     @Override
142     public final V getCurrentVertexValue() throws IOException,
143         InterruptedException {
144       return getValue(getRecordReader().getCurrentValue());
145     }
146 
147     @Override
148     public final boolean nextVertex() throws IOException, InterruptedException {
149       return getRecordReader().nextKeyValue();
150     }
151 
152     /**
153      * Reads vertex id from the current line.
154      *
155      * @param line the current line
156      * @return the vertex id corresponding to the line
157      * @throws IOException exception that can be thrown while reading
158      */
159     protected abstract I getId(Text line) throws IOException;
160 
161     /**
162      * Reads vertex value from the current line.
163      *
164      * @param line the current line
165      * @return the vertex value corresponding to the line
166      * @throws IOException
167      *           exception that can be thrown while reading
168      */
169     protected abstract V getValue(Text line) throws IOException;
170   }
171 
172   /**
173    * Abstract class to be implemented by the user to read a vertex value from
174    * each text line after preprocessing it.
175    *
176    * @param <T> The resulting type of preprocessing.
177    */
178   protected abstract class TextVertexValueReaderFromEachLineProcessed<T>
179       extends TextVertexValueReader {
180     /** Last preprocessed line. */
181     private T processedLine = null;
182 
183     /** Get last preprocessed line. Generate it if missing.
184      *
185      * @return The last preprocessed line
186      * @throws IOException
187      * @throws InterruptedException
188      */
189     private T getProcessedLine() throws IOException, InterruptedException {
190       if (processedLine == null) {
191         processedLine = preprocessLine(getRecordReader().getCurrentValue());
192       }
193       return processedLine;
194     }
195 
196     @Override
197     public I getCurrentVertexId() throws IOException,
198         InterruptedException {
199       return getId(getProcessedLine());
200     }
201 
202     @Override
203     public V getCurrentVertexValue() throws IOException,
204         InterruptedException {
205       return getValue(getProcessedLine());
206     }
207 
208     @Override
209     public final boolean nextVertex() throws IOException, InterruptedException {
210       processedLine = null;
211       return getRecordReader().nextKeyValue();
212     }
213 
214     /**
215      * Preprocess the line so other methods can easily read necessary
216      * information for creating vertex.
217      *
218      * @param line the current line to be read
219      * @return the preprocessed object
220      * @throws IOException exception that can be thrown while reading
221      */
222     protected abstract T preprocessLine(Text line) throws IOException;
223 
224     /**
225      * Reads vertex id from the preprocessed line.
226      *
227      * @param line
228      *          the object obtained by preprocessing the line
229      * @return the vertex id
230      * @throws IOException exception that can be thrown while reading
231      */
232     protected abstract I getId(T line) throws IOException;
233 
234     /**
235      * Reads vertex value from the preprocessed line.
236      *
237      * @param line the object obtained by preprocessing the line
238      * @return the vertex value
239      * @throws IOException exception that can be thrown while reading
240      */
241     protected abstract V getValue(T line) throws IOException;
242   }
243 }