View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.writable.kryo;
19  
20  import java.io.DataInput;
21  import java.io.DataOutput;
22  import java.util.Arrays;
23  import java.util.Collections;
24  import java.util.LinkedHashMap;
25  import java.util.Map;
26  import java.util.Map.Entry;
27  import java.util.Random;
28  
29  import com.esotericsoftware.kryo.util.DefaultClassResolver;
30  import org.apache.giraph.conf.GiraphConfigurationSettable;
31  import com.esotericsoftware.kryo.ClassResolver;
32  import com.esotericsoftware.kryo.ReferenceResolver;
33  import com.esotericsoftware.kryo.util.MapReferenceResolver;
34  import org.apache.giraph.types.ops.collections.Basic2ObjectMap;
35  import org.apache.giraph.types.ops.collections.BasicSet;
36  import org.apache.giraph.writable.kryo.markers.NonKryoWritable;
37  import org.apache.giraph.writable.kryo.markers.KryoIgnoreWritable;
38  import org.apache.giraph.writable.kryo.serializers.ArraysAsListSerializer;
39  import org.apache.giraph.writable.kryo.serializers.CollectionsNCopiesSerializer;
40  import org.apache.giraph.writable.kryo.serializers.DirectWritableSerializer;
41  import org.apache.giraph.writable.kryo.serializers.FastUtilSerializer;
42  import org.apache.giraph.writable.kryo.serializers.ImmutableMapSerializer;
43  import org.apache.giraph.writable.kryo.serializers.ReusableFieldSerializer;
44  import org.apache.hadoop.conf.Configurable;
45  import org.apache.hadoop.conf.Configuration;
46  import org.apache.hadoop.io.Writable;
47  import org.apache.log4j.Logger;
48  import org.objenesis.strategy.StdInstantiatorStrategy;
49  
50  import com.esotericsoftware.kryo.Kryo;
51  import com.esotericsoftware.kryo.Serializer;
52  import com.esotericsoftware.kryo.factories.SerializerFactory;
53  import com.esotericsoftware.kryo.io.Input;
54  import com.esotericsoftware.kryo.io.InputChunked;
55  import com.esotericsoftware.kryo.io.Output;
56  import com.esotericsoftware.kryo.io.OutputChunked;
57  import com.esotericsoftware.kryo.pool.KryoCallback;
58  import com.esotericsoftware.kryo.pool.KryoFactory;
59  import com.esotericsoftware.kryo.pool.KryoPool;
60  import com.esotericsoftware.kryo.serializers.ClosureSerializer;
61  import com.esotericsoftware.kryo.serializers.FieldSerializer;
62  import com.esotericsoftware.kryo.util.ObjectMap;
63  import com.google.common.base.Preconditions;
64  
65  import de.javakaffee.kryoserializers.guava.ImmutableListSerializer;
66  
67  /**
68   * Kryo instance that provides serialization through DataInput/DataOutput
69   * that org.apache.hadoop.io.Writable uses.
70   *
71   * All public APIs are static.
72   *
73   * It extends Kryo to reuse KryoPool functionality, but have additional needed
74   * objects cached as well. If we move to ThreadLocal or other caching
75   * technique, we can use composition, instead of inheritance here.
76   *
77   * TODO: Refactor this class into two separate classes depending on
78   * whether the reference tracking is enabled or disabled.
79   */
80  public class HadoopKryo extends Kryo {
81    /** Pool of reusable Kryo objects, since they are expensive to create */
82    private static final KryoPool KRYO_POOL = new KryoPool.Builder(
83        new KryoFactory() {
84          @Override
85          public Kryo create() {
86            return createKryo(true, true);
87          }
88        }).build();
89    /** Thread local HadoopKryo object */
90    private static final ThreadLocal<HadoopKryo> KRYO =
91      new ThreadLocal<HadoopKryo>() {
92        @Override protected HadoopKryo initialValue() {
93          return  createKryo(false, false);
94        }
95      };
96  
97    /**
98     * List of interfaces/parent classes that will not be allowed to be
99     * serialized, together with explanation of why, that will be shown
100    * when throwing such exception
101    */
102   private static final Map<Class<?>, String> NON_SERIALIZABLE;
103 
104   static {
105     NON_SERIALIZABLE = new LinkedHashMap<>();
106     NON_SERIALIZABLE.put(
107         NonKryoWritable.class,
108         "it is marked to not allow serialization, " +
109         "look at the class for more details");
110     NON_SERIALIZABLE.put(
111         KryoWritableWrapper.class, "recursion is disallowed");
112     NON_SERIALIZABLE.put(
113         Configuration.class,
114         "it cannot be supported since it contains ClassLoader");
115     NON_SERIALIZABLE.put(
116         GiraphConfigurationSettable.class, "configuration cannot be set");
117     NON_SERIALIZABLE.put(
118         Configurable.class, "configuration cannot be set");
119     NON_SERIALIZABLE.put(
120         Random.class,
121         "it should be rarely serialized, since it would create same stream " +
122         "of numbers everywhere, use TransientRandom instead");
123     NON_SERIALIZABLE.put(
124         Logger.class,
125         "Logger must be a static field");
126   }
127 
128   /** Reusable Input object */
129   private InputChunked input;
130   /** Reusable Output object */
131   private OutputChunked output;
132 
133   /** Reusable DataInput wrapper stream */
134   private DataInputWrapperStream dataInputWrapperStream;
135   /** Reusable DataOutput wrapper stream */
136   private DataOutputWrapperStream dataOutputWrapperStream;
137 
138   /**
139    * Map of already initialized serializers used
140    * for readIntoObject/writeOutOfObject pair of methods
141    */
142   private final ObjectMap<Class<?>, ReusableFieldSerializer<Object>>
143   classToIntoSerializer = new ObjectMap<>();
144 
145   /** Hide constructor, so all access go through pool of cached objects */
146   private HadoopKryo() {
147   }
148 
149   /**
150    * Constructor that takes custom class resolver and reference resolver.
151    * @param classResolver Class resolver
152    * @param referenceResolver Reference resolver
153    */
154   private HadoopKryo(ClassResolver classResolver,
155                      ReferenceResolver referenceResolver) {
156     super(classResolver, referenceResolver);
157   }
158 
159   // Public API:
160 
161   /**
162    * Write type of given object and the object itself to the output stream.
163    * Inverse of readClassAndObj.
164    *
165    * @param out Output stream
166    * @param object Object to write
167    */
168   public static void writeClassAndObj(
169           final DataOutput out, final Object object) {
170     writeInternal(out, object, false);
171   }
172 
173   /**
174    * Read object from the input stream, by reading first type of the object,
175    * and then all of its fields.
176    * Inverse of writeClassAndObject.
177    *
178    * @param in Input stream
179    * @return Deserialized object
180    * @param <T> Type of the object being read
181    */
182   public static <T> T readClassAndObj(DataInput in) {
183     return readInternal(in, null, false);
184   }
185 
186   /**
187    * Write an object to output, in a way that can be read by readIntoObject.
188    *
189    * @param out Output stream
190    * @param object Object to be written
191    */
192   public static void writeOutOfObject(
193       final DataOutput out, final Object object) {
194     writeInternal(out, object, true);
195   }
196 
197   /**
198    * Reads an object, from input, into a given object,
199    * allowing object reuse.
200    * Inverse of writeOutOfObject.
201    *
202    * @param in Input stream
203    * @param object Object to fill from input
204    */
205   public static void readIntoObject(DataInput in, Object object) {
206     readInternal(in, object, true);
207   }
208 
209   /**
210    * Writes class and object to specified output stream with specified
211    * Kryo object. It does not use interim buffers.
212    * @param kryo Kryo object
213    * @param out Output stream
214    * @param object Object
215    */
216   public static void writeWithKryo(
217           final HadoopKryo kryo, final Output out,
218           final Object object) {
219     kryo.writeClassAndObject(out, object);
220     out.close();
221   }
222 
223   /**
224    * Write out of object with given kryo
225    * @param kryo Kryo object
226    * @param out Output
227    * @param object Object to write
228    */
229   public static void writeWithKryoOutOfObject(
230           final HadoopKryo kryo, final Output out,
231           final Object object) {
232     kryo.writeOutOfObject(out, object);
233     out.close();
234   }
235 
236   /**
237    * Reads class and object from specified input stream with
238    * specified kryo object.
239    * it does not use interim buffers.
240    * @param kryo Kryo object
241    * @param in Input buffer
242    * @param <T> Object type parameter
243    * @return Object
244    */
245   public static <T> T readWithKryo(
246           final HadoopKryo kryo, final Input in) {
247     T object;
248     object = (T) kryo.readClassAndObject(in);
249     in.close();
250     return object;
251   }
252 
253   /**
254    * Read into object with given kryo.
255    * @param kryo Kryo object
256    * @param in Input
257    * @param object Object to read into
258    */
259   public static void readWithKryoIntoObject(
260           final HadoopKryo kryo, final Input in, Object object) {
261     kryo.readIntoObject(in, object);
262     in.close();
263   }
264 
265   /**
266    * Create copy of the object, by magically recursively copying
267    * all of its fields, keeping reference structures (like cycles)
268    *
269    * @param object Object to be copied
270    * @return Copy of the object.
271    * @param <T> Type of the object
272    */
273   public static <T> T createCopy(final T object) {
274     return KRYO_POOL.run(new KryoCallback<T>() {
275       @Override
276       public T execute(Kryo kryo) {
277         return kryo.copy(object);
278       }
279     });
280   }
281 
282   /**
283    * Returns a kryo which doesn't track objects, hence
284    * serialization of recursive/nested objects is not
285    * supported.
286    *
287    * Reference tracking significantly degrades the performance
288    * since kryo has to store all serialized objects and search
289    * the history to check if an object has been already serialized.
290    *
291    * @return Hadoop kryo which doesn't track objects.
292    */
293   public static HadoopKryo getNontrackingKryo() {
294     return KRYO.get();
295   }
296 
297   // Private implementation:
298 
299   /**
300    * Create new instance of HadoopKryo, properly initialized.
301    * @param trackReferences if true, object references are tracked.
302    * @param hasBuffer if true, an interim buffer is used.
303    * @return new HadoopKryo instance
304    */
305   private static HadoopKryo createKryo(boolean trackReferences,
306                                        boolean hasBuffer) {
307     HadoopKryo kryo;
308     if (trackReferences) {
309       kryo = new HadoopKryo();
310     } else {
311       // Only use GiraphClassResolver if it is properly initialized.
312       // This is to enable test cases which use KryoSimpleWrapper
313       // but don't start ZK.
314       kryo = new HadoopKryo(
315               GiraphClassResolver.isInitialized() ? new GiraphClassResolver() :
316                                                     new DefaultClassResolver(),
317               new MapReferenceResolver());
318     }
319 
320     String version = System.getProperty("java.version");
321     char minor = version.charAt(2);
322     if (minor >= '8') {
323       try {
324         kryo.register(Class.forName("java.lang.invoke.SerializedLambda"));
325         kryo.register(Class.forName("com.esotericsoftware.kryo.Kryo$Closure"),
326             new ClosureSerializer());
327       } catch (ClassNotFoundException e) {
328         throw new IllegalStateException(
329             "Trying to use Kryo on >= Java 8 (" + version +
330             "), but unable to find needed classes", e);
331       }
332     }
333 
334     kryo.register(Arrays.asList().getClass(), new ArraysAsListSerializer());
335     kryo.register(Collections.nCopies(1, new Object()).getClass(),
336         new CollectionsNCopiesSerializer());
337 
338     ImmutableListSerializer.registerSerializers(kryo);
339 
340     registerSerializer(kryo, "com.google.common.collect.RegularImmutableMap",
341         new ImmutableMapSerializer());
342     registerSerializer(kryo,
343         "com.google.common.collect.SingletonImmutableBiMap",
344         new ImmutableMapSerializer());
345 
346     // There are many fastutil classes, register them at the end,
347     // so they don't use up small registration numbers
348     FastUtilSerializer.registerAll(kryo);
349 
350     kryo.setInstantiatorStrategy(new DefaultInstantiatorStrategy(
351         new StdInstantiatorStrategy()));
352 
353     SerializerFactory customSerializerFactory = new SerializerFactory() {
354       @SuppressWarnings("rawtypes")
355       @Override
356       public Serializer makeSerializer(Kryo kryo, final Class<?> type) {
357         for (final Entry<Class<?>, String> entry :
358             NON_SERIALIZABLE.entrySet()) {
359           if (entry.getKey().isAssignableFrom(type)) {
360             // Allow Class object to be serialized, but not a live instance.
361             return new Serializer() {
362               @Override
363               public Object read(Kryo kryo, Input input, Class type) {
364                 throw new RuntimeException("Cannot serialize " + type +
365                     ". Objects being serialized cannot capture " +
366                     entry.getKey() + " because " + entry.getValue() +
367                     ". Either remove field in question" +
368                     ", or make it transient (so that it isn't serialized)");
369               }
370 
371               @Override
372               public void write(Kryo kryo, Output output, Object object) {
373                 throw new RuntimeException("Cannot serialize " + type +
374                     ". Objects being serialized cannot capture " +
375                     entry.getKey() + " because " + entry.getValue() +
376                     ". Either remove field in question" +
377                     ", or make it transient (so that it isn't serialized)");
378               }
379             };
380           }
381         }
382 
383         if (Writable.class.isAssignableFrom(type) &&
384             !KryoIgnoreWritable.class.isAssignableFrom(type) &&
385             // remove BasicSet, BasicArrayList and Basic2ObjectMap temporarily,
386             // for lack of constructors
387             !BasicSet.class.isAssignableFrom(type) &&
388             !Basic2ObjectMap.class.isAssignableFrom(type)) {
389           // use the Writable method defined by the type
390           DirectWritableSerializer serializer = new DirectWritableSerializer();
391           return serializer;
392         } else {
393           FieldSerializer serializer = new FieldSerializer<>(kryo, type);
394           serializer.setIgnoreSyntheticFields(false);
395           return serializer;
396         }
397       }
398     };
399 
400     kryo.addDefaultSerializer(Writable.class, customSerializerFactory);
401     kryo.setDefaultSerializer(customSerializerFactory);
402 
403     if (hasBuffer) {
404       kryo.input = new InputChunked(4096);
405       kryo.output = new OutputChunked(4096);
406       kryo.dataInputWrapperStream = new DataInputWrapperStream();
407       kryo.dataOutputWrapperStream = new DataOutputWrapperStream();
408     }
409 
410     if (!trackReferences) {
411       kryo.setReferences(false);
412 
413       // Auto reset can only be disabled if the GiraphClassResolver is
414       // properly initialized.
415       if (GiraphClassResolver.isInitialized()) {
416         kryo.setAutoReset(false);
417       }
418     }
419     return kryo;
420   }
421 
422   /**
423    * Register serializer for class with class name
424    *
425    * @param kryo HadoopKryo
426    * @param className Name of the class for which to register serializer
427    * @param serializer Serializer to use
428    */
429   private static void registerSerializer(HadoopKryo kryo, String className,
430       Serializer serializer) {
431     try {
432       kryo.register(Class.forName(className), serializer);
433     } catch (ClassNotFoundException e) {
434       throw new IllegalStateException("Class " + className + " is missing", e);
435     }
436   }
437 
438   /**
439    * Initialize reusable objects for reading from given DataInput.
440    *
441    * @param in Input stream
442    */
443   private void setDataInput(DataInput in) {
444     dataInputWrapperStream.setDataInput(in);
445     input.setInputStream(dataInputWrapperStream);
446   }
447 
448   /**
449    * Initialize reusable objects for writing into given DataOutput.
450    *
451    *  @param out Output stream
452    */
453   private void setDataOutput(DataOutput out) {
454     dataOutputWrapperStream.setDataOutput(out);
455     output.setOutputStream(dataOutputWrapperStream);
456   }
457 
458   /**
459    * Get or create reusable serializer for given class.
460    *
461    * @param type Type of the object
462    * @return Serializer
463    */
464   private ReusableFieldSerializer<Object> getOrCreateReusableSerializer(
465       Class<?> type) {
466     ReusableFieldSerializer<Object> serializer =
467         classToIntoSerializer.get(type);
468     if (serializer == null) {
469       serializer = new ReusableFieldSerializer<>(this, type);
470       classToIntoSerializer.put(type, serializer);
471     }
472     return serializer;
473   }
474 
475   /**
476    * Internal write implementation, that reuses HadoopKryo objects
477    * from the pool.
478    *
479    * @param out Output stream
480    * @param object Object to be written
481    * @param outOf whether we are writing reusable objects,
482    *              or full objects with class name
483    */
484   private static void writeInternal(
485       final DataOutput out, final Object object, final boolean outOf) {
486     KRYO_POOL.run(new KryoCallback<Void>() {
487       @Override
488       public Void execute(Kryo kryo) {
489         HadoopKryo hkryo = (HadoopKryo) kryo;
490         hkryo.setDataOutput(out);
491 
492         if (outOf) {
493           hkryo.writeOutOfObject(hkryo.output, object);
494         } else {
495           hkryo.writeClassAndObject(hkryo.output, object);
496         }
497 
498         hkryo.output.endChunks();
499         hkryo.output.close();
500 
501         return null;
502       }
503     });
504   }
505 
506   /**
507    * Internal read implementation, that reuses HadoopKryo objects
508    * from the pool.
509    *
510    * @param in Input stream
511    * @param outObject Object to fill from input (if not null)
512    * @param into whether we are reading reusable objects,
513    *             or full objects with class name
514    * @return Read object (new one, or same passed in if we use reusable)
515    * @param <T> Type of the object to read
516    */
517   @SuppressWarnings("unchecked")
518   private static <T> T readInternal(
519       final DataInput in, final T outObject, final boolean into) {
520     return KRYO_POOL.run(new KryoCallback<T>() {
521       @Override
522       public T execute(Kryo kryo) {
523         HadoopKryo hkryo = (HadoopKryo) kryo;
524         hkryo.setDataInput(in);
525 
526         T object;
527         if (into) {
528           hkryo.readIntoObject(hkryo.input, outObject);
529           object = outObject;
530         } else {
531           object = (T) hkryo.readClassAndObject(hkryo.input);
532         }
533         hkryo.input.nextChunks();
534 
535         hkryo.input.close();
536         return object;
537       }
538     });
539   }
540 
541   /**
542    * Reads an object, from input, into a given object,
543    * allowing object reuse.
544    *
545    * @param input Input stream
546    * @param object Object to fill from input
547    */
548   private void readIntoObject(Input input, Object object) {
549     Preconditions.checkNotNull(object);
550 
551     Class<?> type = object.getClass();
552     ReusableFieldSerializer<Object> serializer =
553         getOrCreateReusableSerializer(type);
554 
555     serializer.setReadIntoObject(object);
556     Object result = readObject(input, type, serializer);
557 
558     Preconditions.checkState(result == object);
559   }
560 
561   /**
562    * Write an object to output, in a way that can be read
563    * using readIntoObject.
564    * @param output Output stream
565    * @param object Object to be written
566    */
567   private void writeOutOfObject(Output output, Object object) {
568     ReusableFieldSerializer<Object> serializer =
569         getOrCreateReusableSerializer(object.getClass());
570     writeObject(output, object, serializer);
571   }
572 
573 }