View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.conf;
20  
21  import io.netty.handler.codec.ByteToMessageDecoder;
22  import io.netty.handler.codec.MessageToByteEncoder;
23  import io.netty.handler.codec.compression.JdkZlibDecoder;
24  import io.netty.handler.codec.compression.JdkZlibEncoder;
25  import io.netty.handler.codec.compression.SnappyFramedDecoder;
26  import io.netty.handler.codec.compression.SnappyFramedEncoder;
27  
28  import org.apache.giraph.aggregators.AggregatorWriter;
29  import org.apache.giraph.combiner.MessageCombiner;
30  import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
31  import org.apache.giraph.edge.Edge;
32  import org.apache.giraph.edge.EdgeFactory;
33  import org.apache.giraph.edge.EdgeStoreFactory;
34  import org.apache.giraph.edge.OutEdges;
35  import org.apache.giraph.edge.ReusableEdge;
36  import org.apache.giraph.factories.ComputationFactory;
37  import org.apache.giraph.factories.EdgeValueFactory;
38  import org.apache.giraph.factories.MessageValueFactory;
39  import org.apache.giraph.factories.ValueFactories;
40  import org.apache.giraph.factories.VertexIdFactory;
41  import org.apache.giraph.factories.VertexValueFactory;
42  import org.apache.giraph.graph.Computation;
43  import org.apache.giraph.graph.Language;
44  import org.apache.giraph.graph.MapperObserver;
45  import org.apache.giraph.graph.Vertex;
46  import org.apache.giraph.graph.VertexResolver;
47  import org.apache.giraph.graph.VertexValueCombiner;
48  import org.apache.giraph.io.EdgeInputFormat;
49  import org.apache.giraph.io.EdgeOutputFormat;
50  import org.apache.giraph.io.MappingInputFormat;
51  import org.apache.giraph.io.VertexInputFormat;
52  import org.apache.giraph.io.VertexOutputFormat;
53  import org.apache.giraph.io.filters.EdgeInputFilter;
54  import org.apache.giraph.io.filters.VertexInputFilter;
55  import org.apache.giraph.io.internal.WrappedEdgeInputFormat;
56  import org.apache.giraph.io.internal.WrappedEdgeOutputFormat;
57  import org.apache.giraph.io.internal.WrappedMappingInputFormat;
58  import org.apache.giraph.io.internal.WrappedVertexInputFormat;
59  import org.apache.giraph.io.internal.WrappedVertexOutputFormat;
60  import org.apache.giraph.io.superstep_output.MultiThreadedSuperstepOutput;
61  import org.apache.giraph.io.superstep_output.NoOpSuperstepOutput;
62  import org.apache.giraph.io.superstep_output.SuperstepOutput;
63  import org.apache.giraph.io.superstep_output.SynchronizedSuperstepOutput;
64  import org.apache.giraph.job.GiraphJobObserver;
65  import org.apache.giraph.job.GiraphJobRetryChecker;
66  import org.apache.giraph.mapping.MappingStore;
67  import org.apache.giraph.mapping.MappingStoreOps;
68  import org.apache.giraph.mapping.translate.TranslateEdge;
69  import org.apache.giraph.master.MasterCompute;
70  import org.apache.giraph.master.MasterObserver;
71  import org.apache.giraph.master.SuperstepClasses;
72  import org.apache.giraph.partition.GraphPartitionerFactory;
73  import org.apache.giraph.partition.Partition;
74  import org.apache.giraph.utils.ExtendedByteArrayDataInput;
75  import org.apache.giraph.utils.ExtendedByteArrayDataOutput;
76  import org.apache.giraph.utils.ExtendedDataInput;
77  import org.apache.giraph.utils.ExtendedDataOutput;
78  import org.apache.giraph.utils.ReflectionUtils;
79  import org.apache.giraph.utils.UnsafeByteArrayInputStream;
80  import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
81  import org.apache.giraph.utils.WritableUtils;
82  import org.apache.giraph.utils.io.BigDataInputOutput;
83  import org.apache.giraph.utils.io.DataInputOutput;
84  import org.apache.giraph.utils.io.ExtendedDataInputOutput;
85  import org.apache.giraph.worker.WorkerContext;
86  import org.apache.giraph.worker.WorkerObserver;
87  import org.apache.hadoop.conf.Configuration;
88  import org.apache.hadoop.io.NullWritable;
89  import org.apache.hadoop.io.Writable;
90  import org.apache.hadoop.io.WritableComparable;
91  import org.apache.hadoop.mapreduce.Mapper;
92  import org.apache.hadoop.util.Progressable;
93  
94  import com.google.common.base.Preconditions;
95  
96  
97  /**
98   * The classes set here are immutable, the remaining configuration is mutable.
99   * Classes are immutable and final to provide the best performance for
100  * instantiation.  Everything is thread-safe.
101  *
102  * @param <I> Vertex id
103  * @param <V> Vertex data
104  * @param <E> Edge data
105  */
106 @SuppressWarnings("unchecked")
107 public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
108     V extends Writable, E extends Writable> extends GiraphConfiguration {
109   /** Holder for all the classes */
110   private final GiraphClasses classes;
111   /** Mapping target class */
112   private Class<? extends Writable> mappingTargetClass = null;
113   /** Value (IVEMM) Factories */
114   private final ValueFactories<I, V, E> valueFactories;
115   /** Language values (IVEMM) are implemented in */
116   private final PerGraphTypeEnum<Language> valueLanguages;
117   /** Whether values (IVEMM) need Jython wrappers */
118   private final PerGraphTypeBoolean valueNeedsWrappers;
119 
120 
121   /**
122    * Use unsafe serialization? Cached for fast access to instantiate the
123    * extended data input/output classes
124    */
125   private final boolean useUnsafeSerialization;
126   /**
127    * Use BigDataIO for messages? Cached for fast access to instantiate the
128    * extended data input/output classes for messages
129    */
130   private final boolean useBigDataIOForMessages;
131   /** Is the graph static (meaning there is no mutation)? */
132   private final boolean isStaticGraph;
133 
134   /**
135    * Constructor.  Takes the configuration and then gets the classes out of
136    * them for Giraph
137    *
138    * @param conf Configuration
139    */
140   public ImmutableClassesGiraphConfiguration(Configuration conf) {
141     super(conf);
142     classes = new GiraphClasses<I, V, E>(conf);
143     useUnsafeSerialization = USE_UNSAFE_SERIALIZATION.get(this);
144     useBigDataIOForMessages = USE_BIG_DATA_IO_FOR_MESSAGES.get(this);
145     valueLanguages = PerGraphTypeEnum.readFromConf(
146         GiraphConstants.GRAPH_TYPE_LANGUAGES, conf);
147     valueNeedsWrappers = PerGraphTypeBoolean.readFromConf(
148         GiraphConstants.GRAPH_TYPES_NEEDS_WRAPPERS, conf);
149     isStaticGraph = GiraphConstants.STATIC_GRAPH.get(this);
150     valueFactories = new ValueFactories<I, V, E>(this);
151   }
152 
153   /**
154    * Configure an object with this instance if the object is configurable.
155    *
156    * @param obj Object
157    */
158   public void configureIfPossible(Object obj) {
159     if (obj instanceof GiraphConfigurationSettable) {
160       ((GiraphConfigurationSettable) obj).setConf(this);
161     }
162   }
163 
164   public PerGraphTypeBoolean getValueNeedsWrappers() {
165     return valueNeedsWrappers;
166   }
167 
168   public PerGraphTypeEnum<Language> getValueLanguages() {
169     return valueLanguages;
170   }
171 
172   /**
173    * Get the class used for edge translation during vertex input
174    *
175    * @return edge translation class
176    */
177   public Class<? extends TranslateEdge> edgeTranslationClass() {
178     return EDGE_TRANSLATION_CLASS.get(this);
179   }
180 
181   /**
182    * Instance of TranslateEdge that contains helper method for edge translation
183    *
184    * @return instance of TranslateEdge
185    */
186   public TranslateEdge<I, E> edgeTranslationInstance() {
187     if (edgeTranslationClass() != null) {
188       return ReflectionUtils.newInstance(edgeTranslationClass(), this);
189     }
190     return null;
191   }
192 
193   /**
194    * Get the vertex input filter class
195    *
196    * @return VertexInputFilter class
197    */
198   public Class<? extends EdgeInputFilter<I, E>>
199   getEdgeInputFilterClass() {
200     return classes.getEdgeInputFilterClass();
201   }
202 
203   /**
204    * Get the edge input filter to use
205    *
206    * @return EdgeInputFilter
207    */
208   public EdgeInputFilter getEdgeInputFilter() {
209     return ReflectionUtils.newInstance(getEdgeInputFilterClass(), this);
210   }
211 
212   /**
213    * Get the vertex input filter class
214    *
215    * @return VertexInputFilter class
216    */
217   public Class<? extends VertexInputFilter<I, V, E>>
218   getVertexInputFilterClass() {
219     return classes.getVertexInputFilterClass();
220   }
221 
222   /**
223    * Get the vertex input filter to use
224    *
225    * @return VertexInputFilter
226    */
227   public VertexInputFilter getVertexInputFilter() {
228     return ReflectionUtils.newInstance(getVertexInputFilterClass(), this);
229   }
230 
231   /**
232    * Get the user's subclassed
233    * {@link org.apache.giraph.partition.GraphPartitionerFactory}.
234    *
235    * @return User's graph partitioner
236    */
237   public Class<? extends GraphPartitionerFactory<I, V, E>>
238   getGraphPartitionerClass() {
239     return classes.getGraphPartitionerFactoryClass();
240   }
241 
242   /**
243    * Create a user graph partitioner class
244    *
245    * @return Instantiated user graph partitioner class
246    */
247   public GraphPartitionerFactory<I, V, E> createGraphPartitioner() {
248     Class<? extends GraphPartitionerFactory<I, V, E>> klass =
249         classes.getGraphPartitionerFactoryClass();
250     return ReflectionUtils.newInstance(klass, this);
251   }
252 
253   @Override
254   public boolean hasVertexInputFormat() {
255     return classes.hasVertexInputFormat();
256   }
257 
258   /**
259    * Get the user's subclassed
260    * {@link org.apache.giraph.io.VertexInputFormat}.
261    *
262    * @return User's vertex input format class
263    */
264   public Class<? extends VertexInputFormat<I, V, E>>
265   getVertexInputFormatClass() {
266     return classes.getVertexInputFormatClass();
267   }
268 
269   /**
270    * Create a user vertex input format class.
271    * Note: Giraph should only use WrappedVertexInputFormat,
272    * which makes sure that Configuration parameters are set properly.
273    *
274    * @return Instantiated user vertex input format class
275    */
276   private VertexInputFormat<I, V, E> createVertexInputFormat() {
277     Class<? extends VertexInputFormat<I, V, E>> klass =
278         getVertexInputFormatClass();
279     return ReflectionUtils.newInstance(klass, this);
280   }
281 
282   /**
283    * Create a wrapper for user vertex input format,
284    * which makes sure that Configuration parameters are set properly in all
285    * methods related to this format.
286    *
287    * @return Wrapper around user vertex input format
288    */
289   public WrappedVertexInputFormat<I, V, E> createWrappedVertexInputFormat() {
290     WrappedVertexInputFormat<I, V, E> wrappedVertexInputFormat =
291         new WrappedVertexInputFormat<I, V, E>(createVertexInputFormat());
292     configureIfPossible(wrappedVertexInputFormat);
293     return wrappedVertexInputFormat;
294   }
295 
296   @Override
297   public void setVertexInputFormatClass(
298       Class<? extends VertexInputFormat> vertexInputFormatClass) {
299     super.setVertexInputFormatClass(vertexInputFormatClass);
300     classes.setVertexInputFormatClass(vertexInputFormatClass);
301   }
302 
303   @Override
304   public boolean hasVertexOutputFormat() {
305     return classes.hasVertexOutputFormat();
306   }
307 
308   /**
309    * Get the user's subclassed
310    * {@link org.apache.giraph.io.VertexOutputFormat}.
311    *
312    * @return User's vertex output format class
313    */
314   public Class<? extends VertexOutputFormat<I, V, E>>
315   getVertexOutputFormatClass() {
316     return classes.getVertexOutputFormatClass();
317   }
318 
319   /**
320    * Get MappingInputFormatClass
321    *
322    * @return MappingInputFormatClass
323    */
324   public Class<? extends MappingInputFormat<I, V, E, ? extends Writable>>
325   getMappingInputFormatClass() {
326     return classes.getMappingInputFormatClass();
327   }
328 
329   /**
330    * Set MappingInputFormatClass
331    *
332    * @param mappingInputFormatClass Determines how mappings are input
333    */
334   @Override
335   public void setMappingInputFormatClass(
336     Class<? extends MappingInputFormat> mappingInputFormatClass) {
337     super.setMappingInputFormatClass(mappingInputFormatClass);
338     classes.setMappingInputFormatClass(mappingInputFormatClass);
339   }
340 
341   /**
342    * Check if mappingInputFormat is set
343    *
344    * @return true if mappingInputFormat is set
345    */
346   public boolean hasMappingInputFormat() {
347     return classes.hasMappingInputFormat();
348   }
349 
350   /**
351    * Create a user vertex output format class.
352    * Note: Giraph should only use WrappedVertexOutputFormat,
353    * which makes sure that Configuration parameters are set properly.
354    *
355    * @return Instantiated user vertex output format class
356    */
357   private VertexOutputFormat<I, V, E> createVertexOutputFormat() {
358     Class<? extends VertexOutputFormat<I, V, E>> klass =
359         getVertexOutputFormatClass();
360     return ReflectionUtils.newInstance(klass, this);
361   }
362 
363   /**
364    * Create a user mapping input format class.
365    * Note: Giraph should only use WrappedMappingInputFormat,
366    * which makes sure that Configuration parameters are set properly.
367    *
368    * @return Instantiated user mapping input format class
369    */
370   private MappingInputFormat<I, V, E, ? extends Writable>
371   createMappingInputFormat() {
372     Class<? extends MappingInputFormat<I, V, E, ? extends Writable>> klass =
373         getMappingInputFormatClass();
374     return ReflectionUtils.newInstance(klass, this);
375   }
376 
377   /**
378    * Create a wrapper for user vertex output format,
379    * which makes sure that Configuration parameters are set properly in all
380    * methods related to this format.
381    *
382    * @return Wrapper around user vertex output format
383    */
384   public WrappedVertexOutputFormat<I, V, E> createWrappedVertexOutputFormat() {
385     WrappedVertexOutputFormat<I, V, E> wrappedVertexOutputFormat =
386         new WrappedVertexOutputFormat<I, V, E>(createVertexOutputFormat());
387     configureIfPossible(wrappedVertexOutputFormat);
388     return wrappedVertexOutputFormat;
389   }
390 
391   /**
392    * Create a wrapper for user mapping input format,
393    * which makes sure that Configuration parameters are set properly in all
394    * methods related to this format.
395    *
396    * @return Wrapper around user mapping input format
397    */
398   public WrappedMappingInputFormat<I, V, E, ? extends Writable>
399   createWrappedMappingInputFormat() {
400     WrappedMappingInputFormat<I, V, E, ? extends Writable>
401       wrappedMappingInputFormat =
402         new WrappedMappingInputFormat<>(createMappingInputFormat());
403     configureIfPossible(wrappedMappingInputFormat);
404     return wrappedMappingInputFormat;
405   }
406 
407   @Override
408   public boolean hasEdgeOutputFormat() {
409     return classes.hasEdgeOutputFormat();
410   }
411 
412   /**
413    * Get the user's subclassed
414    * {@link org.apache.giraph.io.EdgeOutputFormat}.
415    *
416    * @return User's edge output format class
417    */
418   public Class<? extends EdgeOutputFormat<I, V, E>>
419   getEdgeOutputFormatClass() {
420     return classes.getEdgeOutputFormatClass();
421   }
422 
423   /**
424    * Create a user edge output format class.
425    * Note: Giraph should only use WrappedEdgeOutputFormat,
426    * which makes sure that Configuration parameters are set properly.
427    *
428    * @return Instantiated user edge output format class
429    */
430   private EdgeOutputFormat<I, V, E> createEdgeOutputFormat() {
431     Class<? extends EdgeOutputFormat<I, V, E>> klass =
432         getEdgeOutputFormatClass();
433     return ReflectionUtils.newInstance(klass, this);
434   }
435 
436   /**
437    * Create a wrapper for user edge output format,
438    * which makes sure that Configuration parameters are set properly in all
439    * methods related to this format.
440    *
441    * @return Wrapper around user edge output format
442    */
443   public WrappedEdgeOutputFormat<I, V, E> createWrappedEdgeOutputFormat() {
444     WrappedEdgeOutputFormat<I, V, E> wrappedEdgeOutputFormat =
445         new WrappedEdgeOutputFormat<I, V, E>(createEdgeOutputFormat());
446     configureIfPossible(wrappedEdgeOutputFormat);
447     return wrappedEdgeOutputFormat;
448   }
449 
450   /**
451    * Create the proper superstep output, based on the configuration settings.
452    *
453    * @param context Mapper context
454    * @return SuperstepOutput
455    */
456   public SuperstepOutput<I, V, E> createSuperstepOutput(
457       Mapper<?, ?, ?, ?>.Context context) {
458     if (doOutputDuringComputation()) {
459       if (vertexOutputFormatThreadSafe()) {
460         return new MultiThreadedSuperstepOutput<I, V, E>(this, context);
461       } else {
462         return new SynchronizedSuperstepOutput<I, V, E>(this, context);
463       }
464     } else {
465       return new NoOpSuperstepOutput<I, V, E>();
466     }
467   }
468 
469   @Override
470   public boolean hasEdgeInputFormat() {
471     return classes.hasEdgeInputFormat();
472   }
473 
474   /**
475    * Get the user's subclassed
476    * {@link org.apache.giraph.io.EdgeInputFormat}.
477    *
478    * @return User's edge input format class
479    */
480   public Class<? extends EdgeInputFormat<I, E>> getEdgeInputFormatClass() {
481     return classes.getEdgeInputFormatClass();
482   }
483 
484   /**
485    * Create a user edge input format class.
486    * Note: Giraph should only use WrappedEdgeInputFormat,
487    * which makes sure that Configuration parameters are set properly.
488    *
489    * @return Instantiated user edge input format class
490    */
491   private EdgeInputFormat<I, E> createEdgeInputFormat() {
492     Class<? extends EdgeInputFormat<I, E>> klass = getEdgeInputFormatClass();
493     return ReflectionUtils.newInstance(klass, this);
494   }
495 
496   /**
497    * Create a wrapper for user edge input format,
498    * which makes sure that Configuration parameters are set properly in all
499    * methods related to this format.
500    *
501    * @return Wrapper around user edge input format
502    */
503   public WrappedEdgeInputFormat<I, E> createWrappedEdgeInputFormat() {
504     WrappedEdgeInputFormat<I, E> wrappedEdgeInputFormat =
505         new WrappedEdgeInputFormat<I, E>(createEdgeInputFormat());
506     configureIfPossible(wrappedEdgeInputFormat);
507     return wrappedEdgeInputFormat;
508   }
509 
510   @Override
511   public void setEdgeInputFormatClass(
512       Class<? extends EdgeInputFormat> edgeInputFormatClass) {
513     super.setEdgeInputFormatClass(edgeInputFormatClass);
514     classes.setEdgeInputFormatClass(edgeInputFormatClass);
515   }
516 
517   /**
518    * Get the user's subclassed {@link AggregatorWriter}.
519    *
520    * @return User's aggregator writer class
521    */
522   public Class<? extends AggregatorWriter> getAggregatorWriterClass() {
523     return classes.getAggregatorWriterClass();
524   }
525 
526   /**
527    * Create a user aggregator output format class
528    *
529    * @return Instantiated user aggregator writer class
530    */
531   public AggregatorWriter createAggregatorWriter() {
532     return ReflectionUtils.newInstance(getAggregatorWriterClass(), this);
533   }
534 
535   /**
536    * Get the user's subclassed
537    * {@link org.apache.giraph.graph.VertexValueCombiner} class.
538    *
539    * @return User's vertex value combiner class
540    */
541   public Class<? extends VertexValueCombiner<V>>
542   getVertexValueCombinerClass() {
543     return classes.getVertexValueCombinerClass();
544   }
545 
546   /**
547    * Create a user vertex value combiner class
548    *
549    * @return Instantiated user vertex value combiner class
550    */
551   @SuppressWarnings("rawtypes")
552   public VertexValueCombiner<V> createVertexValueCombiner() {
553     return ReflectionUtils.newInstance(getVertexValueCombinerClass(), this);
554   }
555 
556   /**
557    * Get the user's subclassed VertexResolver.
558    *
559    * @return User's vertex resolver class
560    */
561   public Class<? extends VertexResolver<I, V, E>> getVertexResolverClass() {
562     return classes.getVertexResolverClass();
563   }
564 
565   /**
566    * Create a user vertex revolver
567    *
568    * @return Instantiated user vertex resolver
569    */
570   public VertexResolver<I, V, E> createVertexResolver() {
571     return ReflectionUtils.newInstance(getVertexResolverClass(), this);
572   }
573 
574   /**
575    * Get the user's subclassed WorkerContext.
576    *
577    * @return User's worker context class
578    */
579   public Class<? extends WorkerContext> getWorkerContextClass() {
580     return classes.getWorkerContextClass();
581   }
582 
583   /**
584    * Create a user worker context
585    *
586    * @return Instantiated user worker context
587    */
588   public WorkerContext createWorkerContext() {
589     return ReflectionUtils.newInstance(getWorkerContextClass(), this);
590   }
591 
592   /**
593    * Get the user's subclassed {@link org.apache.giraph.master.MasterCompute}
594    *
595    * @return User's master class
596    */
597   public Class<? extends MasterCompute> getMasterComputeClass() {
598     return classes.getMasterComputeClass();
599   }
600 
601   /**
602    * Create a user master
603    *
604    * @return Instantiated user master
605    */
606   public MasterCompute createMasterCompute() {
607     return ReflectionUtils.newInstance(getMasterComputeClass(), this);
608   }
609 
610   @Override
611   public Class<? extends Computation<I, V, E,
612       ? extends Writable, ? extends Writable>>
613   getComputationClass() {
614     return classes.getComputationClass();
615   }
616 
617   /**
618    * Get computation factory class
619    *
620    * @return computation factory class
621    */
622   @Override
623   public Class<? extends ComputationFactory<I, V, E,
624       ? extends Writable, ? extends Writable>>
625   getComputationFactoryClass() {
626     return classes.getComputationFactoryClass();
627   }
628 
629   /**
630    * Get computation factory
631    *
632    * @return computation factory
633    */
634   public ComputationFactory<I, V, E, ? extends Writable, ? extends Writable>
635   createComputationFactory() {
636     return ReflectionUtils.newInstance(getComputationFactoryClass(), this);
637   }
638 
639   /**
640    * Create a user computation
641    *
642    * @return Instantiated user computation
643    */
644   public Computation<I, V, E, ? extends Writable, ? extends Writable>
645   createComputation() {
646     return createComputationFactory().createComputation(this);
647   }
648 
649   /**
650    * Get user types describing graph (I,V,E,M1,M2)
651    *
652    * @return GiraphTypes
653    */
654   public GiraphTypes<I, V, E> getGiraphTypes() {
655     return classes.getGiraphTypes();
656   }
657 
658   /**
659    * Create a vertex
660    *
661    * @return Instantiated vertex
662    */
663   public Vertex<I, V, E> createVertex() {
664     Class vertexClass = classes.getVertexClass();
665     return (Vertex<I, V, E>) ReflectionUtils.newInstance(vertexClass, this);
666   }
667 
668 
669  /**
670    * Get the user's subclassed vertex index class.
671    *
672    * @return User's vertex index class
673    */
674   public Class<I> getVertexIdClass() {
675     return classes.getVertexIdClass();
676   }
677 
678   /**
679    * Get vertex ID factory
680    *
681    * @return {@link VertexIdFactory}
682    */
683   public VertexIdFactory<I> getVertexIdFactory() {
684     return valueFactories.getVertexIdFactory();
685   }
686 
687   /**
688    * Create a user vertex index
689    *
690    * @return Instantiated user vertex index
691    */
692   public I createVertexId() {
693     return getVertexIdFactory().newInstance();
694   }
695 
696   /**
697    * Get the user's subclassed vertex value class.
698    *
699    * @return User's vertex value class
700    */
701   public Class<V> getVertexValueClass() {
702     return classes.getVertexValueClass();
703   }
704 
705   /**
706    * Get vertex value factory
707    *
708    * @return {@link VertexValueFactory}
709    */
710   public VertexValueFactory<V> getVertexValueFactory() {
711     return valueFactories.getVertexValueFactory();
712   }
713 
714   /**
715    * Create a user vertex value
716    *
717    * @return Instantiated user vertex value
718    */
719   @SuppressWarnings("unchecked")
720   public V createVertexValue() {
721     return getVertexValueFactory().newInstance();
722   }
723 
724   /**
725    * Get the user's subclassed vertex value factory class
726    *
727    * @return User's vertex value factory class
728    */
729   public Class<? extends VertexValueFactory<V>> getVertexValueFactoryClass() {
730     return (Class<? extends VertexValueFactory<V>>)
731         valueFactories.getVertexValueFactory().getClass();
732   }
733 
734   /**
735    * Create array of MasterObservers.
736    *
737    * @return Instantiated array of MasterObservers.
738    */
739   public MasterObserver[] createMasterObservers() {
740     Class<? extends MasterObserver>[] klasses = getMasterObserverClasses();
741     MasterObserver[] objects = new MasterObserver[klasses.length];
742     for (int i = 0; i < klasses.length; ++i) {
743       objects[i] = ReflectionUtils.newInstance(klasses[i], this);
744     }
745     return objects;
746   }
747 
748   /**
749    * Create array of WorkerObservers.
750    *
751    * @return Instantiated array of WorkerObservers.
752    */
753   public WorkerObserver[] createWorkerObservers() {
754     Class<? extends WorkerObserver>[] klasses = getWorkerObserverClasses();
755     WorkerObserver[] objects = new WorkerObserver[klasses.length];
756     for (int i = 0; i < klasses.length; ++i) {
757       objects[i] = ReflectionUtils.newInstance(klasses[i], this);
758     }
759     return objects;
760   }
761 
762   /**
763    * Create array of MapperObservers.
764    *
765    * @return Instantiated array of MapperObservers.
766    */
767   public MapperObserver[] createMapperObservers() {
768     Class<? extends MapperObserver>[] klasses = getMapperObserverClasses();
769     MapperObserver[] objects = new MapperObserver[klasses.length];
770     for (int i = 0; i < klasses.length; ++i) {
771       objects[i] = ReflectionUtils.newInstance(klasses[i], this);
772     }
773     return objects;
774   }
775 
776   /**
777    * Create job observer
778    *
779    * @return GiraphJobObserver set in configuration.
780    */
781   public GiraphJobObserver getJobObserver() {
782     return ReflectionUtils.newInstance(getJobObserverClass(), this);
783   }
784 
785   /**
786    * Create job retry checker
787    *
788    * @return GiraphJobRetryChecker set in configuration.
789    */
790   public GiraphJobRetryChecker getJobRetryChecker() {
791     return ReflectionUtils.newInstance(getJobRetryCheckerClass(), this);
792   }
793 
794   /**
795    * Get the user's subclassed edge value class.
796    *
797    * @return User's vertex edge value class
798    */
799   public Class<E> getEdgeValueClass() {
800     return classes.getEdgeValueClass();
801   }
802 
803   /**
804    * Tell if we are using NullWritable for Edge value.
805    *
806    * @return true if NullWritable is class for
807    */
808   public boolean isEdgeValueNullWritable() {
809     return getEdgeValueClass() == NullWritable.class;
810   }
811 
812   /**
813    * Get Factory for creating edge values
814    *
815    * @return {@link EdgeValueFactory}
816    */
817   public EdgeValueFactory<E> getEdgeValueFactory() {
818     return valueFactories.getEdgeValueFactory();
819   }
820 
821   /**
822    * Create a user edge value
823    *
824    * @return Instantiated user edge value
825    */
826   public E createEdgeValue() {
827     return getEdgeValueFactory().newInstance();
828   }
829 
830   /**
831    * Create a user edge.
832    *
833    * @return Instantiated user edge.
834    */
835   public Edge<I, E> createEdge() {
836     if (isEdgeValueNullWritable()) {
837       return (Edge<I, E>) EdgeFactory.create(createVertexId());
838     } else {
839       return EdgeFactory.create(createVertexId(), createEdgeValue());
840     }
841   }
842 
843   /**
844    * Create edge based on #createEdge definition
845    *
846    * @param translateEdge instance of TranslateEdge
847    * @param edge edge to be translated
848    * @return translated edge
849    */
850   public Edge<I, E> createEdge(TranslateEdge<I, E>
851     translateEdge, Edge<I, E> edge) {
852     I translatedId = translateEdge.translateId(edge.getTargetVertexId());
853     if (isEdgeValueNullWritable()) {
854       return (Edge<I, E>) EdgeFactory.create(translatedId);
855     } else {
856       return EdgeFactory.create(translatedId,
857         translateEdge.cloneValue(edge.getValue()));
858     }
859   }
860 
861   /**
862    * Create a reusable edge.
863    *
864    * @return Instantiated reusable edge.
865    */
866   public ReusableEdge<I, E> createReusableEdge() {
867     if (isEdgeValueNullWritable()) {
868       return (ReusableEdge<I, E>) EdgeFactory.createReusable(createVertexId());
869     } else {
870       return EdgeFactory.createReusable(createVertexId(), createEdgeValue());
871     }
872   }
873 
874   /**
875    * Create edge store factory
876    *
877    * @return edge store factory
878    */
879   public EdgeStoreFactory<I, V, E> createEdgeStoreFactory() {
880     Class<? extends EdgeStoreFactory> edgeStoreFactoryClass =
881         EDGE_STORE_FACTORY_CLASS.get(this);
882     return ReflectionUtils.newInstance(edgeStoreFactoryClass);
883   }
884 
885   /**
886    * Get the user's subclassed incoming message value class.
887    *
888    * @param <M> Message data
889    * @return User's vertex message value class
890    */
891   public <M extends Writable> Class<M> getIncomingMessageValueClass() {
892     return classes.getIncomingMessageClasses().getMessageClass();
893   }
894 
895   /**
896    * Get the user's subclassed outgoing message value class.
897    *
898    * @param <M> Message type
899    * @return User's vertex message value class
900    */
901   public <M extends Writable> Class<M> getOutgoingMessageValueClass() {
902     return classes.getOutgoingMessageClasses().getMessageClass();
903   }
904 
905   /**
906    * Get incoming message classes
907    * @param <M> message type
908    * @return incoming message classes
909    */
910   public <M extends Writable>
911   MessageClasses<I, M> getIncomingMessageClasses() {
912     return classes.getIncomingMessageClasses();
913   }
914 
915   /**
916    * Get outgoing message classes
917    * @param <M> message type
918    * @return outgoing message classes
919    */
920   public <M extends Writable>
921   MessageClasses<I, M> getOutgoingMessageClasses() {
922     return classes.getOutgoingMessageClasses();
923   }
924 
925   /**
926    * Create new outgoing message value factory
927    * @param <M> message type
928    * @return outgoing message value factory
929    */
930   public <M extends Writable>
931   MessageValueFactory<M> createOutgoingMessageValueFactory() {
932     return classes.getOutgoingMessageClasses().createMessageValueFactory(this);
933   }
934 
935   /**
936    * Create new incoming message value factory
937    * @param <M> message type
938    * @return incoming message value factory
939    */
940   public <M extends Writable>
941   MessageValueFactory<M> createIncomingMessageValueFactory() {
942     return classes.getIncomingMessageClasses().createMessageValueFactory(this);
943   }
944 
945   @Override
946   public void setMessageCombinerClass(
947       Class<? extends MessageCombiner> messageCombinerClass) {
948     throw new IllegalArgumentException(
949         "Cannot set message combiner on ImmutableClassesGiraphConfigurable");
950   }
951 
952   /**
953    * Create a user combiner class
954    *
955    * @param <M> Message data
956    * @return Instantiated user combiner class
957    */
958   public <M extends Writable> MessageCombiner<? super I, M>
959   createOutgoingMessageCombiner() {
960     return classes.getOutgoingMessageClasses().createMessageCombiner(this);
961   }
962 
963   /**
964    * Check if user set a combiner
965    *
966    * @return True iff user set a combiner class
967    */
968   public boolean useOutgoingMessageCombiner() {
969     return classes.getOutgoingMessageClasses().useMessageCombiner();
970   }
971 
972   /**
973    * Get outgoing message encode and store type
974    * @return outgoing message encode and store type
975    */
976   public MessageEncodeAndStoreType getOutgoingMessageEncodeAndStoreType() {
977     return classes.getOutgoingMessageClasses().getMessageEncodeAndStoreType();
978   }
979 
980   @Override
981   public Class<? extends OutEdges<I, E>> getOutEdgesClass() {
982     return classes.getOutEdgesClass();
983   }
984 
985   /**
986    * Get the user's subclassed {@link org.apache.giraph.edge.OutEdges} used for
987    * input
988    *
989    * @return User's input vertex edges class
990    */
991   public Class<? extends OutEdges<I, E>> getInputOutEdgesClass() {
992     return classes.getInputOutEdgesClass();
993   }
994 
995   /**
996    * Check whether the user has specified a different
997    * {@link org.apache.giraph.edge.OutEdges} class to be used during
998    * edge-based input.
999    *
1000    * @return True iff there is a special edges class for input
1001    */
1002   public boolean useInputOutEdges() {
1003     return classes.getInputOutEdgesClass() != classes.getOutEdgesClass();
1004   }
1005 
1006   /**
1007    * Get MappingStore class to be used
1008    *
1009    * @return MappingStore class set by user
1010    */
1011   public Class<? extends MappingStore> getMappingStoreClass() {
1012     return MAPPING_STORE_CLASS.get(this);
1013   }
1014 
1015   /**
1016    * Create a {@link org.apache.giraph.mapping.MappingStore} instance
1017    *
1018    * @return MappingStore Instance
1019    */
1020   public MappingStore<I, ? extends Writable> createMappingStore() {
1021     if (getMappingStoreClass() != null) {
1022       return ReflectionUtils.newInstance(getMappingStoreClass(), this);
1023     } else {
1024       return null;
1025     }
1026   }
1027 
1028   /**
1029    * Get MappingStoreOps class to be used
1030    *
1031    * @return MappingStoreOps class set by user
1032    */
1033   public Class<? extends MappingStoreOps> getMappingStoreOpsClass() {
1034     return MAPPING_STORE_OPS_CLASS.get(this);
1035   }
1036 
1037   /**
1038    * Create a {@link org.apache.giraph.mapping.MappingStoreOps} instance
1039    *
1040    * @return MappingStoreOps Instance
1041    */
1042   public MappingStoreOps<I, ? extends Writable> createMappingStoreOps() {
1043     if (getMappingStoreOpsClass() != null) {
1044       return ReflectionUtils.newInstance(getMappingStoreOpsClass(), this);
1045     } else {
1046       return null;
1047     }
1048   }
1049 
1050   /**
1051    * Get mappingTarget class
1052    *
1053    * @return mappingTarget class
1054    */
1055   public Class<? extends Writable> getMappingTargetClass() {
1056     if (mappingTargetClass == null) {
1057       Class<?>[] classList = ReflectionUtils.getTypeArguments(
1058         MappingStore.class, getMappingStoreClass());
1059       Preconditions.checkArgument(classList.length == 2);
1060       mappingTargetClass = (Class<? extends Writable>) classList[1];
1061     }
1062     return mappingTargetClass;
1063   }
1064 
1065   /**
1066    * Create and return mappingTarget instance
1067    *
1068    * @return mappingTarget instance
1069    */
1070   public Writable createMappingTarget() {
1071     return WritableUtils.createWritable(getMappingTargetClass());
1072   }
1073 
1074   /**
1075    * Create a user {@link org.apache.giraph.edge.OutEdges}
1076    *
1077    * @return Instantiated user OutEdges
1078    */
1079   public OutEdges<I, E> createOutEdges() {
1080     return ReflectionUtils.newInstance(getOutEdgesClass(), this);
1081   }
1082 
1083   /**
1084    * Create a {@link org.apache.giraph.edge.OutEdges} instance and initialize
1085    * it with the default capacity.
1086    *
1087    * @return Instantiated OutEdges
1088    */
1089   public OutEdges<I, E> createAndInitializeOutEdges() {
1090     OutEdges<I, E> outEdges = createOutEdges();
1091     outEdges.initialize();
1092     return outEdges;
1093   }
1094 
1095   /**
1096    * Create a {@link org.apache.giraph.edge.OutEdges} instance and initialize
1097    * it with the given capacity (the number of edges that will be added).
1098    *
1099    * @param capacity Number of edges that will be added
1100    * @return Instantiated OutEdges
1101    */
1102   public OutEdges<I, E> createAndInitializeOutEdges(int capacity) {
1103     OutEdges<I, E> outEdges = createOutEdges();
1104     outEdges.initialize(capacity);
1105     return outEdges;
1106   }
1107 
1108   /**
1109    * Create a {@link org.apache.giraph.edge.OutEdges} instance and initialize
1110    * it with the given iterable of edges.
1111    *
1112    * @param edges Iterable of edges to add
1113    * @return Instantiated OutEdges
1114    */
1115   public OutEdges<I, E> createAndInitializeOutEdges(
1116       Iterable<Edge<I, E>> edges) {
1117     OutEdges<I, E> outEdges = createOutEdges();
1118     outEdges.initialize(edges);
1119     return outEdges;
1120   }
1121 
1122   /**
1123    * Create a user {@link org.apache.giraph.edge.OutEdges} used during
1124    * edge-based input
1125    *
1126    * @return Instantiated user input OutEdges
1127    */
1128   public OutEdges<I, E> createInputOutEdges() {
1129     return ReflectionUtils.newInstance(getInputOutEdgesClass(), this);
1130   }
1131 
1132   /**
1133    * Create an input {@link org.apache.giraph.edge.OutEdges} instance and
1134    * initialize it with the default capacity.
1135    *
1136    * @return Instantiated input OutEdges
1137    */
1138   public OutEdges<I, E> createAndInitializeInputOutEdges() {
1139     OutEdges<I, E> outEdges = createInputOutEdges();
1140     outEdges.initialize();
1141     return outEdges;
1142   }
1143 
1144   /**
1145    * Create a partition
1146    *
1147    * @param id Partition id
1148    * @param progressable Progressable for reporting progress
1149    * @return Instantiated partition
1150    */
1151   public Partition<I, V, E> createPartition(
1152       int id, Progressable progressable) {
1153     Class<? extends Partition<I, V, E>> klass = classes.getPartitionClass();
1154     Partition<I, V, E> partition = ReflectionUtils.newInstance(klass, this);
1155     partition.initialize(id, progressable);
1156     return partition;
1157   }
1158 
1159   /**
1160    * Use unsafe serialization?
1161    *
1162    * @return True if using unsafe serialization, false otherwise.
1163    */
1164   public boolean useUnsafeSerialization() {
1165     return useUnsafeSerialization;
1166   }
1167 
1168   /**
1169    * Create DataInputOutput to store messages
1170    *
1171    * @return DataInputOutput object
1172    */
1173   public DataInputOutput createMessagesInputOutput() {
1174     if (useBigDataIOForMessages) {
1175       return new BigDataInputOutput(this);
1176     } else {
1177       return new ExtendedDataInputOutput(this);
1178     }
1179   }
1180 
1181   /**
1182    * Create an extended data output (can be subclassed)
1183    *
1184    * @return ExtendedDataOutput object
1185    */
1186   public ExtendedDataOutput createExtendedDataOutput() {
1187     if (useUnsafeSerialization) {
1188       return new UnsafeByteArrayOutputStream();
1189     } else {
1190       return new ExtendedByteArrayDataOutput();
1191     }
1192   }
1193 
1194   /**
1195    * Create an extended data output (can be subclassed)
1196    *
1197    * @param expectedSize Expected size
1198    * @return ExtendedDataOutput object
1199    */
1200   public ExtendedDataOutput createExtendedDataOutput(int expectedSize) {
1201     if (useUnsafeSerialization) {
1202       return new UnsafeByteArrayOutputStream(expectedSize);
1203     } else {
1204       return new ExtendedByteArrayDataOutput(expectedSize);
1205     }
1206   }
1207 
1208   /**
1209    * Create an extended data output (can be subclassed)
1210    *
1211    * @param buf Buffer to use for the output (reuse perhaps)
1212    * @param pos How much of the buffer is already used
1213    * @return ExtendedDataOutput object
1214    */
1215   public ExtendedDataOutput createExtendedDataOutput(byte[] buf,
1216                                                      int pos) {
1217     if (useUnsafeSerialization) {
1218       return new UnsafeByteArrayOutputStream(buf, pos);
1219     } else {
1220       return new ExtendedByteArrayDataOutput(buf, pos);
1221     }
1222   }
1223 
1224   /**
1225    * Create an extended data input (can be subclassed)
1226    *
1227    * @param buf Buffer to use for the input
1228    * @param off Where to start reading in the buffer
1229    * @param length Maximum length of the buffer
1230    * @return ExtendedDataInput object
1231    */
1232   public ExtendedDataInput createExtendedDataInput(
1233       byte[] buf, int off, int length) {
1234     if (useUnsafeSerialization) {
1235       return new UnsafeByteArrayInputStream(buf, off, length);
1236     } else {
1237       return new ExtendedByteArrayDataInput(buf, off, length);
1238     }
1239   }
1240 
1241   /**
1242    * Create an extended data input (can be subclassed)
1243    *
1244    * @param buf Buffer to use for the input
1245    * @return ExtendedDataInput object
1246    */
1247   public ExtendedDataInput createExtendedDataInput(byte[] buf) {
1248     if (useUnsafeSerialization) {
1249       return new UnsafeByteArrayInputStream(buf);
1250     } else {
1251       return new ExtendedByteArrayDataInput(buf);
1252     }
1253   }
1254 
1255   /**
1256    * Create extendedDataInput based on extendedDataOutput
1257    *
1258    * @param extendedDataOutput extendedDataOutput
1259    * @return extendedDataInput
1260    */
1261   public ExtendedDataInput createExtendedDataInput(
1262     ExtendedDataOutput extendedDataOutput) {
1263     return createExtendedDataInput(extendedDataOutput.getByteArray(), 0,
1264         extendedDataOutput.getPos());
1265   }
1266 
1267   /**
1268    * Whether to use an unsafe serialization
1269    *
1270    * @return whether to use unsafe serialization
1271    */
1272   public boolean getUseUnsafeSerialization() {
1273     return useUnsafeSerialization;
1274   }
1275 
1276   /**
1277    * Update Computation and MessageCombiner class used
1278    *
1279    * @param superstepClasses SuperstepClasses
1280    */
1281   public void updateSuperstepClasses(SuperstepClasses superstepClasses) {
1282     superstepClasses.updateGiraphClasses(classes);
1283   }
1284 
1285   /**
1286    * Has the user enabled compression in netty client &amp; server
1287    *
1288    * @return true if ok to do compression of netty requests
1289    */
1290   public boolean doCompression() {
1291     switch (GiraphConstants.NETTY_COMPRESSION_ALGORITHM.get(this)) {
1292     case "SNAPPY":
1293       return true;
1294     case "INFLATE":
1295       return true;
1296     default:
1297       return false;
1298     }
1299   }
1300 
1301   /**
1302    * Get encoder for message compression in netty
1303    *
1304    * @return message to byte encoder
1305    */
1306   public MessageToByteEncoder getNettyCompressionEncoder() {
1307     switch (GiraphConstants.NETTY_COMPRESSION_ALGORITHM.get(this)) {
1308     case "SNAPPY":
1309       return new SnappyFramedEncoder();
1310     case "INFLATE":
1311       return new JdkZlibEncoder();
1312     default:
1313       return null;
1314     }
1315   }
1316 
1317   /**
1318    * Get decoder for message decompression in netty
1319    *
1320    * @return byte to message decoder
1321    */
1322   public ByteToMessageDecoder getNettyCompressionDecoder() {
1323     switch (GiraphConstants.NETTY_COMPRESSION_ALGORITHM.get(this)) {
1324     case "SNAPPY":
1325       return new SnappyFramedDecoder(true);
1326     case "INFLATE":
1327       return new JdkZlibDecoder();
1328     default:
1329       return null;
1330     }
1331   }
1332 
1333   /**
1334    * Whether the application with change or not the graph topology.
1335    *
1336    * @return true if the graph is static, false otherwise.
1337    */
1338   public boolean isStaticGraph() {
1339     return isStaticGraph;
1340   }
1341 
1342   /**
1343    * @return job id
1344    */
1345   public String getJobId() {
1346     return get("mapred.job.id", "UnknownJob");
1347   }
1348 }