This project has retired. For details please refer to its
Attic page.
LongDoubleFloatTextInputFormat xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.examples;
20
21 import com.google.common.collect.Lists;
22 import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
23 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24 import org.apache.giraph.edge.Edge;
25 import org.apache.giraph.edge.EdgeFactory;
26 import org.apache.giraph.graph.Vertex;
27 import org.apache.giraph.io.formats.TextVertexInputFormat;
28 import org.apache.hadoop.io.DoubleWritable;
29 import org.apache.hadoop.io.FloatWritable;
30 import org.apache.hadoop.io.LongWritable;
31 import org.apache.hadoop.mapreduce.InputSplit;
32 import org.apache.hadoop.mapreduce.TaskAttemptContext;
33
34 import java.io.IOException;
35 import java.util.List;
36 import java.util.regex.Pattern;
37
38
39
40
41
42
43 public class LongDoubleFloatTextInputFormat
44 extends TextVertexInputFormat<LongWritable, DoubleWritable, FloatWritable>
45 implements ImmutableClassesGiraphConfigurable<LongWritable, DoubleWritable,
46 FloatWritable> {
47
48 private ImmutableClassesGiraphConfiguration<LongWritable, DoubleWritable,
49 FloatWritable> conf;
50
51 @Override
52 public TextVertexReader createVertexReader(InputSplit split,
53 TaskAttemptContext context) throws IOException {
54 return new LongDoubleFloatVertexReader();
55 }
56
57 @Override
58 public void setConf(ImmutableClassesGiraphConfiguration<LongWritable,
59 DoubleWritable, FloatWritable> configuration) {
60 this.conf = configuration;
61 }
62
63 @Override
64 public ImmutableClassesGiraphConfiguration<LongWritable, DoubleWritable,
65 FloatWritable> getConf() {
66 return conf;
67 }
68
69
70
71
72
73 public class LongDoubleFloatVertexReader extends
74 TextVertexInputFormat<LongWritable, DoubleWritable,
75 FloatWritable>.TextVertexReader {
76
77 private final Pattern neighborSeparator = Pattern.compile("[\t ]");
78
79 private final Pattern weightSeparator = Pattern.compile("[:]");
80
81 @Override
82 public Vertex<LongWritable, DoubleWritable, FloatWritable>
83 getCurrentVertex() throws IOException, InterruptedException {
84 Vertex<LongWritable, DoubleWritable, FloatWritable>
85 vertex = conf.createVertex();
86
87 String[] tokens = neighborSeparator.split(getRecordReader()
88 .getCurrentValue().toString());
89 List<Edge<LongWritable, FloatWritable>> edges =
90 Lists.newArrayListWithCapacity(tokens.length - 1);
91
92 for (int n = 1; n < tokens.length; n++) {
93 String[] parts = weightSeparator.split(tokens[n]);
94 edges.add(EdgeFactory.create(
95 new LongWritable(Long.parseLong(parts[0])),
96 new FloatWritable(Float.parseFloat(parts[1]))));
97 }
98
99 LongWritable vertexId = new LongWritable(Long.parseLong(tokens[0]));
100 vertex.initialize(vertexId, new DoubleWritable(), edges);
101
102 return vertex;
103 }
104
105 @Override
106 public boolean nextVertex() throws IOException, InterruptedException {
107 return getRecordReader().nextKeyValue();
108 }
109 }
110 }