This project has retired. For details please refer to its
Attic page.
SequenceFileVertexInputFormat xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.io.formats;
19
20 import org.apache.giraph.graph.Vertex;
21 import org.apache.giraph.io.VertexInputFormat;
22 import org.apache.giraph.io.VertexReader;
23 import org.apache.hadoop.conf.Configuration;
24 import org.apache.hadoop.io.Writable;
25 import org.apache.hadoop.io.WritableComparable;
26 import org.apache.hadoop.mapreduce.InputSplit;
27 import org.apache.hadoop.mapreduce.JobContext;
28 import org.apache.hadoop.mapreduce.RecordReader;
29 import org.apache.hadoop.mapreduce.TaskAttemptContext;
30 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
31
32 import java.io.IOException;
33 import java.util.List;
34
35
36
37
38
39
40
41
42
43 @SuppressWarnings("rawtypes")
44 public class SequenceFileVertexInputFormat<I extends WritableComparable,
45 V extends Writable, E extends Writable, X extends Vertex<I, V, E>>
46 extends VertexInputFormat<I, V, E> {
47
48 protected SequenceFileInputFormat<I, X> sequenceFileInputFormat =
49 new SequenceFileInputFormat<I, X>();
50
51 @Override public void checkInputSpecs(Configuration conf) { }
52
53 @Override
54 public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
55 throws IOException, InterruptedException {
56 return sequenceFileInputFormat.getSplits(context);
57 }
58
59 @Override
60 public VertexReader<I, V, E> createVertexReader(InputSplit split,
61 TaskAttemptContext context) throws IOException {
62 return new SequenceFileVertexReader<I, V, E, X>(
63 sequenceFileInputFormat.createRecordReader(split, context));
64 }
65
66
67
68
69
70
71
72
73
74 public static class SequenceFileVertexReader<I extends WritableComparable,
75 V extends Writable, E extends Writable, X extends Vertex<I, V, E>>
76 extends VertexReader<I, V, E> {
77
78 private final RecordReader<I, X> recordReader;
79
80
81
82
83
84
85 public SequenceFileVertexReader(RecordReader<I, X> recordReader) {
86 this.recordReader = recordReader;
87 }
88
89 @Override public void initialize(InputSplit inputSplit,
90 TaskAttemptContext context) throws IOException, InterruptedException {
91 recordReader.initialize(inputSplit, context);
92 }
93
94 @Override public boolean nextVertex() throws IOException,
95 InterruptedException {
96 return recordReader.nextKeyValue();
97 }
98
99 @Override public Vertex<I, V, E> getCurrentVertex()
100 throws IOException, InterruptedException {
101 return recordReader.getCurrentValue();
102 }
103
104
105 @Override public void close() throws IOException {
106 recordReader.close();
107 }
108
109 @Override public float getProgress() throws IOException,
110 InterruptedException {
111 return recordReader.getProgress();
112 }
113 }
114 }