This project has retired. For details please refer to its
Attic page.
LongDoubleNullTextInputFormat xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.examples;
20
21 import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
22 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23 import org.apache.giraph.edge.Edge;
24 import org.apache.giraph.edge.EdgeFactory;
25 import org.apache.giraph.graph.Vertex;
26 import org.apache.giraph.io.formats.TextVertexInputFormat;
27 import org.apache.hadoop.io.DoubleWritable;
28 import org.apache.hadoop.io.LongWritable;
29 import org.apache.hadoop.io.NullWritable;
30 import org.apache.hadoop.mapreduce.InputSplit;
31 import org.apache.hadoop.mapreduce.TaskAttemptContext;
32
33 import com.google.common.collect.Lists;
34
35 import java.io.IOException;
36 import java.util.List;
37 import java.util.regex.Pattern;
38
39
40
41
42 public class LongDoubleNullTextInputFormat
43 extends TextVertexInputFormat<LongWritable, DoubleWritable, NullWritable>
44 implements ImmutableClassesGiraphConfigurable<LongWritable, DoubleWritable,
45 NullWritable> {
46
47 private ImmutableClassesGiraphConfiguration<LongWritable, DoubleWritable,
48 NullWritable> conf;
49
50 @Override
51 public TextVertexReader createVertexReader(InputSplit split,
52 TaskAttemptContext context)
53 throws IOException {
54 return new LongDoubleNullDoubleVertexReader();
55 }
56
57 @Override
58 public void setConf(ImmutableClassesGiraphConfiguration<LongWritable,
59 DoubleWritable, NullWritable> configuration) {
60 this.conf = configuration;
61 }
62
63 @Override
64 public ImmutableClassesGiraphConfiguration<LongWritable, DoubleWritable,
65 NullWritable> getConf() {
66 return conf;
67 }
68
69
70
71
72
73 public class LongDoubleNullDoubleVertexReader extends
74 TextVertexInputFormat<LongWritable, DoubleWritable,
75 NullWritable>.TextVertexReader {
76
77 private final Pattern separator = Pattern.compile("[\t ]");
78
79 @Override
80 public Vertex<LongWritable, DoubleWritable, NullWritable>
81 getCurrentVertex() throws IOException, InterruptedException {
82 Vertex<LongWritable, DoubleWritable, NullWritable>
83 vertex = conf.createVertex();
84
85 String[] tokens =
86 separator.split(getRecordReader().getCurrentValue().toString());
87 List<Edge<LongWritable, NullWritable>> edges =
88 Lists.newArrayListWithCapacity(tokens.length - 1);
89 for (int n = 1; n < tokens.length; n++) {
90 edges.add(EdgeFactory.create(
91 new LongWritable(Long.parseLong(tokens[n])),
92 NullWritable.get()));
93 }
94
95 LongWritable vertexId = new LongWritable(Long.parseLong(tokens[0]));
96 vertex.initialize(vertexId, new DoubleWritable(), edges);
97
98 return vertex;
99 }
100
101 @Override
102 public boolean nextVertex() throws IOException, InterruptedException {
103 return getRecordReader().nextKeyValue();
104 }
105 }
106 }