View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.conf;
20  
21  import io.netty.buffer.ByteBufAllocator;
22  import io.netty.buffer.PooledByteBufAllocator;
23  import io.netty.buffer.UnpooledByteBufAllocator;
24  
25  import java.net.InetAddress;
26  import java.net.UnknownHostException;
27  
28  import org.apache.giraph.aggregators.AggregatorWriter;
29  import org.apache.giraph.bsp.checkpoints.CheckpointSupportedChecker;
30  import org.apache.giraph.combiner.MessageCombiner;
31  import org.apache.giraph.edge.OutEdges;
32  import org.apache.giraph.edge.ReuseObjectsOutEdges;
33  import org.apache.giraph.factories.ComputationFactory;
34  import org.apache.giraph.factories.VertexValueFactory;
35  import org.apache.giraph.graph.Computation;
36  import org.apache.giraph.graph.MapperObserver;
37  import org.apache.giraph.graph.Vertex;
38  import org.apache.giraph.graph.VertexResolver;
39  import org.apache.giraph.graph.VertexValueCombiner;
40  import org.apache.giraph.io.EdgeInputFormat;
41  import org.apache.giraph.io.EdgeOutputFormat;
42  import org.apache.giraph.io.MappingInputFormat;
43  import org.apache.giraph.io.VertexInputFormat;
44  import org.apache.giraph.io.VertexOutputFormat;
45  import org.apache.giraph.io.filters.EdgeInputFilter;
46  import org.apache.giraph.io.filters.VertexInputFilter;
47  import org.apache.giraph.job.GiraphJobObserver;
48  import org.apache.giraph.job.GiraphJobRetryChecker;
49  import org.apache.giraph.master.MasterCompute;
50  import org.apache.giraph.master.MasterObserver;
51  import org.apache.giraph.partition.GraphPartitionerFactory;
52  import org.apache.giraph.partition.Partition;
53  import org.apache.giraph.partition.ReusesObjectsPartition;
54  import org.apache.giraph.utils.ReflectionUtils;
55  import org.apache.giraph.worker.WorkerContext;
56  import org.apache.giraph.worker.WorkerObserver;
57  import org.apache.hadoop.conf.Configuration;
58  import org.apache.hadoop.mapreduce.Mapper;
59  import org.apache.hadoop.net.DNS;
60  
61  /**
62   * Adds user methods specific to Giraph.  This will be put into an
63   * ImmutableClassesGiraphConfiguration that provides the configuration plus
64   * the immutable classes.
65   *
66   * Keeps track of parameters which were set so it easily set them in another
67   * copy of configuration.
68   */
69  public class GiraphConfiguration extends Configuration
70      implements GiraphConstants {
71    /** ByteBufAllocator to be used by netty */
72    private ByteBufAllocator nettyBufferAllocator = null;
73  
74    /**
75     * Constructor that creates the configuration
76     */
77    public GiraphConfiguration() {
78      configureHadoopSecurity();
79    }
80  
81    /**
82     * Constructor.
83     *
84     * @param conf Configuration
85     */
86    public GiraphConfiguration(Configuration conf) {
87      super(conf);
88      configureHadoopSecurity();
89    }
90  
91    /**
92     * Get name of computation being run. We leave this up to the
93     * {@link ComputationFactory} to decide what to return.
94     *
95     * @return Name of computation being run
96     */
97    public String getComputationName() {
98      ComputationFactory compFactory = ReflectionUtils.newInstance(
99          getComputationFactoryClass());
100     return compFactory.computationName(this);
101   }
102 
103   /**
104    * Get the user's subclassed {@link ComputationFactory}
105    *
106    * @return User's computation factory class
107    */
108   public Class<? extends ComputationFactory> getComputationFactoryClass() {
109     return COMPUTATION_FACTORY_CLASS.get(this);
110   }
111 
112   /**
113    * Get the user's subclassed {@link Computation}
114    *
115    * @return User's computation class
116    */
117   public Class<? extends Computation> getComputationClass() {
118     return COMPUTATION_CLASS.get(this);
119   }
120 
121   /**
122    * Set the computation class (required)
123    *
124    * @param computationClass Runs vertex computation
125    */
126   public void setComputationClass(
127       Class<? extends Computation> computationClass) {
128     COMPUTATION_CLASS.set(this, computationClass);
129   }
130 
131   /**
132    * Set the vertex value factory class
133    *
134    * @param vertexValueFactoryClass Creates default vertex values
135    */
136   public final void setVertexValueFactoryClass(
137       Class<? extends VertexValueFactory> vertexValueFactoryClass) {
138     VERTEX_VALUE_FACTORY_CLASS.set(this, vertexValueFactoryClass);
139   }
140 
141   /**
142    * Set the edge input filter class
143    *
144    * @param edgeFilterClass class to use
145    */
146   public void setEdgeInputFilterClass(
147       Class<? extends EdgeInputFilter> edgeFilterClass) {
148     EDGE_INPUT_FILTER_CLASS.set(this, edgeFilterClass);
149   }
150 
151   /**
152    * Set the vertex input filter class
153    *
154    * @param vertexFilterClass class to use
155    */
156   public void setVertexInputFilterClass(
157       Class<? extends VertexInputFilter> vertexFilterClass) {
158     VERTEX_INPUT_FILTER_CLASS.set(this, vertexFilterClass);
159   }
160 
161   /**
162    * Get the vertex edges class
163    *
164    * @return vertex edges class
165    */
166   public Class<? extends OutEdges> getOutEdgesClass() {
167     return VERTEX_EDGES_CLASS.get(this);
168   }
169 
170   /**
171    * Set the vertex edges class
172    *
173    * @param outEdgesClass Determines the way edges are stored
174    */
175   public final void setOutEdgesClass(
176       Class<? extends OutEdges> outEdgesClass) {
177     VERTEX_EDGES_CLASS.set(this, outEdgesClass);
178   }
179 
180   /**
181    * Set the vertex implementation class
182    *
183    * @param vertexClass class of the vertex implementation
184    */
185   public final void setVertexClass(Class<? extends Vertex> vertexClass) {
186     VERTEX_CLASS.set(this, vertexClass);
187   }
188 
189 
190   /**
191    * Set the vertex edges class used during edge-based input (if different
192    * from the one used during computation)
193    *
194    * @param inputOutEdgesClass Determines the way edges are stored
195    */
196   public final void setInputOutEdgesClass(
197       Class<? extends OutEdges> inputOutEdgesClass) {
198     INPUT_VERTEX_EDGES_CLASS.set(this, inputOutEdgesClass);
199   }
200 
201   /**
202    * True if the {@link org.apache.giraph.edge.OutEdges} implementation
203    * copies the passed edges to its own data structure,
204    * i.e. it doesn't keep references to Edge objects, target vertex ids or edge
205    * values passed to add() or initialize().
206    * This makes it possible to reuse edge objects passed to the data
207    * structure, to minimize object instantiation (see for example
208    * EdgeStore#addPartitionEdges()).
209    *
210    * @return True iff we can reuse the edge objects
211    */
212   public boolean reuseEdgeObjects() {
213     return ReuseObjectsOutEdges.class.isAssignableFrom(
214         getOutEdgesClass());
215   }
216 
217   /**
218    * True if the {@link Partition} implementation copies the passed vertices
219    * to its own data structure, i.e. it doesn't keep references to Vertex
220    * objects passed to it.
221    * This makes it possible to reuse vertex objects passed to the data
222    * structure, to minimize object instantiation.
223    *
224    * @return True iff we can reuse the vertex objects
225    */
226   public boolean reuseVertexObjects() {
227     return ReusesObjectsPartition.class.isAssignableFrom(getPartitionClass());
228   }
229 
230   /**
231    * Get Partition class used
232    * @return Partition class
233    */
234   public Class<? extends Partition> getPartitionClass() {
235     return PARTITION_CLASS.get(this);
236   }
237 
238   /**
239    * Does the job have a {@link VertexInputFormat}?
240    *
241    * @return True iff a {@link VertexInputFormat} has been specified.
242    */
243   public boolean hasVertexInputFormat() {
244     return VERTEX_INPUT_FORMAT_CLASS.get(this) != null;
245   }
246 
247   /**
248    * Set the vertex input format class (required)
249    *
250    * @param vertexInputFormatClass Determines how graph is input
251    */
252   public void setVertexInputFormatClass(
253       Class<? extends VertexInputFormat> vertexInputFormatClass) {
254     VERTEX_INPUT_FORMAT_CLASS.set(this, vertexInputFormatClass);
255   }
256 
257   /**
258    * Does the job have a {@link EdgeInputFormat}?
259    *
260    * @return True iff a {@link EdgeInputFormat} has been specified.
261    */
262   public boolean hasEdgeInputFormat() {
263     return EDGE_INPUT_FORMAT_CLASS.get(this) != null;
264   }
265 
266   /**
267    * Set the edge input format class (required)
268    *
269    * @param edgeInputFormatClass Determines how graph is input
270    */
271   public void setEdgeInputFormatClass(
272       Class<? extends EdgeInputFormat> edgeInputFormatClass) {
273     EDGE_INPUT_FORMAT_CLASS.set(this, edgeInputFormatClass);
274   }
275 
276   /**
277    * Set the mapping input format class (optional)
278    *
279    * @param mappingInputFormatClass Determines how mappings are input
280    */
281   public void setMappingInputFormatClass(
282     Class<? extends MappingInputFormat> mappingInputFormatClass) {
283     MAPPING_INPUT_FORMAT_CLASS.set(this, mappingInputFormatClass);
284   }
285 
286   /**
287    * Set the master class (optional)
288    *
289    * @param masterComputeClass Runs master computation
290    */
291   public final void setMasterComputeClass(
292       Class<? extends MasterCompute> masterComputeClass) {
293     MASTER_COMPUTE_CLASS.set(this, masterComputeClass);
294   }
295 
296   /**
297    * Add a MasterObserver class (optional)
298    *
299    * @param masterObserverClass MasterObserver class to add.
300    */
301   public final void addMasterObserverClass(
302       Class<? extends MasterObserver> masterObserverClass) {
303     MASTER_OBSERVER_CLASSES.add(this, masterObserverClass);
304   }
305 
306   /**
307    * Add a WorkerObserver class (optional)
308    *
309    * @param workerObserverClass WorkerObserver class to add.
310    */
311   public final void addWorkerObserverClass(
312       Class<? extends WorkerObserver> workerObserverClass) {
313     WORKER_OBSERVER_CLASSES.add(this, workerObserverClass);
314   }
315 
316   /**
317    * Add a MapperObserver class (optional)
318    *
319    * @param mapperObserverClass MapperObserver class to add.
320    */
321   public final void addMapperObserverClass(
322       Class<? extends MapperObserver> mapperObserverClass) {
323     MAPPER_OBSERVER_CLASSES.add(this, mapperObserverClass);
324   }
325 
326   /**
327    * Get job observer class
328    *
329    * @return GiraphJobObserver class set.
330    */
331   public Class<? extends GiraphJobObserver> getJobObserverClass() {
332     return JOB_OBSERVER_CLASS.get(this);
333   }
334 
335   /**
336    * Set job observer class
337    *
338    * @param klass GiraphJobObserver class to set.
339    */
340   public void setJobObserverClass(Class<? extends GiraphJobObserver> klass) {
341     JOB_OBSERVER_CLASS.set(this, klass);
342   }
343 
344   /**
345    * Get job retry checker class
346    *
347    * @return GiraphJobRetryChecker class set.
348    */
349   public Class<? extends GiraphJobRetryChecker> getJobRetryCheckerClass() {
350     return JOB_RETRY_CHECKER_CLASS.get(this);
351   }
352 
353   /**
354    * Set job retry checker class
355    *
356    * @param klass GiraphJobRetryChecker class to set.
357    */
358   public void setJobRetryCheckerClass(
359       Class<? extends GiraphJobRetryChecker> klass) {
360     JOB_RETRY_CHECKER_CLASS.set(this, klass);
361   }
362 
363   /**
364    * Check whether to enable jmap dumping thread.
365    *
366    * @return true if jmap dumper is enabled.
367    */
368   public boolean isJMapHistogramDumpEnabled() {
369     return JMAP_ENABLE.get(this);
370   }
371 
372   /**
373    * Check whether to enable heap memory supervisor thread
374    *
375    * @return true if jmap dumper is reactively enabled
376    */
377   public boolean isReactiveJmapHistogramDumpEnabled() {
378     return REACTIVE_JMAP_ENABLE.get(this);
379   }
380 
381   /**
382    * Set mapping from a key name to a list of classes.
383    *
384    * @param name String key name to use.
385    * @param xface interface of the classes being set.
386    * @param klasses Classes to set.
387    */
388   public final void setClasses(String name, Class<?> xface,
389                                Class<?> ... klasses) {
390     String[] klassNames = new String[klasses.length];
391     for (int i = 0; i < klasses.length; ++i) {
392       Class<?> klass = klasses[i];
393       if (!xface.isAssignableFrom(klass)) {
394         throw new RuntimeException(klass + " does not implement " +
395             xface.getName());
396       }
397       klassNames[i] = klasses[i].getName();
398     }
399     setStrings(name, klassNames);
400   }
401 
402   /**
403    * Does the job have a {@link VertexOutputFormat}?
404    *
405    * @return True iff a {@link VertexOutputFormat} has been specified.
406    */
407   public boolean hasVertexOutputFormat() {
408     return VERTEX_OUTPUT_FORMAT_CLASS.get(this) != null;
409   }
410 
411   /**
412    * Set the vertex output format class (optional)
413    *
414    * @param vertexOutputFormatClass Determines how graph is output
415    */
416   public final void setVertexOutputFormatClass(
417       Class<? extends VertexOutputFormat> vertexOutputFormatClass) {
418     VERTEX_OUTPUT_FORMAT_CLASS.set(this, vertexOutputFormatClass);
419   }
420 
421 
422   /**
423    * Does the job have a {@link EdgeOutputFormat} subdir?
424    *
425    * @return True iff a {@link EdgeOutputFormat} subdir has been specified.
426    */
427   public boolean hasVertexOutputFormatSubdir() {
428     return !VERTEX_OUTPUT_FORMAT_SUBDIR.get(this).isEmpty();
429   }
430 
431   /**
432    * Set the vertex output format path
433    *
434    * @param path path where the verteces will be written
435    */
436   public final void setVertexOutputFormatSubdir(String path) {
437     VERTEX_OUTPUT_FORMAT_SUBDIR.set(this, path);
438   }
439 
440   /**
441    * Check if output should be done during computation
442    *
443    * @return True iff output should be done during computation
444    */
445   public final boolean doOutputDuringComputation() {
446     return DO_OUTPUT_DURING_COMPUTATION.get(this);
447   }
448 
449   /**
450    * Set whether or not we should do output during computation
451    *
452    * @param doOutputDuringComputation True iff we want output to happen
453    *                                  during computation
454    */
455   public final void setDoOutputDuringComputation(
456       boolean doOutputDuringComputation) {
457     DO_OUTPUT_DURING_COMPUTATION.set(this, doOutputDuringComputation);
458   }
459 
460   /**
461    * Check if VertexOutputFormat is thread-safe
462    *
463    * @return True iff VertexOutputFormat is thread-safe
464    */
465   public final boolean vertexOutputFormatThreadSafe() {
466     return VERTEX_OUTPUT_FORMAT_THREAD_SAFE.get(this);
467   }
468 
469   /**
470    * Set whether or not selected VertexOutputFormat is thread-safe
471    *
472    * @param vertexOutputFormatThreadSafe True iff selected VertexOutputFormat
473    *                                     is thread-safe
474    */
475   public final void setVertexOutputFormatThreadSafe(
476       boolean vertexOutputFormatThreadSafe) {
477     VERTEX_OUTPUT_FORMAT_THREAD_SAFE.set(this, vertexOutputFormatThreadSafe);
478   }
479 
480   /**
481    * Does the job have a {@link EdgeOutputFormat}?
482    *
483    * @return True iff a {@link EdgeOutputFormat} has been specified.
484    */
485   public boolean hasEdgeOutputFormat() {
486     return EDGE_OUTPUT_FORMAT_CLASS.get(this) != null;
487   }
488 
489   /**
490    * Set the edge output format class (optional)
491    *
492    * @param edgeOutputFormatClass Determines how graph is output
493    */
494   public final void setEdgeOutputFormatClass(
495       Class<? extends EdgeOutputFormat> edgeOutputFormatClass) {
496     EDGE_OUTPUT_FORMAT_CLASS.set(this, edgeOutputFormatClass);
497   }
498 
499   /**
500    * Does the job have a {@link EdgeOutputFormat} subdir?
501    *
502    * @return True iff a {@link EdgeOutputFormat} subdir has been specified.
503    */
504   public boolean hasEdgeOutputFormatSubdir() {
505     return !EDGE_OUTPUT_FORMAT_SUBDIR.get(this).isEmpty();
506   }
507 
508   /**
509    * Set the edge output format path
510    *
511    * @param path path where the edges will be written
512    */
513   public final void setEdgeOutputFormatSubdir(String path) {
514     EDGE_OUTPUT_FORMAT_SUBDIR.set(this, path);
515   }
516 
517   /**
518    * Get the number of threads to use for writing output in the end of the
519    * application. If output format is not thread safe, returns 1.
520    *
521    * @return Number of output threads
522    */
523   public final int getNumOutputThreads() {
524     if (!vertexOutputFormatThreadSafe()) {
525       return 1;
526     } else {
527       return NUM_OUTPUT_THREADS.get(this);
528     }
529   }
530 
531   /**
532    * Set the number of threads to use for writing output in the end of the
533    * application. Will be used only if {#vertexOutputFormatThreadSafe} is true.
534    *
535    * @param numOutputThreads Number of output threads
536    */
537   public void setNumOutputThreads(int numOutputThreads) {
538     NUM_OUTPUT_THREADS.set(this, numOutputThreads);
539   }
540 
541   /**
542    * Set the message combiner class (optional)
543    *
544    * @param messageCombinerClass Determines how vertex messages are combined
545    */
546   public void setMessageCombinerClass(
547       Class<? extends MessageCombiner> messageCombinerClass) {
548     MESSAGE_COMBINER_CLASS.set(this, messageCombinerClass);
549   }
550 
551   /**
552    * Set the graph partitioner class (optional)
553    *
554    * @param graphPartitionerFactoryClass Determines how the graph is partitioned
555    */
556   public final void setGraphPartitionerFactoryClass(
557       Class<? extends GraphPartitionerFactory> graphPartitionerFactoryClass) {
558     GRAPH_PARTITIONER_FACTORY_CLASS.set(this, graphPartitionerFactoryClass);
559   }
560 
561   /**
562    * Set the vertex resolver class (optional)
563    *
564    * @param vertexResolverClass Determines how vertex mutations are resolved
565    */
566   public final void setVertexResolverClass(
567       Class<? extends VertexResolver> vertexResolverClass) {
568     VERTEX_RESOLVER_CLASS.set(this, vertexResolverClass);
569   }
570 
571   /**
572    * Whether to create a vertex that doesn't exist when it receives messages.
573    * This only affects DefaultVertexResolver.
574    *
575    * @return true if we should create non existent vertices that get messages.
576    */
577   public final boolean getResolverCreateVertexOnMessages() {
578     return RESOLVER_CREATE_VERTEX_ON_MSGS.get(this);
579   }
580 
581   /**
582    * Set whether to create non existent vertices when they receive messages.
583    *
584    * @param v true if we should create vertices when they get messages.
585    */
586   public final void setResolverCreateVertexOnMessages(boolean v) {
587     RESOLVER_CREATE_VERTEX_ON_MSGS.set(this, v);
588   }
589 
590   /**
591    * Set the vertex value combiner class (optional)
592    *
593    * @param vertexValueCombinerClass Determines how vertices are combined
594    */
595   public final void setVertexValueCombinerClass(
596       Class<? extends VertexValueCombiner> vertexValueCombinerClass) {
597     VERTEX_VALUE_COMBINER_CLASS.set(this, vertexValueCombinerClass);
598   }
599 
600   /**
601    * Set the worker context class (optional)
602    *
603    * @param workerContextClass Determines what code is executed on a each
604    *        worker before and after each superstep and computation
605    */
606   public final void setWorkerContextClass(
607       Class<? extends WorkerContext> workerContextClass) {
608     WORKER_CONTEXT_CLASS.set(this, workerContextClass);
609   }
610 
611   /**
612    * Set the aggregator writer class (optional)
613    *
614    * @param aggregatorWriterClass Determines how the aggregators are
615    *        written to file at the end of the job
616    */
617   public final void setAggregatorWriterClass(
618       Class<? extends AggregatorWriter> aggregatorWriterClass) {
619     AGGREGATOR_WRITER_CLASS.set(this, aggregatorWriterClass);
620   }
621 
622   /**
623    * Set the partition class (optional)
624    *
625    * @param partitionClass Determines the why partitions are stored
626    */
627   public final void setPartitionClass(
628       Class<? extends Partition> partitionClass) {
629     PARTITION_CLASS.set(this, partitionClass);
630   }
631 
632   /**
633    * Set worker configuration for determining what is required for
634    * a superstep.
635    *
636    * @param minWorkers Minimum workers to do a superstep
637    * @param maxWorkers Maximum workers to do a superstep
638    *        (max map tasks in job)
639    * @param minPercentResponded 0 - 100 % of the workers required to
640    *        have responded before continuing the superstep
641    */
642   public final void setWorkerConfiguration(int minWorkers,
643                                            int maxWorkers,
644                                            float minPercentResponded) {
645     setInt(MIN_WORKERS, minWorkers);
646     setInt(MAX_WORKERS, maxWorkers);
647     MIN_PERCENT_RESPONDED.set(this, minPercentResponded);
648   }
649 
650   public final int getMinWorkers() {
651     return getInt(MIN_WORKERS, -1);
652   }
653 
654   public final int getMaxWorkers() {
655     return getInt(MAX_WORKERS, -1);
656   }
657 
658   public final float getMinPercentResponded() {
659     return MIN_PERCENT_RESPONDED.get(this);
660   }
661 
662   /**
663    * Utilize an existing ZooKeeper service.  If this is not set, ZooKeeper
664    * will be dynamically started by Giraph for this job.
665    *
666    * @param serverList Comma separated list of servers and ports
667    *        (i.e. zk1:2221,zk2:2221)
668    */
669   public final void setZooKeeperConfiguration(String serverList) {
670     ZOOKEEPER_LIST.set(this, serverList);
671   }
672 
673   /**
674    * Getter for SPLIT_MASTER_WORKER flag.
675    *
676    * @return boolean flag value.
677    */
678   public final boolean getSplitMasterWorker() {
679     return SPLIT_MASTER_WORKER.get(this);
680   }
681 
682   /**
683    * Get array of MasterObserver classes set in the configuration.
684    *
685    * @return array of MasterObserver classes.
686    */
687   public Class<? extends MasterObserver>[] getMasterObserverClasses() {
688     return MASTER_OBSERVER_CLASSES.getArray(this);
689   }
690 
691   /**
692    * Get array of WorkerObserver classes set in configuration.
693    *
694    * @return array of WorkerObserver classes.
695    */
696   public Class<? extends WorkerObserver>[] getWorkerObserverClasses() {
697     return WORKER_OBSERVER_CLASSES.getArray(this);
698   }
699 
700   /**
701    * Get array of MapperObserver classes set in configuration.
702    *
703    * @return array of MapperObserver classes.
704    */
705   public Class<? extends MapperObserver>[] getMapperObserverClasses() {
706     return MAPPER_OBSERVER_CLASSES.getArray(this);
707   }
708 
709   /**
710    * Whether to track, print, and aggregate metrics.
711    *
712    * @return true if metrics are enabled, false otherwise (default)
713    */
714   public boolean metricsEnabled() {
715     return METRICS_ENABLE.isTrue(this);
716   }
717 
718   /**
719    * Get the task partition
720    *
721    * @return The task partition or -1 if not set
722    */
723   public int getTaskPartition() {
724     return getInt("mapred.task.partition", -1);
725   }
726 
727   /**
728    * Is this a "pure YARN" Giraph job, or is a MapReduce layer (v1 or v2)
729    * actually managing our cluster nodes, i.e. each task is a Mapper.
730    *
731    * @return TRUE if this is a pure YARN job.
732    */
733   public boolean isPureYarnJob() {
734     return IS_PURE_YARN_JOB.get(this);
735   }
736 
737   /**
738    * Jars required in "Pure YARN" jobs (names only, no paths) should
739    * be listed here in full, including Giraph framework jar(s).
740    *
741    * @return the comma-separated list of jar names for export to cluster.
742    */
743   public String getYarnLibJars() {
744     return GIRAPH_YARN_LIBJARS.get(this);
745   }
746 
747   /**
748    * Populate jar list for Pure YARN jobs.
749    *
750    * @param jarList a comma-separated list of jar names
751    */
752   public void setYarnLibJars(String jarList) {
753     GIRAPH_YARN_LIBJARS.set(this, jarList);
754   }
755 
756   /**
757    * Get heap size (in MB) for each task in our Giraph job run,
758    * assuming this job will run on the "pure YARN" profile.
759    *
760    * @return the heap size for all tasks, in MB
761    */
762   public int getYarnTaskHeapMb() {
763     return GIRAPH_YARN_TASK_HEAP_MB.get(this);
764   }
765 
766   /**
767    * Set heap size for Giraph tasks in our job run, assuming
768    * the job will run on the "pure YARN" profile.
769    *
770    * @param heapMb the heap size for all tasks
771    */
772   public void setYarnTaskHeapMb(int heapMb) {
773     GIRAPH_YARN_TASK_HEAP_MB.set(this, heapMb);
774   }
775 
776   /**
777    * Get the ZooKeeper list.
778    *
779    * @return ZooKeeper list of strings, comma separated or null if none set.
780    */
781   public String getZookeeperList() {
782     return ZOOKEEPER_LIST.get(this);
783   }
784 
785   /**
786    * Set the ZooKeeper list to the provided list. This method is used when the
787    * ZooKeeper is started internally and will set the zkIsExternal option to
788    * false as well.
789    *
790    * @param zkList list of strings, comma separated of zookeeper servers
791    */
792   public void setZookeeperList(String zkList) {
793     ZOOKEEPER_LIST.set(this, zkList);
794     ZOOKEEPER_IS_EXTERNAL.set(this, false);
795   }
796 
797   /**
798    * Was ZooKeeper provided externally?
799    *
800    * @return true iff was zookeeper is external
801    */
802   public boolean isZookeeperExternal() {
803     return ZOOKEEPER_IS_EXTERNAL.get(this);
804   }
805 
806   public String getLocalLevel() {
807     return LOG_LEVEL.get(this);
808   }
809 
810   /**
811    * Use the log thread layout option?
812    *
813    * @return True if use the log thread layout option, false otherwise
814    */
815   public boolean useLogThreadLayout() {
816     return LOG_THREAD_LAYOUT.get(this);
817   }
818 
819   /**
820    * is this job run a local test?
821    *
822    * @return the test status as recorded in the Configuration
823    */
824   public boolean getLocalTestMode() {
825     return LOCAL_TEST_MODE.get(this);
826   }
827 
828   /**
829    * Flag this job as a local test run.
830    *
831    * @param flag the test status for this job
832    */
833   public void setLocalTestMode(boolean flag) {
834     LOCAL_TEST_MODE.set(this, flag);
835   }
836 
837   public int getZooKeeperSessionTimeout() {
838     return ZOOKEEPER_SESSION_TIMEOUT.get(this);
839   }
840 
841   public int getZookeeperOpsMaxAttempts() {
842     return ZOOKEEPER_OPS_MAX_ATTEMPTS.get(this);
843   }
844 
845   public int getZookeeperOpsRetryWaitMsecs() {
846     return ZOOKEEPER_OPS_RETRY_WAIT_MSECS.get(this);
847   }
848 
849   public boolean getNettyServerUseExecutionHandler() {
850     return NETTY_SERVER_USE_EXECUTION_HANDLER.get(this);
851   }
852 
853   public int getNettyServerThreads() {
854     return NETTY_SERVER_THREADS.get(this);
855   }
856 
857   public int getNettyServerExecutionThreads() {
858     return NETTY_SERVER_EXECUTION_THREADS.get(this);
859   }
860 
861   /**
862    * Get the netty server execution concurrency.  This depends on whether the
863    * netty server execution handler exists.
864    *
865    * @return Server concurrency
866    */
867   public int getNettyServerExecutionConcurrency() {
868     if (getNettyServerUseExecutionHandler()) {
869       return getNettyServerExecutionThreads();
870     } else {
871       return getNettyServerThreads();
872     }
873   }
874 
875   /**
876    * Used by netty client and server to create ByteBufAllocator
877    *
878    * @return ByteBufAllocator
879    */
880   public ByteBufAllocator getNettyAllocator() {
881     if (nettyBufferAllocator == null) {
882       if (NETTY_USE_POOLED_ALLOCATOR.get(this)) { // Use pooled allocator
883         nettyBufferAllocator = new PooledByteBufAllocator(
884           NETTY_USE_DIRECT_MEMORY.get(this));
885       } else { // Use un-pooled allocator
886         // Note: Current default settings create un-pooled heap allocator
887         nettyBufferAllocator = new UnpooledByteBufAllocator(
888             NETTY_USE_DIRECT_MEMORY.get(this));
889       }
890     }
891     return nettyBufferAllocator;
892   }
893 
894   public int getZookeeperConnectionAttempts() {
895     return ZOOKEEPER_CONNECTION_ATTEMPTS.get(this);
896   }
897 
898   public int getZooKeeperMinSessionTimeout() {
899     return ZOOKEEPER_MIN_SESSION_TIMEOUT.get(this);
900   }
901 
902   public int getZooKeeperMaxSessionTimeout() {
903     return ZOOKEEPER_MAX_SESSION_TIMEOUT.get(this);
904   }
905 
906   /**
907    * Get the number of map tasks in this job
908    *
909    * @return Number of map tasks in this job
910    */
911   public int getMapTasks() {
912     int mapTasks = getInt("mapred.map.tasks", -1);
913     if (mapTasks == -1) {
914       throw new IllegalStateException("getMapTasks: Failed to get the map " +
915           "tasks!");
916     }
917     return mapTasks;
918   }
919 
920   /**
921    * Use authentication? (if supported)
922    *
923    * @return True if should authenticate, false otherwise
924    */
925   public boolean authenticate() {
926     return AUTHENTICATE.get(this);
927   }
928 
929   /**
930    * Set the number of compute threads
931    *
932    * @param numComputeThreads Number of compute threads to use
933    */
934   public void setNumComputeThreads(int numComputeThreads) {
935     NUM_COMPUTE_THREADS.set(this, numComputeThreads);
936   }
937 
938   public int getNumComputeThreads() {
939     return NUM_COMPUTE_THREADS.get(this);
940   }
941 
942   /**
943    * Set the number of input split threads
944    *
945    * @param numInputSplitsThreads Number of input split threads to use
946    */
947   public void setNumInputSplitsThreads(int numInputSplitsThreads) {
948     NUM_INPUT_THREADS.set(this, numInputSplitsThreads);
949   }
950 
951   public int getNumInputSplitsThreads() {
952     return NUM_INPUT_THREADS.get(this);
953   }
954 
955   public long getInputSplitMaxVertices() {
956     return INPUT_SPLIT_MAX_VERTICES.get(this);
957   }
958 
959   public long getInputSplitMaxEdges() {
960     return INPUT_SPLIT_MAX_EDGES.get(this);
961   }
962 
963   /**
964    * Set whether to use unsafe serialization
965    *
966    * @param useUnsafeSerialization If true, use unsafe serialization
967    */
968   public void useUnsafeSerialization(boolean useUnsafeSerialization) {
969     USE_UNSAFE_SERIALIZATION.set(this, useUnsafeSerialization);
970   }
971 
972   /**
973    * Use message size encoding?  This feature may help with complex message
974    * objects.
975    *
976    * @return Whether to use message size encoding
977    */
978   public boolean useMessageSizeEncoding() {
979     return USE_MESSAGE_SIZE_ENCODING.get(this);
980   }
981 
982   /**
983    * Set the checkpoint frequeuncy of how many supersteps to wait before
984    * checkpointing
985    *
986    * @param checkpointFrequency How often to checkpoint (0 means never)
987    */
988   public void setCheckpointFrequency(int checkpointFrequency) {
989     CHECKPOINT_FREQUENCY.set(this, checkpointFrequency);
990   }
991 
992   /**
993    * Get the checkpoint frequeuncy of how many supersteps to wait
994    * before checkpointing
995    *
996    * @return Checkpoint frequency (0 means never)
997    */
998   public int getCheckpointFrequency() {
999     return CHECKPOINT_FREQUENCY.get(this);
1000   }
1001 
1002   /**
1003    * Check if checkpointing is used
1004    *
1005    * @return True iff checkpointing is used
1006    */
1007   public boolean useCheckpointing() {
1008     return getCheckpointFrequency() != 0;
1009   }
1010 
1011   /**
1012    * Set runtime checkpoint support checker.
1013    * The instance of this class will have to decide whether
1014    * checkpointing is allowed on current superstep.
1015    *
1016    * @param clazz checkpoint supported checker class
1017    */
1018   public void setCheckpointSupportedChecker(
1019       Class<? extends CheckpointSupportedChecker> clazz) {
1020     GiraphConstants.CHECKPOINT_SUPPORTED_CHECKER.set(this, clazz);
1021   }
1022 
1023   /**
1024    * Set the max task attempts
1025    *
1026    * @param maxTaskAttempts Max task attempts to use
1027    */
1028   public void setMaxTaskAttempts(int maxTaskAttempts) {
1029     MAX_TASK_ATTEMPTS.set(this, maxTaskAttempts);
1030   }
1031 
1032   /**
1033    * Get the max task attempts
1034    *
1035    * @return Max task attempts or -1, if not set
1036    */
1037   public int getMaxTaskAttempts() {
1038     return MAX_TASK_ATTEMPTS.get(this);
1039   }
1040 
1041   /**
1042    * Get the number of milliseconds to wait for an event before continuing on
1043    *
1044    * @return Number of milliseconds to wait for an event before continuing on
1045    */
1046   public int getEventWaitMsecs() {
1047     return EVENT_WAIT_MSECS.get(this);
1048   }
1049 
1050   /**
1051    * Set the number of milliseconds to wait for an event before continuing on
1052    *
1053    * @param eventWaitMsecs Number of milliseconds to wait for an event before
1054    *                       continuing on
1055    */
1056   public void setEventWaitMsecs(int eventWaitMsecs) {
1057     EVENT_WAIT_MSECS.set(this, eventWaitMsecs);
1058   }
1059 
1060   /**
1061    * Get the maximum milliseconds to wait before giving up trying to get the
1062    * minimum number of workers before a superstep.
1063    *
1064    * @return Maximum milliseconds to wait before giving up trying to get the
1065    *         minimum number of workers before a superstep
1066    */
1067   public int getMaxMasterSuperstepWaitMsecs() {
1068     return MAX_MASTER_SUPERSTEP_WAIT_MSECS.get(this);
1069   }
1070 
1071   /**
1072    * Set the maximum milliseconds to wait before giving up trying to get the
1073    * minimum number of workers before a superstep.
1074    *
1075    * @param maxMasterSuperstepWaitMsecs Maximum milliseconds to wait before
1076    *                                    giving up trying to get the minimum
1077    *                                    number of workers before a superstep
1078    */
1079   public void setMaxMasterSuperstepWaitMsecs(int maxMasterSuperstepWaitMsecs) {
1080     MAX_MASTER_SUPERSTEP_WAIT_MSECS.set(this, maxMasterSuperstepWaitMsecs);
1081   }
1082 
1083   /**
1084    * Check environment for Hadoop security token location in case we are
1085    * executing the Giraph job on a MRv1 Hadoop cluster.
1086    */
1087   public void configureHadoopSecurity() {
1088     String hadoopTokenFilePath = System.getenv("HADOOP_TOKEN_FILE_LOCATION");
1089     if (hadoopTokenFilePath != null) {
1090       set("mapreduce.job.credentials.binary", hadoopTokenFilePath);
1091     }
1092   }
1093 
1094   /**
1095    * Check if we want to prioritize input splits which reside on the host.
1096    *
1097    * @return True iff we want to use input split locality
1098    */
1099   public boolean useInputSplitLocality() {
1100     return USE_INPUT_SPLIT_LOCALITY.get(this);
1101   }
1102 
1103   /**
1104    * Get the local hostname on the given interface.
1105    *
1106    * @return The local hostname
1107    * @throws UnknownHostException IP address of a host could not be determined
1108    */
1109   public String getLocalHostname() throws UnknownHostException {
1110     return DNS.getDefaultHost(
1111         GiraphConstants.DNS_INTERFACE.get(this),
1112         GiraphConstants.DNS_NAMESERVER.get(this)).toLowerCase();
1113   }
1114 
1115   /**
1116    * Return local host name by default. Or local host IP if preferIP
1117    * option is set.
1118    * @return local host name or IP
1119    * @throws UnknownHostException IP address of a host could not be determined
1120    */
1121   public String getLocalHostOrIp() throws UnknownHostException {
1122     if (GiraphConstants.PREFER_IP_ADDRESSES.get(this)) {
1123       return InetAddress.getLocalHost().getHostAddress();
1124     }
1125     return getLocalHostname();
1126   }
1127 
1128   /**
1129    * Set the maximum number of supersteps of this application.  After this
1130    * many supersteps are executed, the application will shutdown.
1131    *
1132    * @param maxNumberOfSupersteps Maximum number of supersteps
1133    */
1134   public void setMaxNumberOfSupersteps(int maxNumberOfSupersteps) {
1135     MAX_NUMBER_OF_SUPERSTEPS.set(this, maxNumberOfSupersteps);
1136   }
1137 
1138   /**
1139    * Get the maximum number of supersteps of this application.  After this
1140    * many supersteps are executed, the application will shutdown.
1141    *
1142    * @return Maximum number of supersteps
1143    */
1144   public int getMaxNumberOfSupersteps() {
1145     return MAX_NUMBER_OF_SUPERSTEPS.get(this);
1146   }
1147 
1148   /**
1149    * Get the output directory to write YourKit snapshots to
1150    *
1151    * @param context Map context
1152    * @return output directory
1153    */
1154   public String getYourKitOutputDir(Mapper.Context context) {
1155     final String cacheKey = "giraph.yourkit.outputDirCached";
1156     String outputDir = get(cacheKey);
1157     if (outputDir == null) {
1158       outputDir = getStringVars(YOURKIT_OUTPUT_DIR, YOURKIT_OUTPUT_DIR_DEFAULT,
1159           context);
1160       set(cacheKey, outputDir);
1161     }
1162     return outputDir;
1163   }
1164 
1165   /**
1166    * Get string, replacing variables in the output.
1167    *
1168    * %JOB_ID% =&gt; job id
1169    * %TASK_ID% =&gt; task id
1170    * %USER% =&gt; owning user name
1171    *
1172    * @param key name of key to lookup
1173    * @param context mapper context
1174    * @return value for key, with variables expanded
1175    */
1176   public String getStringVars(String key, Mapper.Context context) {
1177     return getStringVars(key, null, context);
1178   }
1179 
1180   /**
1181    * Get string, replacing variables in the output.
1182    *
1183    * %JOB_ID% =&gt; job id
1184    * %TASK_ID% =&gt; task id
1185    * %USER% =&gt; owning user name
1186    *
1187    * @param key name of key to lookup
1188    * @param defaultValue value to return if no mapping exists. This can also
1189    *                     have variables, which will be substituted.
1190    * @param context mapper context
1191    * @return value for key, with variables expanded
1192    */
1193   public String getStringVars(String key, String defaultValue,
1194                               Mapper.Context context) {
1195     String value = get(key);
1196     if (value == null) {
1197       if (defaultValue == null) {
1198         return null;
1199       }
1200       value = defaultValue;
1201     }
1202     value = value.replace("%JOB_ID%", context.getJobID().toString());
1203     value = value.replace("%TASK_ID%", context.getTaskAttemptID().toString());
1204     value = value.replace("%USER%", get("user.name", "unknown_user"));
1205     return value;
1206   }
1207 
1208   /**
1209    * Get option whether to create a source vertex present only in edge input
1210    *
1211    * @return CREATE_EDGE_SOURCE_VERTICES option
1212    */
1213   public boolean getCreateSourceVertex() {
1214     return CREATE_EDGE_SOURCE_VERTICES.get(this);
1215   }
1216 
1217   /**
1218    * set option whether to create a source vertex present only in edge input
1219    * @param createVertex create source vertex option
1220    */
1221   public void setCreateSourceVertex(boolean createVertex) {
1222     CREATE_EDGE_SOURCE_VERTICES.set(this, createVertex);
1223   }
1224 
1225   /**
1226    * Get the maximum timeout (in milliseconds) for waiting for all tasks
1227    * to complete after the job is done.
1228    *
1229    * @return Wait task done timeout in milliseconds.
1230    */
1231   public int getWaitTaskDoneTimeoutMs() {
1232     return WAIT_TASK_DONE_TIMEOUT_MS.get(this);
1233   }
1234 
1235   /**
1236    * Set the maximum timeout (in milliseconds) for waiting for all tasks
1237    * to complete after the job is done.
1238    *
1239    * @param ms Milliseconds to set
1240    */
1241   public void setWaitTaskDoneTimeoutMs(int ms) {
1242     WAIT_TASK_DONE_TIMEOUT_MS.set(this, ms);
1243   }
1244 
1245   /**
1246    * Check whether to track job progress on client or not
1247    *
1248    * @return True if job progress should be tracked on client
1249    */
1250   public boolean trackJobProgressOnClient() {
1251     return TRACK_JOB_PROGRESS_ON_CLIENT.get(this);
1252   }
1253 
1254   /**
1255    * @return Number of retries when creating an HDFS file before failing.
1256    */
1257   public int getHdfsFileCreationRetries() {
1258     return HDFS_FILE_CREATION_RETRIES.get(this);
1259   }
1260 
1261   /**
1262    * @return Milliseconds to wait before retrying an HDFS file creation
1263    *         operation.
1264    */
1265   public int getHdfsFileCreationRetryWaitMs() {
1266     return HDFS_FILE_CREATION_RETRY_WAIT_MS.get(this);
1267   }
1268 }