1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.io.formats;
2021import org.apache.giraph.io.VertexValueInputFormat;
22import org.apache.giraph.io.VertexValueReader;
23import org.apache.hadoop.conf.Configuration;
24import org.apache.hadoop.io.LongWritable;
25import org.apache.hadoop.io.Text;
26import org.apache.hadoop.io.Writable;
27import org.apache.hadoop.io.WritableComparable;
28import org.apache.hadoop.mapreduce.InputSplit;
29import org.apache.hadoop.mapreduce.JobContext;
30import org.apache.hadoop.mapreduce.RecordReader;
31import org.apache.hadoop.mapreduce.TaskAttemptContext;
3233import java.io.IOException;
34import java.util.List;
3536/**37 * Abstract class that users should subclass to use their own text based38 * vertex value input format.39 *40 * @param <I> Vertex index value41 * @param <V> Vertex value42 * @param <E> Edge value43 */44 @SuppressWarnings("rawtypes")
45publicabstractclass TextVertexValueInputFormat<I extends WritableComparable,
46 V extends Writable, E extends Writable>
47extends VertexValueInputFormat<I, V> {
48/** Uses the GiraphTextInputFormat to do everything */49protectedGiraphTextInputFormat textInputFormat = newGiraphTextInputFormat();
5051 @Override publicvoid checkInputSpecs(Configuration conf) { }
5253 @Override
54public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
55throws IOException, InterruptedException {
56// Ignore the hint of numWorkers here since we are using57// GiraphTextInputFormat to do this for us58return textInputFormat.getVertexSplits(context);
59 }
6061 @Override
62publicabstractTextVertexValueReader createVertexValueReader(
63 InputSplit split, TaskAttemptContext context) throws IOException;
6465/**66 * {@link VertexValueReader} for {@link VertexValueInputFormat}.67 */68protectedabstractclassTextVertexValueReaderextends69 VertexValueReader<I, V> {
70/** Internal line record reader */71private RecordReader<LongWritable, Text> lineRecordReader;
72/** Context passed to initialize */73private TaskAttemptContext context;
7475 @Override
76publicvoid initialize(InputSplit inputSplit, TaskAttemptContext context)
77throws IOException, InterruptedException {
78super.initialize(inputSplit, context);
79this.context = context;
80 lineRecordReader = createLineRecordReader(inputSplit, context);
81 lineRecordReader.initialize(inputSplit, context);
82 }
8384/**85 * Create the line record reader. Override this to use a different86 * underlying record reader (useful for testing).87 *88 * @param inputSplit the split to read89 * @param context the context passed to initialize90 * @return the record reader to be used91 * @throws IOException exception that can be thrown during creation92 * @throws InterruptedException exception that can be thrown during creation93 */94protected RecordReader<LongWritable, Text>
95 createLineRecordReader(InputSplit inputSplit, TaskAttemptContext context)
96throws IOException, InterruptedException {
97return textInputFormat.createRecordReader(inputSplit, context);
98 }
99100 @Override
101publicvoid close() throws IOException {
102 lineRecordReader.close();
103 }
104105 @Override
106publicfloat getProgress() throws IOException, InterruptedException {
107return lineRecordReader.getProgress();
108 }
109110/**111 * Get the line record reader.112 *113 * @return Record reader to be used for reading.114 */115protected RecordReader<LongWritable, Text> getRecordReader() {
116return lineRecordReader;
117 }
118119/**120 * Get the context.121 *122 * @return Context passed to initialize.123 */124protected TaskAttemptContext getContext() {
125return context;
126 }
127 }
128129/**130 * Abstract class to be implemented by the user to read a vertex value from131 * each text line.132 */133protectedabstractclassTextVertexValueReaderFromEachLineextends134TextVertexValueReader {
135 @Override
136publicfinal I getCurrentVertexId() throws IOException,
137 InterruptedException {
138return getId(getRecordReader().getCurrentValue());
139 }
140141 @Override
142publicfinal V getCurrentVertexValue() throws IOException,
143 InterruptedException {
144return getValue(getRecordReader().getCurrentValue());
145 }
146147 @Override
148publicfinalboolean nextVertex() throws IOException, InterruptedException {
149return getRecordReader().nextKeyValue();
150 }
151152/**153 * Reads vertex id from the current line.154 *155 * @param line the current line156 * @return the vertex id corresponding to the line157 * @throws IOException exception that can be thrown while reading158 */159protectedabstract I getId(Text line) throws IOException;
160161/**162 * Reads vertex value from the current line.163 *164 * @param line the current line165 * @return the vertex value corresponding to the line166 * @throws IOException167 * exception that can be thrown while reading168 */169protectedabstract V getValue(Text line) throws IOException;
170 }
171172/**173 * Abstract class to be implemented by the user to read a vertex value from174 * each text line after preprocessing it.175 *176 * @param <T> The resulting type of preprocessing.177 */178protectedabstractclass TextVertexValueReaderFromEachLineProcessed<T>
179extendsTextVertexValueReader {
180/** Last preprocessed line. */181private T processedLine = null;
182183/** Get last preprocessed line. Generate it if missing.184 *185 * @return The last preprocessed line186 * @throws IOException187 * @throws InterruptedException188 */189private T getProcessedLine() throws IOException, InterruptedException {
190if (processedLine == null) {
191 processedLine = preprocessLine(getRecordReader().getCurrentValue());
192 }
193return processedLine;
194 }
195196 @Override
197public I getCurrentVertexId() throws IOException,
198 InterruptedException {
199return getId(getProcessedLine());
200 }
201202 @Override
203public V getCurrentVertexValue() throws IOException,
204 InterruptedException {
205return getValue(getProcessedLine());
206 }
207208 @Override
209publicfinalboolean nextVertex() throws IOException, InterruptedException {
210 processedLine = null;
211return getRecordReader().nextKeyValue();
212 }
213214/**215 * Preprocess the line so other methods can easily read necessary216 * information for creating vertex.217 *218 * @param line the current line to be read219 * @return the preprocessed object220 * @throws IOException exception that can be thrown while reading221 */222protectedabstract T preprocessLine(Text line) throws IOException;
223224/**225 * Reads vertex id from the preprocessed line.226 *227 * @param line228 * the object obtained by preprocessing the line229 * @return the vertex id230 * @throws IOException exception that can be thrown while reading231 */232protectedabstract I getId(T line) throws IOException;
233234/**235 * Reads vertex value from the preprocessed line.236 *237 * @param line the object obtained by preprocessing the line238 * @return the vertex value239 * @throws IOException exception that can be thrown while reading240 */241protectedabstract V getValue(T line) throws IOException;
242 }
243 }