1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.io.formats;
2021import java.io.IOException;
22import java.util.List;
23import org.apache.giraph.edge.Edge;
24import org.apache.giraph.edge.EdgeFactory;
25import org.apache.giraph.io.EdgeInputFormat;
26import org.apache.giraph.io.EdgeReader;
27import org.apache.hadoop.conf.Configuration;
28import org.apache.hadoop.io.LongWritable;
29import org.apache.hadoop.io.Text;
30import org.apache.hadoop.io.Writable;
31import org.apache.hadoop.io.WritableComparable;
32import org.apache.hadoop.mapreduce.InputSplit;
33import org.apache.hadoop.mapreduce.JobContext;
34import org.apache.hadoop.mapreduce.RecordReader;
35import org.apache.hadoop.mapreduce.TaskAttemptContext;
3637/**38 * Abstract class that users should subclass to use their own text based39 * edge output format.40 *41 * @param <I> Vertex id42 * @param <E> Edge data43 */44 @SuppressWarnings("rawtypes")
45publicabstractclass TextEdgeInputFormat<I extends WritableComparable,
46 E extends Writable> extends EdgeInputFormat<I, E> {
47/** Underlying GiraphTextInputFormat. */48protectedGiraphTextInputFormat textInputFormat = newGiraphTextInputFormat();
4950 @Override publicvoid checkInputSpecs(Configuration conf) { }
5152 @Override
53public List<InputSplit> getSplits(
54 JobContext context, int minSplitCountHint) throws IOException,
55 InterruptedException {
56// Ignore the hint of numWorkers here since we are using57// GiraphTextInputFormat to do this for us58return textInputFormat.getEdgeSplits(context);
59 }
6061/**62 * {@link EdgeReader} for {@link TextEdgeInputFormat}.63 */64protectedabstractclassTextEdgeReaderextends EdgeReader<I, E> {
65/** Internal line record reader */66private RecordReader<LongWritable, Text> lineRecordReader;
67/** Context passed to initialize */68private TaskAttemptContext context;
6970 @Override
71publicvoid initialize(InputSplit inputSplit, TaskAttemptContext context)
72throws IOException, InterruptedException {
73this.context = context;
74 lineRecordReader = createLineRecordReader(inputSplit, context);
75 lineRecordReader.initialize(inputSplit, context);
76 }
7778/**79 * Create the line record reader. Override this to use a different80 * underlying record reader (useful for testing).81 *82 * @param inputSplit83 * the split to read84 * @param context85 * the context passed to initialize86 * @return87 * the record reader to be used88 * @throws IOException89 * exception that can be thrown during creation90 * @throws InterruptedException91 * exception that can be thrown during creation92 */93protected RecordReader<LongWritable, Text>
94 createLineRecordReader(InputSplit inputSplit, TaskAttemptContext context)
95throws IOException, InterruptedException {
96return textInputFormat.createRecordReader(inputSplit, context);
97 }
9899 @Override
100publicvoid close() throws IOException {
101 lineRecordReader.close();
102 }
103104 @Override
105publicfloat getProgress() throws IOException, InterruptedException {
106return lineRecordReader.getProgress();
107 }
108109/**110 * Get the line record reader.111 *112 * @return Record reader to be used for reading.113 */114protected RecordReader<LongWritable, Text> getRecordReader() {
115return lineRecordReader;
116 }
117118/**119 * Get the context.120 *121 * @return Context passed to initialize.122 */123protected TaskAttemptContext getContext() {
124return context;
125 }
126 }
127128/**129 * Abstract class to be implemented by the user to read an edge from each130 * text line.131 */132protectedabstractclassTextEdgeReaderFromEachLineextendsTextEdgeReader {
133 @Override
134publicfinal I getCurrentSourceId() throws IOException,
135 InterruptedException {
136 Text line = getRecordReader().getCurrentValue();
137return getSourceVertexId(line);
138 }
139140 @Override
141publicfinal Edge<I, E> getCurrentEdge() throws IOException,
142 InterruptedException {
143 Text line = getRecordReader().getCurrentValue();
144 I targetVertexId = getTargetVertexId(line);
145 E edgeValue = getValue(line);
146return EdgeFactory.create(targetVertexId, edgeValue);
147 }
148149 @Override
150publicfinalboolean nextEdge() throws IOException, InterruptedException {
151return getRecordReader().nextKeyValue();
152 }
153154/**155 * Reads source vertex id from the current line.156 *157 * @param line158 * the current line159 * @return160 * the source vertex id corresponding to the line161 * @throws IOException162 * exception that can be thrown while reading163 */164protectedabstract I getSourceVertexId(Text line) throws IOException;
165166167/**168 * Reads target vertex id from the current line.169 *170 * @param line171 * the current line172 * @return173 * the target vertex id corresponding to the line174 * @throws IOException175 * exception that can be thrown while reading176 */177protectedabstract I getTargetVertexId(Text line) throws IOException;
178179/**180 * Reads edge value from the current line.181 *182 * @param line183 * the current line184 * @return185 * the edge value corresponding to the line186 * @throws IOException187 * exception that can be thrown while reading188 */189protectedabstract E getValue(Text line) throws IOException;
190 }
191192/**193 * Abstract class to be implemented by the user to read an edge from each194 * text line after preprocessing it.195 *196 * @param <T>197 * The resulting type of preprocessing.198 */199protectedabstractclass TextEdgeReaderFromEachLineProcessed<T> extends200TextEdgeReader {
201/** Generic type holding processed line */202private T processedLine;
203204 @Override
205public I getCurrentSourceId() throws IOException, InterruptedException {
206 T processed = processCurrentLine();
207return getSourceVertexId(processed);
208 }
209210 @Override
211publicfinal Edge<I, E> getCurrentEdge() throws IOException,
212 InterruptedException {
213 T processed = processCurrentLine();
214 I targetVertexId = getTargetVertexId(processed);
215 E edgeValue = getValue(processed);
216return EdgeFactory.create(targetVertexId, edgeValue);
217 }
218219/**220 * Process the current line to the user's type.221 *222 * @return T processed line223 * @throws IOException on I/O error224 * @throws InterruptedException on interruption225 */226private T processCurrentLine() throws IOException, InterruptedException {
227if (processedLine == null) {
228 Text line = getRecordReader().getCurrentValue();
229 processedLine = preprocessLine(line);
230 }
231return processedLine;
232 }
233234 @Override
235publicfinalboolean nextEdge() throws IOException, InterruptedException {
236 processedLine = null;
237return getRecordReader().nextKeyValue();
238 }
239240/**241 * Preprocess the line so other methods can easily read necessary242 * information for creating edge243 *244 * @param line245 * the current line to be read246 * @return247 * the preprocessed object248 * @throws IOException249 * exception that can be thrown while reading250 */251protectedabstract T preprocessLine(Text line) throws IOException;
252253/**254 * Reads target vertex id from the preprocessed line.255 *256 * @param line257 * the object obtained by preprocessing the line258 * @return259 * the target vertex id260 * @throws IOException261 * exception that can be thrown while reading262 */263protectedabstract I getTargetVertexId(T line) throws IOException;
264265/**266 * Reads source vertex id from the preprocessed line.267 *268 * @param line269 * the object obtained by preprocessing the line270 * @return271 * the source vertex id272 * @throws IOException273 * exception that can be thrown while reading274 */275protectedabstract I getSourceVertexId(T line) throws IOException;
276277/**278 * Reads edge value from the preprocessed line.279 *280 * @param line281 * the object obtained by preprocessing the line282 * @return283 * the edge value284 * @throws IOException285 * exception that can be thrown while reading286 */287protectedabstract E getValue(T line) throws IOException;
288 }
289 }