This project has retired. For details please refer to its
        
        Attic page.
      
 
ConfigurationUtils xref
1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  package org.apache.giraph.utils;
19  
20  import static org.apache.giraph.conf.GiraphConstants.COMPUTATION_CLASS;
21  import static org.apache.giraph.conf.GiraphConstants.TYPES_HOLDER_CLASS;
22  
23  import java.io.IOException;
24  import java.util.List;
25  
26  import org.apache.commons.cli.BasicParser;
27  import org.apache.commons.cli.CommandLine;
28  import org.apache.commons.cli.CommandLineParser;
29  import org.apache.commons.cli.HelpFormatter;
30  import org.apache.commons.cli.Options;
31  import org.apache.commons.cli.ParseException;
32  import org.apache.giraph.Algorithm;
33  import org.apache.giraph.aggregators.AggregatorWriter;
34  import org.apache.giraph.combiner.MessageCombiner;
35  import org.apache.giraph.conf.GiraphConfiguration;
36  import org.apache.giraph.conf.GiraphConfigurationSettable;
37  import org.apache.giraph.conf.GiraphConstants;
38  import org.apache.giraph.conf.GiraphTypes;
39  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
40  import org.apache.giraph.conf.TypesHolder;
41  import org.apache.giraph.edge.OutEdges;
42  import org.apache.giraph.factories.VertexValueFactory;
43  import org.apache.giraph.graph.Computation;
44  import org.apache.giraph.graph.Language;
45  import org.apache.giraph.graph.VertexValueCombiner;
46  import org.apache.giraph.io.EdgeInputFormat;
47  import org.apache.giraph.io.EdgeOutputFormat;
48  import org.apache.giraph.io.VertexInputFormat;
49  import org.apache.giraph.io.VertexOutputFormat;
50  import org.apache.giraph.io.formats.GiraphFileInputFormat;
51  import org.apache.giraph.job.GiraphConfigurationValidator;
52  import org.apache.giraph.jython.JythonUtils;
53  import org.apache.giraph.master.MasterCompute;
54  import org.apache.giraph.partition.Partition;
55  import org.apache.giraph.scripting.DeployType;
56  import org.apache.giraph.scripting.ScriptLoader;
57  import org.apache.giraph.worker.WorkerContext;
58  import org.apache.hadoop.conf.Configuration;
59  import org.apache.hadoop.fs.FileSystem;
60  import org.apache.hadoop.fs.Path;
61  import org.apache.log4j.Level;
62  import org.apache.log4j.Logger;
63  import org.apache.zookeeper.ZooKeeper;
64  
65  import com.google.common.base.Splitter;
66  import com.google.common.collect.Iterables;
67  
68  
69  
70  
71  public final class ConfigurationUtils {
72    
73    private static final Logger LOG =
74      Logger.getLogger(ConfigurationUtils.class);
75  
76  
77  
78  
79  
80  
81  
82  
83  
84  
85  
86  
87    
88    private static Options OPTIONS;
89  
90    static {
91      OPTIONS = new Options();
92      OPTIONS.addOption("h", "help", false, "Help");
93      OPTIONS.addOption("la", "listAlgorithms", false, "List supported " +
94          "algorithms");
95      OPTIONS.addOption("q", "quiet", false, "Quiet output");
96      OPTIONS.addOption("yj", "yarnjars", true, "comma-separated list of JAR " +
97        "filenames to distribute to Giraph tasks and ApplicationMaster. " +
98        "YARN only. Search order: CLASSPATH, HADOOP_HOME, user current dir.");
99      OPTIONS.addOption("yh", "yarnheap", true, "Heap size, in MB, for each " +
100       "Giraph task (YARN only.) Defaults to " +
101       GiraphConstants.GIRAPH_YARN_TASK_HEAP_MB + " MB.");
102     OPTIONS.addOption("w", "workers", true, "Number of workers");
103     OPTIONS.addOption("vif", "vertexInputFormat", true, "Vertex input format");
104     OPTIONS.addOption("eif", "edgeInputFormat", true, "Edge input format");
105     OPTIONS.addOption("vof", "vertexOutputFormat", true,
106         "Vertex output format");
107     OPTIONS.addOption("eof", "edgeOutputFormat", true, "Edge output format");
108     OPTIONS.addOption("vip", "vertexInputPath", true, "Vertex input path");
109     OPTIONS.addOption("eip", "edgeInputPath", true, "Edge input path");
110     OPTIONS.addOption("op",  "outputPath", true, "Output path");
111     OPTIONS.addOption("vsd",  "vertexSubDir", true, "subdirectory to be used " +
112         "for the vertex output");
113     OPTIONS.addOption("esd",  "edgeSubDir", true, "subdirectory to be used " +
114         "for the edge output");
115     OPTIONS.addOption("c", "combiner", true, "MessageCombiner class");
116     OPTIONS.addOption("ve", "outEdges", true, "Vertex edges class");
117     OPTIONS.addOption("wc", "workerContext", true, "WorkerContext class");
118     OPTIONS.addOption("aw", "aggregatorWriter", true, "AggregatorWriter class");
119     OPTIONS.addOption("mc", "masterCompute", true, "MasterCompute class");
120     OPTIONS.addOption("cf", "cacheFile", true, "Files for distributed cache");
121     OPTIONS.addOption("pc", "partitionClass", true, "Partition class");
122     OPTIONS.addOption("vvf", "vertexValueFactoryClass", true,
123         "Vertex value factory class");
124     OPTIONS.addOption("th", "typesHolder", true,
125         "Class that holds types. Needed only if Computation is not set");
126     OPTIONS.addOption("jyc", "jythonClass", true,
127         "Jython class name, used if computation passed in is a python script");
128     OPTIONS.addOption("ca", "customArguments", true, "provide custom" +
129         " arguments for the job configuration in the form:" +
130         " -ca <param1>=<value1>,<param2>=<value2> -ca <param3>=<value3> etc." +
131         " It can appear multiple times, and the last one has effect" +
132         " for the same param.");
133   }
134 
135   
136 
137 
138   private ConfigurationUtils() { }
139 
140   
141 
142 
143 
144 
145 
146 
147 
148   public static void configureIfPossible(Object object,
149       ImmutableClassesGiraphConfiguration configuration) {
150     if (configuration != null) {
151       configuration.configureIfPossible(object);
152     } else if (object instanceof GiraphConfigurationSettable) {
153       throw new IllegalArgumentException(
154           "Trying to configure configurable object without value, " +
155           object.getClass());
156     }
157   }
158 
159   
160 
161 
162 
163 
164 
165 
166 
167 
168 
169 
170 
171 
172   public static Class<? extends TypesHolder> getTypesHolderClass(
173       Configuration conf) {
174     Class<? extends TypesHolder> klass = TYPES_HOLDER_CLASS.get(conf);
175     if (klass != null) {
176       return klass;
177     }
178     klass = COMPUTATION_CLASS.get(conf);
179     return klass;
180   }
181 
182   
183 
184 
185 
186 
187 
188 
189   public static CommandLine parseArgs(final GiraphConfiguration giraphConf,
190     final String[] args) throws ClassNotFoundException, ParseException,
191     IOException {
192     
193     if (args.length == 0) {
194       throw new IllegalArgumentException("No arguments were provided (try -h)");
195     }
196     CommandLineParser parser = new BasicParser();
197     CommandLine cmd = parser.parse(OPTIONS, args);
198 
199     
200     if (cmd.hasOption("h")) {
201       printHelp();
202       return null;
203     }
204     if (cmd.hasOption("la")) {
205       printSupportedAlgorithms();
206       return null;
207     }
208 
209     
210     performSanityCheck(cmd);
211 
212     
213     final String computationClassName = args[0];
214     final int workers = Integer.parseInt(cmd.getOptionValue('w'));
215     populateGiraphConfiguration(giraphConf, cmd, computationClassName, workers);
216 
217     
218     
219     @SuppressWarnings("rawtypes")
220     GiraphConfigurationValidator<?, ?, ?, ?, ?> gtv =
221       new GiraphConfigurationValidator(giraphConf);
222     gtv.validateConfiguration();
223 
224     
225     return cmd;
226   }
227 
228   
229 
230 
231 
232 
233 
234 
235 
236 
237 
238   public static void addOption(final String opt, final String longOpt,
239     final boolean hasArg, final String description) {
240     if (OPTIONS.hasOption(opt)) {
241       printHelp();
242       throw new IllegalArgumentException("GiraphConfiguration already " +
243         "provides a '" + opt + "' option, please choose another identifier.");
244     }
245     OPTIONS.addOption(opt, longOpt, hasArg, description);
246   }
247 
248   
249 
250 
251 
252 
253   private static void performSanityCheck(final CommandLine cmd) {
254     
255     if (!cmd.hasOption("w")) {
256       throw new IllegalArgumentException("Need to choose the " +
257         "number of workers (-w)");
258     }
259     if (!cmd.hasOption("vif") && !cmd.hasOption("eif")) {
260       throw new IllegalArgumentException("Need to set an input " +
261         "format (-vif or -eif)");
262     }
263   }
264 
265   
266 
267 
268 
269 
270 
271 
272 
273 
274 
275 
276   private static void populateGiraphConfiguration(final GiraphConfiguration
277     conf, final CommandLine cmd,
278     final String computationClassName, final int workers)
279     throws ClassNotFoundException, IOException {
280     conf.setWorkerConfiguration(workers, workers, 100.0f);
281     if (cmd.hasOption("typesHolder")) {
282       Class<? extends TypesHolder> typesHolderClass =
283           (Class<? extends TypesHolder>)
284               Class.forName(cmd.getOptionValue("typesHolder"));
285       TYPES_HOLDER_CLASS.set(conf, typesHolderClass);
286     }
287     if (cmd.hasOption("c")) {
288       conf.setMessageCombinerClass(
289           (Class<? extends MessageCombiner>)
290               Class.forName(cmd.getOptionValue("c")));
291     }
292     if (cmd.hasOption("vc")) {
293       conf.setVertexValueCombinerClass(
294           (Class<? extends VertexValueCombiner>)
295               Class.forName(cmd.getOptionValue("vc")));
296     }
297     if (cmd.hasOption("ve")) {
298       conf.setOutEdgesClass(
299           (Class<? extends OutEdges>) Class.forName(cmd.getOptionValue("ve")));
300     }
301     if (cmd.hasOption("ive")) {
302       conf.setInputOutEdgesClass(
303           (Class<? extends OutEdges>) Class.forName(cmd.getOptionValue("ive")));
304     }
305     if (cmd.hasOption("wc")) {
306       conf.setWorkerContextClass(
307           (Class<? extends WorkerContext>) Class
308               .forName(cmd.getOptionValue("wc")));
309     }
310     if (cmd.hasOption("mc")) {
311       conf.setMasterComputeClass(
312           (Class<? extends MasterCompute>) Class
313               .forName(cmd.getOptionValue("mc")));
314     }
315     if (cmd.hasOption("aw")) {
316       conf.setAggregatorWriterClass(
317           (Class<? extends AggregatorWriter>) Class
318               .forName(cmd.getOptionValue("aw")));
319     }
320     if (cmd.hasOption("vif")) {
321       conf.setVertexInputFormatClass(
322           (Class<? extends VertexInputFormat>) Class
323               .forName(cmd.getOptionValue("vif")));
324     } else {
325       if (LOG.isInfoEnabled()) {
326         LOG.info("No vertex input format specified. Ensure your " +
327           "InputFormat does not require one.");
328       }
329     }
330     if (cmd.hasOption("eif")) {
331       conf.setEdgeInputFormatClass(
332           (Class<? extends EdgeInputFormat>) Class
333               .forName(cmd.getOptionValue("eif")));
334     } else {
335       if (LOG.isInfoEnabled()) {
336         LOG.info("No edge input format specified. Ensure your " +
337           "InputFormat does not require one.");
338       }
339     }
340     if (cmd.hasOption("vof")) {
341       conf.setVertexOutputFormatClass(
342           (Class<? extends VertexOutputFormat>) Class
343               .forName(cmd.getOptionValue("vof")));
344     } else {
345       if (LOG.isInfoEnabled()) {
346         LOG.info("No vertex output format specified. Ensure your " +
347           "OutputFormat does not require one.");
348       }
349     }
350     if (cmd.hasOption("vof")) {
351       if (cmd.hasOption("vsd")) {
352         conf.setVertexOutputFormatSubdir(cmd.getOptionValue("vsd"));
353       }
354     }
355     if (cmd.hasOption("eof")) {
356       conf.setEdgeOutputFormatClass(
357           (Class<? extends EdgeOutputFormat>) Class
358               .forName(cmd.getOptionValue("eof")));
359     } else {
360       if (LOG.isInfoEnabled()) {
361         LOG.info("No edge output format specified. Ensure your " +
362           "OutputFormat does not require one.");
363       }
364     }
365     if (cmd.hasOption("eof")) {
366       if (cmd.hasOption("esd")) {
367         conf.setEdgeOutputFormatSubdir(cmd.getOptionValue("esd"));
368       }
369     }
370     
371     if (cmd.hasOption("vof") && cmd.hasOption("eof") && cmd.hasOption("op")) {
372       if (!cmd.hasOption("vsd") || cmd.hasOption("esd")) {
373         if (!conf.hasEdgeOutputFormatSubdir() ||
374             !conf.hasVertexOutputFormatSubdir()) {
375 
376           throw new IllegalArgumentException("If VertexOutputFormat and " +
377               "EdgeOutputFormat are both set, it is mandatory to provide " +
378               "both vertex subdirectory as well as edge subdirectory");
379         }
380       }
381     }
382     if (cmd.hasOption("pc")) {
383       conf.setPartitionClass(
384           (Class<? extends Partition>) Class.forName(cmd.getOptionValue("pc")));
385     }
386     if (cmd.hasOption("vvf")) {
387       conf.setVertexValueFactoryClass(
388           (Class<? extends VertexValueFactory>) Class
389               .forName(cmd.getOptionValue("vvf")));
390     }
391     if (cmd.hasOption("ca")) {
392       for (String caOptionValue : cmd.getOptionValues("ca")) {
393         for (String paramValue :
394             Splitter.on(',').split(caOptionValue)) {
395           String[] parts = Iterables.toArray(Splitter.on('=').split(paramValue),
396                                               String.class);
397           if (parts.length != 2) {
398             throw new IllegalArgumentException("Unable to parse custom " +
399                 " argument: " + paramValue);
400           }
401           if (LOG.isInfoEnabled()) {
402             LOG.info("Setting custom argument [" + parts[0] + "] to [" +
403                 parts[1] + "] in GiraphConfiguration");
404           }
405           conf.set(parts[0], parts[1]);
406         }
407       }
408     }
409     
410     if (cmd.hasOption("vif")) {
411       if (cmd.hasOption("vip")) {
412         if (FileSystem.get(new Configuration()).listStatus(
413               new Path(cmd.getOptionValue("vip"))) == null) {
414           throw new IllegalArgumentException(
415               "Invalid vertex input path (-vip): " +
416               cmd.getOptionValue("vip"));
417         }
418         GiraphFileInputFormat.addVertexInputPath(conf,
419           new Path(cmd.getOptionValue("vip")));
420       } else {
421         if (LOG.isInfoEnabled()) {
422           LOG.info("No input path for vertex data was specified. Ensure your " +
423             "InputFormat does not require one.");
424         }
425       }
426     }
427     if (cmd.hasOption("eif")) {
428       if (cmd.hasOption("eip")) {
429         if (FileSystem.get(new Configuration()).listStatus(
430               new Path(cmd.getOptionValue("eip"))) == null) {
431           throw new IllegalArgumentException(
432               "Invalid edge input path (-eip): " +
433               cmd.getOptionValue("eip"));
434         }
435         GiraphFileInputFormat.addEdgeInputPath(conf,
436           new Path(cmd.getOptionValue("eip")));
437       } else {
438         if (LOG.isInfoEnabled()) {
439           LOG.info("No input path for edge data was specified. Ensure your " +
440             "InputFormat does not require one.");
441         }
442       }
443     }
444 
445     
446     if (cmd.hasOption("yj")) {
447       conf.setYarnLibJars(cmd.getOptionValue("yj"));
448     }
449     if (cmd.hasOption("yh")) {
450       conf.setYarnTaskHeapMb(
451           Integer.parseInt(cmd.getOptionValue("yh")));
452     }
453 
454 
455 
456 
457 
458 
459 
460 
461 
462 
463 
464 
465 
466 
467 
468 
469 
470 
471 
472 
473     
474     handleComputationClass(conf, cmd, computationClassName);
475   }
476 
477   
478 
479 
480 
481 
482 
483 
484 
485   private static void handleComputationClass(GiraphConfiguration conf,
486     CommandLine cmd, String computationClassName)
487     throws ClassNotFoundException {
488     if (computationClassName.endsWith("py")) {
489       handleJythonComputation(conf, cmd, computationClassName);
490     } else {
491       conf.setComputationClass(
492           (Class<? extends Computation>) Class.forName(computationClassName));
493     }
494   }
495 
496   
497 
498 
499 
500 
501 
502 
503   private static void handleJythonComputation(GiraphConfiguration conf,
504     CommandLine cmd, String scriptPath) {
505     String jythonClass = cmd.getOptionValue("jythonClass");
506     if (jythonClass == null) {
507       throw new IllegalArgumentException(
508           "handleJythonComputation: Need to set Jython Computation class " +
509           "name with --jythonClass");
510     }
511     String typesHolderClass = cmd.getOptionValue("typesHolder");
512     if (typesHolderClass == null) {
513       throw new IllegalArgumentException(
514           "handleJythonComputation: Need to set TypesHolder class name " +
515           "with --typesHolder");
516     }
517 
518     Path path = new Path(scriptPath);
519     Path remotePath = DistributedCacheUtils.copyAndAdd(path, conf);
520 
521     ScriptLoader.setScriptsToLoad(conf, remotePath.toString(),
522         DeployType.DISTRIBUTED_CACHE, Language.JYTHON);
523 
524     GiraphTypes.readFrom(conf).writeIfUnset(conf);
525     JythonUtils.init(conf, jythonClass);
526   }
527 
528   
529 
530 
531   private static void printHelp() {
532     HelpFormatter formatter = new HelpFormatter();
533     formatter.printHelp(ConfigurationUtils.class.getName(), OPTIONS, true);
534   }
535 
536   
537 
538 
539 
540   private static void printSupportedAlgorithms() {
541     Logger.getLogger(ZooKeeper.class).setLevel(Level.OFF);
542 
543     List<Class<?>> classes = AnnotationUtils.getAnnotatedClasses(
544       Algorithm.class, "org.apache.giraph");
545     System.out.print("  Supported algorithms:\n");
546     for (Class<?> clazz : classes) {
547       if (Computation.class.isAssignableFrom(clazz)) {
548         Algorithm algorithm = clazz.getAnnotation(Algorithm.class);
549         StringBuilder sb = new StringBuilder();
550         sb.append(algorithm.name()).append(" - ").append(clazz.getName())
551             .append("\n");
552         if (!algorithm.description().equals("")) {
553           sb.append("    ").append(algorithm.description()).append("\n");
554         }
555         System.out.print(sb.toString());
556       }
557     }
558   }
559 }