This project has retired. For details please refer to its
Attic page.
HCatalogVertexValueInputFormat xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.io.hcatalog;
20
21 import org.apache.giraph.io.VertexValueInputFormat;
22 import org.apache.giraph.io.VertexValueReader;
23 import org.apache.hadoop.io.Writable;
24 import org.apache.hadoop.io.WritableComparable;
25 import org.apache.hadoop.mapreduce.InputSplit;
26 import org.apache.hadoop.mapreduce.JobContext;
27 import org.apache.hadoop.mapreduce.RecordReader;
28 import org.apache.hadoop.mapreduce.TaskAttemptContext;
29 import org.apache.hcatalog.data.HCatRecord;
30
31 import java.io.IOException;
32 import java.util.List;
33
34
35
36
37
38
39
40
41 public abstract class HCatalogVertexValueInputFormat<I extends
42 WritableComparable, V extends Writable>
43 extends VertexValueInputFormat<I, V> {
44
45
46
47 private GiraphHCatInputFormat hCatInputFormat = new GiraphHCatInputFormat();
48
49 @Override
50 public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
51 throws IOException, InterruptedException {
52 return hCatInputFormat.getVertexSplits(context);
53 }
54
55
56
57
58 protected abstract class HCatalogVertexValueReader
59 extends VertexValueReader<I, V> {
60
61 private RecordReader<WritableComparable, HCatRecord> hCatRecordReader;
62
63 private TaskAttemptContext context;
64
65 @Override
66 public final void initialize(InputSplit inputSplit,
67 TaskAttemptContext context)
68 throws IOException, InterruptedException {
69 super.initialize(inputSplit, context);
70 hCatRecordReader =
71 hCatInputFormat.createVertexRecordReader(inputSplit, context);
72 hCatRecordReader.initialize(inputSplit, context);
73 this.context = context;
74 }
75
76 @Override
77 public boolean nextVertex() throws IOException, InterruptedException {
78 return hCatRecordReader.nextKeyValue();
79 }
80
81 @Override
82 public final void close() throws IOException {
83 hCatRecordReader.close();
84 }
85
86 @Override
87 public final float getProgress() throws IOException, InterruptedException {
88 return hCatRecordReader.getProgress();
89 }
90
91
92
93
94
95
96 protected final RecordReader<WritableComparable, HCatRecord>
97 getRecordReader() {
98 return hCatRecordReader;
99 }
100
101
102
103
104
105
106 protected final TaskAttemptContext getContext() {
107 return context;
108 }
109 }
110
111
112
113
114
115
116 protected abstract HCatalogVertexValueReader createVertexValueReader();
117
118 @Override
119 public final VertexValueReader<I, V>
120 createVertexValueReader(InputSplit split, TaskAttemptContext context)
121 throws IOException {
122 try {
123 HCatalogVertexValueReader reader = createVertexValueReader();
124 reader.initialize(split, context);
125 return reader;
126 } catch (InterruptedException e) {
127 throw new IllegalStateException(
128 "createVertexValueReader: Interrupted creating reader.", e);
129 }
130 }
131
132
133
134
135
136 protected abstract class SingleRowHCatalogVertexValueReader
137 extends HCatalogVertexValueReader {
138
139
140
141
142
143
144 protected abstract I getVertexId(HCatRecord record);
145
146
147
148
149
150
151
152 protected abstract V getVertexValue(HCatRecord record);
153
154 @Override
155 public final I getCurrentVertexId() throws IOException,
156 InterruptedException {
157 return getVertexId(getRecordReader().getCurrentValue());
158 }
159
160 @Override
161 public final V getCurrentVertexValue() throws IOException,
162 InterruptedException {
163 return getVertexValue(getRecordReader().getCurrentValue());
164 }
165 }
166 }