This project has retired. For details please refer to its
        
        Attic page.
      
1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.giraph.conf;
20  
21  import io.netty.handler.codec.ByteToMessageDecoder;
22  import io.netty.handler.codec.MessageToByteEncoder;
23  import io.netty.handler.codec.compression.JdkZlibDecoder;
24  import io.netty.handler.codec.compression.JdkZlibEncoder;
25  import io.netty.handler.codec.compression.SnappyFramedDecoder;
26  import io.netty.handler.codec.compression.SnappyFramedEncoder;
27  
28  import org.apache.giraph.aggregators.AggregatorWriter;
29  import org.apache.giraph.combiner.MessageCombiner;
30  import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
31  import org.apache.giraph.edge.Edge;
32  import org.apache.giraph.edge.EdgeFactory;
33  import org.apache.giraph.edge.EdgeStoreFactory;
34  import org.apache.giraph.edge.OutEdges;
35  import org.apache.giraph.edge.ReusableEdge;
36  import org.apache.giraph.factories.ComputationFactory;
37  import org.apache.giraph.factories.EdgeValueFactory;
38  import org.apache.giraph.factories.MessageValueFactory;
39  import org.apache.giraph.factories.OutEdgesFactory;
40  import org.apache.giraph.factories.ValueFactories;
41  import org.apache.giraph.factories.VertexIdFactory;
42  import org.apache.giraph.factories.VertexValueFactory;
43  import org.apache.giraph.graph.Computation;
44  import org.apache.giraph.graph.Language;
45  import org.apache.giraph.graph.MapperObserver;
46  import org.apache.giraph.graph.Vertex;
47  import org.apache.giraph.graph.VertexResolver;
48  import org.apache.giraph.graph.VertexValueCombiner;
49  import org.apache.giraph.io.EdgeInputFormat;
50  import org.apache.giraph.io.EdgeOutputFormat;
51  import org.apache.giraph.io.MappingInputFormat;
52  import org.apache.giraph.io.VertexInputFormat;
53  import org.apache.giraph.io.VertexOutputFormat;
54  import org.apache.giraph.io.filters.EdgeInputFilter;
55  import org.apache.giraph.io.filters.VertexInputFilter;
56  import org.apache.giraph.io.internal.WrappedEdgeInputFormat;
57  import org.apache.giraph.io.internal.WrappedEdgeOutputFormat;
58  import org.apache.giraph.io.internal.WrappedMappingInputFormat;
59  import org.apache.giraph.io.internal.WrappedVertexInputFormat;
60  import org.apache.giraph.io.internal.WrappedVertexOutputFormat;
61  import org.apache.giraph.io.superstep_output.MultiThreadedSuperstepOutput;
62  import org.apache.giraph.io.superstep_output.NoOpSuperstepOutput;
63  import org.apache.giraph.io.superstep_output.SuperstepOutput;
64  import org.apache.giraph.io.superstep_output.SynchronizedSuperstepOutput;
65  import org.apache.giraph.job.GiraphJobObserver;
66  import org.apache.giraph.job.GiraphJobRetryChecker;
67  import org.apache.giraph.mapping.MappingStore;
68  import org.apache.giraph.mapping.MappingStoreOps;
69  import org.apache.giraph.mapping.translate.TranslateEdge;
70  import org.apache.giraph.master.MasterCompute;
71  import org.apache.giraph.master.MasterObserver;
72  import org.apache.giraph.master.SuperstepClasses;
73  import org.apache.giraph.partition.GraphPartitionerFactory;
74  import org.apache.giraph.partition.Partition;
75  import org.apache.giraph.utils.ExtendedByteArrayDataInput;
76  import org.apache.giraph.utils.ExtendedByteArrayDataOutput;
77  import org.apache.giraph.utils.ExtendedDataInput;
78  import org.apache.giraph.utils.ExtendedDataOutput;
79  import org.apache.giraph.utils.GcObserver;
80  import org.apache.giraph.utils.ReflectionUtils;
81  import org.apache.giraph.utils.UnsafeByteArrayInputStream;
82  import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
83  import org.apache.giraph.utils.WritableUtils;
84  import org.apache.giraph.utils.io.BigDataInputOutput;
85  import org.apache.giraph.utils.io.DataInputOutput;
86  import org.apache.giraph.utils.io.ExtendedDataInputOutput;
87  import org.apache.giraph.worker.WorkerContext;
88  import org.apache.giraph.worker.WorkerObserver;
89  import org.apache.hadoop.conf.Configuration;
90  import org.apache.hadoop.io.NullWritable;
91  import org.apache.hadoop.io.Writable;
92  import org.apache.hadoop.io.WritableComparable;
93  import org.apache.hadoop.mapreduce.Mapper;
94  import org.apache.hadoop.util.Progressable;
95  
96  import com.google.common.base.Preconditions;
97  
98  
99  
100 
101 
102 
103 
104 
105 
106 
107 
108 @SuppressWarnings("unchecked")
109 public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
110     V extends Writable, E extends Writable> extends GiraphConfiguration {
111   
112   private final GiraphClasses classes;
113   
114   private Class<? extends Writable> mappingTargetClass = null;
115   
116   private final ValueFactories<I, V, E> valueFactories;
117   
118   private final OutEdgesFactory<I, E> outEdgesFactory;
119   
120   private final OutEdgesFactory<I, E> inputOutEdgesFactory;
121   
122   private final PerGraphTypeEnum<Language> valueLanguages;
123   
124   private final PerGraphTypeBoolean valueNeedsWrappers;
125 
126 
127   
128 
129 
130 
131   private final boolean useUnsafeSerialization;
132   
133 
134 
135 
136   private final boolean useBigDataIOForMessages;
137   
138   private final boolean isStaticGraph;
139   
140   private final boolean useMessageSizeEncoding;
141 
142   
143 
144 
145 
146 
147 
148   public ImmutableClassesGiraphConfiguration(Configuration conf) {
149     super(conf);
150     classes = new GiraphClasses<I, V, E>(conf);
151     useUnsafeSerialization = USE_UNSAFE_SERIALIZATION.get(this);
152     useBigDataIOForMessages = USE_BIG_DATA_IO_FOR_MESSAGES.get(this);
153     valueLanguages = PerGraphTypeEnum.readFromConf(
154         GiraphConstants.GRAPH_TYPE_LANGUAGES, conf);
155     valueNeedsWrappers = PerGraphTypeBoolean.readFromConf(
156         GiraphConstants.GRAPH_TYPES_NEEDS_WRAPPERS, conf);
157     isStaticGraph = GiraphConstants.STATIC_GRAPH.get(this);
158     valueFactories = new ValueFactories<I, V, E>(this);
159     outEdgesFactory = VERTEX_EDGES_FACTORY_CLASS.newInstance(this);
160     inputOutEdgesFactory = INPUT_VERTEX_EDGES_FACTORY_CLASS.newInstance(this);
161     useMessageSizeEncoding = USE_MESSAGE_SIZE_ENCODING.get(conf);
162   }
163 
164   
165 
166 
167 
168 
169   public void configureIfPossible(Object obj) {
170     if (obj instanceof GiraphConfigurationSettable) {
171       ((GiraphConfigurationSettable) obj).setConf(this);
172     }
173   }
174 
175   public PerGraphTypeBoolean getValueNeedsWrappers() {
176     return valueNeedsWrappers;
177   }
178 
179   public PerGraphTypeEnum<Language> getValueLanguages() {
180     return valueLanguages;
181   }
182 
183   
184 
185 
186 
187 
188   public Class<? extends TranslateEdge> edgeTranslationClass() {
189     return EDGE_TRANSLATION_CLASS.get(this);
190   }
191 
192   
193 
194 
195 
196 
197   public TranslateEdge<I, E> edgeTranslationInstance() {
198     if (edgeTranslationClass() != null) {
199       return ReflectionUtils.newInstance(edgeTranslationClass(), this);
200     }
201     return null;
202   }
203 
204   
205 
206 
207 
208 
209   public Class<? extends EdgeInputFilter<I, E>>
210   getEdgeInputFilterClass() {
211     return classes.getEdgeInputFilterClass();
212   }
213 
214   
215 
216 
217 
218 
219   public EdgeInputFilter getEdgeInputFilter() {
220     return ReflectionUtils.newInstance(getEdgeInputFilterClass(), this);
221   }
222 
223   
224 
225 
226 
227 
228   public Class<? extends VertexInputFilter<I, V, E>>
229   getVertexInputFilterClass() {
230     return classes.getVertexInputFilterClass();
231   }
232 
233   
234 
235 
236 
237 
238   public VertexInputFilter getVertexInputFilter() {
239     return ReflectionUtils.newInstance(getVertexInputFilterClass(), this);
240   }
241 
242   
243 
244 
245 
246 
247 
248   public Class<? extends GraphPartitionerFactory<I, V, E>>
249   getGraphPartitionerClass() {
250     return classes.getGraphPartitionerFactoryClass();
251   }
252 
253   
254 
255 
256 
257 
258   public GraphPartitionerFactory<I, V, E> createGraphPartitioner() {
259     Class<? extends GraphPartitionerFactory<I, V, E>> klass =
260         classes.getGraphPartitionerFactoryClass();
261     return ReflectionUtils.newInstance(klass, this);
262   }
263 
264   @Override
265   public boolean hasVertexInputFormat() {
266     return classes.hasVertexInputFormat();
267   }
268 
269   
270 
271 
272 
273 
274 
275   public Class<? extends VertexInputFormat<I, V, E>>
276   getVertexInputFormatClass() {
277     return classes.getVertexInputFormatClass();
278   }
279 
280   
281 
282 
283 
284 
285 
286 
287   private VertexInputFormat<I, V, E> createVertexInputFormat() {
288     Class<? extends VertexInputFormat<I, V, E>> klass =
289         getVertexInputFormatClass();
290     return ReflectionUtils.newInstance(klass, this);
291   }
292 
293   
294 
295 
296 
297 
298 
299 
300   public WrappedVertexInputFormat<I, V, E> createWrappedVertexInputFormat() {
301     WrappedVertexInputFormat<I, V, E> wrappedVertexInputFormat =
302         new WrappedVertexInputFormat<I, V, E>(createVertexInputFormat());
303     configureIfPossible(wrappedVertexInputFormat);
304     return wrappedVertexInputFormat;
305   }
306 
307   @Override
308   public void setVertexInputFormatClass(
309       Class<? extends VertexInputFormat> vertexInputFormatClass) {
310     super.setVertexInputFormatClass(vertexInputFormatClass);
311     classes.setVertexInputFormatClass(vertexInputFormatClass);
312   }
313 
314   @Override
315   public boolean hasVertexOutputFormat() {
316     return classes.hasVertexOutputFormat();
317   }
318 
319   
320 
321 
322 
323 
324 
325   public Class<? extends VertexOutputFormat<I, V, E>>
326   getVertexOutputFormatClass() {
327     return classes.getVertexOutputFormatClass();
328   }
329 
330   
331 
332 
333 
334 
335   public Class<? extends MappingInputFormat<I, V, E, ? extends Writable>>
336   getMappingInputFormatClass() {
337     return classes.getMappingInputFormatClass();
338   }
339 
340   
341 
342 
343 
344 
345   @Override
346   public void setMappingInputFormatClass(
347     Class<? extends MappingInputFormat> mappingInputFormatClass) {
348     super.setMappingInputFormatClass(mappingInputFormatClass);
349     classes.setMappingInputFormatClass(mappingInputFormatClass);
350   }
351 
352   
353 
354 
355 
356 
357   public boolean hasMappingInputFormat() {
358     return classes.hasMappingInputFormat();
359   }
360 
361   
362 
363 
364 
365 
366 
367 
368   private VertexOutputFormat<I, V, E> createVertexOutputFormat() {
369     Class<? extends VertexOutputFormat<I, V, E>> klass =
370         getVertexOutputFormatClass();
371     return ReflectionUtils.newInstance(klass, this);
372   }
373 
374   
375 
376 
377 
378 
379 
380 
381   private MappingInputFormat<I, V, E, ? extends Writable>
382   createMappingInputFormat() {
383     Class<? extends MappingInputFormat<I, V, E, ? extends Writable>> klass =
384         getMappingInputFormatClass();
385     return ReflectionUtils.newInstance(klass, this);
386   }
387 
388   
389 
390 
391 
392 
393 
394 
395   public WrappedVertexOutputFormat<I, V, E> createWrappedVertexOutputFormat() {
396     WrappedVertexOutputFormat<I, V, E> wrappedVertexOutputFormat =
397         new WrappedVertexOutputFormat<I, V, E>(createVertexOutputFormat());
398     configureIfPossible(wrappedVertexOutputFormat);
399     return wrappedVertexOutputFormat;
400   }
401 
402   
403 
404 
405 
406 
407 
408 
409   public WrappedMappingInputFormat<I, V, E, ? extends Writable>
410   createWrappedMappingInputFormat() {
411     WrappedMappingInputFormat<I, V, E, ? extends Writable>
412       wrappedMappingInputFormat =
413         new WrappedMappingInputFormat<>(createMappingInputFormat());
414     configureIfPossible(wrappedMappingInputFormat);
415     return wrappedMappingInputFormat;
416   }
417 
418   @Override
419   public boolean hasEdgeOutputFormat() {
420     return classes.hasEdgeOutputFormat();
421   }
422 
423   
424 
425 
426 
427 
428 
429   public Class<? extends EdgeOutputFormat<I, V, E>>
430   getEdgeOutputFormatClass() {
431     return classes.getEdgeOutputFormatClass();
432   }
433 
434   
435 
436 
437 
438 
439 
440 
441   private EdgeOutputFormat<I, V, E> createEdgeOutputFormat() {
442     Class<? extends EdgeOutputFormat<I, V, E>> klass =
443         getEdgeOutputFormatClass();
444     return ReflectionUtils.newInstance(klass, this);
445   }
446 
447   
448 
449 
450 
451 
452 
453 
454   public WrappedEdgeOutputFormat<I, V, E> createWrappedEdgeOutputFormat() {
455     WrappedEdgeOutputFormat<I, V, E> wrappedEdgeOutputFormat =
456         new WrappedEdgeOutputFormat<I, V, E>(createEdgeOutputFormat());
457     configureIfPossible(wrappedEdgeOutputFormat);
458     return wrappedEdgeOutputFormat;
459   }
460 
461   
462 
463 
464 
465 
466 
467   public SuperstepOutput<I, V, E> createSuperstepOutput(
468       Mapper<?, ?, ?, ?>.Context context) {
469     if (doOutputDuringComputation()) {
470       if (vertexOutputFormatThreadSafe()) {
471         return new MultiThreadedSuperstepOutput<I, V, E>(this, context);
472       } else {
473         return new SynchronizedSuperstepOutput<I, V, E>(this, context);
474       }
475     } else {
476       return new NoOpSuperstepOutput<I, V, E>();
477     }
478   }
479 
480   @Override
481   public boolean hasEdgeInputFormat() {
482     return classes.hasEdgeInputFormat();
483   }
484 
485   
486 
487 
488 
489 
490 
491   public Class<? extends EdgeInputFormat<I, E>> getEdgeInputFormatClass() {
492     return classes.getEdgeInputFormatClass();
493   }
494 
495   
496 
497 
498 
499 
500 
501 
502   private EdgeInputFormat<I, E> createEdgeInputFormat() {
503     Class<? extends EdgeInputFormat<I, E>> klass = getEdgeInputFormatClass();
504     return ReflectionUtils.newInstance(klass, this);
505   }
506 
507   
508 
509 
510 
511 
512 
513 
514   public WrappedEdgeInputFormat<I, E> createWrappedEdgeInputFormat() {
515     WrappedEdgeInputFormat<I, E> wrappedEdgeInputFormat =
516         new WrappedEdgeInputFormat<I, E>(createEdgeInputFormat());
517     configureIfPossible(wrappedEdgeInputFormat);
518     return wrappedEdgeInputFormat;
519   }
520 
521   @Override
522   public void setEdgeInputFormatClass(
523       Class<? extends EdgeInputFormat> edgeInputFormatClass) {
524     super.setEdgeInputFormatClass(edgeInputFormatClass);
525     classes.setEdgeInputFormatClass(edgeInputFormatClass);
526   }
527 
528   
529 
530 
531 
532 
533   public Class<? extends AggregatorWriter> getAggregatorWriterClass() {
534     return classes.getAggregatorWriterClass();
535   }
536 
537   
538 
539 
540 
541 
542   public AggregatorWriter createAggregatorWriter() {
543     return ReflectionUtils.newInstance(getAggregatorWriterClass(), this);
544   }
545 
546   
547 
548 
549 
550 
551 
552   public Class<? extends VertexValueCombiner<V>>
553   getVertexValueCombinerClass() {
554     return classes.getVertexValueCombinerClass();
555   }
556 
557   
558 
559 
560 
561 
562   @SuppressWarnings("rawtypes")
563   public VertexValueCombiner<V> createVertexValueCombiner() {
564     return ReflectionUtils.newInstance(getVertexValueCombinerClass(), this);
565   }
566 
567   
568 
569 
570 
571 
572   public Class<? extends VertexResolver<I, V, E>> getVertexResolverClass() {
573     return classes.getVertexResolverClass();
574   }
575 
576   
577 
578 
579 
580 
581   public VertexResolver<I, V, E> createVertexResolver() {
582     return ReflectionUtils.newInstance(getVertexResolverClass(), this);
583   }
584 
585   
586 
587 
588 
589 
590   public Class<? extends WorkerContext> getWorkerContextClass() {
591     return classes.getWorkerContextClass();
592   }
593 
594   
595 
596 
597 
598 
599   public WorkerContext createWorkerContext() {
600     return ReflectionUtils.newInstance(getWorkerContextClass(), this);
601   }
602 
603   
604 
605 
606 
607 
608   public Class<? extends MasterCompute> getMasterComputeClass() {
609     return classes.getMasterComputeClass();
610   }
611 
612   
613 
614 
615 
616 
617   public MasterCompute createMasterCompute() {
618     return ReflectionUtils.newInstance(getMasterComputeClass(), this);
619   }
620 
621   @Override
622   public Class<? extends Computation<I, V, E,
623       ? extends Writable, ? extends Writable>>
624   getComputationClass() {
625     return classes.getComputationClass();
626   }
627 
628   
629 
630 
631 
632 
633   @Override
634   public Class<? extends ComputationFactory<I, V, E,
635       ? extends Writable, ? extends Writable>>
636   getComputationFactoryClass() {
637     return classes.getComputationFactoryClass();
638   }
639 
640   
641 
642 
643 
644 
645   public ComputationFactory<I, V, E, ? extends Writable, ? extends Writable>
646   createComputationFactory() {
647     return ReflectionUtils.newInstance(getComputationFactoryClass(), this);
648   }
649 
650   
651 
652 
653 
654 
655   public Computation<I, V, E, ? extends Writable, ? extends Writable>
656   createComputation() {
657     return createComputationFactory().createComputation(this);
658   }
659 
660   
661 
662 
663 
664 
665   public GiraphTypes<I, V, E> getGiraphTypes() {
666     return classes.getGiraphTypes();
667   }
668 
669   
670 
671 
672 
673 
674   public Vertex<I, V, E> createVertex() {
675     Class vertexClass = classes.getVertexClass();
676     return (Vertex<I, V, E>) ReflectionUtils.newInstance(vertexClass, this);
677   }
678 
679 
680  
681 
682 
683 
684 
685   public Class<I> getVertexIdClass() {
686     return classes.getVertexIdClass();
687   }
688 
689   
690 
691 
692 
693 
694   public VertexIdFactory<I> getVertexIdFactory() {
695     return valueFactories.getVertexIdFactory();
696   }
697 
698   
699 
700 
701 
702 
703   public I createVertexId() {
704     return getVertexIdFactory().newInstance();
705   }
706 
707   
708 
709 
710 
711 
712   public Class<V> getVertexValueClass() {
713     return classes.getVertexValueClass();
714   }
715 
716   
717 
718 
719 
720 
721   public VertexValueFactory<V> getVertexValueFactory() {
722     return valueFactories.getVertexValueFactory();
723   }
724 
725   
726 
727 
728 
729 
730   @SuppressWarnings("unchecked")
731   public V createVertexValue() {
732     return getVertexValueFactory().newInstance();
733   }
734 
735   
736 
737 
738 
739 
740   public Class<? extends VertexValueFactory<V>> getVertexValueFactoryClass() {
741     return (Class<? extends VertexValueFactory<V>>)
742         valueFactories.getVertexValueFactory().getClass();
743   }
744 
745   
746 
747 
748 
749 
750 
751   public MasterObserver[] createMasterObservers(
752       Mapper<?, ?, ?, ?>.Context context) {
753     Class<? extends MasterObserver>[] klasses = getMasterObserverClasses();
754     MasterObserver[] objects = new MasterObserver[klasses.length];
755     for (int i = 0; i < klasses.length; ++i) {
756       objects[i] = ReflectionUtils.newInstance(klasses[i], this, context);
757     }
758     return objects;
759   }
760 
761   
762 
763 
764 
765 
766 
767   public WorkerObserver[] createWorkerObservers(
768       Mapper<?, ?, ?, ?>.Context context) {
769     Class<? extends WorkerObserver>[] klasses = getWorkerObserverClasses();
770     WorkerObserver[] objects = new WorkerObserver[klasses.length];
771     for (int i = 0; i < klasses.length; ++i) {
772       objects[i] = ReflectionUtils.newInstance(klasses[i], this, context);
773     }
774     return objects;
775   }
776 
777   
778 
779 
780 
781 
782 
783   public MapperObserver[] createMapperObservers(
784       Mapper<?, ?, ?, ?>.Context context) {
785     Class<? extends MapperObserver>[] klasses = getMapperObserverClasses();
786     MapperObserver[] objects = new MapperObserver[klasses.length];
787     for (int i = 0; i < klasses.length; ++i) {
788       objects[i] = ReflectionUtils.newInstance(klasses[i], this, context);
789     }
790     return objects;
791   }
792 
793   
794 
795 
796 
797 
798 
799   public GcObserver[] createGcObservers(
800       Mapper<?, ?, ?, ?>.Context context) {
801     Class<? extends GcObserver>[] klasses = getGcObserverClasses();
802     GcObserver[] objects = new GcObserver[klasses.length];
803     for (int i = 0; i < klasses.length; ++i) {
804       objects[i] = ReflectionUtils.newInstance(klasses[i], this, context);
805     }
806     return objects;
807   }
808 
809   
810 
811 
812 
813 
814   public GiraphJobObserver getJobObserver() {
815     return ReflectionUtils.newInstance(getJobObserverClass(), this);
816   }
817 
818   
819 
820 
821 
822 
823   public GiraphJobRetryChecker getJobRetryChecker() {
824     return ReflectionUtils.newInstance(getJobRetryCheckerClass(), this);
825   }
826 
827   
828 
829 
830 
831 
832   public Class<E> getEdgeValueClass() {
833     return classes.getEdgeValueClass();
834   }
835 
836   
837 
838 
839 
840 
841   public boolean isEdgeValueNullWritable() {
842     return getEdgeValueClass() == NullWritable.class;
843   }
844 
845   
846 
847 
848 
849 
850   public EdgeValueFactory<E> getEdgeValueFactory() {
851     return valueFactories.getEdgeValueFactory();
852   }
853 
854   
855 
856 
857 
858 
859   public E createEdgeValue() {
860     return getEdgeValueFactory().newInstance();
861   }
862 
863   
864 
865 
866 
867 
868   public Edge<I, E> createEdge() {
869     if (isEdgeValueNullWritable()) {
870       return (Edge<I, E>) EdgeFactory.create(createVertexId());
871     } else {
872       return EdgeFactory.create(createVertexId(), createEdgeValue());
873     }
874   }
875 
876   
877 
878 
879 
880 
881 
882 
883   public Edge<I, E> createEdge(TranslateEdge<I, E>
884     translateEdge, Edge<I, E> edge) {
885     I translatedId = translateEdge.translateId(edge.getTargetVertexId());
886     if (isEdgeValueNullWritable()) {
887       return (Edge<I, E>) EdgeFactory.create(translatedId);
888     } else {
889       return EdgeFactory.create(translatedId,
890         translateEdge.cloneValue(edge.getValue()));
891     }
892   }
893 
894   
895 
896 
897 
898 
899   public ReusableEdge<I, E> createReusableEdge() {
900     if (isEdgeValueNullWritable()) {
901       return (ReusableEdge<I, E>) EdgeFactory.createReusable(createVertexId());
902     } else {
903       return EdgeFactory.createReusable(createVertexId(), createEdgeValue());
904     }
905   }
906 
907   
908 
909 
910 
911 
912   public EdgeStoreFactory<I, V, E> createEdgeStoreFactory() {
913     Class<? extends EdgeStoreFactory> edgeStoreFactoryClass =
914         EDGE_STORE_FACTORY_CLASS.get(this);
915     return ReflectionUtils.newInstance(edgeStoreFactoryClass);
916   }
917 
918   
919 
920 
921 
922 
923 
924   public <M extends Writable> Class<M> getIncomingMessageValueClass() {
925     return classes.getIncomingMessageClasses().getMessageClass();
926   }
927 
928   
929 
930 
931 
932 
933 
934   public <M extends Writable> Class<M> getOutgoingMessageValueClass() {
935     return classes.getOutgoingMessageClasses().getMessageClass();
936   }
937 
938   
939 
940 
941 
942 
943   public <M extends Writable>
944   MessageClasses<I, M> getIncomingMessageClasses() {
945     return classes.getIncomingMessageClasses();
946   }
947 
948   
949 
950 
951 
952 
953   public <M extends Writable>
954   MessageClasses<I, M> getOutgoingMessageClasses() {
955     return classes.getOutgoingMessageClasses();
956   }
957 
958   
959 
960 
961 
962 
963   public <M extends Writable>
964   MessageValueFactory<M> createOutgoingMessageValueFactory() {
965     return classes.getOutgoingMessageClasses().createMessageValueFactory(this);
966   }
967 
968   
969 
970 
971 
972 
973   public <M extends Writable>
974   MessageValueFactory<M> createIncomingMessageValueFactory() {
975     return classes.getIncomingMessageClasses().createMessageValueFactory(this);
976   }
977 
978   @Override
979   public void setMessageCombinerClass(
980       Class<? extends MessageCombiner> messageCombinerClass) {
981     throw new IllegalArgumentException(
982         "Cannot set message combiner on ImmutableClassesGiraphConfigurable");
983   }
984 
985   
986 
987 
988 
989 
990 
991   public <M extends Writable> MessageCombiner<? super I, M>
992   createOutgoingMessageCombiner() {
993     return classes.getOutgoingMessageClasses().createMessageCombiner(this);
994   }
995 
996   
997 
998 
999 
1000 
1001   public boolean useOutgoingMessageCombiner() {
1002     return classes.getOutgoingMessageClasses().useMessageCombiner();
1003   }
1004 
1005   
1006 
1007 
1008 
1009   public MessageEncodeAndStoreType getOutgoingMessageEncodeAndStoreType() {
1010     return classes.getOutgoingMessageClasses().getMessageEncodeAndStoreType();
1011   }
1012 
1013   @Override
1014   public Class<? extends OutEdges<I, E>> getOutEdgesClass() {
1015     return classes.getOutEdgesClass();
1016   }
1017 
1018   
1019 
1020 
1021 
1022 
1023 
1024   public Class<? extends OutEdges<I, E>> getInputOutEdgesClass() {
1025     return classes.getInputOutEdgesClass();
1026   }
1027 
1028   
1029 
1030 
1031 
1032 
1033 
1034 
1035   public boolean useInputOutEdges() {
1036     return classes.getInputOutEdgesClass() != classes.getOutEdgesClass();
1037   }
1038 
1039   
1040 
1041 
1042 
1043 
1044   public Class<? extends MappingStore> getMappingStoreClass() {
1045     return MAPPING_STORE_CLASS.get(this);
1046   }
1047 
1048   
1049 
1050 
1051 
1052 
1053   public MappingStore<I, ? extends Writable> createMappingStore() {
1054     if (getMappingStoreClass() != null) {
1055       return ReflectionUtils.newInstance(getMappingStoreClass(), this);
1056     } else {
1057       return null;
1058     }
1059   }
1060 
1061   
1062 
1063 
1064 
1065 
1066   public Class<? extends MappingStoreOps> getMappingStoreOpsClass() {
1067     return MAPPING_STORE_OPS_CLASS.get(this);
1068   }
1069 
1070   
1071 
1072 
1073 
1074 
1075   public MappingStoreOps<I, ? extends Writable> createMappingStoreOps() {
1076     if (getMappingStoreOpsClass() != null) {
1077       return ReflectionUtils.newInstance(getMappingStoreOpsClass(), this);
1078     } else {
1079       return null;
1080     }
1081   }
1082 
1083   
1084 
1085 
1086 
1087 
1088   public Class<? extends Writable> getMappingTargetClass() {
1089     if (mappingTargetClass == null) {
1090       Class<?>[] classList = ReflectionUtils.getTypeArguments(
1091         MappingStore.class, getMappingStoreClass());
1092       Preconditions.checkArgument(classList.length == 2);
1093       mappingTargetClass = (Class<? extends Writable>) classList[1];
1094     }
1095     return mappingTargetClass;
1096   }
1097 
1098   
1099 
1100 
1101 
1102 
1103   public Writable createMappingTarget() {
1104     return WritableUtils.createWritable(getMappingTargetClass());
1105   }
1106 
1107   
1108 
1109 
1110 
1111 
1112   public OutEdges<I, E> createOutEdges() {
1113     return outEdgesFactory.newInstance();
1114   }
1115 
1116   
1117 
1118 
1119 
1120 
1121 
1122   public OutEdges<I, E> createAndInitializeOutEdges() {
1123     OutEdges<I, E> outEdges = createOutEdges();
1124     outEdges.initialize();
1125     return outEdges;
1126   }
1127 
1128   
1129 
1130 
1131 
1132 
1133 
1134 
1135   public OutEdges<I, E> createAndInitializeOutEdges(int capacity) {
1136     OutEdges<I, E> outEdges = createOutEdges();
1137     outEdges.initialize(capacity);
1138     return outEdges;
1139   }
1140 
1141   
1142 
1143 
1144 
1145 
1146 
1147 
1148   public OutEdges<I, E> createAndInitializeOutEdges(
1149       Iterable<Edge<I, E>> edges) {
1150     OutEdges<I, E> outEdges = createOutEdges();
1151     outEdges.initialize(edges);
1152     return outEdges;
1153   }
1154 
1155   
1156 
1157 
1158 
1159 
1160 
1161   public OutEdges<I, E> createInputOutEdges() {
1162     return inputOutEdgesFactory.newInstance();
1163   }
1164 
1165   
1166 
1167 
1168 
1169 
1170 
1171   public OutEdges<I, E> createAndInitializeInputOutEdges() {
1172     OutEdges<I, E> outEdges = createInputOutEdges();
1173     outEdges.initialize();
1174     return outEdges;
1175   }
1176 
1177   
1178 
1179 
1180 
1181 
1182 
1183 
1184   public Partition<I, V, E> createPartition(
1185       int id, Progressable progressable) {
1186     Class<? extends Partition<I, V, E>> klass = classes.getPartitionClass();
1187     Partition<I, V, E> partition = ReflectionUtils.newInstance(klass, this);
1188     partition.initialize(id, progressable);
1189     return partition;
1190   }
1191 
1192   
1193 
1194 
1195 
1196 
1197   public boolean useUnsafeSerialization() {
1198     return useUnsafeSerialization;
1199   }
1200 
1201   
1202 
1203 
1204 
1205 
1206   public DataInputOutput createMessagesInputOutput() {
1207     if (useBigDataIOForMessages) {
1208       return new BigDataInputOutput(this);
1209     } else {
1210       return new ExtendedDataInputOutput(this);
1211     }
1212   }
1213 
1214   
1215 
1216 
1217 
1218 
1219   public ExtendedDataOutput createExtendedDataOutput() {
1220     if (useUnsafeSerialization) {
1221       return new UnsafeByteArrayOutputStream();
1222     } else {
1223       return new ExtendedByteArrayDataOutput();
1224     }
1225   }
1226 
1227   
1228 
1229 
1230 
1231 
1232 
1233   public ExtendedDataOutput createExtendedDataOutput(int expectedSize) {
1234     if (useUnsafeSerialization) {
1235       return new UnsafeByteArrayOutputStream(expectedSize);
1236     } else {
1237       return new ExtendedByteArrayDataOutput(expectedSize);
1238     }
1239   }
1240 
1241   
1242 
1243 
1244 
1245 
1246 
1247 
1248   public ExtendedDataOutput createExtendedDataOutput(byte[] buf,
1249                                                      int pos) {
1250     if (useUnsafeSerialization) {
1251       return new UnsafeByteArrayOutputStream(buf, pos);
1252     } else {
1253       return new ExtendedByteArrayDataOutput(buf, pos);
1254     }
1255   }
1256 
1257   
1258 
1259 
1260 
1261 
1262 
1263 
1264 
1265   public ExtendedDataInput createExtendedDataInput(
1266       byte[] buf, int off, int length) {
1267     if (useUnsafeSerialization) {
1268       return new UnsafeByteArrayInputStream(buf, off, length);
1269     } else {
1270       return new ExtendedByteArrayDataInput(buf, off, length);
1271     }
1272   }
1273 
1274   
1275 
1276 
1277 
1278 
1279 
1280   public ExtendedDataInput createExtendedDataInput(byte[] buf) {
1281     if (useUnsafeSerialization) {
1282       return new UnsafeByteArrayInputStream(buf);
1283     } else {
1284       return new ExtendedByteArrayDataInput(buf);
1285     }
1286   }
1287 
1288   
1289 
1290 
1291 
1292 
1293 
1294   public ExtendedDataInput createExtendedDataInput(
1295     ExtendedDataOutput extendedDataOutput) {
1296     return createExtendedDataInput(extendedDataOutput.getByteArray(), 0,
1297         extendedDataOutput.getPos());
1298   }
1299 
1300   
1301 
1302 
1303 
1304 
1305   public boolean getUseUnsafeSerialization() {
1306     return useUnsafeSerialization;
1307   }
1308 
1309   
1310 
1311 
1312 
1313 
1314   public void updateSuperstepClasses(SuperstepClasses superstepClasses) {
1315     superstepClasses.updateGiraphClasses(classes);
1316   }
1317 
1318   
1319 
1320 
1321 
1322 
1323   public boolean doCompression() {
1324     switch (GiraphConstants.NETTY_COMPRESSION_ALGORITHM.get(this)) {
1325     case "SNAPPY":
1326       return true;
1327     case "INFLATE":
1328       return true;
1329     default:
1330       return false;
1331     }
1332   }
1333 
1334   
1335 
1336 
1337 
1338 
1339   public MessageToByteEncoder getNettyCompressionEncoder() {
1340     switch (GiraphConstants.NETTY_COMPRESSION_ALGORITHM.get(this)) {
1341     case "SNAPPY":
1342       return new SnappyFramedEncoder();
1343     case "INFLATE":
1344       return new JdkZlibEncoder();
1345     default:
1346       return null;
1347     }
1348   }
1349 
1350   
1351 
1352 
1353 
1354 
1355   public ByteToMessageDecoder getNettyCompressionDecoder() {
1356     switch (GiraphConstants.NETTY_COMPRESSION_ALGORITHM.get(this)) {
1357     case "SNAPPY":
1358       return new SnappyFramedDecoder();
1359     case "INFLATE":
1360       return new JdkZlibDecoder();
1361     default:
1362       return null;
1363     }
1364   }
1365 
1366   
1367 
1368 
1369 
1370 
1371   public boolean isStaticGraph() {
1372     return isStaticGraph;
1373   }
1374 
1375   
1376 
1377 
1378   public String getJobId() {
1379     return get("mapred.job.id", "UnknownJob");
1380   }
1381 
1382   
1383 
1384 
1385 
1386 
1387 
1388   public boolean useMessageSizeEncoding() {
1389     return useMessageSizeEncoding;
1390   }
1391 }