This project has retired. For details please refer to its
Attic page.
ConfigurationUtils xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.utils;
19
20 import static org.apache.giraph.conf.GiraphConstants.COMPUTATION_CLASS;
21 import static org.apache.giraph.conf.GiraphConstants.TYPES_HOLDER_CLASS;
22
23 import java.io.IOException;
24 import java.util.List;
25
26 import org.apache.commons.cli.BasicParser;
27 import org.apache.commons.cli.CommandLine;
28 import org.apache.commons.cli.CommandLineParser;
29 import org.apache.commons.cli.HelpFormatter;
30 import org.apache.commons.cli.Options;
31 import org.apache.commons.cli.ParseException;
32 import org.apache.giraph.Algorithm;
33 import org.apache.giraph.aggregators.AggregatorWriter;
34 import org.apache.giraph.combiner.MessageCombiner;
35 import org.apache.giraph.conf.GiraphConfiguration;
36 import org.apache.giraph.conf.GiraphConfigurationSettable;
37 import org.apache.giraph.conf.GiraphConstants;
38 import org.apache.giraph.conf.GiraphTypes;
39 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
40 import org.apache.giraph.conf.TypesHolder;
41 import org.apache.giraph.edge.OutEdges;
42 import org.apache.giraph.factories.VertexValueFactory;
43 import org.apache.giraph.graph.Computation;
44 import org.apache.giraph.graph.Language;
45 import org.apache.giraph.graph.VertexValueCombiner;
46 import org.apache.giraph.io.EdgeInputFormat;
47 import org.apache.giraph.io.EdgeOutputFormat;
48 import org.apache.giraph.io.VertexInputFormat;
49 import org.apache.giraph.io.VertexOutputFormat;
50 import org.apache.giraph.io.formats.GiraphFileInputFormat;
51 import org.apache.giraph.job.GiraphConfigurationValidator;
52 import org.apache.giraph.jython.JythonUtils;
53 import org.apache.giraph.master.MasterCompute;
54 import org.apache.giraph.partition.Partition;
55 import org.apache.giraph.scripting.DeployType;
56 import org.apache.giraph.scripting.ScriptLoader;
57 import org.apache.giraph.worker.WorkerContext;
58 import org.apache.hadoop.conf.Configuration;
59 import org.apache.hadoop.fs.FileSystem;
60 import org.apache.hadoop.fs.Path;
61 import org.apache.log4j.Level;
62 import org.apache.log4j.Logger;
63 import org.apache.zookeeper.ZooKeeper;
64
65 import com.google.common.base.Splitter;
66 import com.google.common.collect.Iterables;
67
68
69
70
71 public final class ConfigurationUtils {
72
73 private static final Logger LOG =
74 Logger.getLogger(ConfigurationUtils.class);
75
76
77
78
79
80
81
82
83
84
85
86
87
88 private static Options OPTIONS;
89
90 static {
91 OPTIONS = new Options();
92 OPTIONS.addOption("h", "help", false, "Help");
93 OPTIONS.addOption("la", "listAlgorithms", false, "List supported " +
94 "algorithms");
95 OPTIONS.addOption("q", "quiet", false, "Quiet output");
96 OPTIONS.addOption("yj", "yarnjars", true, "comma-separated list of JAR " +
97 "filenames to distribute to Giraph tasks and ApplicationMaster. " +
98 "YARN only. Search order: CLASSPATH, HADOOP_HOME, user current dir.");
99 OPTIONS.addOption("yh", "yarnheap", true, "Heap size, in MB, for each " +
100 "Giraph task (YARN only.) Defaults to " +
101 GiraphConstants.GIRAPH_YARN_TASK_HEAP_MB + " MB.");
102 OPTIONS.addOption("w", "workers", true, "Number of workers");
103 OPTIONS.addOption("vif", "vertexInputFormat", true, "Vertex input format");
104 OPTIONS.addOption("eif", "edgeInputFormat", true, "Edge input format");
105 OPTIONS.addOption("vof", "vertexOutputFormat", true,
106 "Vertex output format");
107 OPTIONS.addOption("eof", "edgeOutputFormat", true, "Edge output format");
108 OPTIONS.addOption("vip", "vertexInputPath", true, "Vertex input path");
109 OPTIONS.addOption("eip", "edgeInputPath", true, "Edge input path");
110 OPTIONS.addOption("op", "outputPath", true, "Output path");
111 OPTIONS.addOption("vsd", "vertexSubDir", true, "subdirectory to be used " +
112 "for the vertex output");
113 OPTIONS.addOption("esd", "edgeSubDir", true, "subdirectory to be used " +
114 "for the edge output");
115 OPTIONS.addOption("c", "combiner", true, "MessageCombiner class");
116 OPTIONS.addOption("ve", "outEdges", true, "Vertex edges class");
117 OPTIONS.addOption("wc", "workerContext", true, "WorkerContext class");
118 OPTIONS.addOption("aw", "aggregatorWriter", true, "AggregatorWriter class");
119 OPTIONS.addOption("mc", "masterCompute", true, "MasterCompute class");
120 OPTIONS.addOption("cf", "cacheFile", true, "Files for distributed cache");
121 OPTIONS.addOption("pc", "partitionClass", true, "Partition class");
122 OPTIONS.addOption("vvf", "vertexValueFactoryClass", true,
123 "Vertex value factory class");
124 OPTIONS.addOption("th", "typesHolder", true,
125 "Class that holds types. Needed only if Computation is not set");
126 OPTIONS.addOption("jyc", "jythonClass", true,
127 "Jython class name, used if computation passed in is a python script");
128 OPTIONS.addOption("ca", "customArguments", true, "provide custom" +
129 " arguments for the job configuration in the form:" +
130 " -ca <param1>=<value1>,<param2>=<value2> -ca <param3>=<value3> etc." +
131 " It can appear multiple times, and the last one has effect" +
132 " for the same param.");
133 }
134
135
136
137
138 private ConfigurationUtils() { }
139
140
141
142
143
144
145
146
147
148 public static void configureIfPossible(Object object,
149 ImmutableClassesGiraphConfiguration configuration) {
150 if (configuration != null) {
151 configuration.configureIfPossible(object);
152 } else if (object instanceof GiraphConfigurationSettable) {
153 throw new IllegalArgumentException(
154 "Trying to configure configurable object without value, " +
155 object.getClass());
156 }
157 }
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172 public static Class<? extends TypesHolder> getTypesHolderClass(
173 Configuration conf) {
174 Class<? extends TypesHolder> klass = TYPES_HOLDER_CLASS.get(conf);
175 if (klass != null) {
176 return klass;
177 }
178 klass = COMPUTATION_CLASS.get(conf);
179 return klass;
180 }
181
182
183
184
185
186
187
188
189 public static CommandLine parseArgs(final GiraphConfiguration giraphConf,
190 final String[] args) throws ClassNotFoundException, ParseException,
191 IOException {
192
193 if (args.length == 0) {
194 throw new IllegalArgumentException("No arguments were provided (try -h)");
195 }
196 CommandLineParser parser = new BasicParser();
197 CommandLine cmd = parser.parse(OPTIONS, args);
198
199
200 if (cmd.hasOption("h")) {
201 printHelp();
202 return null;
203 }
204 if (cmd.hasOption("la")) {
205 printSupportedAlgorithms();
206 return null;
207 }
208
209
210 performSanityCheck(cmd);
211
212
213 final String computationClassName = args[0];
214 final int workers = Integer.parseInt(cmd.getOptionValue('w'));
215 populateGiraphConfiguration(giraphConf, cmd, computationClassName, workers);
216
217
218
219 @SuppressWarnings("rawtypes")
220 GiraphConfigurationValidator<?, ?, ?, ?, ?> gtv =
221 new GiraphConfigurationValidator(giraphConf);
222 gtv.validateConfiguration();
223
224
225 return cmd;
226 }
227
228
229
230
231
232
233
234
235
236
237
238 public static void addOption(final String opt, final String longOpt,
239 final boolean hasArg, final String description) {
240 if (OPTIONS.hasOption(opt)) {
241 printHelp();
242 throw new IllegalArgumentException("GiraphConfiguration already " +
243 "provides a '" + opt + "' option, please choose another identifier.");
244 }
245 OPTIONS.addOption(opt, longOpt, hasArg, description);
246 }
247
248
249
250
251
252
253 private static void performSanityCheck(final CommandLine cmd) {
254
255 if (!cmd.hasOption("w")) {
256 throw new IllegalArgumentException("Need to choose the " +
257 "number of workers (-w)");
258 }
259 if (!cmd.hasOption("vif") && !cmd.hasOption("eif")) {
260 throw new IllegalArgumentException("Need to set an input " +
261 "format (-vif or -eif)");
262 }
263 }
264
265
266
267
268
269
270
271
272
273
274
275
276 private static void populateGiraphConfiguration(final GiraphConfiguration
277 conf, final CommandLine cmd,
278 final String computationClassName, final int workers)
279 throws ClassNotFoundException, IOException {
280 conf.setWorkerConfiguration(workers, workers, 100.0f);
281 if (cmd.hasOption("typesHolder")) {
282 Class<? extends TypesHolder> typesHolderClass =
283 (Class<? extends TypesHolder>)
284 Class.forName(cmd.getOptionValue("typesHolder"));
285 TYPES_HOLDER_CLASS.set(conf, typesHolderClass);
286 }
287 if (cmd.hasOption("c")) {
288 conf.setMessageCombinerClass(
289 (Class<? extends MessageCombiner>)
290 Class.forName(cmd.getOptionValue("c")));
291 }
292 if (cmd.hasOption("vc")) {
293 conf.setVertexValueCombinerClass(
294 (Class<? extends VertexValueCombiner>)
295 Class.forName(cmd.getOptionValue("vc")));
296 }
297 if (cmd.hasOption("ve")) {
298 conf.setOutEdgesClass(
299 (Class<? extends OutEdges>) Class.forName(cmd.getOptionValue("ve")));
300 }
301 if (cmd.hasOption("ive")) {
302 conf.setInputOutEdgesClass(
303 (Class<? extends OutEdges>) Class.forName(cmd.getOptionValue("ive")));
304 }
305 if (cmd.hasOption("wc")) {
306 conf.setWorkerContextClass(
307 (Class<? extends WorkerContext>) Class
308 .forName(cmd.getOptionValue("wc")));
309 }
310 if (cmd.hasOption("mc")) {
311 conf.setMasterComputeClass(
312 (Class<? extends MasterCompute>) Class
313 .forName(cmd.getOptionValue("mc")));
314 }
315 if (cmd.hasOption("aw")) {
316 conf.setAggregatorWriterClass(
317 (Class<? extends AggregatorWriter>) Class
318 .forName(cmd.getOptionValue("aw")));
319 }
320 if (cmd.hasOption("vif")) {
321 conf.setVertexInputFormatClass(
322 (Class<? extends VertexInputFormat>) Class
323 .forName(cmd.getOptionValue("vif")));
324 } else {
325 if (LOG.isInfoEnabled()) {
326 LOG.info("No vertex input format specified. Ensure your " +
327 "InputFormat does not require one.");
328 }
329 }
330 if (cmd.hasOption("eif")) {
331 conf.setEdgeInputFormatClass(
332 (Class<? extends EdgeInputFormat>) Class
333 .forName(cmd.getOptionValue("eif")));
334 } else {
335 if (LOG.isInfoEnabled()) {
336 LOG.info("No edge input format specified. Ensure your " +
337 "InputFormat does not require one.");
338 }
339 }
340 if (cmd.hasOption("vof")) {
341 conf.setVertexOutputFormatClass(
342 (Class<? extends VertexOutputFormat>) Class
343 .forName(cmd.getOptionValue("vof")));
344 } else {
345 if (LOG.isInfoEnabled()) {
346 LOG.info("No vertex output format specified. Ensure your " +
347 "OutputFormat does not require one.");
348 }
349 }
350 if (cmd.hasOption("vof")) {
351 if (cmd.hasOption("vsd")) {
352 conf.setVertexOutputFormatSubdir(cmd.getOptionValue("vsd"));
353 }
354 }
355 if (cmd.hasOption("eof")) {
356 conf.setEdgeOutputFormatClass(
357 (Class<? extends EdgeOutputFormat>) Class
358 .forName(cmd.getOptionValue("eof")));
359 } else {
360 if (LOG.isInfoEnabled()) {
361 LOG.info("No edge output format specified. Ensure your " +
362 "OutputFormat does not require one.");
363 }
364 }
365 if (cmd.hasOption("eof")) {
366 if (cmd.hasOption("esd")) {
367 conf.setEdgeOutputFormatSubdir(cmd.getOptionValue("esd"));
368 }
369 }
370
371 if (cmd.hasOption("vof") && cmd.hasOption("eof") && cmd.hasOption("op")) {
372 if (!cmd.hasOption("vsd") || cmd.hasOption("esd")) {
373 if (!conf.hasEdgeOutputFormatSubdir() ||
374 !conf.hasVertexOutputFormatSubdir()) {
375
376 throw new IllegalArgumentException("If VertexOutputFormat and " +
377 "EdgeOutputFormat are both set, it is mandatory to provide " +
378 "both vertex subdirectory as well as edge subdirectory");
379 }
380 }
381 }
382 if (cmd.hasOption("pc")) {
383 conf.setPartitionClass(
384 (Class<? extends Partition>) Class.forName(cmd.getOptionValue("pc")));
385 }
386 if (cmd.hasOption("vvf")) {
387 conf.setVertexValueFactoryClass(
388 (Class<? extends VertexValueFactory>) Class
389 .forName(cmd.getOptionValue("vvf")));
390 }
391 if (cmd.hasOption("ca")) {
392 for (String caOptionValue : cmd.getOptionValues("ca")) {
393 for (String paramValue :
394 Splitter.on(',').split(caOptionValue)) {
395 String[] parts = Iterables.toArray(Splitter.on('=').split(paramValue),
396 String.class);
397 if (parts.length != 2) {
398 throw new IllegalArgumentException("Unable to parse custom " +
399 " argument: " + paramValue);
400 }
401 if (LOG.isInfoEnabled()) {
402 LOG.info("Setting custom argument [" + parts[0] + "] to [" +
403 parts[1] + "] in GiraphConfiguration");
404 }
405 conf.set(parts[0], parts[1]);
406 }
407 }
408 }
409
410 if (cmd.hasOption("vif")) {
411 if (cmd.hasOption("vip")) {
412 if (FileSystem.get(new Configuration()).listStatus(
413 new Path(cmd.getOptionValue("vip"))) == null) {
414 throw new IllegalArgumentException(
415 "Invalid vertex input path (-vip): " +
416 cmd.getOptionValue("vip"));
417 }
418 GiraphFileInputFormat.addVertexInputPath(conf,
419 new Path(cmd.getOptionValue("vip")));
420 } else {
421 if (LOG.isInfoEnabled()) {
422 LOG.info("No input path for vertex data was specified. Ensure your " +
423 "InputFormat does not require one.");
424 }
425 }
426 }
427 if (cmd.hasOption("eif")) {
428 if (cmd.hasOption("eip")) {
429 if (FileSystem.get(new Configuration()).listStatus(
430 new Path(cmd.getOptionValue("eip"))) == null) {
431 throw new IllegalArgumentException(
432 "Invalid edge input path (-eip): " +
433 cmd.getOptionValue("eip"));
434 }
435 GiraphFileInputFormat.addEdgeInputPath(conf,
436 new Path(cmd.getOptionValue("eip")));
437 } else {
438 if (LOG.isInfoEnabled()) {
439 LOG.info("No input path for edge data was specified. Ensure your " +
440 "InputFormat does not require one.");
441 }
442 }
443 }
444
445
446 if (cmd.hasOption("yj")) {
447 conf.setYarnLibJars(cmd.getOptionValue("yj"));
448 }
449 if (cmd.hasOption("yh")) {
450 conf.setYarnTaskHeapMb(
451 Integer.parseInt(cmd.getOptionValue("yh")));
452 }
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474 handleComputationClass(conf, cmd, computationClassName);
475 }
476
477
478
479
480
481
482
483
484
485 private static void handleComputationClass(GiraphConfiguration conf,
486 CommandLine cmd, String computationClassName)
487 throws ClassNotFoundException {
488 if (computationClassName.endsWith("py")) {
489 handleJythonComputation(conf, cmd, computationClassName);
490 } else {
491 conf.setComputationClass(
492 (Class<? extends Computation>) Class.forName(computationClassName));
493 }
494 }
495
496
497
498
499
500
501
502
503 private static void handleJythonComputation(GiraphConfiguration conf,
504 CommandLine cmd, String scriptPath) {
505 String jythonClass = cmd.getOptionValue("jythonClass");
506 if (jythonClass == null) {
507 throw new IllegalArgumentException(
508 "handleJythonComputation: Need to set Jython Computation class " +
509 "name with --jythonClass");
510 }
511 String typesHolderClass = cmd.getOptionValue("typesHolder");
512 if (typesHolderClass == null) {
513 throw new IllegalArgumentException(
514 "handleJythonComputation: Need to set TypesHolder class name " +
515 "with --typesHolder");
516 }
517
518 Path path = new Path(scriptPath);
519 Path remotePath = DistributedCacheUtils.copyAndAdd(path, conf);
520
521 ScriptLoader.setScriptsToLoad(conf, remotePath.toString(),
522 DeployType.DISTRIBUTED_CACHE, Language.JYTHON);
523
524 GiraphTypes.readFrom(conf).writeIfUnset(conf);
525 JythonUtils.init(conf, jythonClass);
526 }
527
528
529
530
531 private static void printHelp() {
532 HelpFormatter formatter = new HelpFormatter();
533 formatter.printHelp(ConfigurationUtils.class.getName(), OPTIONS, true);
534 }
535
536
537
538
539
540 private static void printSupportedAlgorithms() {
541 Logger.getLogger(ZooKeeper.class).setLevel(Level.OFF);
542
543 List<Class<?>> classes = AnnotationUtils.getAnnotatedClasses(
544 Algorithm.class, "org.apache.giraph");
545 System.out.print(" Supported algorithms:\n");
546 for (Class<?> clazz : classes) {
547 if (Computation.class.isAssignableFrom(clazz)) {
548 Algorithm algorithm = clazz.getAnnotation(Algorithm.class);
549 StringBuilder sb = new StringBuilder();
550 sb.append(algorithm.name()).append(" - ").append(clazz.getName())
551 .append("\n");
552 if (!algorithm.description().equals("")) {
553 sb.append(" ").append(algorithm.description()).append("\n");
554 }
555 System.out.print(sb.toString());
556 }
557 }
558 }
559 }