This project has retired. For details please refer to its Attic page.
AdjacencyListTextVertexInputFormat xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.io.formats;
19  
20  import com.google.common.collect.Lists;
21  import java.io.IOException;
22  import java.util.List;
23  import org.apache.giraph.edge.Edge;
24  import org.apache.hadoop.io.Text;
25  import org.apache.hadoop.io.Writable;
26  import org.apache.hadoop.io.WritableComparable;
27  import org.apache.hadoop.mapreduce.InputSplit;
28  import org.apache.hadoop.mapreduce.TaskAttemptContext;
29  
30  /**
31   * VertexReader that readers lines of text with vertices encoded as adjacency
32   * lists and converts each token to the correct type.  For example, a graph
33   * with vertices as integers and values as doubles could be encoded as:
34   *   1 0.1 2 0.2 3 0.3
35   * to represent a vertex named 1, with 0.1 as its value and two edges, to
36   * vertices 2 and 3, with edge values of 0.2 and 0.3, respectively.
37   *
38   * @param <I> Vertex index value
39   * @param <V> Vertex value
40   * @param <E> Edge value
41   */
42  @SuppressWarnings("rawtypes")
43  public abstract class AdjacencyListTextVertexInputFormat<I extends
44      WritableComparable, V extends Writable, E extends Writable> extends
45      TextVertexInputFormat<I, V, E> {
46    /** Delimiter for split */
47    public static final String LINE_TOKENIZE_VALUE = "adj.list.input.delimiter";
48    /** Default delimiter for split */
49    public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t";
50  
51    /**
52     * Utility for doing any cleaning of each line before it is tokenized.
53     */
54    public interface LineSanitizer {
55      /**
56       * Clean string s before attempting to tokenize it.
57       *
58       * @param s String to be cleaned.
59       * @return Sanitized string.
60       */
61      String sanitize(String s);
62    }
63  
64    @Override
65    public abstract AdjacencyListTextVertexReader createVertexReader(
66        InputSplit split, TaskAttemptContext context);
67  
68    /**
69     * Vertex reader associated with {@link AdjacencyListTextVertexInputFormat}.
70     */
71    protected abstract class AdjacencyListTextVertexReader extends
72      TextVertexReaderFromEachLineProcessed<String[]> {
73      /** Cached delimiter used for split */
74      private String splitValue = null;
75      /** Sanitizer from constructor. */
76      private final LineSanitizer sanitizer;
77  
78      /**
79       * Constructor without line sanitizer.
80       */
81      public AdjacencyListTextVertexReader() {
82        this(null);
83      }
84  
85      /**
86       * Constructor with line sanitizer.
87       *
88       * @param sanitizer Sanitizer to be used.
89       */
90      public AdjacencyListTextVertexReader(LineSanitizer sanitizer) {
91        this.sanitizer = sanitizer;
92      }
93  
94      @Override
95      public void initialize(InputSplit inputSplit, TaskAttemptContext context)
96        throws IOException, InterruptedException {
97        super.initialize(inputSplit, context);
98        splitValue =
99            getConf().get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
100     }
101 
102     @Override
103     protected String[] preprocessLine(Text line) throws IOException {
104       String sanitizedLine;
105       if (sanitizer != null) {
106         sanitizedLine = sanitizer.sanitize(line.toString());
107       } else {
108         sanitizedLine = line.toString();
109       }
110       String [] values = sanitizedLine.split(splitValue);
111       if ((values.length < 2) || (values.length % 2 != 0)) {
112         throw new IllegalArgumentException(
113           "Line did not split correctly: " + line);
114       }
115       return values;
116     }
117 
118     @Override
119     protected I getId(String[] values) throws IOException {
120       return decodeId(values[0]);
121     }
122 
123     /**
124      * Decode the id for this line into an instance of its correct type.
125      *
126      * @param s Id of vertex from line
127      * @return Vertex id
128      */
129     public abstract I decodeId(String s);
130 
131     @Override
132     protected V getValue(String[] values) throws IOException {
133       return decodeValue(values[1]);
134     }
135 
136 
137     /**
138      * Decode the value for this line into an instance of its correct type.
139      *
140      * @param s Value from line
141      * @return Vertex value
142      */
143     public abstract V decodeValue(String s);
144 
145     @Override
146     protected Iterable<Edge<I, E>> getEdges(String[] values) throws
147         IOException {
148       int i = 2;
149       List<Edge<I, E>> edges = Lists.newLinkedList();
150       while (i < values.length) {
151         edges.add(decodeEdge(values[i], values[i + 1]));
152         i += 2;
153       }
154       return edges;
155     }
156 
157     /**
158      * Decode an edge from the line into an instance of a correctly typed Edge
159      *
160      * @param id The edge's id from the line
161      * @param value The edge's value from the line
162      * @return Edge with given target id and value
163      */
164     public abstract Edge<I, E> decodeEdge(String id, String value);
165 
166   }
167 }