This project has retired. For details please refer to its
Attic page.
HCatGiraphRunner xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.io.hcatalog;
20
21 import org.apache.commons.cli.CommandLine;
22 import org.apache.commons.cli.CommandLineParser;
23 import org.apache.commons.cli.GnuParser;
24 import org.apache.commons.cli.HelpFormatter;
25 import org.apache.commons.cli.Options;
26 import org.apache.commons.cli.ParseException;
27 import org.apache.giraph.graph.Computation;
28 import org.apache.giraph.io.EdgeInputFormat;
29 import org.apache.giraph.io.VertexInputFormat;
30 import org.apache.giraph.io.VertexOutputFormat;
31 import org.apache.giraph.job.GiraphJob;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.hive.conf.HiveConf;
34 import org.apache.hadoop.util.Tool;
35 import org.apache.hadoop.util.ToolRunner;
36 import org.apache.hcatalog.mapreduce.HCatOutputFormat;
37 import org.apache.hcatalog.mapreduce.InputJobInfo;
38 import org.apache.hcatalog.mapreduce.OutputJobInfo;
39 import org.apache.log4j.Logger;
40
41 import com.google.common.collect.Lists;
42
43 import java.io.File;
44 import java.util.Arrays;
45 import java.util.Collection;
46 import java.util.List;
47 import java.util.Map;
48
49
50
51
52 public class HCatGiraphRunner implements Tool {
53
54
55
56 private static final Logger LOG = Logger.getLogger(HCatGiraphRunner.class);
57
58
59
60 protected int workers;
61
62
63
64 protected boolean isVerbose;
65
66
67
68 protected Map<String, String> outputTablePartitionValues;
69
70
71
72 protected String dbName;
73
74
75
76 protected String vertexInputTableName;
77
78
79
80 protected String vertexInputTableFilterExpr;
81
82
83
84 protected String edgeInputTableName;
85
86
87
88 protected String edgeInputTableFilterExpr;
89
90
91
92 protected String outputTableName;
93
94 private Configuration conf;
95
96 private boolean skipOutput = false;
97
98
99
100
101 private Class<? extends Computation> computationClass;
102
103
104
105 private Class<? extends VertexInputFormat> vertexInputFormatClass;
106
107
108
109 private Class<? extends EdgeInputFormat> edgeInputFormatClass;
110
111
112
113 private Class<? extends VertexOutputFormat> vertexOutputFormatClass;
114
115
116
117
118
119
120
121
122
123 protected HCatGiraphRunner(
124 Class<? extends Computation> computationClass,
125 Class<? extends VertexInputFormat> vertexInputFormatClass,
126 Class<? extends EdgeInputFormat> edgeInputFormatClass,
127 Class<? extends VertexOutputFormat> vertexOutputFormatClass) {
128 this.computationClass = computationClass;
129 this.vertexInputFormatClass = vertexInputFormatClass;
130 this.edgeInputFormatClass = edgeInputFormatClass;
131 this.vertexOutputFormatClass = vertexOutputFormatClass;
132 this.conf = new HiveConf(getClass());
133 }
134
135
136
137
138
139
140 public static void main(String[] args) throws Exception {
141 System.exit(ToolRunner.run(
142 new HCatGiraphRunner(null, null, null, null), args));
143 }
144
145 @Override
146 public final int run(String[] args) throws Exception {
147
148 try {
149 processArguments(args);
150 } catch (InterruptedException e) {
151 return 0;
152 } catch (IllegalArgumentException e) {
153 System.err.println(e.getMessage());
154 return -1;
155 }
156
157
158 adjustConfigurationForHive(getConf());
159
160
161 GiraphJob job = new GiraphJob(getConf(), getClass().getName());
162 job.getConfiguration().setComputationClass(computationClass);
163
164
165 if (vertexInputFormatClass != null) {
166 InputJobInfo vertexInputJobInfo = InputJobInfo.create(dbName,
167 vertexInputTableName, vertexInputTableFilterExpr);
168 GiraphHCatInputFormat.setVertexInput(job.getInternalJob(),
169 vertexInputJobInfo);
170 job.getConfiguration().setVertexInputFormatClass(vertexInputFormatClass);
171 }
172 if (edgeInputFormatClass != null) {
173 InputJobInfo edgeInputJobInfo = InputJobInfo.create(dbName,
174 edgeInputTableName, edgeInputTableFilterExpr);
175 GiraphHCatInputFormat.setEdgeInput(job.getInternalJob(),
176 edgeInputJobInfo);
177 job.getConfiguration().setEdgeInputFormatClass(edgeInputFormatClass);
178 }
179
180
181 HCatOutputFormat.setOutput(job.getInternalJob(), OutputJobInfo.create(
182 dbName, outputTableName, outputTablePartitionValues));
183 HCatOutputFormat.setSchema(job.getInternalJob(),
184 HCatOutputFormat.getTableSchema(job.getInternalJob()));
185 if (skipOutput) {
186 LOG.warn("run: Warning - Output will be skipped!");
187 } else {
188 job.getConfiguration().setVertexOutputFormatClass(
189 vertexOutputFormatClass);
190 }
191
192 job.getConfiguration().setWorkerConfiguration(workers, workers, 100.0f);
193 initGiraphJob(job);
194
195 return job.run(isVerbose) ? 0 : -1;
196 }
197
198
199
200
201
202 private static void adjustConfigurationForHive(Configuration conf) {
203
204
205
206 addToStringCollection(conf, "tmpfiles", conf.getClassLoader()
207 .getResource("hive-site.xml").toString());
208
209
210
211
212
213
214
215 String[] hadoopJars = System.getenv("HADOOP_CLASSPATH").split(
216 File.pathSeparator);
217 List<String> hadoopJarURLs = Lists.newArrayList();
218 for (String jarPath : hadoopJars) {
219 File file = new File(jarPath);
220 if (file.exists() && file.isFile()) {
221 String jarURL = file.toURI().toString();
222 hadoopJarURLs.add(jarURL);
223 }
224 }
225 addToStringCollection(conf, "tmpjars", hadoopJarURLs);
226 }
227
228
229
230
231
232
233
234
235 private CommandLine processArguments(String[] args) throws ParseException,
236 InterruptedException {
237 Options options = new Options();
238 options.addOption("h", "help", false, "Help");
239 options.addOption("v", "verbose", false, "Verbose");
240 options.addOption("D", "hiveconf", true,
241 "property=value for Hive/Hadoop configuration");
242 options.addOption("w", "workers", true, "Number of workers");
243 if (computationClass == null) {
244 options.addOption(null, "computationClass", true,
245 "Giraph Computation class to use");
246 }
247 if (vertexInputFormatClass == null) {
248 options.addOption(null, "vertexInputFormatClass", true,
249 "Giraph HCatalogVertexInputFormat class to use");
250 }
251 if (edgeInputFormatClass == null) {
252 options.addOption(null, "edgeInputFormatClass", true,
253 "Giraph HCatalogEdgeInputFormat class to use");
254 }
255
256 if (vertexOutputFormatClass == null) {
257 options.addOption(null, "vertexOutputFormatClass", true,
258 "Giraph HCatalogVertexOutputFormat class to use");
259 }
260
261 options.addOption("db", "dbName", true, "Hive database name");
262 options.addOption("vi", "vertexInputTable", true,
263 "Vertex input table name");
264 options.addOption("VI", "vertexInputFilter", true,
265 "Vertex input table filter expression (e.g., \"a<2 AND b='two'\"");
266 options.addOption("ei", "edgeInputTable", true,
267 "Edge input table name");
268 options.addOption("EI", "edgeInputFilter", true,
269 "Edge input table filter expression (e.g., \"a<2 AND b='two'\"");
270 options.addOption("o", "outputTable", true, "Output table name");
271 options.addOption("O", "outputPartition", true,
272 "Output table partition values (e.g., \"a=1,b=two\")");
273 options.addOption("s", "skipOutput", false, "Skip output?");
274
275 addMoreOptions(options);
276
277 CommandLineParser parser = new GnuParser();
278 final CommandLine cmdln = parser.parse(options, args);
279 if (args.length == 0 || cmdln.hasOption("help")) {
280 new HelpFormatter().printHelp(getClass().getName(), options, true);
281 throw new InterruptedException();
282 }
283
284
285 if (cmdln.hasOption("computationClass")) {
286 computationClass = findClass(cmdln.getOptionValue("computationClass"),
287 Computation.class);
288 }
289 if (cmdln.hasOption("vertexInputFormatClass")) {
290 vertexInputFormatClass = findClass(
291 cmdln.getOptionValue("vertexInputFormatClass"),
292 HCatalogVertexInputFormat.class);
293 }
294 if (cmdln.hasOption("edgeInputFormatClass")) {
295 edgeInputFormatClass = findClass(
296 cmdln.getOptionValue("edgeInputFormatClass"),
297 HCatalogEdgeInputFormat.class);
298 }
299
300 if (cmdln.hasOption("vertexOutputFormatClass")) {
301 vertexOutputFormatClass = findClass(
302 cmdln.getOptionValue("vertexOutputFormatClass"),
303 HCatalogVertexOutputFormat.class);
304 }
305
306 if (cmdln.hasOption("skipOutput")) {
307 skipOutput = true;
308 }
309
310 if (computationClass == null) {
311 throw new IllegalArgumentException(
312 "Need the Giraph Computation class name (-computationClass) to use");
313 }
314 if (vertexInputFormatClass == null && edgeInputFormatClass == null) {
315 throw new IllegalArgumentException(
316 "Need at least one of Giraph VertexInputFormat " +
317 "class name (-vertexInputFormatClass) and " +
318 "EdgeInputFormat class name (-edgeInputFormatClass)");
319 }
320 if (vertexOutputFormatClass == null) {
321 throw new IllegalArgumentException(
322 "Need the Giraph VertexOutputFormat " +
323 "class name (-vertexOutputFormatClass) to use");
324 }
325 if (!cmdln.hasOption("workers")) {
326 throw new IllegalArgumentException(
327 "Need to choose the number of workers (-w)");
328 }
329 if (!cmdln.hasOption("vertexInputTable") &&
330 vertexInputFormatClass != null) {
331 throw new IllegalArgumentException(
332 "Need to set the vertex input table name (-vi)");
333 }
334 if (!cmdln.hasOption("edgeInputTable") &&
335 edgeInputFormatClass != null) {
336 throw new IllegalArgumentException(
337 "Need to set the edge input table name (-ei)");
338 }
339 if (!cmdln.hasOption("outputTable")) {
340 throw new IllegalArgumentException(
341 "Need to set the output table name (-o)");
342 }
343 dbName = cmdln.getOptionValue("dbName", "default");
344 vertexInputTableName = cmdln.getOptionValue("vertexInputTable");
345 vertexInputTableFilterExpr = cmdln.getOptionValue("vertexInputFilter");
346 edgeInputTableName = cmdln.getOptionValue("edgeInputTable");
347 edgeInputTableFilterExpr = cmdln.getOptionValue("edgeInputFilter");
348 outputTableName = cmdln.getOptionValue("outputTable");
349 outputTablePartitionValues = HiveUtils.parsePartitionValues(cmdln
350 .getOptionValue("outputPartition"));
351 workers = Integer.parseInt(cmdln.getOptionValue("workers"));
352 isVerbose = cmdln.hasOption("verbose");
353
354
355 for (String hiveconf : cmdln.getOptionValues("hiveconf")) {
356 String[] keyval = hiveconf.split("=", 2);
357 if (keyval.length == 2) {
358 String name = keyval[0];
359 String value = keyval[1];
360 if (name.equals("tmpjars") || name.equals("tmpfiles")) {
361 addToStringCollection(
362 conf, name, value);
363 } else {
364 conf.set(name, value);
365 }
366 }
367 }
368
369 processMoreArguments(cmdln);
370
371 return cmdln;
372 }
373
374
375
376
377
378
379
380 private static void addToStringCollection(Configuration conf, String name,
381 String... values) {
382 addToStringCollection(conf, name, Arrays.asList(values));
383 }
384
385
386
387
388
389
390
391 private static void addToStringCollection(
392 Configuration conf, String name, Collection
393 <? extends String> values) {
394 Collection<String> tmpfiles = conf.getStringCollection(name);
395 tmpfiles.addAll(values);
396 conf.setStrings(name, tmpfiles.toArray(new String[tmpfiles.size()]));
397 }
398
399
400
401
402
403
404
405
406 private <T> Class<? extends T> findClass(String className, Class<T> base) {
407 try {
408 Class<?> cls = Class.forName(className);
409 if (base.isAssignableFrom(cls)) {
410 return cls.asSubclass(base);
411 }
412 return null;
413 } catch (ClassNotFoundException e) {
414 throw new IllegalArgumentException(className + ": Invalid class name");
415 }
416 }
417
418 @Override
419 public final Configuration getConf() {
420 return conf;
421 }
422
423 @Override
424 public final void setConf(Configuration conf) {
425 this.conf = conf;
426 }
427
428
429
430
431
432
433
434 protected void addMoreOptions(Options options) {
435 }
436
437
438
439
440
441
442
443
444 protected void processMoreArguments(CommandLine cmd) {
445 }
446
447
448
449
450
451
452
453
454 protected void initGiraphJob(GiraphJob job) {
455 LOG.info(getClass().getSimpleName() + " with");
456 String prefix = "\t";
457 LOG.info(prefix + "-computationClass=" +
458 computationClass.getCanonicalName());
459 if (vertexInputFormatClass != null) {
460 LOG.info(prefix + "-vertexInputFormatClass=" +
461 vertexInputFormatClass.getCanonicalName());
462 }
463 if (edgeInputFormatClass != null) {
464 LOG.info(prefix + "-edgeInputFormatClass=" +
465 edgeInputFormatClass.getCanonicalName());
466 }
467 LOG.info(prefix + "-vertexOutputFormatClass=" +
468 vertexOutputFormatClass.getCanonicalName());
469 if (vertexInputTableName != null) {
470 LOG.info(prefix + "-vertexInputTable=" + vertexInputTableName);
471 }
472 if (vertexInputTableFilterExpr != null) {
473 LOG.info(prefix + "-vertexInputFilter=\"" +
474 vertexInputTableFilterExpr + "\"");
475 }
476 if (edgeInputTableName != null) {
477 LOG.info(prefix + "-edgeInputTable=" + edgeInputTableName);
478 }
479 if (edgeInputTableFilterExpr != null) {
480 LOG.info(prefix + "-edgeInputFilter=\"" +
481 edgeInputTableFilterExpr + "\"");
482 }
483 LOG.info(prefix + "-outputTable=" + outputTableName);
484 if (outputTablePartitionValues != null) {
485 LOG.info(prefix + "-outputPartition=\"" +
486 outputTablePartitionValues + "\"");
487 }
488 LOG.info(prefix + "-workers=" + workers);
489 }
490 }