This project has retired. For details please refer to its Attic page.
HadoopKryo xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.writable.kryo;
19  
20  import java.io.DataInput;
21  import java.io.DataOutput;
22  import java.util.Arrays;
23  import java.util.Collections;
24  import java.util.LinkedHashMap;
25  import java.util.Map;
26  import java.util.Map.Entry;
27  import java.util.Random;
28  
29  import com.esotericsoftware.kryo.util.DefaultClassResolver;
30  import de.javakaffee.kryoserializers.guava.ImmutableMapSerializer;
31  import org.apache.giraph.conf.GiraphConfigurationSettable;
32  import com.esotericsoftware.kryo.ClassResolver;
33  import com.esotericsoftware.kryo.ReferenceResolver;
34  import com.esotericsoftware.kryo.util.MapReferenceResolver;
35  import org.apache.giraph.types.ops.collections.Basic2ObjectMap;
36  import org.apache.giraph.types.ops.collections.BasicSet;
37  import org.apache.giraph.writable.kryo.markers.NonKryoWritable;
38  import org.apache.giraph.writable.kryo.markers.KryoIgnoreWritable;
39  import org.apache.giraph.writable.kryo.serializers.ArraysAsListSerializer;
40  import org.apache.giraph.writable.kryo.serializers.CollectionsNCopiesSerializer;
41  import org.apache.giraph.writable.kryo.serializers.DirectWritableSerializer;
42  import org.apache.giraph.writable.kryo.serializers.FastUtilSerializer;
43  import org.apache.giraph.writable.kryo.serializers.ImmutableBiMapSerializerUtils;
44  import org.apache.giraph.writable.kryo.serializers.ReusableFieldSerializer;
45  import org.apache.hadoop.conf.Configurable;
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.io.Writable;
48  import org.apache.log4j.Logger;
49  import org.objenesis.strategy.StdInstantiatorStrategy;
50  
51  import com.esotericsoftware.kryo.Kryo;
52  import com.esotericsoftware.kryo.Serializer;
53  import com.esotericsoftware.kryo.factories.SerializerFactory;
54  import com.esotericsoftware.kryo.io.Input;
55  import com.esotericsoftware.kryo.io.InputChunked;
56  import com.esotericsoftware.kryo.io.Output;
57  import com.esotericsoftware.kryo.io.OutputChunked;
58  import com.esotericsoftware.kryo.pool.KryoCallback;
59  import com.esotericsoftware.kryo.pool.KryoFactory;
60  import com.esotericsoftware.kryo.pool.KryoPool;
61  import com.esotericsoftware.kryo.serializers.ClosureSerializer;
62  import com.esotericsoftware.kryo.serializers.FieldSerializer;
63  import com.esotericsoftware.kryo.util.ObjectMap;
64  import com.google.common.base.Preconditions;
65  
66  import de.javakaffee.kryoserializers.guava.ImmutableListSerializer;
67  
68  /**
69   * Kryo instance that provides serialization through DataInput/DataOutput
70   * that org.apache.hadoop.io.Writable uses.
71   *
72   * All public APIs are static.
73   *
74   * It extends Kryo to reuse KryoPool functionality, but have additional needed
75   * objects cached as well. If we move to ThreadLocal or other caching
76   * technique, we can use composition, instead of inheritance here.
77   *
78   * TODO: Refactor this class into two separate classes depending on
79   * whether the reference tracking is enabled or disabled.
80   */
81  public class HadoopKryo extends Kryo {
82    /** Pool of reusable Kryo objects, since they are expensive to create */
83    private static final KryoPool KRYO_POOL = new KryoPool.Builder(
84        new KryoFactory() {
85          @Override
86          public Kryo create() {
87            return createKryo(true, true);
88          }
89        }).build();
90    /** Thread local HadoopKryo object */
91    private static final ThreadLocal<HadoopKryo> KRYO =
92      new ThreadLocal<HadoopKryo>() {
93        @Override protected HadoopKryo initialValue() {
94          return  createKryo(false, false);
95        }
96      };
97  
98    /**
99     * List of interfaces/parent classes that will not be allowed to be
100    * serialized, together with explanation of why, that will be shown
101    * when throwing such exception
102    */
103   private static final Map<Class<?>, String> NON_SERIALIZABLE;
104 
105   static {
106     NON_SERIALIZABLE = new LinkedHashMap<>();
107     NON_SERIALIZABLE.put(
108         NonKryoWritable.class,
109         "it is marked to not allow serialization, " +
110         "look at the class for more details");
111     NON_SERIALIZABLE.put(
112         KryoWritableWrapper.class, "recursion is disallowed");
113     NON_SERIALIZABLE.put(
114         Configuration.class,
115         "it cannot be supported since it contains ClassLoader");
116     NON_SERIALIZABLE.put(
117         GiraphConfigurationSettable.class, "configuration cannot be set");
118     NON_SERIALIZABLE.put(
119         Configurable.class, "configuration cannot be set");
120     NON_SERIALIZABLE.put(
121         Random.class,
122         "it should be rarely serialized, since it would create same stream " +
123         "of numbers everywhere, use TransientRandom instead");
124     NON_SERIALIZABLE.put(
125         Logger.class,
126         "Logger must be a static field");
127   }
128 
129   /** Reusable Input object */
130   private InputChunked input;
131   /** Reusable Output object */
132   private OutputChunked output;
133 
134   /** Reusable DataInput wrapper stream */
135   private DataInputWrapperStream dataInputWrapperStream;
136   /** Reusable DataOutput wrapper stream */
137   private DataOutputWrapperStream dataOutputWrapperStream;
138 
139   /**
140    * Map of already initialized serializers used
141    * for readIntoObject/writeOutOfObject pair of methods
142    */
143   private final ObjectMap<Class<?>, ReusableFieldSerializer<Object>>
144   classToIntoSerializer = new ObjectMap<>();
145 
146   /** Hide constructor, so all access go through pool of cached objects */
147   private HadoopKryo() {
148   }
149 
150   /**
151    * Constructor that takes custom class resolver and reference resolver.
152    * @param classResolver Class resolver
153    * @param referenceResolver Reference resolver
154    */
155   private HadoopKryo(ClassResolver classResolver,
156                      ReferenceResolver referenceResolver) {
157     super(classResolver, referenceResolver);
158   }
159 
160   // Public API:
161 
162   /**
163    * Write type of given object and the object itself to the output stream.
164    * Inverse of readClassAndObj.
165    *
166    * @param out Output stream
167    * @param object Object to write
168    */
169   public static void writeClassAndObj(
170           final DataOutput out, final Object object) {
171     writeInternal(out, object, false);
172   }
173 
174   /**
175    * Read object from the input stream, by reading first type of the object,
176    * and then all of its fields.
177    * Inverse of writeClassAndObject.
178    *
179    * @param in Input stream
180    * @return Deserialized object
181    * @param <T> Type of the object being read
182    */
183   public static <T> T readClassAndObj(DataInput in) {
184     return readInternal(in, null, false);
185   }
186 
187   /**
188    * Write an object to output, in a way that can be read by readIntoObject.
189    *
190    * @param out Output stream
191    * @param object Object to be written
192    */
193   public static void writeOutOfObject(
194       final DataOutput out, final Object object) {
195     writeInternal(out, object, true);
196   }
197 
198   /**
199    * Reads an object, from input, into a given object,
200    * allowing object reuse.
201    * Inverse of writeOutOfObject.
202    *
203    * @param in Input stream
204    * @param object Object to fill from input
205    */
206   public static void readIntoObject(DataInput in, Object object) {
207     readInternal(in, object, true);
208   }
209 
210   /**
211    * Writes class and object to specified output stream with specified
212    * Kryo object. It does not use interim buffers.
213    * @param kryo Kryo object
214    * @param out Output stream
215    * @param object Object
216    */
217   public static void writeWithKryo(
218           final HadoopKryo kryo, final Output out,
219           final Object object) {
220     kryo.writeClassAndObject(out, object);
221     out.close();
222   }
223 
224   /**
225    * Write out of object with given kryo
226    * @param kryo Kryo object
227    * @param out Output
228    * @param object Object to write
229    */
230   public static void writeWithKryoOutOfObject(
231           final HadoopKryo kryo, final Output out,
232           final Object object) {
233     kryo.writeOutOfObject(out, object);
234     out.close();
235   }
236 
237   /**
238    * Reads class and object from specified input stream with
239    * specified kryo object.
240    * it does not use interim buffers.
241    * @param kryo Kryo object
242    * @param in Input buffer
243    * @param <T> Object type parameter
244    * @return Object
245    */
246   public static <T> T readWithKryo(
247           final HadoopKryo kryo, final Input in) {
248     T object;
249     object = (T) kryo.readClassAndObject(in);
250     in.close();
251     return object;
252   }
253 
254   /**
255    * Read into object with given kryo.
256    * @param kryo Kryo object
257    * @param in Input
258    * @param object Object to read into
259    */
260   public static void readWithKryoIntoObject(
261           final HadoopKryo kryo, final Input in, Object object) {
262     kryo.readIntoObject(in, object);
263     in.close();
264   }
265 
266   /**
267    * Create copy of the object, by magically recursively copying
268    * all of its fields, keeping reference structures (like cycles)
269    *
270    * @param object Object to be copied
271    * @return Copy of the object.
272    * @param <T> Type of the object
273    */
274   public static <T> T createCopy(final T object) {
275     return KRYO_POOL.run(new KryoCallback<T>() {
276       @Override
277       public T execute(Kryo kryo) {
278         return kryo.copy(object);
279       }
280     });
281   }
282 
283   /**
284    * Returns a kryo which doesn't track objects, hence
285    * serialization of recursive/nested objects is not
286    * supported.
287    *
288    * Reference tracking significantly degrades the performance
289    * since kryo has to store all serialized objects and search
290    * the history to check if an object has been already serialized.
291    *
292    * @return Hadoop kryo which doesn't track objects.
293    */
294   public static HadoopKryo getNontrackingKryo() {
295     return KRYO.get();
296   }
297 
298   // Private implementation:
299 
300   /**
301    * Create new instance of HadoopKryo, properly initialized.
302    * @param trackReferences if true, object references are tracked.
303    * @param hasBuffer if true, an interim buffer is used.
304    * @return new HadoopKryo instance
305    */
306   private static HadoopKryo createKryo(boolean trackReferences,
307                                        boolean hasBuffer) {
308     HadoopKryo kryo;
309     if (trackReferences) {
310       kryo = new HadoopKryo();
311     } else {
312       // Only use GiraphClassResolver if it is properly initialized.
313       // This is to enable test cases which use KryoSimpleWrapper
314       // but don't start ZK.
315       kryo = new HadoopKryo(
316               GiraphClassResolver.isInitialized() ? new GiraphClassResolver() :
317                                                     new DefaultClassResolver(),
318               new MapReferenceResolver());
319     }
320 
321     try {
322       kryo.register(Class.forName("java.lang.invoke.SerializedLambda"));
323       kryo.register(Class.forName(
324         "com.esotericsoftware.kryo.serializers.ClosureSerializer$Closure"),
325         new ClosureSerializer());
326     } catch (ClassNotFoundException e) {
327       throw new IllegalStateException(
328         "Trying to use Kryo on Java version " +
329           System.getProperty("java.version") +
330           ", but unable to find needed classes", e);
331     }
332 
333     kryo.register(Arrays.asList().getClass(), new ArraysAsListSerializer());
334     kryo.register(Collections.nCopies(1, new Object()).getClass(),
335         new CollectionsNCopiesSerializer());
336 
337     ImmutableListSerializer.registerSerializers(kryo);
338     ImmutableMapSerializer.registerSerializers(kryo);
339     ImmutableBiMapSerializerUtils.registerSerializers(kryo);
340 
341     // There are many fastutil classes, register them at the end,
342     // so they don't use up small registration numbers
343     FastUtilSerializer.registerAll(kryo);
344 
345     kryo.setInstantiatorStrategy(new DefaultInstantiatorStrategy(
346         new StdInstantiatorStrategy()));
347 
348     SerializerFactory customSerializerFactory = new SerializerFactory() {
349       @SuppressWarnings("rawtypes")
350       @Override
351       public Serializer makeSerializer(Kryo kryo, final Class<?> type) {
352         for (final Entry<Class<?>, String> entry :
353             NON_SERIALIZABLE.entrySet()) {
354           if (entry.getKey().isAssignableFrom(type)) {
355             // Allow Class object to be serialized, but not a live instance.
356             return new Serializer() {
357               @Override
358               public Object read(Kryo kryo, Input input, Class type) {
359                 throw new RuntimeException("Cannot serialize " + type +
360                     ". Objects being serialized cannot capture " +
361                     entry.getKey() + " because " + entry.getValue() +
362                     ". Either remove field in question" +
363                     ", or make it transient (so that it isn't serialized)");
364               }
365 
366               @Override
367               public void write(Kryo kryo, Output output, Object object) {
368                 throw new RuntimeException("Cannot serialize " + type +
369                     ". Objects being serialized cannot capture " +
370                     entry.getKey() + " because " + entry.getValue() +
371                     ". Either remove field in question" +
372                     ", or make it transient (so that it isn't serialized)");
373               }
374             };
375           }
376         }
377 
378         if (Writable.class.isAssignableFrom(type) &&
379             !KryoIgnoreWritable.class.isAssignableFrom(type) &&
380             // remove BasicSet, BasicArrayList and Basic2ObjectMap temporarily,
381             // for lack of constructors
382             !BasicSet.class.isAssignableFrom(type) &&
383             !Basic2ObjectMap.class.isAssignableFrom(type)) {
384           // use the Writable method defined by the type
385           DirectWritableSerializer serializer = new DirectWritableSerializer();
386           return serializer;
387         } else {
388           FieldSerializer serializer = new FieldSerializer<>(kryo, type);
389           serializer.setIgnoreSyntheticFields(false);
390           return serializer;
391         }
392       }
393     };
394 
395     kryo.addDefaultSerializer(Writable.class, customSerializerFactory);
396     kryo.setDefaultSerializer(customSerializerFactory);
397 
398     if (hasBuffer) {
399       kryo.input = new InputChunked(4096);
400       kryo.output = new OutputChunked(4096);
401       kryo.dataInputWrapperStream = new DataInputWrapperStream();
402       kryo.dataOutputWrapperStream = new DataOutputWrapperStream();
403     }
404 
405     if (!trackReferences) {
406       kryo.setReferences(false);
407 
408       // Auto reset can only be disabled if the GiraphClassResolver is
409       // properly initialized.
410       if (GiraphClassResolver.isInitialized()) {
411         kryo.setAutoReset(false);
412       }
413     }
414     return kryo;
415   }
416 
417   /**
418    * Register serializer for class with class name
419    *
420    * @param kryo HadoopKryo
421    * @param className Name of the class for which to register serializer
422    * @param serializer Serializer to use
423    */
424   private static void registerSerializer(HadoopKryo kryo, String className,
425       Serializer serializer) {
426     try {
427       kryo.register(Class.forName(className), serializer);
428     } catch (ClassNotFoundException e) {
429       throw new IllegalStateException("Class " + className + " is missing", e);
430     }
431   }
432 
433   /**
434    * Initialize reusable objects for reading from given DataInput.
435    *
436    * @param in Input stream
437    */
438   private void setDataInput(DataInput in) {
439     dataInputWrapperStream.setDataInput(in);
440     input.setInputStream(dataInputWrapperStream);
441   }
442 
443   /**
444    * Initialize reusable objects for writing into given DataOutput.
445    *
446    *  @param out Output stream
447    */
448   private void setDataOutput(DataOutput out) {
449     dataOutputWrapperStream.setDataOutput(out);
450     output.setOutputStream(dataOutputWrapperStream);
451   }
452 
453   /**
454    * Get or create reusable serializer for given class.
455    *
456    * @param type Type of the object
457    * @return Serializer
458    */
459   private ReusableFieldSerializer<Object> getOrCreateReusableSerializer(
460       Class<?> type) {
461     ReusableFieldSerializer<Object> serializer =
462         classToIntoSerializer.get(type);
463     if (serializer == null) {
464       serializer = new ReusableFieldSerializer<>(this, type);
465       classToIntoSerializer.put(type, serializer);
466     }
467     return serializer;
468   }
469 
470   /**
471    * Internal write implementation, that reuses HadoopKryo objects
472    * from the pool.
473    *
474    * @param out Output stream
475    * @param object Object to be written
476    * @param outOf whether we are writing reusable objects,
477    *              or full objects with class name
478    */
479   private static void writeInternal(
480       final DataOutput out, final Object object, final boolean outOf) {
481     KRYO_POOL.run(new KryoCallback<Void>() {
482       @Override
483       public Void execute(Kryo kryo) {
484         HadoopKryo hkryo = (HadoopKryo) kryo;
485         hkryo.setDataOutput(out);
486 
487         if (outOf) {
488           hkryo.writeOutOfObject(hkryo.output, object);
489         } else {
490           hkryo.writeClassAndObject(hkryo.output, object);
491         }
492 
493         hkryo.output.endChunks();
494         hkryo.output.close();
495 
496         return null;
497       }
498     });
499   }
500 
501   /**
502    * Internal read implementation, that reuses HadoopKryo objects
503    * from the pool.
504    *
505    * @param in Input stream
506    * @param outObject Object to fill from input (if not null)
507    * @param into whether we are reading reusable objects,
508    *             or full objects with class name
509    * @return Read object (new one, or same passed in if we use reusable)
510    * @param <T> Type of the object to read
511    */
512   @SuppressWarnings("unchecked")
513   private static <T> T readInternal(
514       final DataInput in, final T outObject, final boolean into) {
515     return KRYO_POOL.run(new KryoCallback<T>() {
516       @Override
517       public T execute(Kryo kryo) {
518         HadoopKryo hkryo = (HadoopKryo) kryo;
519         hkryo.setDataInput(in);
520 
521         T object;
522         if (into) {
523           hkryo.readIntoObject(hkryo.input, outObject);
524           object = outObject;
525         } else {
526           object = (T) hkryo.readClassAndObject(hkryo.input);
527         }
528         hkryo.input.nextChunks();
529 
530         hkryo.input.close();
531         return object;
532       }
533     });
534   }
535 
536   /**
537    * Reads an object, from input, into a given object,
538    * allowing object reuse.
539    *
540    * @param input Input stream
541    * @param object Object to fill from input
542    */
543   private void readIntoObject(Input input, Object object) {
544     Preconditions.checkNotNull(object);
545 
546     Class<?> type = object.getClass();
547     ReusableFieldSerializer<Object> serializer =
548         getOrCreateReusableSerializer(type);
549 
550     serializer.setReadIntoObject(object);
551     Object result = readObject(input, type, serializer);
552 
553     Preconditions.checkState(result == object);
554   }
555 
556   /**
557    * Write an object to output, in a way that can be read
558    * using readIntoObject.
559    * @param output Output stream
560    * @param object Object to be written
561    */
562   private void writeOutOfObject(Output output, Object object) {
563     ReusableFieldSerializer<Object> serializer =
564         getOrCreateReusableSerializer(object.getClass());
565     writeObject(output, object, serializer);
566   }
567 
568 }