This project has retired. For details please refer to its Attic page.
NormalizingLongDoubleDoubleTextInputFormat xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.examples;
20  
21  import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
22  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23  import org.apache.giraph.edge.Edge;
24  import org.apache.giraph.edge.EdgeFactory;
25  import org.apache.giraph.graph.Vertex;
26  import org.apache.giraph.io.formats.TextVertexInputFormat;
27  import org.apache.hadoop.io.DoubleWritable;
28  import org.apache.hadoop.io.LongWritable;
29  import org.apache.hadoop.mapreduce.InputSplit;
30  import org.apache.hadoop.mapreduce.TaskAttemptContext;
31  
32  import com.google.common.collect.Lists;
33  
34  import java.io.IOException;
35  import java.util.Collection;
36  import java.util.List;
37  import java.util.regex.Pattern;
38  
39  /**
40   * Simple text-based {@link org.apache.giraph.io.VertexInputFormat} for
41   * unweighted graphs with long ids. Each line consists of: vertex
42   * neighbor1:weight1 neighbor2:weight2 ...
43   */
44  public class NormalizingLongDoubleDoubleTextInputFormat
45      extends
46      TextVertexInputFormat<LongWritable, DoubleWritable, DoubleWritable>
47      implements ImmutableClassesGiraphConfigurable<LongWritable, DoubleWritable,
48      DoubleWritable> {
49    /** Configuration. */
50    private ImmutableClassesGiraphConfiguration<LongWritable, DoubleWritable,
51        DoubleWritable> conf;
52  
53    @Override
54    public TextVertexReader createVertexReader(
55        InputSplit split, TaskAttemptContext context) throws IOException {
56      return new NormalizingLongDoubleDoubleDoubleVertexReader();
57    }
58  
59    @Override
60    public void setConf(ImmutableClassesGiraphConfiguration<LongWritable,
61        DoubleWritable, DoubleWritable> configuration) {
62      conf = configuration;
63    }
64  
65    @Override
66    public ImmutableClassesGiraphConfiguration<LongWritable, DoubleWritable,
67        DoubleWritable> getConf() {
68      return conf;
69    }
70  
71    /**
72     * Vertex reader associated with
73     * {@link LongDoubleDoubleTextInputFormat}.
74     */
75    public class NormalizingLongDoubleDoubleDoubleVertexReader
76        extends NormalizingLongDoubleDoubleTextInputFormat.TextVertexReader {
77      /** Separator of the vertex and neighbors */
78      private final Pattern edgeSeparator = Pattern.compile("\\s+");
79      /** Separator of the edge id and edge weight */
80      private final Pattern weightSeparator = Pattern.compile(":");
81  
82      @Override
83      public Vertex<LongWritable, DoubleWritable, DoubleWritable>
84      getCurrentVertex() throws IOException, InterruptedException {
85        Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex =
86            conf.createVertex();
87  
88        String[] tokens = edgeSeparator.split(getRecordReader()
89            .getCurrentValue().toString());
90        List<Edge<LongWritable, DoubleWritable>> edges = Lists
91            .newArrayListWithCapacity(tokens.length - 1);
92        parse(tokens, edges);
93        normalize(edges);
94  
95        LongWritable vertexId = new LongWritable(Long.parseLong(tokens[0]));
96        vertex.initialize(vertexId, new DoubleWritable(), edges);
97  
98        return vertex;
99      }
100 
101     /**
102      * Parse a set of tokens into a map ID -> weight.
103      * @param tokens The tokens to be parsed.
104      * @param edges The map that will contain the result of the parsing.
105      */
106     void parse(String[] tokens,
107                Collection<Edge<LongWritable, DoubleWritable>> edges) {
108       for (int n = 1; n < tokens.length; n++) {
109         String[] parts = weightSeparator.split(tokens[n]);
110         edges.add(EdgeFactory.create(new LongWritable(Long.parseLong(parts[0])),
111             new DoubleWritable(Double.parseDouble(parts[1]))));
112       }
113     }
114 
115     /**
116      * Normalize the edges with L1 normalization.
117      * @param edges The edges to be normalized.
118      */
119     void normalize(Collection<Edge<LongWritable, DoubleWritable>> edges) {
120       if (edges == null || edges.size() == 0) {
121         throw new IllegalArgumentException(
122             "Cannot normalize an empy set of edges");
123       }
124       float normalizer = 0.0f;
125       for (Edge<LongWritable, DoubleWritable> edge : edges) {
126         normalizer += edge.getValue().get();
127       }
128       for (Edge<LongWritable, DoubleWritable> edge : edges) {
129         edge.getValue().set(edge.getValue().get() / normalizer);
130       }
131     }
132 
133     @Override
134     public boolean nextVertex() throws IOException, InterruptedException {
135       return getRecordReader().nextKeyValue();
136     }
137   }
138 }