View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.conf;
20  
21  import io.netty.buffer.ByteBufAllocator;
22  import io.netty.buffer.PooledByteBufAllocator;
23  import io.netty.buffer.UnpooledByteBufAllocator;
24  
25  import java.net.InetAddress;
26  import java.net.UnknownHostException;
27  
28  import org.apache.giraph.aggregators.AggregatorWriter;
29  import org.apache.giraph.bsp.checkpoints.CheckpointSupportedChecker;
30  import org.apache.giraph.combiner.MessageCombiner;
31  import org.apache.giraph.edge.OutEdges;
32  import org.apache.giraph.edge.ReuseObjectsOutEdges;
33  import org.apache.giraph.factories.ComputationFactory;
34  import org.apache.giraph.factories.VertexValueFactory;
35  import org.apache.giraph.graph.Computation;
36  import org.apache.giraph.graph.MapperObserver;
37  import org.apache.giraph.graph.Vertex;
38  import org.apache.giraph.graph.VertexResolver;
39  import org.apache.giraph.graph.VertexValueCombiner;
40  import org.apache.giraph.io.EdgeInputFormat;
41  import org.apache.giraph.io.EdgeOutputFormat;
42  import org.apache.giraph.io.MappingInputFormat;
43  import org.apache.giraph.io.VertexInputFormat;
44  import org.apache.giraph.io.VertexOutputFormat;
45  import org.apache.giraph.io.filters.EdgeInputFilter;
46  import org.apache.giraph.io.filters.VertexInputFilter;
47  import org.apache.giraph.job.GiraphJobObserver;
48  import org.apache.giraph.job.GiraphJobRetryChecker;
49  import org.apache.giraph.master.MasterCompute;
50  import org.apache.giraph.master.MasterObserver;
51  import org.apache.giraph.partition.GraphPartitionerFactory;
52  import org.apache.giraph.partition.Partition;
53  import org.apache.giraph.partition.ReusesObjectsPartition;
54  import org.apache.giraph.utils.GcObserver;
55  import org.apache.giraph.utils.ReflectionUtils;
56  import org.apache.giraph.worker.WorkerContext;
57  import org.apache.giraph.worker.WorkerObserver;
58  import org.apache.hadoop.conf.Configuration;
59  import org.apache.hadoop.mapreduce.Mapper;
60  import org.apache.hadoop.net.DNS;
61  
62  /**
63   * Adds user methods specific to Giraph.  This will be put into an
64   * ImmutableClassesGiraphConfiguration that provides the configuration plus
65   * the immutable classes.
66   *
67   * Keeps track of parameters which were set so it easily set them in another
68   * copy of configuration.
69   */
70  public class GiraphConfiguration extends Configuration
71      implements GiraphConstants {
72    /** ByteBufAllocator to be used by netty */
73    private ByteBufAllocator nettyBufferAllocator = null;
74  
75    /**
76     * Constructor that creates the configuration
77     */
78    public GiraphConfiguration() {
79      configureHadoopSecurity();
80    }
81  
82    /**
83     * Constructor.
84     *
85     * @param conf Configuration
86     */
87    public GiraphConfiguration(Configuration conf) {
88      super(conf);
89      configureHadoopSecurity();
90    }
91  
92    /**
93     * Get name of computation being run. We leave this up to the
94     * {@link ComputationFactory} to decide what to return.
95     *
96     * @return Name of computation being run
97     */
98    public String getComputationName() {
99      ComputationFactory compFactory = ReflectionUtils.newInstance(
100         getComputationFactoryClass());
101     return compFactory.computationName(this);
102   }
103 
104   /**
105    * Get the user's subclassed {@link ComputationFactory}
106    *
107    * @return User's computation factory class
108    */
109   public Class<? extends ComputationFactory> getComputationFactoryClass() {
110     return COMPUTATION_FACTORY_CLASS.get(this);
111   }
112 
113   /**
114    * Get the user's subclassed {@link Computation}
115    *
116    * @return User's computation class
117    */
118   public Class<? extends Computation> getComputationClass() {
119     return COMPUTATION_CLASS.get(this);
120   }
121 
122   /**
123    * Set the computation class (required)
124    *
125    * @param computationClass Runs vertex computation
126    */
127   public void setComputationClass(
128       Class<? extends Computation> computationClass) {
129     COMPUTATION_CLASS.set(this, computationClass);
130   }
131 
132   /**
133    * Set the vertex value factory class
134    *
135    * @param vertexValueFactoryClass Creates default vertex values
136    */
137   public final void setVertexValueFactoryClass(
138       Class<? extends VertexValueFactory> vertexValueFactoryClass) {
139     VERTEX_VALUE_FACTORY_CLASS.set(this, vertexValueFactoryClass);
140   }
141 
142   /**
143    * Set the edge input filter class
144    *
145    * @param edgeFilterClass class to use
146    */
147   public void setEdgeInputFilterClass(
148       Class<? extends EdgeInputFilter> edgeFilterClass) {
149     EDGE_INPUT_FILTER_CLASS.set(this, edgeFilterClass);
150   }
151 
152   /**
153    * Set the vertex input filter class
154    *
155    * @param vertexFilterClass class to use
156    */
157   public void setVertexInputFilterClass(
158       Class<? extends VertexInputFilter> vertexFilterClass) {
159     VERTEX_INPUT_FILTER_CLASS.set(this, vertexFilterClass);
160   }
161 
162   /**
163    * Get the vertex edges class
164    *
165    * @return vertex edges class
166    */
167   public Class<? extends OutEdges> getOutEdgesClass() {
168     return VERTEX_EDGES_CLASS.get(this);
169   }
170 
171   /**
172    * Set the vertex edges class
173    *
174    * @param outEdgesClass Determines the way edges are stored
175    */
176   public final void setOutEdgesClass(
177       Class<? extends OutEdges> outEdgesClass) {
178     VERTEX_EDGES_CLASS.set(this, outEdgesClass);
179   }
180 
181   /**
182    * Set the vertex implementation class
183    *
184    * @param vertexClass class of the vertex implementation
185    */
186   public final void setVertexClass(Class<? extends Vertex> vertexClass) {
187     VERTEX_CLASS.set(this, vertexClass);
188   }
189 
190 
191   /**
192    * Set the vertex edges class used during edge-based input (if different
193    * from the one used during computation)
194    *
195    * @param inputOutEdgesClass Determines the way edges are stored
196    */
197   public final void setInputOutEdgesClass(
198       Class<? extends OutEdges> inputOutEdgesClass) {
199     INPUT_VERTEX_EDGES_CLASS.set(this, inputOutEdgesClass);
200   }
201 
202   /**
203    * True if the {@link org.apache.giraph.edge.OutEdges} implementation
204    * copies the passed edges to its own data structure,
205    * i.e. it doesn't keep references to Edge objects, target vertex ids or edge
206    * values passed to add() or initialize().
207    * This makes it possible to reuse edge objects passed to the data
208    * structure, to minimize object instantiation (see for example
209    * EdgeStore#addPartitionEdges()).
210    *
211    * @return True iff we can reuse the edge objects
212    */
213   public boolean reuseEdgeObjects() {
214     return ReuseObjectsOutEdges.class.isAssignableFrom(
215         getOutEdgesClass());
216   }
217 
218   /**
219    * True if the {@link Partition} implementation copies the passed vertices
220    * to its own data structure, i.e. it doesn't keep references to Vertex
221    * objects passed to it.
222    * This makes it possible to reuse vertex objects passed to the data
223    * structure, to minimize object instantiation.
224    *
225    * @return True iff we can reuse the vertex objects
226    */
227   public boolean reuseVertexObjects() {
228     return ReusesObjectsPartition.class.isAssignableFrom(getPartitionClass());
229   }
230 
231   /**
232    * Get Partition class used
233    * @return Partition class
234    */
235   public Class<? extends Partition> getPartitionClass() {
236     return PARTITION_CLASS.get(this);
237   }
238 
239   /**
240    * Does the job have a {@link VertexInputFormat}?
241    *
242    * @return True iff a {@link VertexInputFormat} has been specified.
243    */
244   public boolean hasVertexInputFormat() {
245     return VERTEX_INPUT_FORMAT_CLASS.get(this) != null;
246   }
247 
248   /**
249    * Set the vertex input format class (required)
250    *
251    * @param vertexInputFormatClass Determines how graph is input
252    */
253   public void setVertexInputFormatClass(
254       Class<? extends VertexInputFormat> vertexInputFormatClass) {
255     VERTEX_INPUT_FORMAT_CLASS.set(this, vertexInputFormatClass);
256   }
257 
258   /**
259    * Does the job have a {@link EdgeInputFormat}?
260    *
261    * @return True iff a {@link EdgeInputFormat} has been specified.
262    */
263   public boolean hasEdgeInputFormat() {
264     return EDGE_INPUT_FORMAT_CLASS.get(this) != null;
265   }
266 
267   /**
268    * Set the edge input format class (required)
269    *
270    * @param edgeInputFormatClass Determines how graph is input
271    */
272   public void setEdgeInputFormatClass(
273       Class<? extends EdgeInputFormat> edgeInputFormatClass) {
274     EDGE_INPUT_FORMAT_CLASS.set(this, edgeInputFormatClass);
275   }
276 
277   /**
278    * Set the mapping input format class (optional)
279    *
280    * @param mappingInputFormatClass Determines how mappings are input
281    */
282   public void setMappingInputFormatClass(
283     Class<? extends MappingInputFormat> mappingInputFormatClass) {
284     MAPPING_INPUT_FORMAT_CLASS.set(this, mappingInputFormatClass);
285   }
286 
287   /**
288    * Set the master class (optional)
289    *
290    * @param masterComputeClass Runs master computation
291    */
292   public final void setMasterComputeClass(
293       Class<? extends MasterCompute> masterComputeClass) {
294     MASTER_COMPUTE_CLASS.set(this, masterComputeClass);
295   }
296 
297   /**
298    * Add a MasterObserver class (optional)
299    *
300    * @param masterObserverClass MasterObserver class to add.
301    */
302   public final void addMasterObserverClass(
303       Class<? extends MasterObserver> masterObserverClass) {
304     MASTER_OBSERVER_CLASSES.add(this, masterObserverClass);
305   }
306 
307   /**
308    * Add a WorkerObserver class (optional)
309    *
310    * @param workerObserverClass WorkerObserver class to add.
311    */
312   public final void addWorkerObserverClass(
313       Class<? extends WorkerObserver> workerObserverClass) {
314     WORKER_OBSERVER_CLASSES.add(this, workerObserverClass);
315   }
316 
317   /**
318    * Add a MapperObserver class (optional)
319    *
320    * @param mapperObserverClass MapperObserver class to add.
321    */
322   public final void addMapperObserverClass(
323       Class<? extends MapperObserver> mapperObserverClass) {
324     MAPPER_OBSERVER_CLASSES.add(this, mapperObserverClass);
325   }
326 
327   /**
328    * Add a GcObserver class (optional)
329    *
330    * @param gcObserverClass GcObserver class to add.
331    */
332   public final void addGcObserverClass(
333       Class<? extends GcObserver> gcObserverClass) {
334     GC_OBSERVER_CLASSES.add(this, gcObserverClass);
335   }
336 
337   /**
338    * Get job observer class
339    *
340    * @return GiraphJobObserver class set.
341    */
342   public Class<? extends GiraphJobObserver> getJobObserverClass() {
343     return JOB_OBSERVER_CLASS.get(this);
344   }
345 
346   /**
347    * Set job observer class
348    *
349    * @param klass GiraphJobObserver class to set.
350    */
351   public void setJobObserverClass(Class<? extends GiraphJobObserver> klass) {
352     JOB_OBSERVER_CLASS.set(this, klass);
353   }
354 
355   /**
356    * Get job retry checker class
357    *
358    * @return GiraphJobRetryChecker class set.
359    */
360   public Class<? extends GiraphJobRetryChecker> getJobRetryCheckerClass() {
361     return JOB_RETRY_CHECKER_CLASS.get(this);
362   }
363 
364   /**
365    * Set job retry checker class
366    *
367    * @param klass GiraphJobRetryChecker class to set.
368    */
369   public void setJobRetryCheckerClass(
370       Class<? extends GiraphJobRetryChecker> klass) {
371     JOB_RETRY_CHECKER_CLASS.set(this, klass);
372   }
373 
374   /**
375    * Check whether to enable jmap dumping thread.
376    *
377    * @return true if jmap dumper is enabled.
378    */
379   public boolean isJMapHistogramDumpEnabled() {
380     return JMAP_ENABLE.get(this);
381   }
382 
383   /**
384    * Check whether to enable heap memory supervisor thread
385    *
386    * @return true if jmap dumper is reactively enabled
387    */
388   public boolean isReactiveJmapHistogramDumpEnabled() {
389     return REACTIVE_JMAP_ENABLE.get(this);
390   }
391 
392   /**
393    * Set mapping from a key name to a list of classes.
394    *
395    * @param name String key name to use.
396    * @param xface interface of the classes being set.
397    * @param klasses Classes to set.
398    */
399   public final void setClasses(String name, Class<?> xface,
400                                Class<?> ... klasses) {
401     String[] klassNames = new String[klasses.length];
402     for (int i = 0; i < klasses.length; ++i) {
403       Class<?> klass = klasses[i];
404       if (!xface.isAssignableFrom(klass)) {
405         throw new RuntimeException(klass + " does not implement " +
406             xface.getName());
407       }
408       klassNames[i] = klasses[i].getName();
409     }
410     setStrings(name, klassNames);
411   }
412 
413   /**
414    * Does the job have a {@link VertexOutputFormat}?
415    *
416    * @return True iff a {@link VertexOutputFormat} has been specified.
417    */
418   public boolean hasVertexOutputFormat() {
419     return VERTEX_OUTPUT_FORMAT_CLASS.get(this) != null;
420   }
421 
422   /**
423    * Set the vertex output format class (optional)
424    *
425    * @param vertexOutputFormatClass Determines how graph is output
426    */
427   public final void setVertexOutputFormatClass(
428       Class<? extends VertexOutputFormat> vertexOutputFormatClass) {
429     VERTEX_OUTPUT_FORMAT_CLASS.set(this, vertexOutputFormatClass);
430   }
431 
432 
433   /**
434    * Does the job have a {@link EdgeOutputFormat} subdir?
435    *
436    * @return True iff a {@link EdgeOutputFormat} subdir has been specified.
437    */
438   public boolean hasVertexOutputFormatSubdir() {
439     return !VERTEX_OUTPUT_FORMAT_SUBDIR.get(this).isEmpty();
440   }
441 
442   /**
443    * Set the vertex output format path
444    *
445    * @param path path where the verteces will be written
446    */
447   public final void setVertexOutputFormatSubdir(String path) {
448     VERTEX_OUTPUT_FORMAT_SUBDIR.set(this, path);
449   }
450 
451   /**
452    * Check if output should be done during computation
453    *
454    * @return True iff output should be done during computation
455    */
456   public final boolean doOutputDuringComputation() {
457     return DO_OUTPUT_DURING_COMPUTATION.get(this);
458   }
459 
460   /**
461    * Set whether or not we should do output during computation
462    *
463    * @param doOutputDuringComputation True iff we want output to happen
464    *                                  during computation
465    */
466   public final void setDoOutputDuringComputation(
467       boolean doOutputDuringComputation) {
468     DO_OUTPUT_DURING_COMPUTATION.set(this, doOutputDuringComputation);
469   }
470 
471   /**
472    * Check if VertexOutputFormat is thread-safe
473    *
474    * @return True iff VertexOutputFormat is thread-safe
475    */
476   public final boolean vertexOutputFormatThreadSafe() {
477     return VERTEX_OUTPUT_FORMAT_THREAD_SAFE.get(this);
478   }
479 
480   /**
481    * Set whether or not selected VertexOutputFormat is thread-safe
482    *
483    * @param vertexOutputFormatThreadSafe True iff selected VertexOutputFormat
484    *                                     is thread-safe
485    */
486   public final void setVertexOutputFormatThreadSafe(
487       boolean vertexOutputFormatThreadSafe) {
488     VERTEX_OUTPUT_FORMAT_THREAD_SAFE.set(this, vertexOutputFormatThreadSafe);
489   }
490 
491   /**
492    * Does the job have a {@link EdgeOutputFormat}?
493    *
494    * @return True iff a {@link EdgeOutputFormat} has been specified.
495    */
496   public boolean hasEdgeOutputFormat() {
497     return EDGE_OUTPUT_FORMAT_CLASS.get(this) != null;
498   }
499 
500   /**
501    * Set the edge output format class (optional)
502    *
503    * @param edgeOutputFormatClass Determines how graph is output
504    */
505   public final void setEdgeOutputFormatClass(
506       Class<? extends EdgeOutputFormat> edgeOutputFormatClass) {
507     EDGE_OUTPUT_FORMAT_CLASS.set(this, edgeOutputFormatClass);
508   }
509 
510   /**
511    * Does the job have a {@link EdgeOutputFormat} subdir?
512    *
513    * @return True iff a {@link EdgeOutputFormat} subdir has been specified.
514    */
515   public boolean hasEdgeOutputFormatSubdir() {
516     return !EDGE_OUTPUT_FORMAT_SUBDIR.get(this).isEmpty();
517   }
518 
519   /**
520    * Set the edge output format path
521    *
522    * @param path path where the edges will be written
523    */
524   public final void setEdgeOutputFormatSubdir(String path) {
525     EDGE_OUTPUT_FORMAT_SUBDIR.set(this, path);
526   }
527 
528   /**
529    * Get the number of threads to use for writing output in the end of the
530    * application. If output format is not thread safe, returns 1.
531    *
532    * @return Number of output threads
533    */
534   public final int getNumOutputThreads() {
535     if (!vertexOutputFormatThreadSafe()) {
536       return 1;
537     } else {
538       return NUM_OUTPUT_THREADS.get(this);
539     }
540   }
541 
542   /**
543    * Set the number of threads to use for writing output in the end of the
544    * application. Will be used only if {#vertexOutputFormatThreadSafe} is true.
545    *
546    * @param numOutputThreads Number of output threads
547    */
548   public void setNumOutputThreads(int numOutputThreads) {
549     NUM_OUTPUT_THREADS.set(this, numOutputThreads);
550   }
551 
552   /**
553    * Set the message combiner class (optional)
554    *
555    * @param messageCombinerClass Determines how vertex messages are combined
556    */
557   public void setMessageCombinerClass(
558       Class<? extends MessageCombiner> messageCombinerClass) {
559     MESSAGE_COMBINER_CLASS.set(this, messageCombinerClass);
560   }
561 
562   /**
563    * Set the graph partitioner class (optional)
564    *
565    * @param graphPartitionerFactoryClass Determines how the graph is partitioned
566    */
567   public final void setGraphPartitionerFactoryClass(
568       Class<? extends GraphPartitionerFactory> graphPartitionerFactoryClass) {
569     GRAPH_PARTITIONER_FACTORY_CLASS.set(this, graphPartitionerFactoryClass);
570   }
571 
572   /**
573    * Set the vertex resolver class (optional)
574    *
575    * @param vertexResolverClass Determines how vertex mutations are resolved
576    */
577   public final void setVertexResolverClass(
578       Class<? extends VertexResolver> vertexResolverClass) {
579     VERTEX_RESOLVER_CLASS.set(this, vertexResolverClass);
580   }
581 
582   /**
583    * Whether to create a vertex that doesn't exist when it receives messages.
584    * This only affects DefaultVertexResolver.
585    *
586    * @return true if we should create non existent vertices that get messages.
587    */
588   public final boolean getResolverCreateVertexOnMessages() {
589     return RESOLVER_CREATE_VERTEX_ON_MSGS.get(this);
590   }
591 
592   /**
593    * Set whether to create non existent vertices when they receive messages.
594    *
595    * @param v true if we should create vertices when they get messages.
596    */
597   public final void setResolverCreateVertexOnMessages(boolean v) {
598     RESOLVER_CREATE_VERTEX_ON_MSGS.set(this, v);
599   }
600 
601   /**
602    * Set the vertex value combiner class (optional)
603    *
604    * @param vertexValueCombinerClass Determines how vertices are combined
605    */
606   public final void setVertexValueCombinerClass(
607       Class<? extends VertexValueCombiner> vertexValueCombinerClass) {
608     VERTEX_VALUE_COMBINER_CLASS.set(this, vertexValueCombinerClass);
609   }
610 
611   /**
612    * Set the worker context class (optional)
613    *
614    * @param workerContextClass Determines what code is executed on a each
615    *        worker before and after each superstep and computation
616    */
617   public final void setWorkerContextClass(
618       Class<? extends WorkerContext> workerContextClass) {
619     WORKER_CONTEXT_CLASS.set(this, workerContextClass);
620   }
621 
622   /**
623    * Set the aggregator writer class (optional)
624    *
625    * @param aggregatorWriterClass Determines how the aggregators are
626    *        written to file at the end of the job
627    */
628   public final void setAggregatorWriterClass(
629       Class<? extends AggregatorWriter> aggregatorWriterClass) {
630     AGGREGATOR_WRITER_CLASS.set(this, aggregatorWriterClass);
631   }
632 
633   /**
634    * Set the partition class (optional)
635    *
636    * @param partitionClass Determines the why partitions are stored
637    */
638   public final void setPartitionClass(
639       Class<? extends Partition> partitionClass) {
640     PARTITION_CLASS.set(this, partitionClass);
641   }
642 
643   /**
644    * Set worker configuration for determining what is required for
645    * a superstep.
646    *
647    * @param minWorkers Minimum workers to do a superstep
648    * @param maxWorkers Maximum workers to do a superstep
649    *        (max map tasks in job)
650    * @param minPercentResponded 0 - 100 % of the workers required to
651    *        have responded before continuing the superstep
652    */
653   public final void setWorkerConfiguration(int minWorkers,
654                                            int maxWorkers,
655                                            float minPercentResponded) {
656     setInt(MIN_WORKERS, minWorkers);
657     setInt(MAX_WORKERS, maxWorkers);
658     MIN_PERCENT_RESPONDED.set(this, minPercentResponded);
659   }
660 
661   public final int getMinWorkers() {
662     return getInt(MIN_WORKERS, -1);
663   }
664 
665   public final int getMaxWorkers() {
666     return getInt(MAX_WORKERS, -1);
667   }
668 
669   public final float getMinPercentResponded() {
670     return MIN_PERCENT_RESPONDED.get(this);
671   }
672 
673   /**
674    * Utilize an existing ZooKeeper service.  If this is not set, ZooKeeper
675    * will be dynamically started by Giraph for this job.
676    *
677    * @param serverList Comma separated list of servers and ports
678    *        (i.e. zk1:2221,zk2:2221)
679    */
680   public final void setZooKeeperConfiguration(String serverList) {
681     ZOOKEEPER_LIST.set(this, serverList);
682   }
683 
684   /**
685    * Getter for SPLIT_MASTER_WORKER flag.
686    *
687    * @return boolean flag value.
688    */
689   public final boolean getSplitMasterWorker() {
690     return SPLIT_MASTER_WORKER.get(this);
691   }
692 
693   /**
694    * Get array of MasterObserver classes set in the configuration.
695    *
696    * @return array of MasterObserver classes.
697    */
698   public Class<? extends MasterObserver>[] getMasterObserverClasses() {
699     return MASTER_OBSERVER_CLASSES.getArray(this);
700   }
701 
702   /**
703    * Get array of WorkerObserver classes set in configuration.
704    *
705    * @return array of WorkerObserver classes.
706    */
707   public Class<? extends WorkerObserver>[] getWorkerObserverClasses() {
708     return WORKER_OBSERVER_CLASSES.getArray(this);
709   }
710 
711   /**
712    * Get array of MapperObserver classes set in configuration.
713    *
714    * @return array of MapperObserver classes.
715    */
716   public Class<? extends MapperObserver>[] getMapperObserverClasses() {
717     return MAPPER_OBSERVER_CLASSES.getArray(this);
718   }
719 
720   /**
721    * Get array of GcObserver classes set in configuration.
722    *
723    * @return array of GcObserver classes.
724    */
725   public Class<? extends GcObserver>[] getGcObserverClasses() {
726     return GC_OBSERVER_CLASSES.getArray(this);
727   }
728 
729   /**
730    * Whether to track, print, and aggregate metrics.
731    *
732    * @return true if metrics are enabled, false otherwise (default)
733    */
734   public boolean metricsEnabled() {
735     return METRICS_ENABLE.isTrue(this);
736   }
737 
738   /**
739    * Get the task partition
740    *
741    * @return The task partition or -1 if not set
742    */
743   public int getTaskPartition() {
744     return getInt("mapred.task.partition", -1);
745   }
746 
747   /**
748    * Is this a "pure YARN" Giraph job, or is a MapReduce layer (v1 or v2)
749    * actually managing our cluster nodes, i.e. each task is a Mapper.
750    *
751    * @return TRUE if this is a pure YARN job.
752    */
753   public boolean isPureYarnJob() {
754     return IS_PURE_YARN_JOB.get(this);
755   }
756 
757   /**
758    * Jars required in "Pure YARN" jobs (names only, no paths) should
759    * be listed here in full, including Giraph framework jar(s).
760    *
761    * @return the comma-separated list of jar names for export to cluster.
762    */
763   public String getYarnLibJars() {
764     return GIRAPH_YARN_LIBJARS.get(this);
765   }
766 
767   /**
768    * Populate jar list for Pure YARN jobs.
769    *
770    * @param jarList a comma-separated list of jar names
771    */
772   public void setYarnLibJars(String jarList) {
773     GIRAPH_YARN_LIBJARS.set(this, jarList);
774   }
775 
776   /**
777    * Get heap size (in MB) for each task in our Giraph job run,
778    * assuming this job will run on the "pure YARN" profile.
779    *
780    * @return the heap size for all tasks, in MB
781    */
782   public int getYarnTaskHeapMb() {
783     return GIRAPH_YARN_TASK_HEAP_MB.get(this);
784   }
785 
786   /**
787    * Set heap size for Giraph tasks in our job run, assuming
788    * the job will run on the "pure YARN" profile.
789    *
790    * @param heapMb the heap size for all tasks
791    */
792   public void setYarnTaskHeapMb(int heapMb) {
793     GIRAPH_YARN_TASK_HEAP_MB.set(this, heapMb);
794   }
795 
796   /**
797    * Get the ZooKeeper list.
798    *
799    * @return ZooKeeper list of strings, comma separated or null if none set.
800    */
801   public String getZookeeperList() {
802     return ZOOKEEPER_LIST.get(this);
803   }
804 
805   /**
806    * Set the ZooKeeper list to the provided list. This method is used when the
807    * ZooKeeper is started internally and will set the zkIsExternal option to
808    * false as well.
809    *
810    * @param zkList list of strings, comma separated of zookeeper servers
811    */
812   public void setZookeeperList(String zkList) {
813     ZOOKEEPER_LIST.set(this, zkList);
814     ZOOKEEPER_IS_EXTERNAL.set(this, false);
815   }
816 
817   /**
818    * Was ZooKeeper provided externally?
819    *
820    * @return true iff was zookeeper is external
821    */
822   public boolean isZookeeperExternal() {
823     return ZOOKEEPER_IS_EXTERNAL.get(this);
824   }
825 
826   public String getLocalLevel() {
827     return LOG_LEVEL.get(this);
828   }
829 
830   /**
831    * Use the log thread layout option?
832    *
833    * @return True if use the log thread layout option, false otherwise
834    */
835   public boolean useLogThreadLayout() {
836     return LOG_THREAD_LAYOUT.get(this);
837   }
838 
839   /**
840    * is this job run a local test?
841    *
842    * @return the test status as recorded in the Configuration
843    */
844   public boolean getLocalTestMode() {
845     return LOCAL_TEST_MODE.get(this);
846   }
847 
848   /**
849    * Flag this job as a local test run.
850    *
851    * @param flag the test status for this job
852    */
853   public void setLocalTestMode(boolean flag) {
854     LOCAL_TEST_MODE.set(this, flag);
855   }
856 
857   public int getZooKeeperSessionTimeout() {
858     return ZOOKEEPER_SESSION_TIMEOUT.get(this);
859   }
860 
861   public int getZookeeperOpsMaxAttempts() {
862     return ZOOKEEPER_OPS_MAX_ATTEMPTS.get(this);
863   }
864 
865   public int getZookeeperOpsRetryWaitMsecs() {
866     return ZOOKEEPER_OPS_RETRY_WAIT_MSECS.get(this);
867   }
868 
869   public boolean getNettyServerUseExecutionHandler() {
870     return NETTY_SERVER_USE_EXECUTION_HANDLER.get(this);
871   }
872 
873   public int getNettyServerThreads() {
874     return NETTY_SERVER_THREADS.get(this);
875   }
876 
877   public int getNettyServerExecutionThreads() {
878     return NETTY_SERVER_EXECUTION_THREADS.get(this);
879   }
880 
881   /**
882    * Get the netty server execution concurrency.  This depends on whether the
883    * netty server execution handler exists.
884    *
885    * @return Server concurrency
886    */
887   public int getNettyServerExecutionConcurrency() {
888     if (getNettyServerUseExecutionHandler()) {
889       return getNettyServerExecutionThreads();
890     } else {
891       return getNettyServerThreads();
892     }
893   }
894 
895   /**
896    * Used by netty client and server to create ByteBufAllocator
897    *
898    * @return ByteBufAllocator
899    */
900   public ByteBufAllocator getNettyAllocator() {
901     if (nettyBufferAllocator == null) {
902       if (NETTY_USE_POOLED_ALLOCATOR.get(this)) { // Use pooled allocator
903         nettyBufferAllocator = new PooledByteBufAllocator(
904           NETTY_USE_DIRECT_MEMORY.get(this));
905       } else { // Use un-pooled allocator
906         // Note: Current default settings create un-pooled heap allocator
907         nettyBufferAllocator = new UnpooledByteBufAllocator(
908             NETTY_USE_DIRECT_MEMORY.get(this));
909       }
910     }
911     return nettyBufferAllocator;
912   }
913 
914   public int getZookeeperConnectionAttempts() {
915     return ZOOKEEPER_CONNECTION_ATTEMPTS.get(this);
916   }
917 
918   public int getZooKeeperMinSessionTimeout() {
919     return ZOOKEEPER_MIN_SESSION_TIMEOUT.get(this);
920   }
921 
922   public int getZooKeeperMaxSessionTimeout() {
923     return ZOOKEEPER_MAX_SESSION_TIMEOUT.get(this);
924   }
925 
926   /**
927    * Get the number of map tasks in this job
928    *
929    * @return Number of map tasks in this job
930    */
931   public int getMapTasks() {
932     int mapTasks = getInt("mapred.map.tasks", -1);
933     if (mapTasks == -1) {
934       throw new IllegalStateException("getMapTasks: Failed to get the map " +
935           "tasks!");
936     }
937     return mapTasks;
938   }
939 
940   /**
941    * Use authentication? (if supported)
942    *
943    * @return True if should authenticate, false otherwise
944    */
945   public boolean authenticate() {
946     return AUTHENTICATE.get(this);
947   }
948 
949   /**
950    * Set the number of compute threads
951    *
952    * @param numComputeThreads Number of compute threads to use
953    */
954   public void setNumComputeThreads(int numComputeThreads) {
955     NUM_COMPUTE_THREADS.set(this, numComputeThreads);
956   }
957 
958   public int getNumComputeThreads() {
959     return NUM_COMPUTE_THREADS.get(this);
960   }
961 
962   /**
963    * Set the number of input split threads
964    *
965    * @param numInputSplitsThreads Number of input split threads to use
966    */
967   public void setNumInputSplitsThreads(int numInputSplitsThreads) {
968     NUM_INPUT_THREADS.set(this, numInputSplitsThreads);
969   }
970 
971   public int getNumInputSplitsThreads() {
972     return NUM_INPUT_THREADS.get(this);
973   }
974 
975   public long getInputSplitMaxVertices() {
976     return INPUT_SPLIT_MAX_VERTICES.get(this);
977   }
978 
979   public long getInputSplitMaxEdges() {
980     return INPUT_SPLIT_MAX_EDGES.get(this);
981   }
982 
983   /**
984    * Set whether to use unsafe serialization
985    *
986    * @param useUnsafeSerialization If true, use unsafe serialization
987    */
988   public void useUnsafeSerialization(boolean useUnsafeSerialization) {
989     USE_UNSAFE_SERIALIZATION.set(this, useUnsafeSerialization);
990   }
991 
992   /**
993    * Set the checkpoint frequeuncy of how many supersteps to wait before
994    * checkpointing
995    *
996    * @param checkpointFrequency How often to checkpoint (0 means never)
997    */
998   public void setCheckpointFrequency(int checkpointFrequency) {
999     CHECKPOINT_FREQUENCY.set(this, checkpointFrequency);
1000   }
1001 
1002   /**
1003    * Get the checkpoint frequeuncy of how many supersteps to wait
1004    * before checkpointing
1005    *
1006    * @return Checkpoint frequency (0 means never)
1007    */
1008   public int getCheckpointFrequency() {
1009     return CHECKPOINT_FREQUENCY.get(this);
1010   }
1011 
1012   /**
1013    * Check if checkpointing is used
1014    *
1015    * @return True iff checkpointing is used
1016    */
1017   public boolean useCheckpointing() {
1018     return getCheckpointFrequency() != 0;
1019   }
1020 
1021   /**
1022    * Set runtime checkpoint support checker.
1023    * The instance of this class will have to decide whether
1024    * checkpointing is allowed on current superstep.
1025    *
1026    * @param clazz checkpoint supported checker class
1027    */
1028   public void setCheckpointSupportedChecker(
1029       Class<? extends CheckpointSupportedChecker> clazz) {
1030     GiraphConstants.CHECKPOINT_SUPPORTED_CHECKER.set(this, clazz);
1031   }
1032 
1033   /**
1034    * Set the max task attempts
1035    *
1036    * @param maxTaskAttempts Max task attempts to use
1037    */
1038   public void setMaxTaskAttempts(int maxTaskAttempts) {
1039     MAX_TASK_ATTEMPTS.set(this, maxTaskAttempts);
1040   }
1041 
1042   /**
1043    * Get the max task attempts
1044    *
1045    * @return Max task attempts or -1, if not set
1046    */
1047   public int getMaxTaskAttempts() {
1048     return MAX_TASK_ATTEMPTS.get(this);
1049   }
1050 
1051   /**
1052    * Get the number of milliseconds to wait for an event before continuing on
1053    *
1054    * @return Number of milliseconds to wait for an event before continuing on
1055    */
1056   public int getEventWaitMsecs() {
1057     return EVENT_WAIT_MSECS.get(this);
1058   }
1059 
1060   /**
1061    * Set the number of milliseconds to wait for an event before continuing on
1062    *
1063    * @param eventWaitMsecs Number of milliseconds to wait for an event before
1064    *                       continuing on
1065    */
1066   public void setEventWaitMsecs(int eventWaitMsecs) {
1067     EVENT_WAIT_MSECS.set(this, eventWaitMsecs);
1068   }
1069 
1070   /**
1071    * Get the maximum milliseconds to wait before giving up trying to get the
1072    * minimum number of workers before a superstep.
1073    *
1074    * @return Maximum milliseconds to wait before giving up trying to get the
1075    *         minimum number of workers before a superstep
1076    */
1077   public int getMaxMasterSuperstepWaitMsecs() {
1078     return MAX_MASTER_SUPERSTEP_WAIT_MSECS.get(this);
1079   }
1080 
1081   /**
1082    * Set the maximum milliseconds to wait before giving up trying to get the
1083    * minimum number of workers before a superstep.
1084    *
1085    * @param maxMasterSuperstepWaitMsecs Maximum milliseconds to wait before
1086    *                                    giving up trying to get the minimum
1087    *                                    number of workers before a superstep
1088    */
1089   public void setMaxMasterSuperstepWaitMsecs(int maxMasterSuperstepWaitMsecs) {
1090     MAX_MASTER_SUPERSTEP_WAIT_MSECS.set(this, maxMasterSuperstepWaitMsecs);
1091   }
1092 
1093   /**
1094    * Check environment for Hadoop security token location in case we are
1095    * executing the Giraph job on a MRv1 Hadoop cluster.
1096    */
1097   public void configureHadoopSecurity() {
1098     String hadoopTokenFilePath = System.getenv("HADOOP_TOKEN_FILE_LOCATION");
1099     if (hadoopTokenFilePath != null) {
1100       set("mapreduce.job.credentials.binary", hadoopTokenFilePath);
1101     }
1102   }
1103 
1104   /**
1105    * Check if we want to prioritize input splits which reside on the host.
1106    *
1107    * @return True iff we want to use input split locality
1108    */
1109   public boolean useInputSplitLocality() {
1110     return USE_INPUT_SPLIT_LOCALITY.get(this);
1111   }
1112 
1113   /**
1114    * Get the local hostname on the given interface.
1115    *
1116    * @return The local hostname
1117    * @throws UnknownHostException IP address of a host could not be determined
1118    */
1119   public String getLocalHostname() throws UnknownHostException {
1120     return DNS.getDefaultHost(
1121         GiraphConstants.DNS_INTERFACE.get(this),
1122         GiraphConstants.DNS_NAMESERVER.get(this)).toLowerCase();
1123   }
1124 
1125   /**
1126    * Return local host name by default. Or local host IP if preferIP
1127    * option is set.
1128    * @return local host name or IP
1129    * @throws UnknownHostException IP address of a host could not be determined
1130    */
1131   public String getLocalHostOrIp() throws UnknownHostException {
1132     if (GiraphConstants.PREFER_IP_ADDRESSES.get(this)) {
1133       return InetAddress.getLocalHost().getHostAddress();
1134     }
1135     return getLocalHostname();
1136   }
1137 
1138   /**
1139    * Set the maximum number of supersteps of this application.  After this
1140    * many supersteps are executed, the application will shutdown.
1141    *
1142    * @param maxNumberOfSupersteps Maximum number of supersteps
1143    */
1144   public void setMaxNumberOfSupersteps(int maxNumberOfSupersteps) {
1145     MAX_NUMBER_OF_SUPERSTEPS.set(this, maxNumberOfSupersteps);
1146   }
1147 
1148   /**
1149    * Get the maximum number of supersteps of this application.  After this
1150    * many supersteps are executed, the application will shutdown.
1151    *
1152    * @return Maximum number of supersteps
1153    */
1154   public int getMaxNumberOfSupersteps() {
1155     return MAX_NUMBER_OF_SUPERSTEPS.get(this);
1156   }
1157 
1158   /**
1159    * Get the output directory to write YourKit snapshots to
1160    *
1161    * @param context Map context
1162    * @return output directory
1163    */
1164   public String getYourKitOutputDir(Mapper.Context context) {
1165     final String cacheKey = "giraph.yourkit.outputDirCached";
1166     String outputDir = get(cacheKey);
1167     if (outputDir == null) {
1168       outputDir = getStringVars(YOURKIT_OUTPUT_DIR, YOURKIT_OUTPUT_DIR_DEFAULT,
1169           context);
1170       set(cacheKey, outputDir);
1171     }
1172     return outputDir;
1173   }
1174 
1175   /**
1176    * Get string, replacing variables in the output.
1177    *
1178    * %JOB_ID% =&gt; job id
1179    * %TASK_ID% =&gt; task id
1180    * %USER% =&gt; owning user name
1181    *
1182    * @param key name of key to lookup
1183    * @param context mapper context
1184    * @return value for key, with variables expanded
1185    */
1186   public String getStringVars(String key, Mapper.Context context) {
1187     return getStringVars(key, null, context);
1188   }
1189 
1190   /**
1191    * Get string, replacing variables in the output.
1192    *
1193    * %JOB_ID% =&gt; job id
1194    * %TASK_ID% =&gt; task id
1195    * %USER% =&gt; owning user name
1196    *
1197    * @param key name of key to lookup
1198    * @param defaultValue value to return if no mapping exists. This can also
1199    *                     have variables, which will be substituted.
1200    * @param context mapper context
1201    * @return value for key, with variables expanded
1202    */
1203   public String getStringVars(String key, String defaultValue,
1204                               Mapper.Context context) {
1205     String value = get(key);
1206     if (value == null) {
1207       if (defaultValue == null) {
1208         return null;
1209       }
1210       value = defaultValue;
1211     }
1212     value = value.replace("%JOB_ID%", context.getJobID().toString());
1213     value = value.replace("%TASK_ID%", context.getTaskAttemptID().toString());
1214     value = value.replace("%USER%", get("user.name", "unknown_user"));
1215     return value;
1216   }
1217 
1218   /**
1219    * Get option whether to create a source vertex present only in edge input
1220    *
1221    * @return CREATE_EDGE_SOURCE_VERTICES option
1222    */
1223   public boolean getCreateSourceVertex() {
1224     return CREATE_EDGE_SOURCE_VERTICES.get(this);
1225   }
1226 
1227   /**
1228    * set option whether to create a source vertex present only in edge input
1229    * @param createVertex create source vertex option
1230    */
1231   public void setCreateSourceVertex(boolean createVertex) {
1232     CREATE_EDGE_SOURCE_VERTICES.set(this, createVertex);
1233   }
1234 
1235   /**
1236    * Get the maximum timeout (in milliseconds) for waiting for all tasks
1237    * to complete after the job is done.
1238    *
1239    * @return Wait task done timeout in milliseconds.
1240    */
1241   public int getWaitTaskDoneTimeoutMs() {
1242     return WAIT_TASK_DONE_TIMEOUT_MS.get(this);
1243   }
1244 
1245   /**
1246    * Set the maximum timeout (in milliseconds) for waiting for all tasks
1247    * to complete after the job is done.
1248    *
1249    * @param ms Milliseconds to set
1250    */
1251   public void setWaitTaskDoneTimeoutMs(int ms) {
1252     WAIT_TASK_DONE_TIMEOUT_MS.set(this, ms);
1253   }
1254 
1255   /**
1256    * Check whether to track job progress on client or not
1257    *
1258    * @return True if job progress should be tracked on client
1259    */
1260   public boolean trackJobProgressOnClient() {
1261     return TRACK_JOB_PROGRESS_ON_CLIENT.get(this);
1262   }
1263 
1264   /**
1265    * @return Number of retries when creating an HDFS file before failing.
1266    */
1267   public int getHdfsFileCreationRetries() {
1268     return HDFS_FILE_CREATION_RETRIES.get(this);
1269   }
1270 
1271   /**
1272    * @return Milliseconds to wait before retrying an HDFS file creation
1273    *         operation.
1274    */
1275   public int getHdfsFileCreationRetryWaitMs() {
1276     return HDFS_FILE_CREATION_RETRY_WAIT_MS.get(this);
1277   }
1278 }