This project has retired. For details please refer to its
Attic page.
NormalizingLongDoubleDoubleTextInputFormat xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.examples;
20
21 import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
22 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23 import org.apache.giraph.edge.Edge;
24 import org.apache.giraph.edge.EdgeFactory;
25 import org.apache.giraph.graph.Vertex;
26 import org.apache.giraph.io.formats.TextVertexInputFormat;
27 import org.apache.hadoop.io.DoubleWritable;
28 import org.apache.hadoop.io.LongWritable;
29 import org.apache.hadoop.mapreduce.InputSplit;
30 import org.apache.hadoop.mapreduce.TaskAttemptContext;
31
32 import com.google.common.collect.Lists;
33
34 import java.io.IOException;
35 import java.util.Collection;
36 import java.util.List;
37 import java.util.regex.Pattern;
38
39
40
41
42
43
44 public class NormalizingLongDoubleDoubleTextInputFormat
45 extends
46 TextVertexInputFormat<LongWritable, DoubleWritable, DoubleWritable>
47 implements ImmutableClassesGiraphConfigurable<LongWritable, DoubleWritable,
48 DoubleWritable> {
49
50 private ImmutableClassesGiraphConfiguration<LongWritable, DoubleWritable,
51 DoubleWritable> conf;
52
53 @Override
54 public TextVertexReader createVertexReader(
55 InputSplit split, TaskAttemptContext context) throws IOException {
56 return new NormalizingLongDoubleDoubleDoubleVertexReader();
57 }
58
59 @Override
60 public void setConf(ImmutableClassesGiraphConfiguration<LongWritable,
61 DoubleWritable, DoubleWritable> configuration) {
62 conf = configuration;
63 }
64
65 @Override
66 public ImmutableClassesGiraphConfiguration<LongWritable, DoubleWritable,
67 DoubleWritable> getConf() {
68 return conf;
69 }
70
71
72
73
74
75 public class NormalizingLongDoubleDoubleDoubleVertexReader
76 extends NormalizingLongDoubleDoubleTextInputFormat.TextVertexReader {
77
78 private final Pattern edgeSeparator = Pattern.compile("\\s+");
79
80 private final Pattern weightSeparator = Pattern.compile(":");
81
82 @Override
83 public Vertex<LongWritable, DoubleWritable, DoubleWritable>
84 getCurrentVertex() throws IOException, InterruptedException {
85 Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex =
86 conf.createVertex();
87
88 String[] tokens = edgeSeparator.split(getRecordReader()
89 .getCurrentValue().toString());
90 List<Edge<LongWritable, DoubleWritable>> edges = Lists
91 .newArrayListWithCapacity(tokens.length - 1);
92 parse(tokens, edges);
93 normalize(edges);
94
95 LongWritable vertexId = new LongWritable(Long.parseLong(tokens[0]));
96 vertex.initialize(vertexId, new DoubleWritable(), edges);
97
98 return vertex;
99 }
100
101
102
103
104
105
106 void parse(String[] tokens,
107 Collection<Edge<LongWritable, DoubleWritable>> edges) {
108 for (int n = 1; n < tokens.length; n++) {
109 String[] parts = weightSeparator.split(tokens[n]);
110 edges.add(EdgeFactory.create(new LongWritable(Long.parseLong(parts[0])),
111 new DoubleWritable(Double.parseDouble(parts[1]))));
112 }
113 }
114
115
116
117
118
119 void normalize(Collection<Edge<LongWritable, DoubleWritable>> edges) {
120 if (edges == null || edges.size() == 0) {
121 throw new IllegalArgumentException(
122 "Cannot normalize an empy set of edges");
123 }
124 float normalizer = 0.0f;
125 for (Edge<LongWritable, DoubleWritable> edge : edges) {
126 normalizer += edge.getValue().get();
127 }
128 for (Edge<LongWritable, DoubleWritable> edge : edges) {
129 edge.getValue().set(edge.getValue().get() / normalizer);
130 }
131 }
132
133 @Override
134 public boolean nextVertex() throws IOException, InterruptedException {
135 return getRecordReader().nextKeyValue();
136 }
137 }
138 }