This project has retired. For details please refer to its Attic page.
LongDoubleNullTextInputFormat xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.examples;
20  
21  import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
22  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23  import org.apache.giraph.edge.Edge;
24  import org.apache.giraph.edge.EdgeFactory;
25  import org.apache.giraph.graph.Vertex;
26  import org.apache.giraph.io.formats.TextVertexInputFormat;
27  import org.apache.hadoop.io.DoubleWritable;
28  import org.apache.hadoop.io.LongWritable;
29  import org.apache.hadoop.io.NullWritable;
30  import org.apache.hadoop.mapreduce.InputSplit;
31  import org.apache.hadoop.mapreduce.TaskAttemptContext;
32  
33  import com.google.common.collect.Lists;
34  
35  import java.io.IOException;
36  import java.util.List;
37  import java.util.regex.Pattern;
38  
39  /**
40   * Input format for unweighted graphs with long ids.
41   */
42  public class LongDoubleNullTextInputFormat
43      extends TextVertexInputFormat<LongWritable, DoubleWritable, NullWritable>
44      implements ImmutableClassesGiraphConfigurable<LongWritable, DoubleWritable,
45      NullWritable> {
46    /** Configuration. */
47    private ImmutableClassesGiraphConfiguration<LongWritable, DoubleWritable,
48        NullWritable> conf;
49  
50    @Override
51    public TextVertexReader createVertexReader(InputSplit split,
52                                               TaskAttemptContext context)
53      throws IOException {
54      return new LongDoubleNullDoubleVertexReader();
55    }
56  
57    @Override
58    public void setConf(ImmutableClassesGiraphConfiguration<LongWritable,
59        DoubleWritable, NullWritable> configuration) {
60      this.conf = configuration;
61    }
62  
63    @Override
64    public ImmutableClassesGiraphConfiguration<LongWritable, DoubleWritable,
65        NullWritable> getConf() {
66      return conf;
67    }
68  
69    /**
70     * Vertex reader associated with
71     * {@link LongDoubleNullTextInputFormat}.
72     */
73    public class LongDoubleNullDoubleVertexReader extends
74        TextVertexInputFormat<LongWritable, DoubleWritable,
75            NullWritable>.TextVertexReader {
76      /** Separator of the vertex and neighbors */
77      private final Pattern separator = Pattern.compile("[\t ]");
78  
79      @Override
80      public Vertex<LongWritable, DoubleWritable, NullWritable>
81      getCurrentVertex() throws IOException, InterruptedException {
82        Vertex<LongWritable, DoubleWritable, NullWritable>
83            vertex = conf.createVertex();
84  
85        String[] tokens =
86            separator.split(getRecordReader().getCurrentValue().toString());
87        List<Edge<LongWritable, NullWritable>> edges =
88            Lists.newArrayListWithCapacity(tokens.length - 1);
89        for (int n = 1; n < tokens.length; n++) {
90          edges.add(EdgeFactory.create(
91              new LongWritable(Long.parseLong(tokens[n])),
92              NullWritable.get()));
93        }
94  
95        LongWritable vertexId = new LongWritable(Long.parseLong(tokens[0]));
96        vertex.initialize(vertexId, new DoubleWritable(), edges);
97  
98        return vertex;
99      }
100 
101     @Override
102     public boolean nextVertex() throws IOException, InterruptedException {
103       return getRecordReader().nextKeyValue();
104     }
105   }
106 }