View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.conf;
20  
21  import io.netty.buffer.ByteBufAllocator;
22  import io.netty.buffer.PooledByteBufAllocator;
23  import io.netty.buffer.UnpooledByteBufAllocator;
24  
25  import java.net.InetAddress;
26  import java.net.UnknownHostException;
27  
28  import org.apache.giraph.aggregators.AggregatorWriter;
29  import org.apache.giraph.bsp.checkpoints.CheckpointSupportedChecker;
30  import org.apache.giraph.combiner.MessageCombiner;
31  import org.apache.giraph.edge.OutEdges;
32  import org.apache.giraph.edge.ReuseObjectsOutEdges;
33  import org.apache.giraph.factories.ComputationFactory;
34  import org.apache.giraph.factories.VertexValueFactory;
35  import org.apache.giraph.graph.Computation;
36  import org.apache.giraph.graph.MapperObserver;
37  import org.apache.giraph.graph.Vertex;
38  import org.apache.giraph.graph.VertexResolver;
39  import org.apache.giraph.graph.VertexValueCombiner;
40  import org.apache.giraph.io.EdgeInputFormat;
41  import org.apache.giraph.io.EdgeOutputFormat;
42  import org.apache.giraph.io.MappingInputFormat;
43  import org.apache.giraph.io.VertexInputFormat;
44  import org.apache.giraph.io.VertexOutputFormat;
45  import org.apache.giraph.io.filters.EdgeInputFilter;
46  import org.apache.giraph.io.filters.VertexInputFilter;
47  import org.apache.giraph.job.GiraphJobObserver;
48  import org.apache.giraph.job.GiraphJobRetryChecker;
49  import org.apache.giraph.master.MasterCompute;
50  import org.apache.giraph.master.MasterObserver;
51  import org.apache.giraph.partition.GraphPartitionerFactory;
52  import org.apache.giraph.partition.Partition;
53  import org.apache.giraph.partition.ReusesObjectsPartition;
54  import org.apache.giraph.utils.GcObserver;
55  import org.apache.giraph.utils.ReflectionUtils;
56  import org.apache.giraph.worker.WorkerContext;
57  import org.apache.giraph.worker.WorkerObserver;
58  import org.apache.hadoop.conf.Configuration;
59  import org.apache.hadoop.mapreduce.Mapper;
60  import org.apache.hadoop.net.DNS;
61  
62  /**
63   * Adds user methods specific to Giraph.  This will be put into an
64   * ImmutableClassesGiraphConfiguration that provides the configuration plus
65   * the immutable classes.
66   *
67   * Keeps track of parameters which were set so it easily set them in another
68   * copy of configuration.
69   */
70  public class GiraphConfiguration extends Configuration
71      implements GiraphConstants {
72    /** ByteBufAllocator to be used by netty */
73    private ByteBufAllocator nettyBufferAllocator = null;
74  
75    /**
76     * Constructor that creates the configuration
77     */
78    public GiraphConfiguration() {
79      configureHadoopSecurity();
80    }
81  
82    /**
83     * Constructor.
84     *
85     * @param conf Configuration
86     */
87    public GiraphConfiguration(Configuration conf) {
88      super(conf);
89      configureHadoopSecurity();
90    }
91  
92    /**
93     * Get name of computation being run. We leave this up to the
94     * {@link ComputationFactory} to decide what to return.
95     *
96     * @return Name of computation being run
97     */
98    public String getComputationName() {
99      ComputationFactory compFactory = ReflectionUtils.newInstance(
100         getComputationFactoryClass());
101     return compFactory.computationName(this);
102   }
103 
104   /**
105    * Get the user's subclassed {@link ComputationFactory}
106    *
107    * @return User's computation factory class
108    */
109   public Class<? extends ComputationFactory> getComputationFactoryClass() {
110     return COMPUTATION_FACTORY_CLASS.get(this);
111   }
112 
113   /**
114    * Get the user's subclassed {@link Computation}
115    *
116    * @return User's computation class
117    */
118   public Class<? extends Computation> getComputationClass() {
119     return COMPUTATION_CLASS.get(this);
120   }
121 
122   /**
123    * Set the computation class (required)
124    *
125    * @param computationClass Runs vertex computation
126    */
127   public void setComputationClass(
128       Class<? extends Computation> computationClass) {
129     COMPUTATION_CLASS.set(this, computationClass);
130   }
131 
132   /**
133    * Set the vertex value factory class
134    *
135    * @param vertexValueFactoryClass Creates default vertex values
136    */
137   public final void setVertexValueFactoryClass(
138       Class<? extends VertexValueFactory> vertexValueFactoryClass) {
139     VERTEX_VALUE_FACTORY_CLASS.set(this, vertexValueFactoryClass);
140   }
141 
142   /**
143    * Set the edge input filter class
144    *
145    * @param edgeFilterClass class to use
146    */
147   public void setEdgeInputFilterClass(
148       Class<? extends EdgeInputFilter> edgeFilterClass) {
149     EDGE_INPUT_FILTER_CLASS.set(this, edgeFilterClass);
150   }
151 
152   /**
153    * Set the vertex input filter class
154    *
155    * @param vertexFilterClass class to use
156    */
157   public void setVertexInputFilterClass(
158       Class<? extends VertexInputFilter> vertexFilterClass) {
159     VERTEX_INPUT_FILTER_CLASS.set(this, vertexFilterClass);
160   }
161 
162   /**
163    * Get the vertex edges class
164    *
165    * @return vertex edges class
166    */
167   public Class<? extends OutEdges> getOutEdgesClass() {
168     return VERTEX_EDGES_CLASS.get(this);
169   }
170 
171   /**
172    * Set the vertex edges class
173    *
174    * @param outEdgesClass Determines the way edges are stored
175    */
176   public final void setOutEdgesClass(
177       Class<? extends OutEdges> outEdgesClass) {
178     VERTEX_EDGES_CLASS.set(this, outEdgesClass);
179   }
180 
181   /**
182    * Set the vertex implementation class
183    *
184    * @param vertexClass class of the vertex implementation
185    */
186   public final void setVertexClass(Class<? extends Vertex> vertexClass) {
187     VERTEX_CLASS.set(this, vertexClass);
188   }
189 
190 
191   /**
192    * Set the vertex edges class used during edge-based input (if different
193    * from the one used during computation)
194    *
195    * @param inputOutEdgesClass Determines the way edges are stored
196    */
197   public final void setInputOutEdgesClass(
198       Class<? extends OutEdges> inputOutEdgesClass) {
199     INPUT_VERTEX_EDGES_CLASS.set(this, inputOutEdgesClass);
200   }
201 
202   /**
203    * True if the {@link org.apache.giraph.edge.OutEdges} implementation
204    * copies the passed edges to its own data structure,
205    * i.e. it doesn't keep references to Edge objects, target vertex ids or edge
206    * values passed to add() or initialize().
207    * This makes it possible to reuse edge objects passed to the data
208    * structure, to minimize object instantiation (see for example
209    * EdgeStore#addPartitionEdges()).
210    *
211    * @return True iff we can reuse the edge objects
212    */
213   public boolean reuseEdgeObjects() {
214     return ReuseObjectsOutEdges.class.isAssignableFrom(
215         getOutEdgesClass());
216   }
217 
218   /**
219    * True if the {@link Partition} implementation copies the passed vertices
220    * to its own data structure, i.e. it doesn't keep references to Vertex
221    * objects passed to it.
222    * This makes it possible to reuse vertex objects passed to the data
223    * structure, to minimize object instantiation.
224    *
225    * @return True iff we can reuse the vertex objects
226    */
227   public boolean reuseVertexObjects() {
228     return ReusesObjectsPartition.class.isAssignableFrom(getPartitionClass());
229   }
230 
231   /**
232    * Get Partition class used
233    * @return Partition class
234    */
235   public Class<? extends Partition> getPartitionClass() {
236     return PARTITION_CLASS.get(this);
237   }
238 
239   /**
240    * Does the job have a {@link VertexInputFormat}?
241    *
242    * @return True iff a {@link VertexInputFormat} has been specified.
243    */
244   public boolean hasVertexInputFormat() {
245     return VERTEX_INPUT_FORMAT_CLASS.get(this) != null;
246   }
247 
248   /**
249    * Set the vertex input format class (required)
250    *
251    * @param vertexInputFormatClass Determines how graph is input
252    */
253   public void setVertexInputFormatClass(
254       Class<? extends VertexInputFormat> vertexInputFormatClass) {
255     VERTEX_INPUT_FORMAT_CLASS.set(this, vertexInputFormatClass);
256   }
257 
258   /**
259    * Does the job have a {@link EdgeInputFormat}?
260    *
261    * @return True iff a {@link EdgeInputFormat} has been specified.
262    */
263   public boolean hasEdgeInputFormat() {
264     return EDGE_INPUT_FORMAT_CLASS.get(this) != null;
265   }
266 
267   /**
268    * Set the edge input format class (required)
269    *
270    * @param edgeInputFormatClass Determines how graph is input
271    */
272   public void setEdgeInputFormatClass(
273       Class<? extends EdgeInputFormat> edgeInputFormatClass) {
274     EDGE_INPUT_FORMAT_CLASS.set(this, edgeInputFormatClass);
275   }
276 
277   /**
278    * Set the mapping input format class (optional)
279    *
280    * @param mappingInputFormatClass Determines how mappings are input
281    */
282   public void setMappingInputFormatClass(
283     Class<? extends MappingInputFormat> mappingInputFormatClass) {
284     MAPPING_INPUT_FORMAT_CLASS.set(this, mappingInputFormatClass);
285   }
286 
287   /**
288    * Set the master class (optional)
289    *
290    * @param masterComputeClass Runs master computation
291    */
292   public final void setMasterComputeClass(
293       Class<? extends MasterCompute> masterComputeClass) {
294     MASTER_COMPUTE_CLASS.set(this, masterComputeClass);
295   }
296 
297   /**
298    * Add a MasterObserver class (optional)
299    *
300    * @param masterObserverClass MasterObserver class to add.
301    */
302   public final void addMasterObserverClass(
303       Class<? extends MasterObserver> masterObserverClass) {
304     MASTER_OBSERVER_CLASSES.add(this, masterObserverClass);
305   }
306 
307   /**
308    * Add a WorkerObserver class (optional)
309    *
310    * @param workerObserverClass WorkerObserver class to add.
311    */
312   public final void addWorkerObserverClass(
313       Class<? extends WorkerObserver> workerObserverClass) {
314     WORKER_OBSERVER_CLASSES.add(this, workerObserverClass);
315   }
316 
317   /**
318    * Add a MapperObserver class (optional)
319    *
320    * @param mapperObserverClass MapperObserver class to add.
321    */
322   public final void addMapperObserverClass(
323       Class<? extends MapperObserver> mapperObserverClass) {
324     MAPPER_OBSERVER_CLASSES.add(this, mapperObserverClass);
325   }
326 
327   /**
328    * Add a GcObserver class (optional)
329    *
330    * @param gcObserverClass GcObserver class to add.
331    */
332   public final void addGcObserverClass(
333       Class<? extends GcObserver> gcObserverClass) {
334     GC_OBSERVER_CLASSES.add(this, gcObserverClass);
335   }
336 
337   /**
338    * Get job observer class
339    *
340    * @return GiraphJobObserver class set.
341    */
342   public Class<? extends GiraphJobObserver> getJobObserverClass() {
343     return JOB_OBSERVER_CLASS.get(this);
344   }
345 
346   /**
347    * Set job observer class
348    *
349    * @param klass GiraphJobObserver class to set.
350    */
351   public void setJobObserverClass(Class<? extends GiraphJobObserver> klass) {
352     JOB_OBSERVER_CLASS.set(this, klass);
353   }
354 
355   /**
356    * Get job retry checker class
357    *
358    * @return GiraphJobRetryChecker class set.
359    */
360   public Class<? extends GiraphJobRetryChecker> getJobRetryCheckerClass() {
361     return JOB_RETRY_CHECKER_CLASS.get(this);
362   }
363 
364   /**
365    * Set job retry checker class
366    *
367    * @param klass GiraphJobRetryChecker class to set.
368    */
369   public void setJobRetryCheckerClass(
370       Class<? extends GiraphJobRetryChecker> klass) {
371     JOB_RETRY_CHECKER_CLASS.set(this, klass);
372   }
373 
374   /**
375    * Check whether to enable jmap dumping thread.
376    *
377    * @return true if jmap dumper is enabled.
378    */
379   public boolean isJMapHistogramDumpEnabled() {
380     return JMAP_ENABLE.get(this);
381   }
382 
383   /**
384    * Check whether to enable heap memory supervisor thread
385    *
386    * @return true if jmap dumper is reactively enabled
387    */
388   public boolean isReactiveJmapHistogramDumpEnabled() {
389     return REACTIVE_JMAP_ENABLE.get(this);
390   }
391 
392   /**
393    * Set mapping from a key name to a list of classes.
394    *
395    * @param name String key name to use.
396    * @param xface interface of the classes being set.
397    * @param klasses Classes to set.
398    */
399   public final void setClasses(String name, Class<?> xface,
400                                Class<?> ... klasses) {
401     String[] klassNames = new String[klasses.length];
402     for (int i = 0; i < klasses.length; ++i) {
403       Class<?> klass = klasses[i];
404       if (!xface.isAssignableFrom(klass)) {
405         throw new RuntimeException(klass + " does not implement " +
406             xface.getName());
407       }
408       klassNames[i] = klasses[i].getName();
409     }
410     setStrings(name, klassNames);
411   }
412 
413   /**
414    * Does the job have a {@link VertexOutputFormat}?
415    *
416    * @return True iff a {@link VertexOutputFormat} has been specified.
417    */
418   public boolean hasVertexOutputFormat() {
419     return VERTEX_OUTPUT_FORMAT_CLASS.get(this) != null;
420   }
421 
422   /**
423    * Set the vertex output format class (optional)
424    *
425    * @param vertexOutputFormatClass Determines how graph is output
426    */
427   public final void setVertexOutputFormatClass(
428       Class<? extends VertexOutputFormat> vertexOutputFormatClass) {
429     VERTEX_OUTPUT_FORMAT_CLASS.set(this, vertexOutputFormatClass);
430   }
431 
432 
433   /**
434    * Does the job have a {@link EdgeOutputFormat} subdir?
435    *
436    * @return True iff a {@link EdgeOutputFormat} subdir has been specified.
437    */
438   public boolean hasVertexOutputFormatSubdir() {
439     return !VERTEX_OUTPUT_FORMAT_SUBDIR.get(this).isEmpty();
440   }
441 
442   /**
443    * Set the vertex output format path
444    *
445    * @param path path where the verteces will be written
446    */
447   public final void setVertexOutputFormatSubdir(String path) {
448     VERTEX_OUTPUT_FORMAT_SUBDIR.set(this, path);
449   }
450 
451   /**
452    * Check if output should be done during computation
453    *
454    * @return True iff output should be done during computation
455    */
456   public final boolean doOutputDuringComputation() {
457     return DO_OUTPUT_DURING_COMPUTATION.get(this);
458   }
459 
460   /**
461    * Set whether or not we should do output during computation
462    *
463    * @param doOutputDuringComputation True iff we want output to happen
464    *                                  during computation
465    */
466   public final void setDoOutputDuringComputation(
467       boolean doOutputDuringComputation) {
468     DO_OUTPUT_DURING_COMPUTATION.set(this, doOutputDuringComputation);
469   }
470 
471   /**
472    * Check if VertexOutputFormat is thread-safe
473    *
474    * @return True iff VertexOutputFormat is thread-safe
475    */
476   public final boolean vertexOutputFormatThreadSafe() {
477     return VERTEX_OUTPUT_FORMAT_THREAD_SAFE.get(this);
478   }
479 
480   /**
481    * Set whether or not selected VertexOutputFormat is thread-safe
482    *
483    * @param vertexOutputFormatThreadSafe True iff selected VertexOutputFormat
484    *                                     is thread-safe
485    */
486   public final void setVertexOutputFormatThreadSafe(
487       boolean vertexOutputFormatThreadSafe) {
488     VERTEX_OUTPUT_FORMAT_THREAD_SAFE.set(this, vertexOutputFormatThreadSafe);
489   }
490 
491   /**
492    * Does the job have a {@link EdgeOutputFormat}?
493    *
494    * @return True iff a {@link EdgeOutputFormat} has been specified.
495    */
496   public boolean hasEdgeOutputFormat() {
497     return EDGE_OUTPUT_FORMAT_CLASS.get(this) != null;
498   }
499 
500   /**
501    * Set the edge output format class (optional)
502    *
503    * @param edgeOutputFormatClass Determines how graph is output
504    */
505   public final void setEdgeOutputFormatClass(
506       Class<? extends EdgeOutputFormat> edgeOutputFormatClass) {
507     EDGE_OUTPUT_FORMAT_CLASS.set(this, edgeOutputFormatClass);
508   }
509 
510   /**
511    * Does the job have a {@link EdgeOutputFormat} subdir?
512    *
513    * @return True iff a {@link EdgeOutputFormat} subdir has been specified.
514    */
515   public boolean hasEdgeOutputFormatSubdir() {
516     return !EDGE_OUTPUT_FORMAT_SUBDIR.get(this).isEmpty();
517   }
518 
519   /**
520    * Set the edge output format path
521    *
522    * @param path path where the edges will be written
523    */
524   public final void setEdgeOutputFormatSubdir(String path) {
525     EDGE_OUTPUT_FORMAT_SUBDIR.set(this, path);
526   }
527 
528   /**
529    * Get the number of threads to use for writing output in the end of the
530    * application. If output format is not thread safe, returns 1.
531    *
532    * @return Number of output threads
533    */
534   public final int getNumOutputThreads() {
535     if (!vertexOutputFormatThreadSafe()) {
536       return 1;
537     } else {
538       return NUM_OUTPUT_THREADS.get(this);
539     }
540   }
541 
542   /**
543    * Set the number of threads to use for writing output in the end of the
544    * application. Will be used only if {#vertexOutputFormatThreadSafe} is true.
545    *
546    * @param numOutputThreads Number of output threads
547    */
548   public void setNumOutputThreads(int numOutputThreads) {
549     NUM_OUTPUT_THREADS.set(this, numOutputThreads);
550   }
551 
552   /**
553    * Set the message combiner class (optional)
554    *
555    * @param messageCombinerClass Determines how vertex messages are combined
556    */
557   public void setMessageCombinerClass(
558       Class<? extends MessageCombiner> messageCombinerClass) {
559     MESSAGE_COMBINER_CLASS.set(this, messageCombinerClass);
560   }
561 
562   /**
563    * Set the graph partitioner class (optional)
564    *
565    * @param graphPartitionerFactoryClass Determines how the graph is partitioned
566    */
567   public final void setGraphPartitionerFactoryClass(
568       Class<? extends GraphPartitionerFactory> graphPartitionerFactoryClass) {
569     GRAPH_PARTITIONER_FACTORY_CLASS.set(this, graphPartitionerFactoryClass);
570   }
571 
572   /**
573    * Set the vertex resolver class (optional)
574    *
575    * @param vertexResolverClass Determines how vertex mutations are resolved
576    */
577   public final void setVertexResolverClass(
578       Class<? extends VertexResolver> vertexResolverClass) {
579     VERTEX_RESOLVER_CLASS.set(this, vertexResolverClass);
580   }
581 
582   /**
583    * Whether to create a vertex that doesn't exist when it receives messages.
584    * This only affects DefaultVertexResolver.
585    *
586    * @return true if we should create non existent vertices that get messages.
587    */
588   public final boolean getResolverCreateVertexOnMessages() {
589     return RESOLVER_CREATE_VERTEX_ON_MSGS.get(this);
590   }
591 
592   /**
593    * Set whether to create non existent vertices when they receive messages.
594    *
595    * @param v true if we should create vertices when they get messages.
596    */
597   public final void setResolverCreateVertexOnMessages(boolean v) {
598     RESOLVER_CREATE_VERTEX_ON_MSGS.set(this, v);
599   }
600 
601   /**
602    * Set the vertex value combiner class (optional)
603    *
604    * @param vertexValueCombinerClass Determines how vertices are combined
605    */
606   public final void setVertexValueCombinerClass(
607       Class<? extends VertexValueCombiner> vertexValueCombinerClass) {
608     VERTEX_VALUE_COMBINER_CLASS.set(this, vertexValueCombinerClass);
609   }
610 
611   /**
612    * Set the worker context class (optional)
613    *
614    * @param workerContextClass Determines what code is executed on a each
615    *        worker before and after each superstep and computation
616    */
617   public final void setWorkerContextClass(
618       Class<? extends WorkerContext> workerContextClass) {
619     WORKER_CONTEXT_CLASS.set(this, workerContextClass);
620   }
621 
622   /**
623    * Set the aggregator writer class (optional)
624    *
625    * @param aggregatorWriterClass Determines how the aggregators are
626    *        written to file at the end of the job
627    */
628   public final void setAggregatorWriterClass(
629       Class<? extends AggregatorWriter> aggregatorWriterClass) {
630     AGGREGATOR_WRITER_CLASS.set(this, aggregatorWriterClass);
631   }
632 
633   /**
634    * Set the partition class (optional)
635    *
636    * @param partitionClass Determines the why partitions are stored
637    */
638   public final void setPartitionClass(
639       Class<? extends Partition> partitionClass) {
640     PARTITION_CLASS.set(this, partitionClass);
641   }
642 
643   /**
644    * Set worker configuration for determining what is required for
645    * a superstep.
646    *
647    * @param minWorkers Minimum workers to do a superstep
648    * @param maxWorkers Maximum workers to do a superstep
649    *        (max map tasks in job)
650    * @param minPercentResponded 0 - 100 % of the workers required to
651    *        have responded before continuing the superstep
652    */
653   public final void setWorkerConfiguration(int minWorkers,
654                                            int maxWorkers,
655                                            float minPercentResponded) {
656     setInt(MIN_WORKERS, minWorkers);
657     setInt(MAX_WORKERS, maxWorkers);
658     MIN_PERCENT_RESPONDED.set(this, minPercentResponded);
659   }
660 
661   public final int getMinWorkers() {
662     return getInt(MIN_WORKERS, -1);
663   }
664 
665   public final int getMaxWorkers() {
666     return getInt(MAX_WORKERS, -1);
667   }
668 
669   public final float getMinPercentResponded() {
670     return MIN_PERCENT_RESPONDED.get(this);
671   }
672 
673   /**
674    * How many mappers is job asking for, taking into account whether master
675    * is running on the same mapper as worker or not
676    *
677    * @return How many mappers is job asking for
678    */
679   public final int getMaxMappers() {
680     return getMaxWorkers() + (SPLIT_MASTER_WORKER.get(this) ? 1 : 0);
681   }
682 
683   /**
684    * Utilize an existing ZooKeeper service.  If this is not set, ZooKeeper
685    * will be dynamically started by Giraph for this job.
686    *
687    * @param serverList Comma separated list of servers and ports
688    *        (i.e. zk1:2221,zk2:2221)
689    */
690   public final void setZooKeeperConfiguration(String serverList) {
691     ZOOKEEPER_LIST.set(this, serverList);
692   }
693 
694   /**
695    * Getter for SPLIT_MASTER_WORKER flag.
696    *
697    * @return boolean flag value.
698    */
699   public final boolean getSplitMasterWorker() {
700     return SPLIT_MASTER_WORKER.get(this);
701   }
702 
703   /**
704    * Get array of MasterObserver classes set in the configuration.
705    *
706    * @return array of MasterObserver classes.
707    */
708   public Class<? extends MasterObserver>[] getMasterObserverClasses() {
709     return MASTER_OBSERVER_CLASSES.getArray(this);
710   }
711 
712   /**
713    * Get array of WorkerObserver classes set in configuration.
714    *
715    * @return array of WorkerObserver classes.
716    */
717   public Class<? extends WorkerObserver>[] getWorkerObserverClasses() {
718     return WORKER_OBSERVER_CLASSES.getArray(this);
719   }
720 
721   /**
722    * Get array of MapperObserver classes set in configuration.
723    *
724    * @return array of MapperObserver classes.
725    */
726   public Class<? extends MapperObserver>[] getMapperObserverClasses() {
727     return MAPPER_OBSERVER_CLASSES.getArray(this);
728   }
729 
730   /**
731    * Get array of GcObserver classes set in configuration.
732    *
733    * @return array of GcObserver classes.
734    */
735   public Class<? extends GcObserver>[] getGcObserverClasses() {
736     return GC_OBSERVER_CLASSES.getArray(this);
737   }
738 
739   /**
740    * Whether to track, print, and aggregate metrics.
741    *
742    * @return true if metrics are enabled, false otherwise (default)
743    */
744   public boolean metricsEnabled() {
745     return METRICS_ENABLE.isTrue(this);
746   }
747 
748   /**
749    * Get the task partition
750    *
751    * @return The task partition or -1 if not set
752    */
753   public int getTaskPartition() {
754     return getInt("mapred.task.partition", -1);
755   }
756 
757   /**
758    * Is this a "pure YARN" Giraph job, or is a MapReduce layer (v1 or v2)
759    * actually managing our cluster nodes, i.e. each task is a Mapper.
760    *
761    * @return TRUE if this is a pure YARN job.
762    */
763   public boolean isPureYarnJob() {
764     return IS_PURE_YARN_JOB.get(this);
765   }
766 
767   /**
768    * Jars required in "Pure YARN" jobs (names only, no paths) should
769    * be listed here in full, including Giraph framework jar(s).
770    *
771    * @return the comma-separated list of jar names for export to cluster.
772    */
773   public String getYarnLibJars() {
774     return GIRAPH_YARN_LIBJARS.get(this);
775   }
776 
777   /**
778    * Populate jar list for Pure YARN jobs.
779    *
780    * @param jarList a comma-separated list of jar names
781    */
782   public void setYarnLibJars(String jarList) {
783     GIRAPH_YARN_LIBJARS.set(this, jarList);
784   }
785 
786   /**
787    * Get heap size (in MB) for each task in our Giraph job run,
788    * assuming this job will run on the "pure YARN" profile.
789    *
790    * @return the heap size for all tasks, in MB
791    */
792   public int getYarnTaskHeapMb() {
793     return GIRAPH_YARN_TASK_HEAP_MB.get(this);
794   }
795 
796   /**
797    * Set heap size for Giraph tasks in our job run, assuming
798    * the job will run on the "pure YARN" profile.
799    *
800    * @param heapMb the heap size for all tasks
801    */
802   public void setYarnTaskHeapMb(int heapMb) {
803     GIRAPH_YARN_TASK_HEAP_MB.set(this, heapMb);
804   }
805 
806   /**
807    * Get the ZooKeeper list.
808    *
809    * @return ZooKeeper list of strings, comma separated or null if none set.
810    */
811   public String getZookeeperList() {
812     return ZOOKEEPER_LIST.get(this);
813   }
814 
815   /**
816    * Set the ZooKeeper list to the provided list. This method is used when the
817    * ZooKeeper is started internally and will set the zkIsExternal option to
818    * false as well.
819    *
820    * @param zkList list of strings, comma separated of zookeeper servers
821    */
822   public void setZookeeperList(String zkList) {
823     ZOOKEEPER_LIST.set(this, zkList);
824     ZOOKEEPER_IS_EXTERNAL.set(this, false);
825   }
826 
827   /**
828    * Was ZooKeeper provided externally?
829    *
830    * @return true iff was zookeeper is external
831    */
832   public boolean isZookeeperExternal() {
833     return ZOOKEEPER_IS_EXTERNAL.get(this);
834   }
835 
836   public String getLocalLevel() {
837     return LOG_LEVEL.get(this);
838   }
839 
840   /**
841    * Use the log thread layout option?
842    *
843    * @return True if use the log thread layout option, false otherwise
844    */
845   public boolean useLogThreadLayout() {
846     return LOG_THREAD_LAYOUT.get(this);
847   }
848 
849   /**
850    * is this job run a local test?
851    *
852    * @return the test status as recorded in the Configuration
853    */
854   public boolean getLocalTestMode() {
855     return LOCAL_TEST_MODE.get(this);
856   }
857 
858   /**
859    * Flag this job as a local test run.
860    *
861    * @param flag the test status for this job
862    */
863   public void setLocalTestMode(boolean flag) {
864     LOCAL_TEST_MODE.set(this, flag);
865   }
866 
867   public int getZooKeeperSessionTimeout() {
868     return ZOOKEEPER_SESSION_TIMEOUT.get(this);
869   }
870 
871   public int getZookeeperOpsMaxAttempts() {
872     return ZOOKEEPER_OPS_MAX_ATTEMPTS.get(this);
873   }
874 
875   public int getZookeeperOpsRetryWaitMsecs() {
876     return ZOOKEEPER_OPS_RETRY_WAIT_MSECS.get(this);
877   }
878 
879   public boolean getNettyServerUseExecutionHandler() {
880     return NETTY_SERVER_USE_EXECUTION_HANDLER.get(this);
881   }
882 
883   public int getNettyServerThreads() {
884     return NETTY_SERVER_THREADS.get(this);
885   }
886 
887   public int getNettyServerExecutionThreads() {
888     return NETTY_SERVER_EXECUTION_THREADS.get(this);
889   }
890 
891   /**
892    * Get the netty server execution concurrency.  This depends on whether the
893    * netty server execution handler exists.
894    *
895    * @return Server concurrency
896    */
897   public int getNettyServerExecutionConcurrency() {
898     if (getNettyServerUseExecutionHandler()) {
899       return getNettyServerExecutionThreads();
900     } else {
901       return getNettyServerThreads();
902     }
903   }
904 
905   /**
906    * Used by netty client and server to create ByteBufAllocator
907    *
908    * @return ByteBufAllocator
909    */
910   public ByteBufAllocator getNettyAllocator() {
911     if (nettyBufferAllocator == null) {
912       if (NETTY_USE_POOLED_ALLOCATOR.get(this)) { // Use pooled allocator
913         nettyBufferAllocator = new PooledByteBufAllocator(
914           NETTY_USE_DIRECT_MEMORY.get(this));
915       } else { // Use un-pooled allocator
916         // Note: Current default settings create un-pooled heap allocator
917         nettyBufferAllocator = new UnpooledByteBufAllocator(
918             NETTY_USE_DIRECT_MEMORY.get(this));
919       }
920     }
921     return nettyBufferAllocator;
922   }
923 
924   public int getZookeeperConnectionAttempts() {
925     return ZOOKEEPER_CONNECTION_ATTEMPTS.get(this);
926   }
927 
928   public int getZooKeeperMinSessionTimeout() {
929     return ZOOKEEPER_MIN_SESSION_TIMEOUT.get(this);
930   }
931 
932   public int getZooKeeperMaxSessionTimeout() {
933     return ZOOKEEPER_MAX_SESSION_TIMEOUT.get(this);
934   }
935 
936   /**
937    * Get the number of map tasks in this job
938    *
939    * @return Number of map tasks in this job
940    */
941   public int getMapTasks() {
942     int mapTasks = getInt("mapred.map.tasks", -1);
943     if (mapTasks == -1) {
944       throw new IllegalStateException("getMapTasks: Failed to get the map " +
945           "tasks!");
946     }
947     return mapTasks;
948   }
949 
950   /**
951    * Use authentication? (if supported)
952    *
953    * @return True if should authenticate, false otherwise
954    */
955   public boolean authenticate() {
956     return AUTHENTICATE.get(this);
957   }
958 
959   /**
960    * Set the number of compute threads
961    *
962    * @param numComputeThreads Number of compute threads to use
963    */
964   public void setNumComputeThreads(int numComputeThreads) {
965     NUM_COMPUTE_THREADS.set(this, numComputeThreads);
966   }
967 
968   public int getNumComputeThreads() {
969     return NUM_COMPUTE_THREADS.get(this);
970   }
971 
972   /**
973    * Set the number of input split threads
974    *
975    * @param numInputSplitsThreads Number of input split threads to use
976    */
977   public void setNumInputSplitsThreads(int numInputSplitsThreads) {
978     NUM_INPUT_THREADS.set(this, numInputSplitsThreads);
979   }
980 
981   public int getNumInputSplitsThreads() {
982     return NUM_INPUT_THREADS.get(this);
983   }
984 
985   public long getInputSplitMaxVertices() {
986     return INPUT_SPLIT_MAX_VERTICES.get(this);
987   }
988 
989   public long getInputSplitMaxEdges() {
990     return INPUT_SPLIT_MAX_EDGES.get(this);
991   }
992 
993   /**
994    * Set whether to use unsafe serialization
995    *
996    * @param useUnsafeSerialization If true, use unsafe serialization
997    */
998   public void useUnsafeSerialization(boolean useUnsafeSerialization) {
999     USE_UNSAFE_SERIALIZATION.set(this, useUnsafeSerialization);
1000   }
1001 
1002   /**
1003    * Set the checkpoint frequeuncy of how many supersteps to wait before
1004    * checkpointing
1005    *
1006    * @param checkpointFrequency How often to checkpoint (0 means never)
1007    */
1008   public void setCheckpointFrequency(int checkpointFrequency) {
1009     CHECKPOINT_FREQUENCY.set(this, checkpointFrequency);
1010   }
1011 
1012   /**
1013    * Get the checkpoint frequeuncy of how many supersteps to wait
1014    * before checkpointing
1015    *
1016    * @return Checkpoint frequency (0 means never)
1017    */
1018   public int getCheckpointFrequency() {
1019     return CHECKPOINT_FREQUENCY.get(this);
1020   }
1021 
1022   /**
1023    * Check if checkpointing is used
1024    *
1025    * @return True iff checkpointing is used
1026    */
1027   public boolean useCheckpointing() {
1028     return getCheckpointFrequency() != 0;
1029   }
1030 
1031   /**
1032    * Set runtime checkpoint support checker.
1033    * The instance of this class will have to decide whether
1034    * checkpointing is allowed on current superstep.
1035    *
1036    * @param clazz checkpoint supported checker class
1037    */
1038   public void setCheckpointSupportedChecker(
1039       Class<? extends CheckpointSupportedChecker> clazz) {
1040     GiraphConstants.CHECKPOINT_SUPPORTED_CHECKER.set(this, clazz);
1041   }
1042 
1043   /**
1044    * Set the max task attempts
1045    *
1046    * @param maxTaskAttempts Max task attempts to use
1047    */
1048   public void setMaxTaskAttempts(int maxTaskAttempts) {
1049     MAX_TASK_ATTEMPTS.set(this, maxTaskAttempts);
1050   }
1051 
1052   /**
1053    * Get the max task attempts
1054    *
1055    * @return Max task attempts or -1, if not set
1056    */
1057   public int getMaxTaskAttempts() {
1058     return MAX_TASK_ATTEMPTS.get(this);
1059   }
1060 
1061   /**
1062    * Get the number of milliseconds to wait for an event before continuing on
1063    *
1064    * @return Number of milliseconds to wait for an event before continuing on
1065    */
1066   public int getEventWaitMsecs() {
1067     return EVENT_WAIT_MSECS.get(this);
1068   }
1069 
1070   /**
1071    * Set the number of milliseconds to wait for an event before continuing on
1072    *
1073    * @param eventWaitMsecs Number of milliseconds to wait for an event before
1074    *                       continuing on
1075    */
1076   public void setEventWaitMsecs(int eventWaitMsecs) {
1077     EVENT_WAIT_MSECS.set(this, eventWaitMsecs);
1078   }
1079 
1080   /**
1081    * Get the maximum milliseconds to wait before giving up trying to get the
1082    * minimum number of workers before a superstep.
1083    *
1084    * @return Maximum milliseconds to wait before giving up trying to get the
1085    *         minimum number of workers before a superstep
1086    */
1087   public int getMaxMasterSuperstepWaitMsecs() {
1088     return MAX_MASTER_SUPERSTEP_WAIT_MSECS.get(this);
1089   }
1090 
1091   /**
1092    * Set the maximum milliseconds to wait before giving up trying to get the
1093    * minimum number of workers before a superstep.
1094    *
1095    * @param maxMasterSuperstepWaitMsecs Maximum milliseconds to wait before
1096    *                                    giving up trying to get the minimum
1097    *                                    number of workers before a superstep
1098    */
1099   public void setMaxMasterSuperstepWaitMsecs(int maxMasterSuperstepWaitMsecs) {
1100     MAX_MASTER_SUPERSTEP_WAIT_MSECS.set(this, maxMasterSuperstepWaitMsecs);
1101   }
1102 
1103   /**
1104    * Check environment for Hadoop security token location in case we are
1105    * executing the Giraph job on a MRv1 Hadoop cluster.
1106    */
1107   public void configureHadoopSecurity() {
1108     String hadoopTokenFilePath = System.getenv("HADOOP_TOKEN_FILE_LOCATION");
1109     if (hadoopTokenFilePath != null) {
1110       set("mapreduce.job.credentials.binary", hadoopTokenFilePath);
1111     }
1112   }
1113 
1114   /**
1115    * Check if we want to prioritize input splits which reside on the host.
1116    *
1117    * @return True iff we want to use input split locality
1118    */
1119   public boolean useInputSplitLocality() {
1120     return USE_INPUT_SPLIT_LOCALITY.get(this);
1121   }
1122 
1123   /**
1124    * Get the local hostname on the given interface.
1125    *
1126    * @return The local hostname
1127    * @throws UnknownHostException IP address of a host could not be determined
1128    */
1129   public String getLocalHostname() throws UnknownHostException {
1130     return DNS.getDefaultHost(
1131         GiraphConstants.DNS_INTERFACE.get(this),
1132         GiraphConstants.DNS_NAMESERVER.get(this)).toLowerCase();
1133   }
1134 
1135   /**
1136    * Return local host name by default. Or local host IP if preferIP
1137    * option is set.
1138    * @return local host name or IP
1139    * @throws UnknownHostException IP address of a host could not be determined
1140    */
1141   public String getLocalHostOrIp() throws UnknownHostException {
1142     if (GiraphConstants.PREFER_IP_ADDRESSES.get(this)) {
1143       return InetAddress.getLocalHost().getHostAddress();
1144     }
1145     return getLocalHostname();
1146   }
1147 
1148   /**
1149    * Set the maximum number of supersteps of this application.  After this
1150    * many supersteps are executed, the application will shutdown.
1151    *
1152    * @param maxNumberOfSupersteps Maximum number of supersteps
1153    */
1154   public void setMaxNumberOfSupersteps(int maxNumberOfSupersteps) {
1155     MAX_NUMBER_OF_SUPERSTEPS.set(this, maxNumberOfSupersteps);
1156   }
1157 
1158   /**
1159    * Get the maximum number of supersteps of this application.  After this
1160    * many supersteps are executed, the application will shutdown.
1161    *
1162    * @return Maximum number of supersteps
1163    */
1164   public int getMaxNumberOfSupersteps() {
1165     return MAX_NUMBER_OF_SUPERSTEPS.get(this);
1166   }
1167 
1168   /**
1169    * Get the output directory to write YourKit snapshots to
1170    *
1171    * @param context Map context
1172    * @return output directory
1173    */
1174   public String getYourKitOutputDir(Mapper.Context context) {
1175     final String cacheKey = "giraph.yourkit.outputDirCached";
1176     String outputDir = get(cacheKey);
1177     if (outputDir == null) {
1178       outputDir = getStringVars(YOURKIT_OUTPUT_DIR, YOURKIT_OUTPUT_DIR_DEFAULT,
1179           context);
1180       set(cacheKey, outputDir);
1181     }
1182     return outputDir;
1183   }
1184 
1185   /**
1186    * Get string, replacing variables in the output.
1187    *
1188    * %JOB_ID% =&gt; job id
1189    * %TASK_ID% =&gt; task id
1190    * %USER% =&gt; owning user name
1191    *
1192    * @param key name of key to lookup
1193    * @param context mapper context
1194    * @return value for key, with variables expanded
1195    */
1196   public String getStringVars(String key, Mapper.Context context) {
1197     return getStringVars(key, null, context);
1198   }
1199 
1200   /**
1201    * Get string, replacing variables in the output.
1202    *
1203    * %JOB_ID% =&gt; job id
1204    * %TASK_ID% =&gt; task id
1205    * %USER% =&gt; owning user name
1206    *
1207    * @param key name of key to lookup
1208    * @param defaultValue value to return if no mapping exists. This can also
1209    *                     have variables, which will be substituted.
1210    * @param context mapper context
1211    * @return value for key, with variables expanded
1212    */
1213   public String getStringVars(String key, String defaultValue,
1214                               Mapper.Context context) {
1215     String value = get(key);
1216     if (value == null) {
1217       if (defaultValue == null) {
1218         return null;
1219       }
1220       value = defaultValue;
1221     }
1222     value = value.replace("%JOB_ID%", context.getJobID().toString());
1223     value = value.replace("%TASK_ID%", context.getTaskAttemptID().toString());
1224     value = value.replace("%USER%", get("user.name", "unknown_user"));
1225     return value;
1226   }
1227 
1228   /**
1229    * Get option whether to create a source vertex present only in edge input
1230    *
1231    * @return CREATE_EDGE_SOURCE_VERTICES option
1232    */
1233   public boolean getCreateSourceVertex() {
1234     return CREATE_EDGE_SOURCE_VERTICES.get(this);
1235   }
1236 
1237   /**
1238    * set option whether to create a source vertex present only in edge input
1239    * @param createVertex create source vertex option
1240    */
1241   public void setCreateSourceVertex(boolean createVertex) {
1242     CREATE_EDGE_SOURCE_VERTICES.set(this, createVertex);
1243   }
1244 
1245   /**
1246    * Get the maximum timeout (in milliseconds) for waiting for all tasks
1247    * to complete after the job is done.
1248    *
1249    * @return Wait task done timeout in milliseconds.
1250    */
1251   public int getWaitTaskDoneTimeoutMs() {
1252     return WAIT_TASK_DONE_TIMEOUT_MS.get(this);
1253   }
1254 
1255   /**
1256    * Set the maximum timeout (in milliseconds) for waiting for all tasks
1257    * to complete after the job is done.
1258    *
1259    * @param ms Milliseconds to set
1260    */
1261   public void setWaitTaskDoneTimeoutMs(int ms) {
1262     WAIT_TASK_DONE_TIMEOUT_MS.set(this, ms);
1263   }
1264 
1265   /**
1266    * Check whether to track job progress on client or not
1267    *
1268    * @return True if job progress should be tracked on client
1269    */
1270   public boolean trackJobProgressOnClient() {
1271     return TRACK_JOB_PROGRESS_ON_CLIENT.get(this);
1272   }
1273 
1274   /**
1275    * @return Number of retries when creating an HDFS file before failing.
1276    */
1277   public int getHdfsFileCreationRetries() {
1278     return HDFS_FILE_CREATION_RETRIES.get(this);
1279   }
1280 
1281   /**
1282    * @return Milliseconds to wait before retrying an HDFS file creation
1283    *         operation.
1284    */
1285   public int getHdfsFileCreationRetryWaitMs() {
1286     return HDFS_FILE_CREATION_RETRY_WAIT_MS.get(this);
1287   }
1288 }