This project has retired. For details please refer to its
Attic page.
LongDoubleDoubleTextInputFormat xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.examples;
20
21 import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
22 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23 import org.apache.giraph.edge.Edge;
24 import org.apache.giraph.edge.EdgeFactory;
25 import org.apache.giraph.graph.Vertex;
26 import org.apache.giraph.io.formats.TextVertexInputFormat;
27 import org.apache.hadoop.io.DoubleWritable;
28 import org.apache.hadoop.io.LongWritable;
29 import org.apache.hadoop.mapreduce.InputSplit;
30 import org.apache.hadoop.mapreduce.TaskAttemptContext;
31
32 import com.google.common.collect.Lists;
33
34 import java.io.IOException;
35 import java.util.List;
36 import java.util.regex.Pattern;
37
38
39
40
41
42
43 public class LongDoubleDoubleTextInputFormat
44 extends TextVertexInputFormat<LongWritable, DoubleWritable,
45 DoubleWritable>
46 implements ImmutableClassesGiraphConfigurable<LongWritable, DoubleWritable,
47 DoubleWritable> {
48
49
50 private ImmutableClassesGiraphConfiguration<LongWritable, DoubleWritable,
51 DoubleWritable> conf;
52
53 @Override
54 public TextVertexReader createVertexReader(InputSplit split,
55 TaskAttemptContext context)
56 throws IOException {
57 return new LongDoubleDoubleDoubleVertexReader();
58 }
59
60 @Override
61 public void setConf(ImmutableClassesGiraphConfiguration<LongWritable,
62 DoubleWritable, DoubleWritable> configuration) {
63 this.conf = configuration;
64 }
65
66 @Override
67 public ImmutableClassesGiraphConfiguration<LongWritable, DoubleWritable,
68 DoubleWritable> getConf() {
69 return conf;
70 }
71
72
73
74
75
76 public class LongDoubleDoubleDoubleVertexReader extends
77 TextVertexInputFormat<LongWritable, DoubleWritable,
78 DoubleWritable>.TextVertexReader {
79
80 private final Pattern separator = Pattern.compile("[\t ]");
81
82 @Override
83 public Vertex<LongWritable, DoubleWritable, DoubleWritable>
84 getCurrentVertex() throws IOException, InterruptedException {
85 Vertex<LongWritable, DoubleWritable, DoubleWritable>
86 vertex = conf.createVertex();
87
88 String[] tokens =
89 separator.split(getRecordReader().getCurrentValue().toString());
90 List<Edge<LongWritable, DoubleWritable>> edges =
91 Lists.newArrayListWithCapacity(tokens.length - 1);
92 float weight = 1.0f / (tokens.length - 1);
93 for (int n = 1; n < tokens.length; n++) {
94 edges.add(EdgeFactory.create(
95 new LongWritable(Long.parseLong(tokens[n])),
96 new DoubleWritable(weight)));
97 }
98
99 LongWritable vertexId = new LongWritable(Long.parseLong(tokens[0]));
100 vertex.initialize(vertexId, new DoubleWritable(), edges);
101
102 return vertex;
103 }
104
105 @Override
106 public boolean nextVertex() throws IOException, InterruptedException {
107 return getRecordReader().nextKeyValue();
108 }
109 }
110 }