This project has retired. For details please refer to its
Attic page.
HCatalogVertexInputFormat xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.io.hcatalog;
20
21 import org.apache.giraph.edge.Edge;
22 import org.apache.giraph.edge.EdgeFactory;
23 import org.apache.giraph.graph.Vertex;
24 import org.apache.giraph.io.VertexInputFormat;
25 import org.apache.giraph.io.VertexReader;
26 import org.apache.giraph.utils.TimedLogger;
27 import org.apache.hadoop.io.Writable;
28 import org.apache.hadoop.io.WritableComparable;
29 import org.apache.hadoop.mapreduce.InputSplit;
30 import org.apache.hadoop.mapreduce.JobContext;
31 import org.apache.hadoop.mapreduce.RecordReader;
32 import org.apache.hadoop.mapreduce.TaskAttemptContext;
33 import org.apache.hcatalog.data.HCatRecord;
34 import org.apache.log4j.Logger;
35
36 import com.google.common.collect.Lists;
37
38 import java.io.IOException;
39 import java.util.List;
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60 @SuppressWarnings("rawtypes")
61 public abstract class HCatalogVertexInputFormat<
62 I extends WritableComparable,
63 V extends Writable,
64 E extends Writable>
65 extends VertexInputFormat<I, V, E> {
66
67
68
69 private GiraphHCatInputFormat hCatInputFormat = new GiraphHCatInputFormat();
70
71 @Override
72 public final List<InputSplit> getSplits(
73 final JobContext context, final int minSplitCountHint)
74 throws IOException, InterruptedException {
75 return hCatInputFormat.getVertexSplits(context);
76 }
77
78
79
80
81
82
83
84
85
86
87 protected abstract class HCatalogVertexReader
88 extends VertexReader<I, V, E> {
89
90 private RecordReader<WritableComparable,
91 HCatRecord> hCatRecordReader;
92
93 private TaskAttemptContext context;
94
95
96
97
98
99
100 private void initialize(
101 final RecordReader<
102 WritableComparable, HCatRecord>
103 recordReader) {
104 this.hCatRecordReader = recordReader;
105 }
106
107 @Override
108 public final void initialize(
109 final InputSplit inputSplit,
110 final TaskAttemptContext ctxt)
111 throws IOException, InterruptedException {
112 hCatRecordReader.initialize(inputSplit, ctxt);
113 this.context = ctxt;
114 }
115
116 @Override
117 public boolean nextVertex() throws IOException, InterruptedException {
118
119
120
121 return hCatRecordReader.nextKeyValue();
122 }
123
124 @Override
125 public final void close() throws IOException {
126 hCatRecordReader.close();
127 }
128
129 @Override
130 public final float getProgress() throws IOException, InterruptedException {
131 return hCatRecordReader.getProgress();
132 }
133
134
135
136
137
138 protected final RecordReader<WritableComparable, HCatRecord>
139 getRecordReader() {
140 return hCatRecordReader;
141 }
142
143
144
145
146
147
148
149
150 protected final TaskAttemptContext getContext() {
151 return context;
152 }
153 }
154
155
156
157
158
159 protected abstract HCatalogVertexReader createVertexReader();
160
161 @Override
162 public final VertexReader<I, V, E>
163 createVertexReader(final InputSplit split,
164 final TaskAttemptContext context)
165 throws IOException {
166 try {
167 HCatalogVertexReader reader = createVertexReader();
168 reader.initialize(hCatInputFormat.
169 createVertexRecordReader(split, context));
170 return reader;
171 } catch (InterruptedException e) {
172 throw new IllegalStateException(
173 "createVertexReader: " +
174 "Interrupted creating reader.", e);
175 }
176 }
177
178
179
180
181
182
183 protected abstract class SingleRowHCatalogVertexReader
184 extends HCatalogVertexReader {
185
186
187
188 private static final int BYTE_CONST = 1024;
189
190
191
192 private final Logger log =
193 Logger.getLogger(SingleRowHCatalogVertexReader.class);
194
195
196
197 private int recordCount = 0;
198
199
200
201 private final int recordModLimit = 1000;
202
203
204
205 private final TimedLogger timedLogger = new TimedLogger(30 * 1000,
206 log);
207
208
209
210
211
212
213 protected abstract I getVertexId(HCatRecord record);
214
215
216
217
218
219
220 protected abstract V getVertexValue(HCatRecord record);
221
222
223
224
225
226
227 protected abstract Iterable<Edge<I, E>> getEdges(HCatRecord record);
228
229 @Override
230 public final Vertex<I, V, E> getCurrentVertex()
231 throws IOException, InterruptedException {
232 HCatRecord record = getRecordReader().getCurrentValue();
233 Vertex<I, V, E> vertex = getConf().createVertex();
234 vertex.initialize(getVertexId(record), getVertexValue(record),
235 getEdges(record));
236 ++recordCount;
237 if (log.isInfoEnabled() &&
238 ((recordCount % recordModLimit) == 0)) {
239
240 Runtime runtime = Runtime.getRuntime();
241 double gb = BYTE_CONST *
242 BYTE_CONST *
243 BYTE_CONST;
244 timedLogger.info(
245 "read " + recordCount + " records. Memory: " +
246 (runtime.totalMemory() / gb) +
247 "GB total = " +
248 ((runtime.totalMemory() - runtime.freeMemory()) / gb) +
249 "GB used + " + (runtime.freeMemory() / gb) +
250 "GB free, " + (runtime.maxMemory() / gb) + "GB max");
251 }
252 return vertex;
253 }
254 }
255
256
257
258
259
260
261
262 protected abstract class MultiRowHCatalogVertexReader extends
263 HCatalogVertexReader {
264
265
266
267 private static final int RECORD_MOD_LIMIT = 1000;
268
269
270
271 private final Logger log =
272 Logger.getLogger(MultiRowHCatalogVertexReader.class);
273
274
275
276 private I currentVertexId = null;
277
278
279
280 private List<Edge<I, E>> currentEdges = Lists.newLinkedList();
281
282
283
284 private List<HCatRecord> recordsForVertex = Lists.newArrayList();
285
286
287
288 private int recordCount = 0;
289
290
291
292 private Vertex<I, V, E> vertex = null;
293
294
295
296 private final TimedLogger timedLogger = new TimedLogger(30 * 1000,
297 log);
298
299
300
301
302
303
304
305
306 protected abstract I getVertexId(HCatRecord record);
307
308
309
310
311
312
313 protected abstract V getVertexValue(
314 Iterable<HCatRecord> records);
315
316
317
318
319
320
321
322 protected abstract I getTargetVertexId(HCatRecord record);
323
324
325
326
327
328
329
330 protected abstract E getEdgeValue(HCatRecord record);
331
332 @Override
333 public final Vertex<I, V, E>
334 getCurrentVertex() throws IOException, InterruptedException {
335 return vertex;
336 }
337
338 @Override
339 public boolean nextVertex() throws IOException, InterruptedException {
340 while (getRecordReader().nextKeyValue()) {
341 HCatRecord record = getRecordReader().getCurrentValue();
342 if (currentVertexId == null) {
343 currentVertexId = getVertexId(record);
344 }
345 if (currentVertexId.equals(getVertexId(record))) {
346 currentEdges.add(EdgeFactory.create(getTargetVertexId(record),
347 getEdgeValue(record)));
348 recordsForVertex.add(record);
349 } else {
350 createCurrentVertex();
351 if (log.isInfoEnabled() && (recordCount % RECORD_MOD_LIMIT) == 0) {
352 timedLogger.info("read " + recordCount);
353 }
354 currentVertexId = getVertexId(record);
355 recordsForVertex.add(record);
356 return true;
357 }
358 }
359
360 if (currentEdges.isEmpty()) {
361 return false;
362 } else {
363 createCurrentVertex();
364 return true;
365 }
366 }
367
368
369
370
371 private void createCurrentVertex() {
372 vertex = getConf().createVertex();
373 vertex.initialize(currentVertexId, getVertexValue(recordsForVertex),
374 currentEdges);
375 currentEdges.clear();
376 recordsForVertex.clear();
377 ++recordCount;
378 }
379 }
380 }