This project has retired. For details please refer to its Attic page.
ConfigurationUtils xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.utils;
19  
20  import static org.apache.giraph.conf.GiraphConstants.COMPUTATION_CLASS;
21  import static org.apache.giraph.conf.GiraphConstants.TYPES_HOLDER_CLASS;
22  
23  import java.io.IOException;
24  import java.util.List;
25  
26  import org.apache.commons.cli.BasicParser;
27  import org.apache.commons.cli.CommandLine;
28  import org.apache.commons.cli.CommandLineParser;
29  import org.apache.commons.cli.HelpFormatter;
30  import org.apache.commons.cli.Options;
31  import org.apache.commons.cli.ParseException;
32  import org.apache.giraph.Algorithm;
33  import org.apache.giraph.aggregators.AggregatorWriter;
34  import org.apache.giraph.combiner.MessageCombiner;
35  import org.apache.giraph.conf.GiraphConfiguration;
36  import org.apache.giraph.conf.GiraphConfigurationSettable;
37  import org.apache.giraph.conf.GiraphConstants;
38  import org.apache.giraph.conf.GiraphTypes;
39  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
40  import org.apache.giraph.conf.TypesHolder;
41  import org.apache.giraph.edge.OutEdges;
42  import org.apache.giraph.factories.VertexValueFactory;
43  import org.apache.giraph.graph.Computation;
44  import org.apache.giraph.graph.Language;
45  import org.apache.giraph.graph.VertexValueCombiner;
46  import org.apache.giraph.io.EdgeInputFormat;
47  import org.apache.giraph.io.EdgeOutputFormat;
48  import org.apache.giraph.io.VertexInputFormat;
49  import org.apache.giraph.io.VertexOutputFormat;
50  import org.apache.giraph.io.formats.GiraphFileInputFormat;
51  import org.apache.giraph.job.GiraphConfigurationValidator;
52  import org.apache.giraph.jython.JythonUtils;
53  import org.apache.giraph.master.MasterCompute;
54  import org.apache.giraph.partition.Partition;
55  import org.apache.giraph.scripting.DeployType;
56  import org.apache.giraph.scripting.ScriptLoader;
57  import org.apache.giraph.worker.WorkerContext;
58  import org.apache.hadoop.conf.Configuration;
59  import org.apache.hadoop.fs.FileSystem;
60  import org.apache.hadoop.fs.Path;
61  import org.apache.log4j.Level;
62  import org.apache.log4j.Logger;
63  import org.apache.zookeeper.ZooKeeper;
64  
65  import com.google.common.base.Splitter;
66  import com.google.common.collect.Iterables;
67  
68  /**
69   * Translate command line args into Configuration Key-Value pairs.
70   */
71  public final class ConfigurationUtils {
72    /** Class logger */
73    private static final Logger LOG =
74      Logger.getLogger(ConfigurationUtils.class);
75  /*if[PURE_YARN]
76    // The base path for output dirs as saved in GiraphConfiguration
77    private static final Path BASE_OUTPUT_PATH;
78    static {
79      // whether local or remote, if there's no *-site.xml's to find, we're done
80      try {
81        BASE_OUTPUT_PATH = FileSystem.get(new Configuration()).getHomeDirectory();
82      } catch (IOException ioe) {
83        throw new IllegalStateException("Error locating default base path!", ioe);
84      }
85    }
86  end[PURE_YARN]*/
87    /** Maintains our accepted options in case the caller wants to add some */
88    private static Options OPTIONS;
89  
90    static {
91      OPTIONS = new Options();
92      OPTIONS.addOption("h", "help", false, "Help");
93      OPTIONS.addOption("la", "listAlgorithms", false, "List supported " +
94          "algorithms");
95      OPTIONS.addOption("q", "quiet", false, "Quiet output");
96      OPTIONS.addOption("yj", "yarnjars", true, "comma-separated list of JAR " +
97        "filenames to distribute to Giraph tasks and ApplicationMaster. " +
98        "YARN only. Search order: CLASSPATH, HADOOP_HOME, user current dir.");
99      OPTIONS.addOption("yh", "yarnheap", true, "Heap size, in MB, for each " +
100       "Giraph task (YARN only.) Defaults to " +
101       GiraphConstants.GIRAPH_YARN_TASK_HEAP_MB + " MB.");
102     OPTIONS.addOption("w", "workers", true, "Number of workers");
103     OPTIONS.addOption("vif", "vertexInputFormat", true, "Vertex input format");
104     OPTIONS.addOption("eif", "edgeInputFormat", true, "Edge input format");
105     OPTIONS.addOption("vof", "vertexOutputFormat", true,
106         "Vertex output format");
107     OPTIONS.addOption("eof", "edgeOutputFormat", true, "Edge output format");
108     OPTIONS.addOption("vip", "vertexInputPath", true, "Vertex input path");
109     OPTIONS.addOption("eip", "edgeInputPath", true, "Edge input path");
110     OPTIONS.addOption("op",  "outputPath", true, "Output path");
111     OPTIONS.addOption("vsd",  "vertexSubDir", true, "subdirectory to be used " +
112         "for the vertex output");
113     OPTIONS.addOption("esd",  "edgeSubDir", true, "subdirectory to be used " +
114         "for the edge output");
115     OPTIONS.addOption("c", "combiner", true, "MessageCombiner class");
116     OPTIONS.addOption("ve", "outEdges", true, "Vertex edges class");
117     OPTIONS.addOption("wc", "workerContext", true, "WorkerContext class");
118     OPTIONS.addOption("aw", "aggregatorWriter", true, "AggregatorWriter class");
119     OPTIONS.addOption("mc", "masterCompute", true, "MasterCompute class");
120     OPTIONS.addOption("cf", "cacheFile", true, "Files for distributed cache");
121     OPTIONS.addOption("pc", "partitionClass", true, "Partition class");
122     OPTIONS.addOption("vvf", "vertexValueFactoryClass", true,
123         "Vertex value factory class");
124     OPTIONS.addOption("th", "typesHolder", true,
125         "Class that holds types. Needed only if Computation is not set");
126     OPTIONS.addOption("jyc", "jythonClass", true,
127         "Jython class name, used if computation passed in is a python script");
128     OPTIONS.addOption("ca", "customArguments", true, "provide custom" +
129         " arguments for the job configuration in the form:" +
130         " -ca <param1>=<value1>,<param2>=<value2> -ca <param3>=<value3> etc." +
131         " It can appear multiple times, and the last one has effect" +
132         " for the same param.");
133   }
134 
135   /**
136    * No constructing this utility class
137    */
138   private ConfigurationUtils() { }
139 
140   /**
141    * Configure an object with an
142    * {@link org.apache.giraph.conf.ImmutableClassesGiraphConfiguration}
143    * if that objects supports it.
144    *
145    * @param object The object to configure
146    * @param configuration The configuration
147    */
148   public static void configureIfPossible(Object object,
149       ImmutableClassesGiraphConfiguration configuration) {
150     if (configuration != null) {
151       configuration.configureIfPossible(object);
152     } else if (object instanceof GiraphConfigurationSettable) {
153       throw new IllegalArgumentException(
154           "Trying to configure configurable object without value, " +
155           object.getClass());
156     }
157   }
158 
159   /**
160    * Get a class which is parameterized by the graph types defined by user.
161    * The types holder is actually an interface that any class which holds all of
162    * Giraph types can implement. It is used with reflection to infer the Giraph
163    * types.
164    *
165    * The current order of type holders we try are:
166    * 1) The {@link TypesHolder} class directly.
167    * 2) The {@link Computation} class, as that holds all the types.
168    *
169    * @param conf Configuration
170    * @return {@link TypesHolder} or null if could not find one.
171    */
172   public static Class<? extends TypesHolder> getTypesHolderClass(
173       Configuration conf) {
174     Class<? extends TypesHolder> klass = TYPES_HOLDER_CLASS.get(conf);
175     if (klass != null) {
176       return klass;
177     }
178     klass = COMPUTATION_CLASS.get(conf);
179     return klass;
180   }
181 
182   /**
183    * Translate CLI arguments to GiraphRunner or 'bin/hadoop jar' into
184    * Configuration Key-Value pairs.
185    * @param giraphConf the current job Configuration.
186    * @param args the raw CLI args to parse
187    * @return a CommandLine object, or null if the job run should exit.
188    */
189   public static CommandLine parseArgs(final GiraphConfiguration giraphConf,
190     final String[] args) throws ClassNotFoundException, ParseException,
191     IOException {
192     // verify we have args at all (can't run without them!)
193     if (args.length == 0) {
194       throw new IllegalArgumentException("No arguments were provided (try -h)");
195     }
196     CommandLineParser parser = new BasicParser();
197     CommandLine cmd = parser.parse(OPTIONS, args);
198 
199     // simply printing help or info, return normally but kill job run
200     if (cmd.hasOption("h")) {
201       printHelp();
202       return null;
203     }
204     if (cmd.hasOption("la")) {
205       printSupportedAlgorithms();
206       return null;
207     }
208 
209     // Be certain that there are no critical args missing, die if so.
210     performSanityCheck(cmd);
211 
212     // Args are OK; attempt to populate the GiraphConfiguration with them.
213     final String computationClassName = args[0];
214     final int workers = Integer.parseInt(cmd.getOptionValue('w'));
215     populateGiraphConfiguration(giraphConf, cmd, computationClassName, workers);
216 
217     // validate generic parameters chosen are correct or
218     // throw IllegalArgumentException, halting execution.
219     @SuppressWarnings("rawtypes")
220     GiraphConfigurationValidator<?, ?, ?, ?, ?> gtv =
221       new GiraphConfigurationValidator(giraphConf);
222     gtv.validateConfiguration();
223 
224     // successfully populated/validated GiraphConfiguration, ready to run job
225     return cmd;
226   }
227 
228   /**
229    * Callers can place additional options to be parsed and stored in our job's
230    * GiraphConfiguration via this utility call. These options will ONLY be
231    * parsed and placed into the CommandLine returned from <code>parseArgs</code>
232    * Calling code must query this CommandLine to take action on these options.
233    * @param opt short options name, i.e. -h
234    * @param longOpt long option name, i.e. --help
235    * @param hasArg should we expect an argument for this option?
236    * @param description English description of this option.
237    */
238   public static void addOption(final String opt, final String longOpt,
239     final boolean hasArg, final String description) {
240     if (OPTIONS.hasOption(opt)) {
241       printHelp();
242       throw new IllegalArgumentException("GiraphConfiguration already " +
243         "provides a '" + opt + "' option, please choose another identifier.");
244     }
245     OPTIONS.addOption(opt, longOpt, hasArg, description);
246   }
247 
248   /**
249    * Utility to check mission-critical args are populated. The validity of
250    * the values provided in these args is checked elsewhere.
251    * @param cmd our parsed CommandLine
252    */
253   private static void performSanityCheck(final CommandLine cmd) {
254     // Verify all the required options have been provided
255     if (!cmd.hasOption("w")) {
256       throw new IllegalArgumentException("Need to choose the " +
257         "number of workers (-w)");
258     }
259     if (!cmd.hasOption("vif") && !cmd.hasOption("eif")) {
260       throw new IllegalArgumentException("Need to set an input " +
261         "format (-vif or -eif)");
262     }
263   }
264 
265   /**
266    * Populate GiraphConfiguration for this job with all cmd line args found.
267    * Any global configuration data that Giraph on any platform might need
268    * should be captured here.
269    *
270    * @param conf config for this job run
271    * @param cmd parsed command line options to store in conf
272    * @param computationClassName the computation class (application) to run in
273    *                             this job.
274    * @param workers the number of worker tasks for this job run.
275    */
276   private static void populateGiraphConfiguration(final GiraphConfiguration
277     conf, final CommandLine cmd,
278     final String computationClassName, final int workers)
279     throws ClassNotFoundException, IOException {
280     conf.setWorkerConfiguration(workers, workers, 100.0f);
281     if (cmd.hasOption("typesHolder")) {
282       Class<? extends TypesHolder> typesHolderClass =
283           (Class<? extends TypesHolder>)
284               Class.forName(cmd.getOptionValue("typesHolder"));
285       TYPES_HOLDER_CLASS.set(conf, typesHolderClass);
286     }
287     if (cmd.hasOption("c")) {
288       conf.setMessageCombinerClass(
289           (Class<? extends MessageCombiner>)
290               Class.forName(cmd.getOptionValue("c")));
291     }
292     if (cmd.hasOption("vc")) {
293       conf.setVertexValueCombinerClass(
294           (Class<? extends VertexValueCombiner>)
295               Class.forName(cmd.getOptionValue("vc")));
296     }
297     if (cmd.hasOption("ve")) {
298       conf.setOutEdgesClass(
299           (Class<? extends OutEdges>) Class.forName(cmd.getOptionValue("ve")));
300     }
301     if (cmd.hasOption("ive")) {
302       conf.setInputOutEdgesClass(
303           (Class<? extends OutEdges>) Class.forName(cmd.getOptionValue("ive")));
304     }
305     if (cmd.hasOption("wc")) {
306       conf.setWorkerContextClass(
307           (Class<? extends WorkerContext>) Class
308               .forName(cmd.getOptionValue("wc")));
309     }
310     if (cmd.hasOption("mc")) {
311       conf.setMasterComputeClass(
312           (Class<? extends MasterCompute>) Class
313               .forName(cmd.getOptionValue("mc")));
314     }
315     if (cmd.hasOption("aw")) {
316       conf.setAggregatorWriterClass(
317           (Class<? extends AggregatorWriter>) Class
318               .forName(cmd.getOptionValue("aw")));
319     }
320     if (cmd.hasOption("vif")) {
321       conf.setVertexInputFormatClass(
322           (Class<? extends VertexInputFormat>) Class
323               .forName(cmd.getOptionValue("vif")));
324     } else {
325       if (LOG.isInfoEnabled()) {
326         LOG.info("No vertex input format specified. Ensure your " +
327           "InputFormat does not require one.");
328       }
329     }
330     if (cmd.hasOption("eif")) {
331       conf.setEdgeInputFormatClass(
332           (Class<? extends EdgeInputFormat>) Class
333               .forName(cmd.getOptionValue("eif")));
334     } else {
335       if (LOG.isInfoEnabled()) {
336         LOG.info("No edge input format specified. Ensure your " +
337           "InputFormat does not require one.");
338       }
339     }
340     if (cmd.hasOption("vof")) {
341       conf.setVertexOutputFormatClass(
342           (Class<? extends VertexOutputFormat>) Class
343               .forName(cmd.getOptionValue("vof")));
344     } else {
345       if (LOG.isInfoEnabled()) {
346         LOG.info("No vertex output format specified. Ensure your " +
347           "OutputFormat does not require one.");
348       }
349     }
350     if (cmd.hasOption("vof")) {
351       if (cmd.hasOption("vsd")) {
352         conf.setVertexOutputFormatSubdir(cmd.getOptionValue("vsd"));
353       }
354     }
355     if (cmd.hasOption("eof")) {
356       conf.setEdgeOutputFormatClass(
357           (Class<? extends EdgeOutputFormat>) Class
358               .forName(cmd.getOptionValue("eof")));
359     } else {
360       if (LOG.isInfoEnabled()) {
361         LOG.info("No edge output format specified. Ensure your " +
362           "OutputFormat does not require one.");
363       }
364     }
365     if (cmd.hasOption("eof")) {
366       if (cmd.hasOption("esd")) {
367         conf.setEdgeOutputFormatSubdir(cmd.getOptionValue("esd"));
368       }
369     }
370     /* check for path clashes */
371     if (cmd.hasOption("vof") && cmd.hasOption("eof") && cmd.hasOption("op")) {
372       if (!cmd.hasOption("vsd") || cmd.hasOption("esd")) {
373         if (!conf.hasEdgeOutputFormatSubdir() ||
374             !conf.hasVertexOutputFormatSubdir()) {
375 
376           throw new IllegalArgumentException("If VertexOutputFormat and " +
377               "EdgeOutputFormat are both set, it is mandatory to provide " +
378               "both vertex subdirectory as well as edge subdirectory");
379         }
380       }
381     }
382     if (cmd.hasOption("pc")) {
383       conf.setPartitionClass(
384           (Class<? extends Partition>) Class.forName(cmd.getOptionValue("pc")));
385     }
386     if (cmd.hasOption("vvf")) {
387       conf.setVertexValueFactoryClass(
388           (Class<? extends VertexValueFactory>) Class
389               .forName(cmd.getOptionValue("vvf")));
390     }
391     if (cmd.hasOption("ca")) {
392       for (String caOptionValue : cmd.getOptionValues("ca")) {
393         for (String paramValue :
394             Splitter.on(',').split(caOptionValue)) {
395           String[] parts = Iterables.toArray(Splitter.on('=').split(paramValue),
396                                               String.class);
397           if (parts.length != 2) {
398             throw new IllegalArgumentException("Unable to parse custom " +
399                 " argument: " + paramValue);
400           }
401           if (LOG.isInfoEnabled()) {
402             LOG.info("Setting custom argument [" + parts[0] + "] to [" +
403                 parts[1] + "] in GiraphConfiguration");
404           }
405           conf.set(parts[0], parts[1]);
406         }
407       }
408     }
409     // Now, we parse options that are specific to Hadoop MR Job
410     if (cmd.hasOption("vif")) {
411       if (cmd.hasOption("vip")) {
412         if (FileSystem.get(new Configuration()).listStatus(
413               new Path(cmd.getOptionValue("vip"))) == null) {
414           throw new IllegalArgumentException(
415               "Invalid vertex input path (-vip): " +
416               cmd.getOptionValue("vip"));
417         }
418         GiraphFileInputFormat.addVertexInputPath(conf,
419           new Path(cmd.getOptionValue("vip")));
420       } else {
421         if (LOG.isInfoEnabled()) {
422           LOG.info("No input path for vertex data was specified. Ensure your " +
423             "InputFormat does not require one.");
424         }
425       }
426     }
427     if (cmd.hasOption("eif")) {
428       if (cmd.hasOption("eip")) {
429         if (FileSystem.get(new Configuration()).listStatus(
430               new Path(cmd.getOptionValue("eip"))) == null) {
431           throw new IllegalArgumentException(
432               "Invalid edge input path (-eip): " +
433               cmd.getOptionValue("eip"));
434         }
435         GiraphFileInputFormat.addEdgeInputPath(conf,
436           new Path(cmd.getOptionValue("eip")));
437       } else {
438         if (LOG.isInfoEnabled()) {
439           LOG.info("No input path for edge data was specified. Ensure your " +
440             "InputFormat does not require one.");
441         }
442       }
443     }
444 
445     // YARN-ONLY OPTIONS
446     if (cmd.hasOption("yj")) {
447       conf.setYarnLibJars(cmd.getOptionValue("yj"));
448     }
449     if (cmd.hasOption("yh")) {
450       conf.setYarnTaskHeapMb(
451           Integer.parseInt(cmd.getOptionValue("yh")));
452     }
453 /*if[PURE_YARN]
454     if (cmd.hasOption("vof") || cmd.hasOption("eof")) {
455       if (cmd.hasOption("op")) {
456         // For YARN conf to get the out dir we need w/o a Job obj
457         Path outputDir =
458             new Path(BASE_OUTPUT_PATH, cmd.getOptionValue("op"));
459         outputDir =
460           outputDir.getFileSystem(conf).makeQualified(outputDir);
461         conf.set(
462             org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR,
463             outputDir.toString());
464 
465       } else {
466         if (LOG.isInfoEnabled()) {
467           LOG.info("No output path specified. Ensure your OutputFormat " +
468             "does not require one.");
469         }
470       }
471     }
472 end[PURE_YARN]*/
473     // END YARN-ONLY OPTIONS
474     handleComputationClass(conf, cmd, computationClassName);
475   }
476 
477   /**
478    * Helper to deal with computation class.
479    *
480    * @param conf Configuration
481    * @param cmd CommandLine
482    * @param computationClassName Name of computation
483    * @throws ClassNotFoundException error finding class
484    */
485   private static void handleComputationClass(GiraphConfiguration conf,
486     CommandLine cmd, String computationClassName)
487     throws ClassNotFoundException {
488     if (computationClassName.endsWith("py")) {
489       handleJythonComputation(conf, cmd, computationClassName);
490     } else {
491       conf.setComputationClass(
492           (Class<? extends Computation>) Class.forName(computationClassName));
493     }
494   }
495 
496   /**
497    * Helper to handle Computations implemented in Python.
498    *
499    * @param conf Configuration
500    * @param cmd CommandLine
501    * @param scriptPath Path to python script
502    */
503   private static void handleJythonComputation(GiraphConfiguration conf,
504     CommandLine cmd, String scriptPath) {
505     String jythonClass = cmd.getOptionValue("jythonClass");
506     if (jythonClass == null) {
507       throw new IllegalArgumentException(
508           "handleJythonComputation: Need to set Jython Computation class " +
509           "name with --jythonClass");
510     }
511     String typesHolderClass = cmd.getOptionValue("typesHolder");
512     if (typesHolderClass == null) {
513       throw new IllegalArgumentException(
514           "handleJythonComputation: Need to set TypesHolder class name " +
515           "with --typesHolder");
516     }
517 
518     Path path = new Path(scriptPath);
519     Path remotePath = DistributedCacheUtils.copyAndAdd(path, conf);
520 
521     ScriptLoader.setScriptsToLoad(conf, remotePath.toString(),
522         DeployType.DISTRIBUTED_CACHE, Language.JYTHON);
523 
524     GiraphTypes.readFrom(conf).writeIfUnset(conf);
525     JythonUtils.init(conf, jythonClass);
526   }
527 
528   /**
529    * Utility to print CLI help messsage for registered options.
530    */
531   private static void printHelp() {
532     HelpFormatter formatter = new HelpFormatter();
533     formatter.printHelp(ConfigurationUtils.class.getName(), OPTIONS, true);
534   }
535 
536   /**
537    * Prints description of algorithms annotated with
538    * {@link org.apache.giraph.Algorithm}
539    */
540   private static void printSupportedAlgorithms() {
541     Logger.getLogger(ZooKeeper.class).setLevel(Level.OFF);
542 
543     List<Class<?>> classes = AnnotationUtils.getAnnotatedClasses(
544       Algorithm.class, "org.apache.giraph");
545     System.out.print("  Supported algorithms:\n");
546     for (Class<?> clazz : classes) {
547       if (Computation.class.isAssignableFrom(clazz)) {
548         Algorithm algorithm = clazz.getAnnotation(Algorithm.class);
549         StringBuilder sb = new StringBuilder();
550         sb.append(algorithm.name()).append(" - ").append(clazz.getName())
551             .append("\n");
552         if (!algorithm.description().equals("")) {
553           sb.append("    ").append(algorithm.description()).append("\n");
554         }
555         System.out.print(sb.toString());
556       }
557     }
558   }
559 }