1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.io.formats;
1920import com.google.common.collect.Lists;
21import java.io.IOException;
22import java.util.List;
23import org.apache.giraph.edge.Edge;
24import org.apache.hadoop.io.Text;
25import org.apache.hadoop.io.Writable;
26import org.apache.hadoop.io.WritableComparable;
27import org.apache.hadoop.mapreduce.InputSplit;
28import org.apache.hadoop.mapreduce.TaskAttemptContext;
2930/**31 * VertexReader that readers lines of text with vertices encoded as adjacency32 * lists and converts each token to the correct type. For example, a graph33 * with vertices as integers and values as doubles could be encoded as:34 * 1 0.1 2 0.2 3 0.335 * to represent a vertex named 1, with 0.1 as its value and two edges, to36 * vertices 2 and 3, with edge values of 0.2 and 0.3, respectively.37 *38 * @param <I> Vertex index value39 * @param <V> Vertex value40 * @param <E> Edge value41 */42 @SuppressWarnings("rawtypes")
43publicabstractclass AdjacencyListTextVertexInputFormat<I extends44 WritableComparable, V extends Writable, E extends Writable> extends45 TextVertexInputFormat<I, V, E> {
46/** Delimiter for split */47publicstaticfinal String LINE_TOKENIZE_VALUE = "adj.list.input.delimiter";
48/** Default delimiter for split */49publicstaticfinal String LINE_TOKENIZE_VALUE_DEFAULT = "\t";
5051/**52 * Utility for doing any cleaning of each line before it is tokenized.53 */54publicinterfaceLineSanitizer {
55/**56 * Clean string s before attempting to tokenize it.57 *58 * @param s String to be cleaned.59 * @return Sanitized string.60 */61 String sanitize(String s);
62 }
6364 @Override
65publicabstractAdjacencyListTextVertexReader createVertexReader(
66 InputSplit split, TaskAttemptContext context);
6768/**69 * Vertex reader associated with {@link AdjacencyListTextVertexInputFormat}.70 */71protectedabstractclassAdjacencyListTextVertexReaderextends72 TextVertexReaderFromEachLineProcessed<String[]> {
73/** Cached delimiter used for split */74private String splitValue = null;
75/** Sanitizer from constructor. */76privatefinalLineSanitizer sanitizer;
7778/**79 * Constructor without line sanitizer.80 */81publicAdjacencyListTextVertexReader() {
82this(null);
83 }
8485/**86 * Constructor with line sanitizer.87 *88 * @param sanitizer Sanitizer to be used.89 */90publicAdjacencyListTextVertexReader(LineSanitizer sanitizer) {
91this.sanitizer = sanitizer;
92 }
9394 @Override
95publicvoid initialize(InputSplit inputSplit, TaskAttemptContext context)
96throws IOException, InterruptedException {
97super.initialize(inputSplit, context);
98 splitValue =
99 getConf().get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
100 }
101102 @Override
103protected String[] preprocessLine(Text line) throws IOException {
104 String sanitizedLine;
105if (sanitizer != null) {
106 sanitizedLine = sanitizer.sanitize(line.toString());
107 } else {
108 sanitizedLine = line.toString();
109 }
110 String [] values = sanitizedLine.split(splitValue);
111if ((values.length < 2) || (values.length % 2 != 0)) {
112thrownew IllegalArgumentException(
113"Line did not split correctly: " + line);
114 }
115return values;
116 }
117118 @Override
119protected I getId(String[] values) throws IOException {
120return decodeId(values[0]);
121 }
122123/**124 * Decode the id for this line into an instance of its correct type.125 *126 * @param s Id of vertex from line127 * @return Vertex id128 */129publicabstract I decodeId(String s);
130131 @Override
132protected V getValue(String[] values) throws IOException {
133return decodeValue(values[1]);
134 }
135136137/**138 * Decode the value for this line into an instance of its correct type.139 *140 * @param s Value from line141 * @return Vertex value142 */143publicabstract V decodeValue(String s);
144145 @Override
146protected Iterable<Edge<I, E>> getEdges(String[] values) throws147 IOException {
148int i = 2;
149 List<Edge<I, E>> edges = Lists.newLinkedList();
150while (i < values.length) {
151 edges.add(decodeEdge(values[i], values[i + 1]));
152 i += 2;
153 }
154return edges;
155 }
156157/**158 * Decode an edge from the line into an instance of a correctly typed Edge159 *160 * @param id The edge's id from the line161 * @param value The edge's value from the line162 * @return Edge with given target id and value163 */164publicabstract Edge<I, E> decodeEdge(String id, String value);
165166 }
167 }