This project has retired. For details please refer to its
        
        Attic page.
      
1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  package org.apache.giraph.io.formats;
19  
20  import org.apache.giraph.graph.Vertex;
21  import org.apache.giraph.io.VertexInputFormat;
22  import org.apache.giraph.io.VertexReader;
23  import org.apache.hadoop.conf.Configuration;
24  import org.apache.hadoop.io.Writable;
25  import org.apache.hadoop.io.WritableComparable;
26  import org.apache.hadoop.mapreduce.InputSplit;
27  import org.apache.hadoop.mapreduce.JobContext;
28  import org.apache.hadoop.mapreduce.RecordReader;
29  import org.apache.hadoop.mapreduce.TaskAttemptContext;
30  import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
31  
32  import java.io.IOException;
33  import java.util.List;
34  
35  
36  
37  
38  
39  
40  
41  
42  
43  @SuppressWarnings("rawtypes")
44  public class SequenceFileVertexInputFormat<I extends WritableComparable,
45      V extends Writable, E extends Writable, X extends Vertex<I, V, E>>
46      extends VertexInputFormat<I, V, E> {
47    
48    protected SequenceFileInputFormat<I, X> sequenceFileInputFormat =
49      new SequenceFileInputFormat<I, X>();
50  
51    @Override public void checkInputSpecs(Configuration conf) { }
52  
53    @Override
54    public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
55      throws IOException, InterruptedException {
56      return sequenceFileInputFormat.getSplits(context);
57    }
58  
59    @Override
60    public VertexReader<I, V, E> createVertexReader(InputSplit split,
61        TaskAttemptContext context) throws IOException {
62      return new SequenceFileVertexReader<I, V, E, X>(
63          sequenceFileInputFormat.createRecordReader(split, context));
64    }
65  
66    
67  
68  
69  
70  
71  
72  
73  
74    public static class SequenceFileVertexReader<I extends WritableComparable,
75        V extends Writable, E extends Writable, X extends Vertex<I, V, E>>
76        extends VertexReader<I, V, E> {
77      
78      private final RecordReader<I, X> recordReader;
79  
80      
81  
82  
83  
84  
85      public SequenceFileVertexReader(RecordReader<I, X> recordReader) {
86        this.recordReader = recordReader;
87      }
88  
89      @Override public void initialize(InputSplit inputSplit,
90          TaskAttemptContext context) throws IOException, InterruptedException {
91        recordReader.initialize(inputSplit, context);
92      }
93  
94      @Override public boolean nextVertex() throws IOException,
95          InterruptedException {
96        return recordReader.nextKeyValue();
97      }
98  
99      @Override public Vertex<I, V, E> getCurrentVertex()
100       throws IOException, InterruptedException {
101       return recordReader.getCurrentValue();
102     }
103 
104 
105     @Override public void close() throws IOException {
106       recordReader.close();
107     }
108 
109     @Override public float getProgress() throws IOException,
110         InterruptedException {
111       return recordReader.getProgress();
112     }
113   }
114 }