This project has retired. For details please refer to its
Attic page.
GiraphHCatInputFormat xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.io.hcatalog;
20
21 import org.apache.hadoop.conf.Configuration;
22 import org.apache.hadoop.fs.FileSystem;
23 import org.apache.hadoop.fs.Path;
24 import org.apache.hadoop.io.WritableComparable;
25 import org.apache.hadoop.mapred.JobConf;
26 import org.apache.hadoop.mapreduce.InputSplit;
27 import org.apache.hadoop.mapreduce.Job;
28 import org.apache.hadoop.mapreduce.JobContext;
29 import org.apache.hadoop.mapreduce.RecordReader;
30 import org.apache.hadoop.mapreduce.TaskAttemptContext;
31 import org.apache.hadoop.util.StringUtils;
32 import org.apache.hcatalog.common.HCatConstants;
33 import org.apache.hcatalog.common.HCatUtil;
34 import org.apache.hcatalog.data.HCatRecord;
35 import org.apache.hcatalog.data.schema.HCatFieldSchema;
36 import org.apache.hcatalog.data.schema.HCatSchema;
37 import org.apache.hcatalog.mapreduce.HCatBaseInputFormat;
38 import org.apache.hcatalog.mapreduce.HCatSplit;
39 import org.apache.hcatalog.mapreduce.HCatStorageHandler;
40 import org.apache.hcatalog.mapreduce.HCatUtils;
41 import org.apache.hcatalog.mapreduce.InputJobInfo;
42 import org.apache.hcatalog.mapreduce.PartInfo;
43
44 import java.io.IOException;
45 import java.util.ArrayList;
46 import java.util.HashMap;
47 import java.util.LinkedList;
48 import java.util.List;
49 import java.util.Map;
50
51
52
53
54
55
56 public class GiraphHCatInputFormat extends HCatBaseInputFormat {
57
58 public static final String VERTEX_INPUT_JOB_INFO =
59 "giraph.hcat.vertex.input.job.info";
60
61 public static final String EDGE_INPUT_JOB_INFO =
62 "giraph.hcat.edge.input.job.info";
63
64
65
66
67
68
69
70
71 public static void setVertexInput(Job job,
72 InputJobInfo inputJobInfo)
73 throws IOException {
74 InputJobInfo vertexInputJobInfo = InputJobInfo.create(
75 inputJobInfo.getDatabaseName(),
76 inputJobInfo.getTableName(),
77 inputJobInfo.getFilter());
78 vertexInputJobInfo.getProperties().putAll(inputJobInfo.getProperties());
79 Configuration conf = job.getConfiguration();
80 conf.set(VERTEX_INPUT_JOB_INFO, HCatUtil.serialize(
81 HCatUtils.getInputJobInfo(conf, vertexInputJobInfo)));
82 }
83
84
85
86
87
88
89
90
91 public static void setEdgeInput(Job job,
92 InputJobInfo inputJobInfo)
93 throws IOException {
94 InputJobInfo edgeInputJobInfo = InputJobInfo.create(
95 inputJobInfo.getDatabaseName(),
96 inputJobInfo.getTableName(),
97 inputJobInfo.getFilter());
98 edgeInputJobInfo.getProperties().putAll(inputJobInfo.getProperties());
99 Configuration conf = job.getConfiguration();
100 conf.set(EDGE_INPUT_JOB_INFO, HCatUtil.serialize(
101 HCatUtils.getInputJobInfo(conf, edgeInputJobInfo)));
102 }
103
104
105
106
107
108
109
110
111 private static HCatSchema getTableSchema(InputJobInfo inputJobInfo)
112 throws IOException {
113 HCatSchema allCols = new HCatSchema(new LinkedList<HCatFieldSchema>());
114 for (HCatFieldSchema field :
115 inputJobInfo.getTableInfo().getDataColumns().getFields()) {
116 allCols.append(field);
117 }
118 for (HCatFieldSchema field :
119 inputJobInfo.getTableInfo().getPartitionColumns().getFields()) {
120 allCols.append(field);
121 }
122 return allCols;
123 }
124
125
126
127
128
129
130
131
132 public static HCatSchema getVertexTableSchema(Configuration conf)
133 throws IOException {
134 return getTableSchema(getVertexJobInfo(conf));
135 }
136
137
138
139
140
141
142
143
144 public static HCatSchema getEdgeTableSchema(Configuration conf)
145 throws IOException {
146 return getTableSchema(getEdgeJobInfo(conf));
147 }
148
149
150
151
152
153
154
155
156 private void setInputPath(JobConf jobConf, String location)
157 throws IOException {
158 int length = location.length();
159 int curlyOpen = 0;
160 int pathStart = 0;
161 boolean globPattern = false;
162 List<String> pathStrings = new ArrayList<String>();
163
164 for (int i = 0; i < length; i++) {
165 char ch = location.charAt(i);
166 switch (ch) {
167 case '{':
168 curlyOpen++;
169 if (!globPattern) {
170 globPattern = true;
171 }
172 break;
173 case '}':
174 curlyOpen--;
175 if (curlyOpen == 0 && globPattern) {
176 globPattern = false;
177 }
178 break;
179 case ',':
180 if (!globPattern) {
181 pathStrings.add(location.substring(pathStart, i));
182 pathStart = i + 1;
183 }
184 break;
185 default:
186 }
187 }
188 pathStrings.add(location.substring(pathStart, length));
189
190 Path[] paths = StringUtils.stringToPath(pathStrings.toArray(new String[0]));
191
192 FileSystem fs = FileSystem.get(jobConf);
193 Path path = paths[0].makeQualified(fs);
194 StringBuilder str = new StringBuilder(StringUtils.escapeString(
195 path.toString()));
196 for (int i = 1; i < paths.length; i++) {
197 str.append(StringUtils.COMMA_STR);
198 path = paths[i].makeQualified(fs);
199 str.append(StringUtils.escapeString(path.toString()));
200 }
201
202 jobConf.set("mapred.input.dir", str.toString());
203 }
204
205
206
207
208
209
210
211
212
213
214 private List<InputSplit> getSplits(JobContext jobContext,
215 InputJobInfo inputJobInfo)
216 throws IOException, InterruptedException {
217 Configuration conf = jobContext.getConfiguration();
218
219 List<InputSplit> splits = new ArrayList<InputSplit>();
220 List<PartInfo> partitionInfoList = inputJobInfo.getPartitions();
221 if (partitionInfoList == null) {
222
223 return splits;
224 }
225
226 HCatStorageHandler storageHandler;
227 JobConf jobConf;
228
229 for (PartInfo partitionInfo : partitionInfoList) {
230 jobConf = HCatUtil.getJobConfFromContext(jobContext);
231 setInputPath(jobConf, partitionInfo.getLocation());
232 Map<String, String> jobProperties = partitionInfo.getJobProperties();
233
234 HCatSchema allCols = new HCatSchema(new LinkedList<HCatFieldSchema>());
235 for (HCatFieldSchema field :
236 inputJobInfo.getTableInfo().getDataColumns().getFields()) {
237 allCols.append(field);
238 }
239 for (HCatFieldSchema field :
240 inputJobInfo.getTableInfo().getPartitionColumns().getFields()) {
241 allCols.append(field);
242 }
243
244 HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);
245
246 storageHandler = HCatUtil.getStorageHandler(
247 jobConf, partitionInfo);
248
249
250 Class inputFormatClass = storageHandler.getInputFormatClass();
251 org.apache.hadoop.mapred.InputFormat inputFormat =
252 getMapRedInputFormat(jobConf, inputFormatClass);
253
254
255
256
257
258
259
260
261 int desiredNumSplits =
262 conf.getInt(HCatConstants.HCAT_DESIRED_PARTITION_NUM_SPLITS, 0);
263 org.apache.hadoop.mapred.InputSplit[] baseSplits =
264 inputFormat.getSplits(jobConf, desiredNumSplits);
265
266 for (org.apache.hadoop.mapred.InputSplit split : baseSplits) {
267 splits.add(new HCatSplit(partitionInfo, split, allCols));
268 }
269 }
270
271 return splits;
272 }
273
274
275
276
277
278
279
280
281 private static InputJobInfo getVertexJobInfo(Configuration conf)
282 throws IOException {
283 String jobString = conf.get(VERTEX_INPUT_JOB_INFO);
284 if (jobString == null) {
285 throw new IOException("Vertex job information not found in JobContext." +
286 " GiraphHCatInputFormat.setVertexInput() not called?");
287 }
288 return (InputJobInfo) HCatUtil.deserialize(jobString);
289 }
290
291
292
293
294
295
296
297
298 private static InputJobInfo getEdgeJobInfo(Configuration conf)
299 throws IOException {
300 String jobString = conf.get(EDGE_INPUT_JOB_INFO);
301 if (jobString == null) {
302 throw new IOException("Edge job information not found in JobContext." +
303 " GiraphHCatInputFormat.setEdgeInput() not called?");
304 }
305 return (InputJobInfo) HCatUtil.deserialize(jobString);
306 }
307
308
309
310
311
312
313
314
315
316 public List<InputSplit> getVertexSplits(JobContext jobContext)
317 throws IOException, InterruptedException {
318 return getSplits(jobContext,
319 getVertexJobInfo(jobContext.getConfiguration()));
320 }
321
322
323
324
325
326
327
328
329
330 public List<InputSplit> getEdgeSplits(JobContext jobContext)
331 throws IOException, InterruptedException {
332 return getSplits(jobContext,
333 getEdgeJobInfo(jobContext.getConfiguration()));
334 }
335
336
337
338
339
340
341
342
343
344
345
346 private RecordReader<WritableComparable, HCatRecord>
347 createRecordReader(InputSplit split,
348 HCatSchema schema,
349 TaskAttemptContext taskContext)
350 throws IOException, InterruptedException {
351 HCatSplit hcatSplit = HCatUtils.castToHCatSplit(split);
352 PartInfo partitionInfo = hcatSplit.getPartitionInfo();
353 JobContext jobContext = taskContext;
354 Configuration conf = jobContext.getConfiguration();
355
356 HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(
357 conf, partitionInfo);
358
359 JobConf jobConf = HCatUtil.getJobConfFromContext(jobContext);
360 Map<String, String> jobProperties = partitionInfo.getJobProperties();
361 HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);
362
363 Map<String, String> valuesNotInDataCols = getColValsNotInDataColumns(
364 schema, partitionInfo);
365
366 return HCatUtils.newHCatReader(storageHandler, valuesNotInDataCols);
367 }
368
369
370
371
372
373
374
375
376
377
378 public RecordReader<WritableComparable, HCatRecord>
379 createVertexRecordReader(InputSplit split, TaskAttemptContext taskContext)
380 throws IOException, InterruptedException {
381 return createRecordReader(split, getVertexTableSchema(
382 taskContext.getConfiguration()), taskContext);
383 }
384
385
386
387
388
389
390
391
392
393
394 public RecordReader<WritableComparable, HCatRecord>
395 createEdgeRecordReader(InputSplit split, TaskAttemptContext taskContext)
396 throws IOException, InterruptedException {
397 return createRecordReader(split, getEdgeTableSchema(
398 taskContext.getConfiguration()), taskContext);
399 }
400
401
402
403
404
405
406
407
408
409 private static Map<String, String> getColValsNotInDataColumns(
410 HCatSchema outputSchema,
411 PartInfo partInfo) {
412 HCatSchema dataSchema = partInfo.getPartitionSchema();
413 Map<String, String> vals = new HashMap<String, String>();
414 for (String fieldName : outputSchema.getFieldNames()) {
415 if (dataSchema.getPosition(fieldName) == null) {
416
417
418 if (partInfo.getPartitionValues().containsKey(fieldName)) {
419 vals.put(fieldName, partInfo.getPartitionValues().get(fieldName));
420 } else {
421 vals.put(fieldName, null);
422 }
423 }
424 }
425 return vals;
426 }
427 }