View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.io.hcatalog;
20  
21  import org.apache.commons.cli.CommandLine;
22  import org.apache.commons.cli.CommandLineParser;
23  import org.apache.commons.cli.GnuParser;
24  import org.apache.commons.cli.HelpFormatter;
25  import org.apache.commons.cli.Options;
26  import org.apache.commons.cli.ParseException;
27  import org.apache.giraph.graph.Computation;
28  import org.apache.giraph.io.EdgeInputFormat;
29  import org.apache.giraph.io.VertexInputFormat;
30  import org.apache.giraph.io.VertexOutputFormat;
31  import org.apache.giraph.job.GiraphJob;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.hive.conf.HiveConf;
34  import org.apache.hadoop.util.Tool;
35  import org.apache.hadoop.util.ToolRunner;
36  import org.apache.hcatalog.mapreduce.HCatOutputFormat;
37  import org.apache.hcatalog.mapreduce.InputJobInfo;
38  import org.apache.hcatalog.mapreduce.OutputJobInfo;
39  import org.apache.log4j.Logger;
40  
41  import com.google.common.collect.Lists;
42  
43  import java.io.File;
44  import java.util.Arrays;
45  import java.util.Collection;
46  import java.util.List;
47  import java.util.Map;
48  
49  /**
50   * Hive Giraph Runner
51   */
52  public class HCatGiraphRunner implements Tool {
53    /**
54     * logger
55     */
56    private static final Logger LOG = Logger.getLogger(HCatGiraphRunner.class);
57    /**
58     * workers
59     */
60    protected int workers;
61    /**
62     * is verbose
63     */
64    protected boolean isVerbose;
65    /**
66     * output table partitions
67     */
68    protected Map<String, String> outputTablePartitionValues;
69    /**
70     * dbName
71     */
72    protected String dbName;
73    /**
74     * vertex input table name
75     */
76    protected String vertexInputTableName;
77    /**
78     * vertex input table filter
79     */
80    protected String vertexInputTableFilterExpr;
81    /**
82     * edge input table name
83     */
84    protected String edgeInputTableName;
85    /**
86     * edge input table filter
87     */
88    protected String edgeInputTableFilterExpr;
89    /**
90     * output table name
91     */
92    protected String outputTableName;
93    /** Configuration */
94    private Configuration conf;
95    /** Skip output? (Useful for testing without writing) */
96    private boolean skipOutput = false;
97  
98    /**
99    * computation class.
100   */
101   private Class<? extends Computation> computationClass;
102   /**
103    * vertex input format internal.
104    */
105   private Class<? extends VertexInputFormat> vertexInputFormatClass;
106   /**
107    * edge input format internal.
108    */
109   private Class<? extends EdgeInputFormat> edgeInputFormatClass;
110   /**
111   * vertex output format internal.
112   */
113   private Class<? extends VertexOutputFormat> vertexOutputFormatClass;
114 
115   /**
116   * Giraph runner class.
117    *
118   * @param computationClass Computation class
119   * @param vertexInputFormatClass Vertex input format
120   * @param edgeInputFormatClass Edge input format
121   * @param vertexOutputFormatClass Output format
122   */
123   protected HCatGiraphRunner(
124       Class<? extends Computation> computationClass,
125       Class<? extends VertexInputFormat> vertexInputFormatClass,
126       Class<? extends EdgeInputFormat> edgeInputFormatClass,
127       Class<? extends VertexOutputFormat> vertexOutputFormatClass) {
128     this.computationClass = computationClass;
129     this.vertexInputFormatClass = vertexInputFormatClass;
130     this.edgeInputFormatClass = edgeInputFormatClass;
131     this.vertexOutputFormatClass = vertexOutputFormatClass;
132     this.conf = new HiveConf(getClass());
133   }
134 
135   /**
136   * main method
137   * @param args system arguments
138   * @throws Exception any errors from Hive Giraph Runner
139   */
140   public static void main(String[] args) throws Exception {
141     System.exit(ToolRunner.run(
142         new HCatGiraphRunner(null, null, null, null), args));
143   }
144 
145   @Override
146   public final int run(String[] args) throws Exception {
147     // process args
148     try {
149       processArguments(args);
150     } catch (InterruptedException e) {
151       return 0;
152     } catch (IllegalArgumentException e) {
153       System.err.println(e.getMessage());
154       return -1;
155     }
156 
157     // additional configuration for Hive
158     adjustConfigurationForHive(getConf());
159 
160     // setup GiraphJob
161     GiraphJob job = new GiraphJob(getConf(), getClass().getName());
162     job.getConfiguration().setComputationClass(computationClass);
163 
164     // setup input from Hive
165     if (vertexInputFormatClass != null) {
166       InputJobInfo vertexInputJobInfo = InputJobInfo.create(dbName,
167           vertexInputTableName, vertexInputTableFilterExpr);
168       GiraphHCatInputFormat.setVertexInput(job.getInternalJob(),
169           vertexInputJobInfo);
170       job.getConfiguration().setVertexInputFormatClass(vertexInputFormatClass);
171     }
172     if (edgeInputFormatClass != null) {
173       InputJobInfo edgeInputJobInfo = InputJobInfo.create(dbName,
174           edgeInputTableName, edgeInputTableFilterExpr);
175       GiraphHCatInputFormat.setEdgeInput(job.getInternalJob(),
176           edgeInputJobInfo);
177       job.getConfiguration().setEdgeInputFormatClass(edgeInputFormatClass);
178     }
179 
180     // setup output to Hive
181     HCatOutputFormat.setOutput(job.getInternalJob(), OutputJobInfo.create(
182         dbName, outputTableName, outputTablePartitionValues));
183     HCatOutputFormat.setSchema(job.getInternalJob(),
184         HCatOutputFormat.getTableSchema(job.getInternalJob()));
185     if (skipOutput) {
186       LOG.warn("run: Warning - Output will be skipped!");
187     } else {
188       job.getConfiguration().setVertexOutputFormatClass(
189           vertexOutputFormatClass);
190     }
191 
192     job.getConfiguration().setWorkerConfiguration(workers, workers, 100.0f);
193     initGiraphJob(job);
194 
195     return job.run(isVerbose) ? 0 : -1;
196   }
197 
198   /**
199   * set hive configuration
200   * @param conf Configuration argument
201   */
202   private static void adjustConfigurationForHive(Configuration conf) {
203     // when output partitions are used, workers register them to the
204     // metastore at cleanup stage, and on HiveConf's initialization, it
205     // looks for hive-site.xml from.
206     addToStringCollection(conf, "tmpfiles", conf.getClassLoader()
207         .getResource("hive-site.xml").toString());
208 
209     // Also, you need hive.aux.jars as well
210     // addToStringCollection(conf, "tmpjars",
211     // conf.getStringCollection("hive.aux.jars.path"));
212 
213     // Or, more effectively, we can provide all the jars client needed to
214     // the workers as well
215     String[] hadoopJars = System.getenv("HADOOP_CLASSPATH").split(
216         File.pathSeparator);
217     List<String> hadoopJarURLs = Lists.newArrayList();
218     for (String jarPath : hadoopJars) {
219       File file = new File(jarPath);
220       if (file.exists() && file.isFile()) {
221         String jarURL = file.toURI().toString();
222         hadoopJarURLs.add(jarURL);
223       }
224     }
225     addToStringCollection(conf, "tmpjars", hadoopJarURLs);
226   }
227 
228   /**
229   * process arguments
230   * @param args to process
231   * @return CommandLine instance
232   * @throws ParseException error parsing arguments
233   * @throws InterruptedException interrupted
234   */
235   private CommandLine processArguments(String[] args) throws ParseException,
236             InterruptedException {
237     Options options = new Options();
238     options.addOption("h", "help", false, "Help");
239     options.addOption("v", "verbose", false, "Verbose");
240     options.addOption("D", "hiveconf", true,
241                 "property=value for Hive/Hadoop configuration");
242     options.addOption("w", "workers", true, "Number of workers");
243     if (computationClass == null) {
244       options.addOption(null, "computationClass", true,
245           "Giraph Computation class to use");
246     }
247     if (vertexInputFormatClass == null) {
248       options.addOption(null, "vertexInputFormatClass", true,
249           "Giraph HCatalogVertexInputFormat class to use");
250     }
251     if (edgeInputFormatClass == null) {
252       options.addOption(null, "edgeInputFormatClass", true,
253           "Giraph HCatalogEdgeInputFormat class to use");
254     }
255 
256     if (vertexOutputFormatClass == null) {
257       options.addOption(null, "vertexOutputFormatClass", true,
258           "Giraph HCatalogVertexOutputFormat class to use");
259     }
260 
261     options.addOption("db", "dbName", true, "Hive database name");
262     options.addOption("vi", "vertexInputTable", true,
263         "Vertex input table name");
264     options.addOption("VI", "vertexInputFilter", true,
265         "Vertex input table filter expression (e.g., \"a<2 AND b='two'\"");
266     options.addOption("ei", "edgeInputTable", true,
267         "Edge input table name");
268     options.addOption("EI", "edgeInputFilter", true,
269         "Edge input table filter expression (e.g., \"a<2 AND b='two'\"");
270     options.addOption("o", "outputTable", true, "Output table name");
271     options.addOption("O", "outputPartition", true,
272         "Output table partition values (e.g., \"a=1,b=two\")");
273     options.addOption("s", "skipOutput", false, "Skip output?");
274 
275     addMoreOptions(options);
276 
277     CommandLineParser parser = new GnuParser();
278     final CommandLine cmdln = parser.parse(options, args);
279     if (args.length == 0 || cmdln.hasOption("help")) {
280       new HelpFormatter().printHelp(getClass().getName(), options, true);
281       throw new InterruptedException();
282     }
283 
284     // Giraph classes
285     if (cmdln.hasOption("computationClass")) {
286       computationClass = findClass(cmdln.getOptionValue("computationClass"),
287           Computation.class);
288     }
289     if (cmdln.hasOption("vertexInputFormatClass")) {
290       vertexInputFormatClass = findClass(
291           cmdln.getOptionValue("vertexInputFormatClass"),
292           HCatalogVertexInputFormat.class);
293     }
294     if (cmdln.hasOption("edgeInputFormatClass")) {
295       edgeInputFormatClass = findClass(
296           cmdln.getOptionValue("edgeInputFormatClass"),
297           HCatalogEdgeInputFormat.class);
298     }
299 
300     if (cmdln.hasOption("vertexOutputFormatClass")) {
301       vertexOutputFormatClass = findClass(
302           cmdln.getOptionValue("vertexOutputFormatClass"),
303           HCatalogVertexOutputFormat.class);
304     }
305 
306     if (cmdln.hasOption("skipOutput")) {
307       skipOutput = true;
308     }
309 
310     if (computationClass == null) {
311       throw new IllegalArgumentException(
312           "Need the Giraph Computation class name (-computationClass) to use");
313     }
314     if (vertexInputFormatClass == null && edgeInputFormatClass == null) {
315       throw new IllegalArgumentException(
316           "Need at least one of Giraph VertexInputFormat " +
317               "class name (-vertexInputFormatClass) and " +
318               "EdgeInputFormat class name (-edgeInputFormatClass)");
319     }
320     if (vertexOutputFormatClass == null) {
321       throw new IllegalArgumentException(
322           "Need the Giraph VertexOutputFormat " +
323               "class name (-vertexOutputFormatClass) to use");
324     }
325     if (!cmdln.hasOption("workers")) {
326       throw new IllegalArgumentException(
327           "Need to choose the number of workers (-w)");
328     }
329     if (!cmdln.hasOption("vertexInputTable") &&
330         vertexInputFormatClass != null) {
331       throw new IllegalArgumentException(
332           "Need to set the vertex input table name (-vi)");
333     }
334     if (!cmdln.hasOption("edgeInputTable") &&
335         edgeInputFormatClass != null) {
336       throw new IllegalArgumentException(
337           "Need to set the edge input table name (-ei)");
338     }
339     if (!cmdln.hasOption("outputTable")) {
340       throw new IllegalArgumentException(
341           "Need to set the output table name (-o)");
342     }
343     dbName = cmdln.getOptionValue("dbName", "default");
344     vertexInputTableName = cmdln.getOptionValue("vertexInputTable");
345     vertexInputTableFilterExpr = cmdln.getOptionValue("vertexInputFilter");
346     edgeInputTableName = cmdln.getOptionValue("edgeInputTable");
347     edgeInputTableFilterExpr = cmdln.getOptionValue("edgeInputFilter");
348     outputTableName = cmdln.getOptionValue("outputTable");
349     outputTablePartitionValues = HiveUtils.parsePartitionValues(cmdln
350                 .getOptionValue("outputPartition"));
351     workers = Integer.parseInt(cmdln.getOptionValue("workers"));
352     isVerbose = cmdln.hasOption("verbose");
353 
354     // pick up -hiveconf arguments
355     for (String hiveconf : cmdln.getOptionValues("hiveconf")) {
356       String[] keyval = hiveconf.split("=", 2);
357       if (keyval.length == 2) {
358         String name = keyval[0];
359         String value = keyval[1];
360         if (name.equals("tmpjars") || name.equals("tmpfiles")) {
361           addToStringCollection(
362                   conf, name, value);
363         } else {
364           conf.set(name, value);
365         }
366       }
367     }
368 
369     processMoreArguments(cmdln);
370 
371     return cmdln;
372   }
373 
374   /**
375   * add string to collection
376   * @param conf Configuration
377   * @param name name to add
378   * @param values values for collection
379   */
380   private static void addToStringCollection(Configuration conf, String name,
381                                               String... values) {
382     addToStringCollection(conf, name, Arrays.asList(values));
383   }
384 
385   /**
386   * add string to collection
387   * @param conf Configuration
388   * @param name to add
389   * @param values values for collection
390   */
391   private static void addToStringCollection(
392           Configuration conf, String name, Collection
393           <? extends String> values) {
394     Collection<String> tmpfiles = conf.getStringCollection(name);
395     tmpfiles.addAll(values);
396     conf.setStrings(name, tmpfiles.toArray(new String[tmpfiles.size()]));
397   }
398 
399   /**
400   *
401   * @param className to find
402   * @param base  base class
403   * @param <T> class type found
404   * @return type found
405   */
406   private <T> Class<? extends T> findClass(String className, Class<T> base) {
407     try {
408       Class<?> cls = Class.forName(className);
409       if (base.isAssignableFrom(cls)) {
410         return cls.asSubclass(base);
411       }
412       return null;
413     } catch (ClassNotFoundException e) {
414       throw new IllegalArgumentException(className + ": Invalid class name");
415     }
416   }
417 
418   @Override
419   public final Configuration getConf() {
420     return conf;
421   }
422 
423   @Override
424   public final void setConf(Configuration conf) {
425     this.conf = conf;
426   }
427 
428   /**
429   * Override this method to add more command-line options. You can process
430   * them by also overriding {@link #processMoreArguments(CommandLine)}.
431   *
432   * @param options Options
433   */
434   protected void addMoreOptions(Options options) {
435   }
436 
437   /**
438   * Override this method to process additional command-line arguments. You
439   * may want to declare additional options by also overriding
440   * {@link #addMoreOptions(Options)}.
441   *
442   * @param cmd Command
443   */
444   protected void processMoreArguments(CommandLine cmd) {
445   }
446 
447   /**
448   * Override this method to do additional setup with the GiraphJob that will
449   * run.
450   *
451   * @param job
452   *            GiraphJob that is going to run
453   */
454   protected void initGiraphJob(GiraphJob job) {
455     LOG.info(getClass().getSimpleName() + " with");
456     String prefix = "\t";
457     LOG.info(prefix + "-computationClass=" +
458          computationClass.getCanonicalName());
459     if (vertexInputFormatClass != null) {
460       LOG.info(prefix + "-vertexInputFormatClass=" +
461           vertexInputFormatClass.getCanonicalName());
462     }
463     if (edgeInputFormatClass != null) {
464       LOG.info(prefix + "-edgeInputFormatClass=" +
465           edgeInputFormatClass.getCanonicalName());
466     }
467     LOG.info(prefix + "-vertexOutputFormatClass=" +
468         vertexOutputFormatClass.getCanonicalName());
469     if (vertexInputTableName != null) {
470       LOG.info(prefix + "-vertexInputTable=" + vertexInputTableName);
471     }
472     if (vertexInputTableFilterExpr != null) {
473       LOG.info(prefix + "-vertexInputFilter=\"" +
474           vertexInputTableFilterExpr + "\"");
475     }
476     if (edgeInputTableName != null) {
477       LOG.info(prefix + "-edgeInputTable=" + edgeInputTableName);
478     }
479     if (edgeInputTableFilterExpr != null) {
480       LOG.info(prefix + "-edgeInputFilter=\"" +
481           edgeInputTableFilterExpr + "\"");
482     }
483     LOG.info(prefix + "-outputTable=" + outputTableName);
484     if (outputTablePartitionValues != null) {
485       LOG.info(prefix + "-outputPartition=\"" +
486           outputTablePartitionValues + "\"");
487     }
488     LOG.info(prefix + "-workers=" + workers);
489   }
490 }