This project has retired. For details please refer to its
Attic page.
HCatalogEdgeInputFormat xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.io.hcatalog;
20
21 import java.io.IOException;
22 import java.util.List;
23 import org.apache.giraph.edge.Edge;
24 import org.apache.giraph.edge.EdgeFactory;
25 import org.apache.giraph.io.EdgeInputFormat;
26 import org.apache.giraph.io.EdgeReader;
27 import org.apache.hadoop.io.NullWritable;
28 import org.apache.hadoop.io.Writable;
29 import org.apache.hadoop.io.WritableComparable;
30 import org.apache.hadoop.mapreduce.InputSplit;
31 import org.apache.hadoop.mapreduce.JobContext;
32 import org.apache.hadoop.mapreduce.RecordReader;
33 import org.apache.hadoop.mapreduce.TaskAttemptContext;
34 import org.apache.hcatalog.data.HCatRecord;
35
36
37
38
39
40
41
42 public abstract class HCatalogEdgeInputFormat<
43 I extends WritableComparable,
44 E extends Writable>
45 extends EdgeInputFormat<I, E> {
46
47
48
49 private GiraphHCatInputFormat hCatInputFormat = new GiraphHCatInputFormat();
50
51 @Override
52 public final List<InputSplit> getSplits(JobContext context,
53 int minSplitCountHint)
54 throws IOException, InterruptedException {
55 return hCatInputFormat.getEdgeSplits(context);
56 }
57
58
59
60
61
62
63 protected GiraphHCatInputFormat getHCatInputFormat() {
64 return hCatInputFormat;
65 }
66
67
68
69
70 protected abstract static class HCatalogEdgeReader<
71 I extends WritableComparable, E extends Writable>
72 extends EdgeReader<I, E> {
73
74 private final GiraphHCatInputFormat hCatInputFormat;
75
76 private RecordReader<WritableComparable, HCatRecord> hCatRecordReader;
77
78 private TaskAttemptContext context;
79
80
81
82
83
84
85 public HCatalogEdgeReader(GiraphHCatInputFormat hCatInputFormat) {
86 this.hCatInputFormat = hCatInputFormat;
87 }
88
89 @Override
90 public final void initialize(InputSplit inputSplit,
91 TaskAttemptContext context)
92 throws IOException, InterruptedException {
93 hCatRecordReader =
94 hCatInputFormat.createEdgeRecordReader(inputSplit, context);
95 hCatRecordReader.initialize(inputSplit, context);
96 this.context = context;
97 }
98
99 @Override
100 public boolean nextEdge() throws IOException, InterruptedException {
101 return hCatRecordReader.nextKeyValue();
102 }
103
104 @Override
105 public final void close() throws IOException {
106 hCatRecordReader.close();
107 }
108
109 @Override
110 public final float getProgress() throws IOException, InterruptedException {
111 return hCatRecordReader.getProgress();
112 }
113
114
115
116
117
118
119 protected final RecordReader<WritableComparable, HCatRecord>
120 getRecordReader() {
121 return hCatRecordReader;
122 }
123
124
125
126
127
128
129 protected final TaskAttemptContext getContext() {
130 return context;
131 }
132 }
133
134
135
136
137
138
139 protected abstract HCatalogEdgeReader<I, E> createEdgeReader();
140
141 @Override
142 public EdgeReader<I, E>
143 createEdgeReader(InputSplit split, TaskAttemptContext context)
144 throws IOException {
145 try {
146 HCatalogEdgeReader reader = createEdgeReader();
147 reader.initialize(split, context);
148 return reader;
149 } catch (InterruptedException e) {
150 throw new IllegalStateException(
151 "createEdgeReader: Interrupted creating reader.", e);
152 }
153 }
154
155
156
157
158
159 protected abstract static class SingleRowHCatalogEdgeReader<
160 I extends WritableComparable, E extends Writable>
161 extends HCatalogEdgeReader<I, E> {
162
163
164
165
166 public SingleRowHCatalogEdgeReader(GiraphHCatInputFormat hCatInputFormat) {
167 super(hCatInputFormat);
168 }
169
170
171
172
173
174
175
176 protected abstract I getSourceVertexId(HCatRecord record);
177
178
179
180
181
182
183
184 protected abstract I getTargetVertexId(HCatRecord record);
185
186
187
188
189
190
191
192 protected abstract E getEdgeValue(HCatRecord record);
193
194 @Override
195 public I getCurrentSourceId() throws IOException, InterruptedException {
196 HCatRecord record = getRecordReader().getCurrentValue();
197 return getSourceVertexId(record);
198 }
199
200 @Override
201 public Edge<I, E> getCurrentEdge() throws IOException,
202 InterruptedException {
203 HCatRecord record = getRecordReader().getCurrentValue();
204 return EdgeFactory.create(getTargetVertexId(record),
205 getEdgeValue(record));
206 }
207 }
208
209
210
211
212
213 protected abstract static class SingleRowHCatalogEdgeNoValueReader<
214 I extends WritableComparable>
215 extends HCatalogEdgeReader<I, NullWritable> {
216
217
218
219
220 public SingleRowHCatalogEdgeNoValueReader(
221 GiraphHCatInputFormat hCatInputFormat) {
222 super(hCatInputFormat);
223 }
224
225
226
227
228
229
230
231 protected abstract I getSourceVertexId(HCatRecord record);
232
233
234
235
236
237
238
239 protected abstract I getTargetVertexId(HCatRecord record);
240
241 @Override
242 public I getCurrentSourceId() throws IOException, InterruptedException {
243 HCatRecord record = getRecordReader().getCurrentValue();
244 return getSourceVertexId(record);
245 }
246
247 @Override
248 public Edge<I, NullWritable> getCurrentEdge() throws IOException,
249 InterruptedException {
250 HCatRecord record = getRecordReader().getCurrentValue();
251 return EdgeFactory.create(getTargetVertexId(record));
252 }
253 }
254 }