1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.io.formats;
2021import org.apache.giraph.edge.Edge;
22import org.apache.giraph.graph.Vertex;
23import org.apache.giraph.io.VertexInputFormat;
24import org.apache.giraph.io.VertexReader;
25import org.apache.hadoop.conf.Configuration;
26import org.apache.hadoop.io.LongWritable;
27import org.apache.hadoop.io.Text;
28import org.apache.hadoop.io.Writable;
29import org.apache.hadoop.io.WritableComparable;
30import org.apache.hadoop.mapreduce.InputSplit;
31import org.apache.hadoop.mapreduce.JobContext;
32import org.apache.hadoop.mapreduce.RecordReader;
33import org.apache.hadoop.mapreduce.TaskAttemptContext;
3435import java.io.IOException;
36import java.util.List;
3738/**39 * Abstract class that users should subclass to use their own text based40 * vertex input format.41 *42 * @param <I> Vertex index value43 * @param <V> Vertex value44 * @param <E> Edge value45 */46 @SuppressWarnings("rawtypes")
47publicabstractclass TextVertexInputFormat<I extends WritableComparable,
48 V extends Writable, E extends Writable>
49extends VertexInputFormat<I, V, E> {
50/** Uses the GiraphTextInputFormat to do everything */51protectedGiraphTextInputFormat textInputFormat = newGiraphTextInputFormat();
5253 @Override publicvoid checkInputSpecs(Configuration conf) { }
5455 @Override
56public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
57throws IOException, InterruptedException {
58// Ignore the hint of numWorkers here since we are using59// GiraphTextInputFormat to do this for us60return textInputFormat.getVertexSplits(context);
61 }
6263/**64 * The factory method which produces the {@link TextVertexReader} used by this65 * input format.66 *67 * @param split68 * the split to be read69 * @param context70 * the information about the task71 * @return72 * the text vertex reader to be used73 */74 @Override
75publicabstractTextVertexReader createVertexReader(InputSplit split,
76 TaskAttemptContext context) throws IOException;
7778/**79 * Abstract class to be implemented by the user based on their specific80 * vertex input. Easiest to ignore the key value separator and only use81 * key instead.82 *83 * When reading a vertex from each line, extend84 * {@link TextVertexReaderFromEachLine}. If you need to preprocess each line85 * first, then extend {@link TextVertexReaderFromEachLineProcessed}. If you86 * need common exception handling while preprocessing, then extend87 * {@link TextVertexReaderFromEachLineProcessedHandlingExceptions}.88 */89protectedabstractclassTextVertexReaderextends VertexReader<I, V, E> {
90/** Internal line record reader */91private RecordReader<LongWritable, Text> lineRecordReader;
92/** Context passed to initialize */93private TaskAttemptContext context;
9495 @Override
96publicvoid initialize(InputSplit inputSplit, TaskAttemptContext context)
97throws IOException, InterruptedException {
98this.context = context;
99 lineRecordReader = createLineRecordReader(inputSplit, context);
100 lineRecordReader.initialize(inputSplit, context);
101 }
102103/**104 * Create the line record reader. Override this to use a different105 * underlying record reader (useful for testing).106 *107 * @param inputSplit108 * the split to read109 * @param context110 * the context passed to initialize111 * @return112 * the record reader to be used113 * @throws IOException114 * exception that can be thrown during creation115 * @throws InterruptedException116 * exception that can be thrown during creation117 */118protected RecordReader<LongWritable, Text>
119 createLineRecordReader(InputSplit inputSplit, TaskAttemptContext context)
120throws IOException, InterruptedException {
121return textInputFormat.createRecordReader(inputSplit, context);
122 }
123124 @Override
125publicvoid close() throws IOException {
126 lineRecordReader.close();
127 }
128129 @Override
130publicfloat getProgress() throws IOException, InterruptedException {
131return lineRecordReader.getProgress();
132 }
133134/**135 * Get the line record reader.136 *137 * @return Record reader to be used for reading.138 */139protected RecordReader<LongWritable, Text> getRecordReader() {
140return lineRecordReader;
141 }
142143/**144 * Get the context.145 *146 * @return Context passed to initialize.147 */148protected TaskAttemptContext getContext() {
149return context;
150 }
151 }
152153/**154 * Abstract class to be implemented by the user to read a vertex from each155 * text line.156 */157protectedabstractclassTextVertexReaderFromEachLineextends158TextVertexReader {
159160 @Override
161publicfinal Vertex<I, V, E> getCurrentVertex() throws IOException,
162 InterruptedException {
163 Text line = getRecordReader().getCurrentValue();
164 Vertex<I, V, E> vertex = getConf().createVertex();
165 vertex.initialize(getId(line), getValue(line), getEdges(line));
166return vertex;
167 }
168169 @Override
170publicfinalboolean nextVertex() throws IOException, InterruptedException {
171return getRecordReader().nextKeyValue();
172 }
173174/**175 * Reads vertex id from the current line.176 *177 * @param line178 * the current line179 * @return180 * the vertex id corresponding to the line181 * @throws IOException182 * exception that can be thrown while reading183 */184protectedabstract I getId(Text line) throws IOException;
185186/**187 * Reads vertex value from the current line.188 *189 * @param line190 * the current line191 * @return192 * the vertex value corresponding to the line193 * @throws IOException194 * exception that can be thrown while reading195 */196protectedabstract V getValue(Text line) throws IOException;
197198/**199 * Reads edges value from the current line.200 *201 * @param line202 * the current line203 * @return204 * the edges205 * @throws IOException206 * exception that can be thrown while reading207 */208protectedabstract Iterable<Edge<I, E>> getEdges(Text line) throws209 IOException;
210211 }
212213/**214 * Abstract class to be implemented by the user to read a vertex from each215 * text line after preprocessing it.216 *217 * @param <T>218 * The resulting type of preprocessing.219 */220protectedabstractclass TextVertexReaderFromEachLineProcessed<T> extends221TextVertexReader {
222223 @Override
224publicfinalboolean nextVertex() throws IOException, InterruptedException {
225return getRecordReader().nextKeyValue();
226 }
227228 @Override
229publicfinal Vertex<I, V, E> getCurrentVertex() throws IOException,
230 InterruptedException {
231 Text line = getRecordReader().getCurrentValue();
232 Vertex<I, V, E> vertex;
233 T processed = preprocessLine(line);
234 vertex = getConf().createVertex();
235 vertex.initialize(getId(processed), getValue(processed),
236 getEdges(processed));
237return vertex;
238 }
239240/**241 * Preprocess the line so other methods can easily read necessary242 * information for creating vertex.243 *244 * @param line245 * the current line to be read246 * @return247 * the preprocessed object248 * @throws IOException249 * exception that can be thrown while reading250 */251protectedabstract T preprocessLine(Text line) throws IOException;
252253/**254 * Reads vertex id from the preprocessed line.255 *256 * @param line257 * the object obtained by preprocessing the line258 * @return259 * the vertex id260 * @throws IOException261 * exception that can be thrown while reading262 */263protectedabstract I getId(T line) throws IOException;
264265/**266 * Reads vertex value from the preprocessed line.267 *268 * @param line269 * the object obtained by preprocessing the line270 * @return271 * the vertex value272 * @throws IOException273 * exception that can be thrown while reading274 */275protectedabstract V getValue(T line) throws IOException;
276277/**278 * Reads edges from the preprocessed line.279 *280 *281 * @param line282 * the object obtained by preprocessing the line283 * @return284 * the edges285 * @throws IOException286 * exception that can be thrown while reading287 */288protectedabstract Iterable<Edge<I, E>> getEdges(T line) throws IOException;
289290 }
291292// CHECKSTYLE: stop RedundantThrows293/**294 * Abstract class to be implemented by the user to read a vertex from each295 * text line after preprocessing it with exception handling.296 *297 * @param <T>298 * The resulting type of preprocessing.299 * @param <X>300 * The exception type that can be thrown due to preprocessing.301 */302protectedabstractclass303 TextVertexReaderFromEachLineProcessedHandlingExceptions<T, X extends304 Throwable> extendsTextVertexReader {
305306 @Override
307publicfinalboolean nextVertex() throws IOException, InterruptedException {
308return getRecordReader().nextKeyValue();
309 }
310311 @SuppressWarnings("unchecked")
312 @Override
313publicfinal Vertex<I, V, E> getCurrentVertex() throws IOException,
314 InterruptedException {
315// Note we are reading from value only since key is the line number316 Text line = getRecordReader().getCurrentValue();
317 Vertex<I, V, E> vertex;
318 T processed = null;
319try {
320 processed = preprocessLine(line);
321 vertex = getConf().createVertex();
322 vertex.initialize(getId(processed), getValue(processed),
323 getEdges(processed));
324 } catch (IOException e) {
325throw e;
326// CHECKSTYLE: stop IllegalCatch327 } catch (Throwable t) {
328return handleException(line, processed, (X) t);
329// CHECKSTYLE: resume IllegalCatch330 }
331return vertex;
332 }
333334/**335 * Preprocess the line so other methods can easily read necessary336 * information for creating vertex.337 *338 * @param line339 * the current line to be read340 * @return341 * the preprocessed object342 * @throws X343 * exception that can be thrown while preprocessing the line344 * @throws IOException345 * exception that can be thrown while reading346 */347protectedabstract T preprocessLine(Text line) throws X, IOException;
348349/**350 * Reads vertex id from the preprocessed line.351 *352 * @param line353 * the object obtained by preprocessing the line354 * @return355 * the vertex id356 * @throws X357 * exception that can be thrown while reading the preprocessed358 * object359 * @throws IOException360 * exception that can be thrown while reading361 */362protectedabstract I getId(T line) throws X, IOException;
363364/**365 * Reads vertex value from the preprocessed line.366 *367 * @param line368 * the object obtained by preprocessing the line369 * @return370 * the vertex value371 * @throws X372 * exception that can be thrown while reading the preprocessed373 * object374 * @throws IOException375 * exception that can be thrown while reading376 */377protectedabstract V getValue(T line) throws X, IOException;
378379/**380 * Reads edges from the preprocessed line.381 *382 *383 * @param line384 * the object obtained by preprocessing the line385 * @return386 * the edges387 * @throws X388 * exception that can be thrown while reading the preprocessed389 * object390 * @throws IOException391 * exception that can be thrown while reading392 */393protectedabstract Iterable<Edge<I, E>> getEdges(T line) throws X,
394 IOException;
395396/**397 * Handles exceptions while reading vertex from each line.398 *399 * @param line400 * the line that was being read when the exception was thrown401 * @param processed402 * the object obtained by preprocessing the line. Can be null if403 * exception was thrown during preprocessing.404 * @param e405 * the exception thrown while reading the line406 * @return the recovered/alternative vertex to be used407 */408protected Vertex<I, V, E> handleException(Text line, T processed, X e) {
409thrownew IllegalArgumentException(e);
410 }
411412 }
413// CHECKSTYLE: resume RedundantThrows414415 }