View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.conf;
20  
21  import io.netty.handler.codec.ByteToMessageDecoder;
22  import io.netty.handler.codec.MessageToByteEncoder;
23  import io.netty.handler.codec.compression.JdkZlibDecoder;
24  import io.netty.handler.codec.compression.JdkZlibEncoder;
25  import io.netty.handler.codec.compression.SnappyFramedDecoder;
26  import io.netty.handler.codec.compression.SnappyFramedEncoder;
27  
28  import org.apache.giraph.aggregators.AggregatorWriter;
29  import org.apache.giraph.combiner.MessageCombiner;
30  import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
31  import org.apache.giraph.edge.Edge;
32  import org.apache.giraph.edge.EdgeFactory;
33  import org.apache.giraph.edge.EdgeStoreFactory;
34  import org.apache.giraph.edge.OutEdges;
35  import org.apache.giraph.edge.ReusableEdge;
36  import org.apache.giraph.factories.ComputationFactory;
37  import org.apache.giraph.factories.EdgeValueFactory;
38  import org.apache.giraph.factories.MessageValueFactory;
39  import org.apache.giraph.factories.OutEdgesFactory;
40  import org.apache.giraph.factories.ValueFactories;
41  import org.apache.giraph.factories.VertexIdFactory;
42  import org.apache.giraph.factories.VertexValueFactory;
43  import org.apache.giraph.graph.Computation;
44  import org.apache.giraph.graph.Language;
45  import org.apache.giraph.graph.MapperObserver;
46  import org.apache.giraph.graph.Vertex;
47  import org.apache.giraph.graph.VertexResolver;
48  import org.apache.giraph.graph.VertexValueCombiner;
49  import org.apache.giraph.io.EdgeInputFormat;
50  import org.apache.giraph.io.EdgeOutputFormat;
51  import org.apache.giraph.io.MappingInputFormat;
52  import org.apache.giraph.io.VertexInputFormat;
53  import org.apache.giraph.io.VertexOutputFormat;
54  import org.apache.giraph.io.filters.EdgeInputFilter;
55  import org.apache.giraph.io.filters.VertexInputFilter;
56  import org.apache.giraph.io.internal.WrappedEdgeInputFormat;
57  import org.apache.giraph.io.internal.WrappedEdgeOutputFormat;
58  import org.apache.giraph.io.internal.WrappedMappingInputFormat;
59  import org.apache.giraph.io.internal.WrappedVertexInputFormat;
60  import org.apache.giraph.io.internal.WrappedVertexOutputFormat;
61  import org.apache.giraph.io.superstep_output.MultiThreadedSuperstepOutput;
62  import org.apache.giraph.io.superstep_output.NoOpSuperstepOutput;
63  import org.apache.giraph.io.superstep_output.SuperstepOutput;
64  import org.apache.giraph.io.superstep_output.SynchronizedSuperstepOutput;
65  import org.apache.giraph.job.GiraphJobObserver;
66  import org.apache.giraph.job.GiraphJobRetryChecker;
67  import org.apache.giraph.mapping.MappingStore;
68  import org.apache.giraph.mapping.MappingStoreOps;
69  import org.apache.giraph.mapping.translate.TranslateEdge;
70  import org.apache.giraph.master.MasterCompute;
71  import org.apache.giraph.master.MasterObserver;
72  import org.apache.giraph.master.SuperstepClasses;
73  import org.apache.giraph.partition.GraphPartitionerFactory;
74  import org.apache.giraph.partition.Partition;
75  import org.apache.giraph.utils.ExtendedByteArrayDataInput;
76  import org.apache.giraph.utils.ExtendedByteArrayDataOutput;
77  import org.apache.giraph.utils.ExtendedDataInput;
78  import org.apache.giraph.utils.ExtendedDataOutput;
79  import org.apache.giraph.utils.GcObserver;
80  import org.apache.giraph.utils.ReflectionUtils;
81  import org.apache.giraph.utils.UnsafeByteArrayInputStream;
82  import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
83  import org.apache.giraph.utils.WritableUtils;
84  import org.apache.giraph.utils.io.BigDataInputOutput;
85  import org.apache.giraph.utils.io.DataInputOutput;
86  import org.apache.giraph.utils.io.ExtendedDataInputOutput;
87  import org.apache.giraph.worker.WorkerContext;
88  import org.apache.giraph.worker.WorkerObserver;
89  import org.apache.hadoop.conf.Configuration;
90  import org.apache.hadoop.io.NullWritable;
91  import org.apache.hadoop.io.Writable;
92  import org.apache.hadoop.io.WritableComparable;
93  import org.apache.hadoop.mapreduce.Mapper;
94  import org.apache.hadoop.util.Progressable;
95  
96  import com.google.common.base.Preconditions;
97  
98  
99  /**
100  * The classes set here are immutable, the remaining configuration is mutable.
101  * Classes are immutable and final to provide the best performance for
102  * instantiation.  Everything is thread-safe.
103  *
104  * @param <I> Vertex id
105  * @param <V> Vertex data
106  * @param <E> Edge data
107  */
108 @SuppressWarnings("unchecked")
109 public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
110     V extends Writable, E extends Writable> extends GiraphConfiguration {
111   /** Holder for all the classes */
112   private final GiraphClasses classes;
113   /** Mapping target class */
114   private Class<? extends Writable> mappingTargetClass = null;
115   /** Value (IVEMM) Factories */
116   private final ValueFactories<I, V, E> valueFactories;
117   /** Factory to create {@link OutEdges} for computation */
118   private final OutEdgesFactory<I, E> outEdgesFactory;
119   /** Factory to create {@link OutEdges} for input */
120   private final OutEdgesFactory<I, E> inputOutEdgesFactory;
121   /** Language values (IVEMM) are implemented in */
122   private final PerGraphTypeEnum<Language> valueLanguages;
123   /** Whether values (IVEMM) need Jython wrappers */
124   private final PerGraphTypeBoolean valueNeedsWrappers;
125 
126 
127   /**
128    * Use unsafe serialization? Cached for fast access to instantiate the
129    * extended data input/output classes
130    */
131   private final boolean useUnsafeSerialization;
132   /**
133    * Use BigDataIO for messages? Cached for fast access to instantiate the
134    * extended data input/output classes for messages
135    */
136   private final boolean useBigDataIOForMessages;
137   /** Is the graph static (meaning there is no mutation)? */
138   private final boolean isStaticGraph;
139   /** Whether or not to use message size encoding */
140   private final boolean useMessageSizeEncoding;
141 
142   /**
143    * Constructor.  Takes the configuration and then gets the classes out of
144    * them for Giraph
145    *
146    * @param conf Configuration
147    */
148   public ImmutableClassesGiraphConfiguration(Configuration conf) {
149     super(conf);
150     classes = new GiraphClasses<I, V, E>(conf);
151     useUnsafeSerialization = USE_UNSAFE_SERIALIZATION.get(this);
152     useBigDataIOForMessages = USE_BIG_DATA_IO_FOR_MESSAGES.get(this);
153     valueLanguages = PerGraphTypeEnum.readFromConf(
154         GiraphConstants.GRAPH_TYPE_LANGUAGES, conf);
155     valueNeedsWrappers = PerGraphTypeBoolean.readFromConf(
156         GiraphConstants.GRAPH_TYPES_NEEDS_WRAPPERS, conf);
157     isStaticGraph = GiraphConstants.STATIC_GRAPH.get(this);
158     valueFactories = new ValueFactories<I, V, E>(this);
159     outEdgesFactory = VERTEX_EDGES_FACTORY_CLASS.newInstance(this);
160     inputOutEdgesFactory = INPUT_VERTEX_EDGES_FACTORY_CLASS.newInstance(this);
161     useMessageSizeEncoding = USE_MESSAGE_SIZE_ENCODING.get(conf);
162   }
163 
164   /**
165    * Configure an object with this instance if the object is configurable.
166    *
167    * @param obj Object
168    */
169   public void configureIfPossible(Object obj) {
170     if (obj instanceof GiraphConfigurationSettable) {
171       ((GiraphConfigurationSettable) obj).setConf(this);
172     }
173   }
174 
175   public PerGraphTypeBoolean getValueNeedsWrappers() {
176     return valueNeedsWrappers;
177   }
178 
179   public PerGraphTypeEnum<Language> getValueLanguages() {
180     return valueLanguages;
181   }
182 
183   /**
184    * Get the class used for edge translation during vertex input
185    *
186    * @return edge translation class
187    */
188   public Class<? extends TranslateEdge> edgeTranslationClass() {
189     return EDGE_TRANSLATION_CLASS.get(this);
190   }
191 
192   /**
193    * Instance of TranslateEdge that contains helper method for edge translation
194    *
195    * @return instance of TranslateEdge
196    */
197   public TranslateEdge<I, E> edgeTranslationInstance() {
198     if (edgeTranslationClass() != null) {
199       return ReflectionUtils.newInstance(edgeTranslationClass(), this);
200     }
201     return null;
202   }
203 
204   /**
205    * Get the vertex input filter class
206    *
207    * @return VertexInputFilter class
208    */
209   public Class<? extends EdgeInputFilter<I, E>>
210   getEdgeInputFilterClass() {
211     return classes.getEdgeInputFilterClass();
212   }
213 
214   /**
215    * Get the edge input filter to use
216    *
217    * @return EdgeInputFilter
218    */
219   public EdgeInputFilter getEdgeInputFilter() {
220     return ReflectionUtils.newInstance(getEdgeInputFilterClass(), this);
221   }
222 
223   /**
224    * Get the vertex input filter class
225    *
226    * @return VertexInputFilter class
227    */
228   public Class<? extends VertexInputFilter<I, V, E>>
229   getVertexInputFilterClass() {
230     return classes.getVertexInputFilterClass();
231   }
232 
233   /**
234    * Get the vertex input filter to use
235    *
236    * @return VertexInputFilter
237    */
238   public VertexInputFilter getVertexInputFilter() {
239     return ReflectionUtils.newInstance(getVertexInputFilterClass(), this);
240   }
241 
242   /**
243    * Get the user's subclassed
244    * {@link org.apache.giraph.partition.GraphPartitionerFactory}.
245    *
246    * @return User's graph partitioner
247    */
248   public Class<? extends GraphPartitionerFactory<I, V, E>>
249   getGraphPartitionerClass() {
250     return classes.getGraphPartitionerFactoryClass();
251   }
252 
253   /**
254    * Create a user graph partitioner class
255    *
256    * @return Instantiated user graph partitioner class
257    */
258   public GraphPartitionerFactory<I, V, E> createGraphPartitioner() {
259     Class<? extends GraphPartitionerFactory<I, V, E>> klass =
260         classes.getGraphPartitionerFactoryClass();
261     return ReflectionUtils.newInstance(klass, this);
262   }
263 
264   @Override
265   public boolean hasVertexInputFormat() {
266     return classes.hasVertexInputFormat();
267   }
268 
269   /**
270    * Get the user's subclassed
271    * {@link org.apache.giraph.io.VertexInputFormat}.
272    *
273    * @return User's vertex input format class
274    */
275   public Class<? extends VertexInputFormat<I, V, E>>
276   getVertexInputFormatClass() {
277     return classes.getVertexInputFormatClass();
278   }
279 
280   /**
281    * Create a user vertex input format class.
282    * Note: Giraph should only use WrappedVertexInputFormat,
283    * which makes sure that Configuration parameters are set properly.
284    *
285    * @return Instantiated user vertex input format class
286    */
287   private VertexInputFormat<I, V, E> createVertexInputFormat() {
288     Class<? extends VertexInputFormat<I, V, E>> klass =
289         getVertexInputFormatClass();
290     return ReflectionUtils.newInstance(klass, this);
291   }
292 
293   /**
294    * Create a wrapper for user vertex input format,
295    * which makes sure that Configuration parameters are set properly in all
296    * methods related to this format.
297    *
298    * @return Wrapper around user vertex input format
299    */
300   public WrappedVertexInputFormat<I, V, E> createWrappedVertexInputFormat() {
301     WrappedVertexInputFormat<I, V, E> wrappedVertexInputFormat =
302         new WrappedVertexInputFormat<I, V, E>(createVertexInputFormat());
303     configureIfPossible(wrappedVertexInputFormat);
304     return wrappedVertexInputFormat;
305   }
306 
307   @Override
308   public void setVertexInputFormatClass(
309       Class<? extends VertexInputFormat> vertexInputFormatClass) {
310     super.setVertexInputFormatClass(vertexInputFormatClass);
311     classes.setVertexInputFormatClass(vertexInputFormatClass);
312   }
313 
314   @Override
315   public boolean hasVertexOutputFormat() {
316     return classes.hasVertexOutputFormat();
317   }
318 
319   /**
320    * Get the user's subclassed
321    * {@link org.apache.giraph.io.VertexOutputFormat}.
322    *
323    * @return User's vertex output format class
324    */
325   public Class<? extends VertexOutputFormat<I, V, E>>
326   getVertexOutputFormatClass() {
327     return classes.getVertexOutputFormatClass();
328   }
329 
330   /**
331    * Get MappingInputFormatClass
332    *
333    * @return MappingInputFormatClass
334    */
335   public Class<? extends MappingInputFormat<I, V, E, ? extends Writable>>
336   getMappingInputFormatClass() {
337     return classes.getMappingInputFormatClass();
338   }
339 
340   /**
341    * Set MappingInputFormatClass
342    *
343    * @param mappingInputFormatClass Determines how mappings are input
344    */
345   @Override
346   public void setMappingInputFormatClass(
347     Class<? extends MappingInputFormat> mappingInputFormatClass) {
348     super.setMappingInputFormatClass(mappingInputFormatClass);
349     classes.setMappingInputFormatClass(mappingInputFormatClass);
350   }
351 
352   /**
353    * Check if mappingInputFormat is set
354    *
355    * @return true if mappingInputFormat is set
356    */
357   public boolean hasMappingInputFormat() {
358     return classes.hasMappingInputFormat();
359   }
360 
361   /**
362    * Create a user vertex output format class.
363    * Note: Giraph should only use WrappedVertexOutputFormat,
364    * which makes sure that Configuration parameters are set properly.
365    *
366    * @return Instantiated user vertex output format class
367    */
368   private VertexOutputFormat<I, V, E> createVertexOutputFormat() {
369     Class<? extends VertexOutputFormat<I, V, E>> klass =
370         getVertexOutputFormatClass();
371     return ReflectionUtils.newInstance(klass, this);
372   }
373 
374   /**
375    * Create a user mapping input format class.
376    * Note: Giraph should only use WrappedMappingInputFormat,
377    * which makes sure that Configuration parameters are set properly.
378    *
379    * @return Instantiated user mapping input format class
380    */
381   private MappingInputFormat<I, V, E, ? extends Writable>
382   createMappingInputFormat() {
383     Class<? extends MappingInputFormat<I, V, E, ? extends Writable>> klass =
384         getMappingInputFormatClass();
385     return ReflectionUtils.newInstance(klass, this);
386   }
387 
388   /**
389    * Create a wrapper for user vertex output format,
390    * which makes sure that Configuration parameters are set properly in all
391    * methods related to this format.
392    *
393    * @return Wrapper around user vertex output format
394    */
395   public WrappedVertexOutputFormat<I, V, E> createWrappedVertexOutputFormat() {
396     WrappedVertexOutputFormat<I, V, E> wrappedVertexOutputFormat =
397         new WrappedVertexOutputFormat<I, V, E>(createVertexOutputFormat());
398     configureIfPossible(wrappedVertexOutputFormat);
399     return wrappedVertexOutputFormat;
400   }
401 
402   /**
403    * Create a wrapper for user mapping input format,
404    * which makes sure that Configuration parameters are set properly in all
405    * methods related to this format.
406    *
407    * @return Wrapper around user mapping input format
408    */
409   public WrappedMappingInputFormat<I, V, E, ? extends Writable>
410   createWrappedMappingInputFormat() {
411     WrappedMappingInputFormat<I, V, E, ? extends Writable>
412       wrappedMappingInputFormat =
413         new WrappedMappingInputFormat<>(createMappingInputFormat());
414     configureIfPossible(wrappedMappingInputFormat);
415     return wrappedMappingInputFormat;
416   }
417 
418   @Override
419   public boolean hasEdgeOutputFormat() {
420     return classes.hasEdgeOutputFormat();
421   }
422 
423   /**
424    * Get the user's subclassed
425    * {@link org.apache.giraph.io.EdgeOutputFormat}.
426    *
427    * @return User's edge output format class
428    */
429   public Class<? extends EdgeOutputFormat<I, V, E>>
430   getEdgeOutputFormatClass() {
431     return classes.getEdgeOutputFormatClass();
432   }
433 
434   /**
435    * Create a user edge output format class.
436    * Note: Giraph should only use WrappedEdgeOutputFormat,
437    * which makes sure that Configuration parameters are set properly.
438    *
439    * @return Instantiated user edge output format class
440    */
441   private EdgeOutputFormat<I, V, E> createEdgeOutputFormat() {
442     Class<? extends EdgeOutputFormat<I, V, E>> klass =
443         getEdgeOutputFormatClass();
444     return ReflectionUtils.newInstance(klass, this);
445   }
446 
447   /**
448    * Create a wrapper for user edge output format,
449    * which makes sure that Configuration parameters are set properly in all
450    * methods related to this format.
451    *
452    * @return Wrapper around user edge output format
453    */
454   public WrappedEdgeOutputFormat<I, V, E> createWrappedEdgeOutputFormat() {
455     WrappedEdgeOutputFormat<I, V, E> wrappedEdgeOutputFormat =
456         new WrappedEdgeOutputFormat<I, V, E>(createEdgeOutputFormat());
457     configureIfPossible(wrappedEdgeOutputFormat);
458     return wrappedEdgeOutputFormat;
459   }
460 
461   /**
462    * Create the proper superstep output, based on the configuration settings.
463    *
464    * @param context Mapper context
465    * @return SuperstepOutput
466    */
467   public SuperstepOutput<I, V, E> createSuperstepOutput(
468       Mapper<?, ?, ?, ?>.Context context) {
469     if (doOutputDuringComputation()) {
470       if (vertexOutputFormatThreadSafe()) {
471         return new MultiThreadedSuperstepOutput<I, V, E>(this, context);
472       } else {
473         return new SynchronizedSuperstepOutput<I, V, E>(this, context);
474       }
475     } else {
476       return new NoOpSuperstepOutput<I, V, E>();
477     }
478   }
479 
480   @Override
481   public boolean hasEdgeInputFormat() {
482     return classes.hasEdgeInputFormat();
483   }
484 
485   /**
486    * Get the user's subclassed
487    * {@link org.apache.giraph.io.EdgeInputFormat}.
488    *
489    * @return User's edge input format class
490    */
491   public Class<? extends EdgeInputFormat<I, E>> getEdgeInputFormatClass() {
492     return classes.getEdgeInputFormatClass();
493   }
494 
495   /**
496    * Create a user edge input format class.
497    * Note: Giraph should only use WrappedEdgeInputFormat,
498    * which makes sure that Configuration parameters are set properly.
499    *
500    * @return Instantiated user edge input format class
501    */
502   private EdgeInputFormat<I, E> createEdgeInputFormat() {
503     Class<? extends EdgeInputFormat<I, E>> klass = getEdgeInputFormatClass();
504     return ReflectionUtils.newInstance(klass, this);
505   }
506 
507   /**
508    * Create a wrapper for user edge input format,
509    * which makes sure that Configuration parameters are set properly in all
510    * methods related to this format.
511    *
512    * @return Wrapper around user edge input format
513    */
514   public WrappedEdgeInputFormat<I, E> createWrappedEdgeInputFormat() {
515     WrappedEdgeInputFormat<I, E> wrappedEdgeInputFormat =
516         new WrappedEdgeInputFormat<I, E>(createEdgeInputFormat());
517     configureIfPossible(wrappedEdgeInputFormat);
518     return wrappedEdgeInputFormat;
519   }
520 
521   @Override
522   public void setEdgeInputFormatClass(
523       Class<? extends EdgeInputFormat> edgeInputFormatClass) {
524     super.setEdgeInputFormatClass(edgeInputFormatClass);
525     classes.setEdgeInputFormatClass(edgeInputFormatClass);
526   }
527 
528   /**
529    * Get the user's subclassed {@link AggregatorWriter}.
530    *
531    * @return User's aggregator writer class
532    */
533   public Class<? extends AggregatorWriter> getAggregatorWriterClass() {
534     return classes.getAggregatorWriterClass();
535   }
536 
537   /**
538    * Create a user aggregator output format class
539    *
540    * @return Instantiated user aggregator writer class
541    */
542   public AggregatorWriter createAggregatorWriter() {
543     return ReflectionUtils.newInstance(getAggregatorWriterClass(), this);
544   }
545 
546   /**
547    * Get the user's subclassed
548    * {@link org.apache.giraph.graph.VertexValueCombiner} class.
549    *
550    * @return User's vertex value combiner class
551    */
552   public Class<? extends VertexValueCombiner<V>>
553   getVertexValueCombinerClass() {
554     return classes.getVertexValueCombinerClass();
555   }
556 
557   /**
558    * Create a user vertex value combiner class
559    *
560    * @return Instantiated user vertex value combiner class
561    */
562   @SuppressWarnings("rawtypes")
563   public VertexValueCombiner<V> createVertexValueCombiner() {
564     return ReflectionUtils.newInstance(getVertexValueCombinerClass(), this);
565   }
566 
567   /**
568    * Get the user's subclassed VertexResolver.
569    *
570    * @return User's vertex resolver class
571    */
572   public Class<? extends VertexResolver<I, V, E>> getVertexResolverClass() {
573     return classes.getVertexResolverClass();
574   }
575 
576   /**
577    * Create a user vertex revolver
578    *
579    * @return Instantiated user vertex resolver
580    */
581   public VertexResolver<I, V, E> createVertexResolver() {
582     return ReflectionUtils.newInstance(getVertexResolverClass(), this);
583   }
584 
585   /**
586    * Get the user's subclassed WorkerContext.
587    *
588    * @return User's worker context class
589    */
590   public Class<? extends WorkerContext> getWorkerContextClass() {
591     return classes.getWorkerContextClass();
592   }
593 
594   /**
595    * Create a user worker context
596    *
597    * @return Instantiated user worker context
598    */
599   public WorkerContext createWorkerContext() {
600     return ReflectionUtils.newInstance(getWorkerContextClass(), this);
601   }
602 
603   /**
604    * Get the user's subclassed {@link org.apache.giraph.master.MasterCompute}
605    *
606    * @return User's master class
607    */
608   public Class<? extends MasterCompute> getMasterComputeClass() {
609     return classes.getMasterComputeClass();
610   }
611 
612   /**
613    * Create a user master
614    *
615    * @return Instantiated user master
616    */
617   public MasterCompute createMasterCompute() {
618     return ReflectionUtils.newInstance(getMasterComputeClass(), this);
619   }
620 
621   @Override
622   public Class<? extends Computation<I, V, E,
623       ? extends Writable, ? extends Writable>>
624   getComputationClass() {
625     return classes.getComputationClass();
626   }
627 
628   /**
629    * Get computation factory class
630    *
631    * @return computation factory class
632    */
633   @Override
634   public Class<? extends ComputationFactory<I, V, E,
635       ? extends Writable, ? extends Writable>>
636   getComputationFactoryClass() {
637     return classes.getComputationFactoryClass();
638   }
639 
640   /**
641    * Get computation factory
642    *
643    * @return computation factory
644    */
645   public ComputationFactory<I, V, E, ? extends Writable, ? extends Writable>
646   createComputationFactory() {
647     return ReflectionUtils.newInstance(getComputationFactoryClass(), this);
648   }
649 
650   /**
651    * Create a user computation
652    *
653    * @return Instantiated user computation
654    */
655   public Computation<I, V, E, ? extends Writable, ? extends Writable>
656   createComputation() {
657     return createComputationFactory().createComputation(this);
658   }
659 
660   /**
661    * Get user types describing graph (I,V,E,M1,M2)
662    *
663    * @return GiraphTypes
664    */
665   public GiraphTypes<I, V, E> getGiraphTypes() {
666     return classes.getGiraphTypes();
667   }
668 
669   /**
670    * Create a vertex
671    *
672    * @return Instantiated vertex
673    */
674   public Vertex<I, V, E> createVertex() {
675     Class vertexClass = classes.getVertexClass();
676     return (Vertex<I, V, E>) ReflectionUtils.newInstance(vertexClass, this);
677   }
678 
679 
680  /**
681    * Get the user's subclassed vertex index class.
682    *
683    * @return User's vertex index class
684    */
685   public Class<I> getVertexIdClass() {
686     return classes.getVertexIdClass();
687   }
688 
689   /**
690    * Get vertex ID factory
691    *
692    * @return {@link VertexIdFactory}
693    */
694   public VertexIdFactory<I> getVertexIdFactory() {
695     return valueFactories.getVertexIdFactory();
696   }
697 
698   /**
699    * Create a user vertex index
700    *
701    * @return Instantiated user vertex index
702    */
703   public I createVertexId() {
704     return getVertexIdFactory().newInstance();
705   }
706 
707   /**
708    * Get the user's subclassed vertex value class.
709    *
710    * @return User's vertex value class
711    */
712   public Class<V> getVertexValueClass() {
713     return classes.getVertexValueClass();
714   }
715 
716   /**
717    * Get vertex value factory
718    *
719    * @return {@link VertexValueFactory}
720    */
721   public VertexValueFactory<V> getVertexValueFactory() {
722     return valueFactories.getVertexValueFactory();
723   }
724 
725   /**
726    * Create a user vertex value
727    *
728    * @return Instantiated user vertex value
729    */
730   @SuppressWarnings("unchecked")
731   public V createVertexValue() {
732     return getVertexValueFactory().newInstance();
733   }
734 
735   /**
736    * Get the user's subclassed vertex value factory class
737    *
738    * @return User's vertex value factory class
739    */
740   public Class<? extends VertexValueFactory<V>> getVertexValueFactoryClass() {
741     return (Class<? extends VertexValueFactory<V>>)
742         valueFactories.getVertexValueFactory().getClass();
743   }
744 
745   /**
746    * Create array of MasterObservers.
747    *
748    * @param context Mapper context
749    * @return Instantiated array of MasterObservers.
750    */
751   public MasterObserver[] createMasterObservers(
752       Mapper<?, ?, ?, ?>.Context context) {
753     Class<? extends MasterObserver>[] klasses = getMasterObserverClasses();
754     MasterObserver[] objects = new MasterObserver[klasses.length];
755     for (int i = 0; i < klasses.length; ++i) {
756       objects[i] = ReflectionUtils.newInstance(klasses[i], this, context);
757     }
758     return objects;
759   }
760 
761   /**
762    * Create array of WorkerObservers.
763    *
764    * @param context Mapper context
765    * @return Instantiated array of WorkerObservers.
766    */
767   public WorkerObserver[] createWorkerObservers(
768       Mapper<?, ?, ?, ?>.Context context) {
769     Class<? extends WorkerObserver>[] klasses = getWorkerObserverClasses();
770     WorkerObserver[] objects = new WorkerObserver[klasses.length];
771     for (int i = 0; i < klasses.length; ++i) {
772       objects[i] = ReflectionUtils.newInstance(klasses[i], this, context);
773     }
774     return objects;
775   }
776 
777   /**
778    * Create array of MapperObservers.
779    *
780    * @param context Mapper context
781    * @return Instantiated array of MapperObservers.
782    */
783   public MapperObserver[] createMapperObservers(
784       Mapper<?, ?, ?, ?>.Context context) {
785     Class<? extends MapperObserver>[] klasses = getMapperObserverClasses();
786     MapperObserver[] objects = new MapperObserver[klasses.length];
787     for (int i = 0; i < klasses.length; ++i) {
788       objects[i] = ReflectionUtils.newInstance(klasses[i], this, context);
789     }
790     return objects;
791   }
792 
793   /**
794    * Create array of GcObservers.
795    *
796    * @param context Mapper context
797    * @return Instantiated array of GcObservers.
798    */
799   public GcObserver[] createGcObservers(
800       Mapper<?, ?, ?, ?>.Context context) {
801     Class<? extends GcObserver>[] klasses = getGcObserverClasses();
802     GcObserver[] objects = new GcObserver[klasses.length];
803     for (int i = 0; i < klasses.length; ++i) {
804       objects[i] = ReflectionUtils.newInstance(klasses[i], this, context);
805     }
806     return objects;
807   }
808 
809   /**
810    * Create job observer
811    *
812    * @return GiraphJobObserver set in configuration.
813    */
814   public GiraphJobObserver getJobObserver() {
815     return ReflectionUtils.newInstance(getJobObserverClass(), this);
816   }
817 
818   /**
819    * Create job retry checker
820    *
821    * @return GiraphJobRetryChecker set in configuration.
822    */
823   public GiraphJobRetryChecker getJobRetryChecker() {
824     return ReflectionUtils.newInstance(getJobRetryCheckerClass(), this);
825   }
826 
827   /**
828    * Get the user's subclassed edge value class.
829    *
830    * @return User's vertex edge value class
831    */
832   public Class<E> getEdgeValueClass() {
833     return classes.getEdgeValueClass();
834   }
835 
836   /**
837    * Tell if we are using NullWritable for Edge value.
838    *
839    * @return true if NullWritable is class for
840    */
841   public boolean isEdgeValueNullWritable() {
842     return getEdgeValueClass() == NullWritable.class;
843   }
844 
845   /**
846    * Get Factory for creating edge values
847    *
848    * @return {@link EdgeValueFactory}
849    */
850   public EdgeValueFactory<E> getEdgeValueFactory() {
851     return valueFactories.getEdgeValueFactory();
852   }
853 
854   /**
855    * Create a user edge value
856    *
857    * @return Instantiated user edge value
858    */
859   public E createEdgeValue() {
860     return getEdgeValueFactory().newInstance();
861   }
862 
863   /**
864    * Create a user edge.
865    *
866    * @return Instantiated user edge.
867    */
868   public Edge<I, E> createEdge() {
869     if (isEdgeValueNullWritable()) {
870       return (Edge<I, E>) EdgeFactory.create(createVertexId());
871     } else {
872       return EdgeFactory.create(createVertexId(), createEdgeValue());
873     }
874   }
875 
876   /**
877    * Create edge based on #createEdge definition
878    *
879    * @param translateEdge instance of TranslateEdge
880    * @param edge edge to be translated
881    * @return translated edge
882    */
883   public Edge<I, E> createEdge(TranslateEdge<I, E>
884     translateEdge, Edge<I, E> edge) {
885     I translatedId = translateEdge.translateId(edge.getTargetVertexId());
886     if (isEdgeValueNullWritable()) {
887       return (Edge<I, E>) EdgeFactory.create(translatedId);
888     } else {
889       return EdgeFactory.create(translatedId,
890         translateEdge.cloneValue(edge.getValue()));
891     }
892   }
893 
894   /**
895    * Create a reusable edge.
896    *
897    * @return Instantiated reusable edge.
898    */
899   public ReusableEdge<I, E> createReusableEdge() {
900     if (isEdgeValueNullWritable()) {
901       return (ReusableEdge<I, E>) EdgeFactory.createReusable(createVertexId());
902     } else {
903       return EdgeFactory.createReusable(createVertexId(), createEdgeValue());
904     }
905   }
906 
907   /**
908    * Create edge store factory
909    *
910    * @return edge store factory
911    */
912   public EdgeStoreFactory<I, V, E> createEdgeStoreFactory() {
913     Class<? extends EdgeStoreFactory> edgeStoreFactoryClass =
914         EDGE_STORE_FACTORY_CLASS.get(this);
915     return ReflectionUtils.newInstance(edgeStoreFactoryClass);
916   }
917 
918   /**
919    * Get the user's subclassed incoming message value class.
920    *
921    * @param <M> Message data
922    * @return User's vertex message value class
923    */
924   public <M extends Writable> Class<M> getIncomingMessageValueClass() {
925     return classes.getIncomingMessageClasses().getMessageClass();
926   }
927 
928   /**
929    * Get the user's subclassed outgoing message value class.
930    *
931    * @param <M> Message type
932    * @return User's vertex message value class
933    */
934   public <M extends Writable> Class<M> getOutgoingMessageValueClass() {
935     return classes.getOutgoingMessageClasses().getMessageClass();
936   }
937 
938   /**
939    * Get incoming message classes
940    * @param <M> message type
941    * @return incoming message classes
942    */
943   public <M extends Writable>
944   MessageClasses<I, M> getIncomingMessageClasses() {
945     return classes.getIncomingMessageClasses();
946   }
947 
948   /**
949    * Get outgoing message classes
950    * @param <M> message type
951    * @return outgoing message classes
952    */
953   public <M extends Writable>
954   MessageClasses<I, M> getOutgoingMessageClasses() {
955     return classes.getOutgoingMessageClasses();
956   }
957 
958   /**
959    * Create new outgoing message value factory
960    * @param <M> message type
961    * @return outgoing message value factory
962    */
963   public <M extends Writable>
964   MessageValueFactory<M> createOutgoingMessageValueFactory() {
965     return classes.getOutgoingMessageClasses().createMessageValueFactory(this);
966   }
967 
968   /**
969    * Create new incoming message value factory
970    * @param <M> message type
971    * @return incoming message value factory
972    */
973   public <M extends Writable>
974   MessageValueFactory<M> createIncomingMessageValueFactory() {
975     return classes.getIncomingMessageClasses().createMessageValueFactory(this);
976   }
977 
978   @Override
979   public void setMessageCombinerClass(
980       Class<? extends MessageCombiner> messageCombinerClass) {
981     throw new IllegalArgumentException(
982         "Cannot set message combiner on ImmutableClassesGiraphConfigurable");
983   }
984 
985   /**
986    * Create a user combiner class
987    *
988    * @param <M> Message data
989    * @return Instantiated user combiner class
990    */
991   public <M extends Writable> MessageCombiner<? super I, M>
992   createOutgoingMessageCombiner() {
993     return classes.getOutgoingMessageClasses().createMessageCombiner(this);
994   }
995 
996   /**
997    * Check if user set a combiner
998    *
999    * @return True iff user set a combiner class
1000    */
1001   public boolean useOutgoingMessageCombiner() {
1002     return classes.getOutgoingMessageClasses().useMessageCombiner();
1003   }
1004 
1005   /**
1006    * Get outgoing message encode and store type
1007    * @return outgoing message encode and store type
1008    */
1009   public MessageEncodeAndStoreType getOutgoingMessageEncodeAndStoreType() {
1010     return classes.getOutgoingMessageClasses().getMessageEncodeAndStoreType();
1011   }
1012 
1013   @Override
1014   public Class<? extends OutEdges<I, E>> getOutEdgesClass() {
1015     return classes.getOutEdgesClass();
1016   }
1017 
1018   /**
1019    * Get the user's subclassed {@link org.apache.giraph.edge.OutEdges} used for
1020    * input
1021    *
1022    * @return User's input vertex edges class
1023    */
1024   public Class<? extends OutEdges<I, E>> getInputOutEdgesClass() {
1025     return classes.getInputOutEdgesClass();
1026   }
1027 
1028   /**
1029    * Check whether the user has specified a different
1030    * {@link org.apache.giraph.edge.OutEdges} class to be used during
1031    * edge-based input.
1032    *
1033    * @return True iff there is a special edges class for input
1034    */
1035   public boolean useInputOutEdges() {
1036     return classes.getInputOutEdgesClass() != classes.getOutEdgesClass();
1037   }
1038 
1039   /**
1040    * Get MappingStore class to be used
1041    *
1042    * @return MappingStore class set by user
1043    */
1044   public Class<? extends MappingStore> getMappingStoreClass() {
1045     return MAPPING_STORE_CLASS.get(this);
1046   }
1047 
1048   /**
1049    * Create a {@link org.apache.giraph.mapping.MappingStore} instance
1050    *
1051    * @return MappingStore Instance
1052    */
1053   public MappingStore<I, ? extends Writable> createMappingStore() {
1054     if (getMappingStoreClass() != null) {
1055       return ReflectionUtils.newInstance(getMappingStoreClass(), this);
1056     } else {
1057       return null;
1058     }
1059   }
1060 
1061   /**
1062    * Get MappingStoreOps class to be used
1063    *
1064    * @return MappingStoreOps class set by user
1065    */
1066   public Class<? extends MappingStoreOps> getMappingStoreOpsClass() {
1067     return MAPPING_STORE_OPS_CLASS.get(this);
1068   }
1069 
1070   /**
1071    * Create a {@link org.apache.giraph.mapping.MappingStoreOps} instance
1072    *
1073    * @return MappingStoreOps Instance
1074    */
1075   public MappingStoreOps<I, ? extends Writable> createMappingStoreOps() {
1076     if (getMappingStoreOpsClass() != null) {
1077       return ReflectionUtils.newInstance(getMappingStoreOpsClass(), this);
1078     } else {
1079       return null;
1080     }
1081   }
1082 
1083   /**
1084    * Get mappingTarget class
1085    *
1086    * @return mappingTarget class
1087    */
1088   public Class<? extends Writable> getMappingTargetClass() {
1089     if (mappingTargetClass == null) {
1090       Class<?>[] classList = ReflectionUtils.getTypeArguments(
1091         MappingStore.class, getMappingStoreClass());
1092       Preconditions.checkArgument(classList.length == 2);
1093       mappingTargetClass = (Class<? extends Writable>) classList[1];
1094     }
1095     return mappingTargetClass;
1096   }
1097 
1098   /**
1099    * Create and return mappingTarget instance
1100    *
1101    * @return mappingTarget instance
1102    */
1103   public Writable createMappingTarget() {
1104     return WritableUtils.createWritable(getMappingTargetClass());
1105   }
1106 
1107   /**
1108    * Create a user {@link org.apache.giraph.edge.OutEdges}
1109    *
1110    * @return Instantiated user OutEdges
1111    */
1112   public OutEdges<I, E> createOutEdges() {
1113     return outEdgesFactory.newInstance();
1114   }
1115 
1116   /**
1117    * Create a {@link org.apache.giraph.edge.OutEdges} instance and initialize
1118    * it with the default capacity.
1119    *
1120    * @return Instantiated OutEdges
1121    */
1122   public OutEdges<I, E> createAndInitializeOutEdges() {
1123     OutEdges<I, E> outEdges = createOutEdges();
1124     outEdges.initialize();
1125     return outEdges;
1126   }
1127 
1128   /**
1129    * Create a {@link org.apache.giraph.edge.OutEdges} instance and initialize
1130    * it with the given capacity (the number of edges that will be added).
1131    *
1132    * @param capacity Number of edges that will be added
1133    * @return Instantiated OutEdges
1134    */
1135   public OutEdges<I, E> createAndInitializeOutEdges(int capacity) {
1136     OutEdges<I, E> outEdges = createOutEdges();
1137     outEdges.initialize(capacity);
1138     return outEdges;
1139   }
1140 
1141   /**
1142    * Create a {@link org.apache.giraph.edge.OutEdges} instance and initialize
1143    * it with the given iterable of edges.
1144    *
1145    * @param edges Iterable of edges to add
1146    * @return Instantiated OutEdges
1147    */
1148   public OutEdges<I, E> createAndInitializeOutEdges(
1149       Iterable<Edge<I, E>> edges) {
1150     OutEdges<I, E> outEdges = createOutEdges();
1151     outEdges.initialize(edges);
1152     return outEdges;
1153   }
1154 
1155   /**
1156    * Create a user {@link org.apache.giraph.edge.OutEdges} used during
1157    * edge-based input
1158    *
1159    * @return Instantiated user input OutEdges
1160    */
1161   public OutEdges<I, E> createInputOutEdges() {
1162     return inputOutEdgesFactory.newInstance();
1163   }
1164 
1165   /**
1166    * Create an input {@link org.apache.giraph.edge.OutEdges} instance and
1167    * initialize it with the default capacity.
1168    *
1169    * @return Instantiated input OutEdges
1170    */
1171   public OutEdges<I, E> createAndInitializeInputOutEdges() {
1172     OutEdges<I, E> outEdges = createInputOutEdges();
1173     outEdges.initialize();
1174     return outEdges;
1175   }
1176 
1177   /**
1178    * Create a partition
1179    *
1180    * @param id Partition id
1181    * @param progressable Progressable for reporting progress
1182    * @return Instantiated partition
1183    */
1184   public Partition<I, V, E> createPartition(
1185       int id, Progressable progressable) {
1186     Class<? extends Partition<I, V, E>> klass = classes.getPartitionClass();
1187     Partition<I, V, E> partition = ReflectionUtils.newInstance(klass, this);
1188     partition.initialize(id, progressable);
1189     return partition;
1190   }
1191 
1192   /**
1193    * Use unsafe serialization?
1194    *
1195    * @return True if using unsafe serialization, false otherwise.
1196    */
1197   public boolean useUnsafeSerialization() {
1198     return useUnsafeSerialization;
1199   }
1200 
1201   /**
1202    * Create DataInputOutput to store messages
1203    *
1204    * @return DataInputOutput object
1205    */
1206   public DataInputOutput createMessagesInputOutput() {
1207     if (useBigDataIOForMessages) {
1208       return new BigDataInputOutput(this);
1209     } else {
1210       return new ExtendedDataInputOutput(this);
1211     }
1212   }
1213 
1214   /**
1215    * Create an extended data output (can be subclassed)
1216    *
1217    * @return ExtendedDataOutput object
1218    */
1219   public ExtendedDataOutput createExtendedDataOutput() {
1220     if (useUnsafeSerialization) {
1221       return new UnsafeByteArrayOutputStream();
1222     } else {
1223       return new ExtendedByteArrayDataOutput();
1224     }
1225   }
1226 
1227   /**
1228    * Create an extended data output (can be subclassed)
1229    *
1230    * @param expectedSize Expected size
1231    * @return ExtendedDataOutput object
1232    */
1233   public ExtendedDataOutput createExtendedDataOutput(int expectedSize) {
1234     if (useUnsafeSerialization) {
1235       return new UnsafeByteArrayOutputStream(expectedSize);
1236     } else {
1237       return new ExtendedByteArrayDataOutput(expectedSize);
1238     }
1239   }
1240 
1241   /**
1242    * Create an extended data output (can be subclassed)
1243    *
1244    * @param buf Buffer to use for the output (reuse perhaps)
1245    * @param pos How much of the buffer is already used
1246    * @return ExtendedDataOutput object
1247    */
1248   public ExtendedDataOutput createExtendedDataOutput(byte[] buf,
1249                                                      int pos) {
1250     if (useUnsafeSerialization) {
1251       return new UnsafeByteArrayOutputStream(buf, pos);
1252     } else {
1253       return new ExtendedByteArrayDataOutput(buf, pos);
1254     }
1255   }
1256 
1257   /**
1258    * Create an extended data input (can be subclassed)
1259    *
1260    * @param buf Buffer to use for the input
1261    * @param off Where to start reading in the buffer
1262    * @param length Maximum length of the buffer
1263    * @return ExtendedDataInput object
1264    */
1265   public ExtendedDataInput createExtendedDataInput(
1266       byte[] buf, int off, int length) {
1267     if (useUnsafeSerialization) {
1268       return new UnsafeByteArrayInputStream(buf, off, length);
1269     } else {
1270       return new ExtendedByteArrayDataInput(buf, off, length);
1271     }
1272   }
1273 
1274   /**
1275    * Create an extended data input (can be subclassed)
1276    *
1277    * @param buf Buffer to use for the input
1278    * @return ExtendedDataInput object
1279    */
1280   public ExtendedDataInput createExtendedDataInput(byte[] buf) {
1281     if (useUnsafeSerialization) {
1282       return new UnsafeByteArrayInputStream(buf);
1283     } else {
1284       return new ExtendedByteArrayDataInput(buf);
1285     }
1286   }
1287 
1288   /**
1289    * Create extendedDataInput based on extendedDataOutput
1290    *
1291    * @param extendedDataOutput extendedDataOutput
1292    * @return extendedDataInput
1293    */
1294   public ExtendedDataInput createExtendedDataInput(
1295     ExtendedDataOutput extendedDataOutput) {
1296     return createExtendedDataInput(extendedDataOutput.getByteArray(), 0,
1297         extendedDataOutput.getPos());
1298   }
1299 
1300   /**
1301    * Whether to use an unsafe serialization
1302    *
1303    * @return whether to use unsafe serialization
1304    */
1305   public boolean getUseUnsafeSerialization() {
1306     return useUnsafeSerialization;
1307   }
1308 
1309   /**
1310    * Update Computation and MessageCombiner class used
1311    *
1312    * @param superstepClasses SuperstepClasses
1313    */
1314   public void updateSuperstepClasses(SuperstepClasses superstepClasses) {
1315     superstepClasses.updateGiraphClasses(classes);
1316   }
1317 
1318   /**
1319    * Has the user enabled compression in netty client &amp; server
1320    *
1321    * @return true if ok to do compression of netty requests
1322    */
1323   public boolean doCompression() {
1324     switch (GiraphConstants.NETTY_COMPRESSION_ALGORITHM.get(this)) {
1325     case "SNAPPY":
1326       return true;
1327     case "INFLATE":
1328       return true;
1329     default:
1330       return false;
1331     }
1332   }
1333 
1334   /**
1335    * Get encoder for message compression in netty
1336    *
1337    * @return message to byte encoder
1338    */
1339   public MessageToByteEncoder getNettyCompressionEncoder() {
1340     switch (GiraphConstants.NETTY_COMPRESSION_ALGORITHM.get(this)) {
1341     case "SNAPPY":
1342       return new SnappyFramedEncoder();
1343     case "INFLATE":
1344       return new JdkZlibEncoder();
1345     default:
1346       return null;
1347     }
1348   }
1349 
1350   /**
1351    * Get decoder for message decompression in netty
1352    *
1353    * @return byte to message decoder
1354    */
1355   public ByteToMessageDecoder getNettyCompressionDecoder() {
1356     switch (GiraphConstants.NETTY_COMPRESSION_ALGORITHM.get(this)) {
1357     case "SNAPPY":
1358       return new SnappyFramedDecoder(true);
1359     case "INFLATE":
1360       return new JdkZlibDecoder();
1361     default:
1362       return null;
1363     }
1364   }
1365 
1366   /**
1367    * Whether the application with change or not the graph topology.
1368    *
1369    * @return true if the graph is static, false otherwise.
1370    */
1371   public boolean isStaticGraph() {
1372     return isStaticGraph;
1373   }
1374 
1375   /**
1376    * @return job id
1377    */
1378   public String getJobId() {
1379     return get("mapred.job.id", "UnknownJob");
1380   }
1381 
1382   /**
1383    * Use message size encoding?  This feature may help with complex message
1384    * objects.
1385    *
1386    * @return Whether to use message size encoding
1387    */
1388   public boolean useMessageSizeEncoding() {
1389     return useMessageSizeEncoding;
1390   }
1391 }