View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.writable.kryo;
19  
20  import java.io.DataInput;
21  import java.io.DataOutput;
22  import java.util.Arrays;
23  import java.util.Collections;
24  import java.util.LinkedHashMap;
25  import java.util.Map;
26  import java.util.Map.Entry;
27  import java.util.Random;
28  
29  import org.apache.giraph.conf.GiraphConfigurationSettable;
30  import org.apache.giraph.types.ops.collections.Basic2ObjectMap;
31  import org.apache.giraph.types.ops.collections.BasicSet;
32  import org.apache.giraph.writable.kryo.markers.KryoIgnoreWritable;
33  import org.apache.giraph.writable.kryo.markers.NonKryoWritable;
34  import org.apache.giraph.writable.kryo.serializers.ArraysAsListSerializer;
35  import org.apache.giraph.writable.kryo.serializers.CollectionsNCopiesSerializer;
36  import org.apache.giraph.writable.kryo.serializers.DirectWritableSerializer;
37  import org.apache.giraph.writable.kryo.serializers.FastUtilSerializer;
38  import org.apache.giraph.writable.kryo.serializers.ImmutableMapSerializer;
39  import org.apache.giraph.writable.kryo.serializers.ReusableFieldSerializer;
40  import org.apache.hadoop.conf.Configurable;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.io.Writable;
43  import org.apache.log4j.Logger;
44  import org.objenesis.strategy.StdInstantiatorStrategy;
45  
46  import com.esotericsoftware.kryo.Kryo;
47  import com.esotericsoftware.kryo.Serializer;
48  import com.esotericsoftware.kryo.factories.SerializerFactory;
49  import com.esotericsoftware.kryo.io.Input;
50  import com.esotericsoftware.kryo.io.InputChunked;
51  import com.esotericsoftware.kryo.io.Output;
52  import com.esotericsoftware.kryo.io.OutputChunked;
53  import com.esotericsoftware.kryo.pool.KryoCallback;
54  import com.esotericsoftware.kryo.pool.KryoFactory;
55  import com.esotericsoftware.kryo.pool.KryoPool;
56  import com.esotericsoftware.kryo.serializers.ClosureSerializer;
57  import com.esotericsoftware.kryo.serializers.FieldSerializer;
58  import com.esotericsoftware.kryo.util.ObjectMap;
59  import com.google.common.base.Preconditions;
60  
61  import de.javakaffee.kryoserializers.guava.ImmutableListSerializer;
62  
63  /**
64   * Kryo instance that provides serialization through DataInput/DataOutput
65   * that org.apache.hadoop.io.Writable uses.
66   *
67   * All public APIs are static.
68   *
69   * It extends Kryo to reuse KryoPool functionality, but have additional needed
70   * objects cached as well. If we move to ThreadLocal or other caching
71   * technique, we can use composition, instead of inheritance here.
72   */
73  public class HadoopKryo extends Kryo {
74    /** Pool of reusable Kryo objects, since they are expensive to create */
75    private static final KryoPool KRYO_POOL = new KryoPool.Builder(
76        new KryoFactory() {
77          @Override
78          public Kryo create() {
79            return createKryo();
80          }
81        }).build();
82  
83    /**
84     * List of interfaces/parent classes that will not be allowed to be
85     * serialized, together with explanation of why, that will be shown
86     * when throwing such exception
87     */
88    private static final Map<Class<?>, String> NON_SERIALIZABLE;
89  
90    static {
91      NON_SERIALIZABLE = new LinkedHashMap<>();
92      NON_SERIALIZABLE.put(
93          NonKryoWritable.class,
94          "it is marked to not allow serialization, " +
95          "look at the class for more details");
96      NON_SERIALIZABLE.put(
97          KryoWritableWrapper.class, "recursion is dissallowed");
98      NON_SERIALIZABLE.put(
99          Configuration.class,
100         "it cannot be supported since it contains ClassLoader");
101     NON_SERIALIZABLE.put(
102         GiraphConfigurationSettable.class, "configuration cannot be set");
103     NON_SERIALIZABLE.put(
104         Configurable.class, "configuration cannot be set");
105     NON_SERIALIZABLE.put(
106         Random.class,
107         "it should be rarely serialized, since it would create same stream " +
108         "of numbers everywhere, use TransientRandom instead");
109     NON_SERIALIZABLE.put(
110         Logger.class,
111         "Logger must be a static field");
112   }
113 
114   // Use chunked streams, so within same stream we can use both kryo and
115   // non-kryo serialization.
116   /** Reusable Input object */
117   private final InputChunked input = new InputChunked(4096);
118   /** Reusable Output object */
119   private final OutputChunked output = new OutputChunked(4096);
120 
121   /** Reusable DataInput wrapper stream */
122   private final DataInputWrapperStream dataInputWrapperStream =
123       new DataInputWrapperStream();
124   /** Reusable DataOutput wrapper stream */
125   private final DataOutputWrapperStream dataOutputWrapperStream =
126       new DataOutputWrapperStream();
127 
128   /**
129    * Map of already initialized serializers used
130    * for readIntoObject/writeOutOfObject pair of methods
131    */
132   private final ObjectMap<Class<?>, ReusableFieldSerializer<Object>>
133   classToIntoSerializer = new ObjectMap<>();
134 
135   /** Hide constructor, so all access go through pool of cached objects */
136   private HadoopKryo() {
137   }
138 
139   // Public API:
140 
141   /**
142    * Write type of given object and the object itself to the output stream.
143    * Inverse of readClassAndObject.
144    *
145    * @param out Output stream
146    * @param object Object to write
147    */
148   public static void writeClassAndObject(
149       final DataOutput out, final Object object) {
150     writeInternal(out, object, false);
151   }
152 
153   /**
154    * Read object from the input stream, by reading first type of the object,
155    * and then all of it's fields.
156    * Inverse of writeClassAndObject.
157    *
158    * @param in Input stream
159    * @return Deserialized object
160    * @param <T> Type of the object being read
161    */
162   public static <T> T readClassAndObject(DataInput in) {
163     return readInternal(in, null, false);
164   }
165 
166   /**
167    * Write an object to output, in a way that can be read by readIntoObject.
168    *
169    * @param out Output stream
170    * @param object Object to be written
171    */
172   public static void writeOutOfObject(
173       final DataOutput out, final Object object) {
174     writeInternal(out, object, true);
175   }
176 
177   /**
178    * Reads an object, from input, into a given object,
179    * allowing object reuse.
180    * Inverse of writeOutOfObject.
181    *
182    * @param in Input stream
183    * @param object Object to fill from input
184    */
185   public static void readIntoObject(DataInput in, Object object) {
186     readInternal(in, object, true);
187   }
188 
189   /**
190    * Create copy of the object, by magically recursively copying
191    * all of it's fields, keeping reference structures (like cycles)
192    *
193    * @param object Object to be copied
194    * @return Copy of the object.
195    * @param <T> Type of the object
196    */
197   public static <T> T createCopy(final T object) {
198     return KRYO_POOL.run(new KryoCallback<T>() {
199       @Override
200       public T execute(Kryo kryo) {
201         return kryo.copy(object);
202       }
203     });
204   }
205 
206   // Private implementation:
207 
208   /**
209    * Create new instance of HadoopKryo, properly initialized.
210    *
211    * @return New HadoopKryo instnace
212    */
213   private static HadoopKryo createKryo() {
214     HadoopKryo kryo = new HadoopKryo();
215 
216     String version = System.getProperty("java.version");
217     char minor = version.charAt(2);
218     if (minor >= '8') {
219       try {
220         kryo.register(Class.forName("java.lang.invoke.SerializedLambda"));
221         kryo.register(Class.forName("com.esotericsoftware.kryo.Kryo$Closure"),
222             new ClosureSerializer());
223       } catch (ClassNotFoundException e) {
224         throw new IllegalStateException(
225             "Trying to use Kryo on >= Java 8 (" + version +
226             "), but unable to find needed classes", e);
227       }
228     }
229 
230     kryo.register(Arrays.asList().getClass(), new ArraysAsListSerializer());
231     kryo.register(Collections.nCopies(1, new Object()).getClass(),
232         new CollectionsNCopiesSerializer());
233 
234     ImmutableListSerializer.registerSerializers(kryo);
235 
236     registerSerializer(kryo, "com.google.common.collect.RegularImmutableMap",
237         new ImmutableMapSerializer());
238     registerSerializer(kryo,
239         "com.google.common.collect.SingletonImmutableBiMap",
240         new ImmutableMapSerializer());
241 
242     // There are many fastutil classes, register them at the end,
243     // so they don't use up small registration numbers
244     FastUtilSerializer.registerAll(kryo);
245 
246     kryo.setInstantiatorStrategy(new DefaultInstantiatorStrategy(
247         new StdInstantiatorStrategy()));
248 
249     SerializerFactory customSerializerFactory = new SerializerFactory() {
250       @SuppressWarnings("rawtypes")
251       @Override
252       public Serializer makeSerializer(Kryo kryo, final Class<?> type) {
253         for (final Entry<Class<?>, String> entry :
254             NON_SERIALIZABLE.entrySet()) {
255           if (entry.getKey().isAssignableFrom(type)) {
256             // Allow Class object to be serialized, but not a live instance.
257             return new Serializer() {
258               @Override
259               public Object read(Kryo kryo, Input input, Class type) {
260                 throw new RuntimeException("Cannot serialize " + type +
261                     ". Objects being serialized cannot capture " +
262                     entry.getKey() + " because " + entry.getValue() +
263                     ". Either remove field in question" +
264                     ", or make it transient (so that it isn't serialized)");
265               }
266 
267               @Override
268               public void write(Kryo kryo, Output output, Object object) {
269                 throw new RuntimeException("Cannot serialize " + type +
270                     ". Objects being serialized cannot capture " +
271                     entry.getKey() + " because " + entry.getValue() +
272                     ". Either remove field in question" +
273                     ", or make it transient (so that it isn't serialized)");
274               }
275             };
276           }
277         }
278 
279         if (Writable.class.isAssignableFrom(type) &&
280             !KryoIgnoreWritable.class.isAssignableFrom(type) &&
281             // remove BasicSet, BasicArrayList and Basic2ObjectMap temporarily,
282             // for lack of constructors
283             !BasicSet.class.isAssignableFrom(type) &&
284             !Basic2ObjectMap.class.isAssignableFrom(type)) {
285           // use the Writable method defined by the type
286           DirectWritableSerializer serializer = new DirectWritableSerializer();
287           return serializer;
288         } else {
289           FieldSerializer serializer = new FieldSerializer<>(kryo, type);
290           serializer.setIgnoreSyntheticFields(false);
291           return serializer;
292         }
293       }
294     };
295 
296     kryo.addDefaultSerializer(Writable.class, customSerializerFactory);
297     kryo.setDefaultSerializer(customSerializerFactory);
298 
299     return kryo;
300   }
301 
302   /**
303    * Register serializer for class with class name
304    *
305    * @param kryo HadoopKryo
306    * @param className Name of the class for which to register serializer
307    * @param serializer Serializer to use
308    */
309   private static void registerSerializer(HadoopKryo kryo, String className,
310       Serializer serializer) {
311     try {
312       kryo.register(Class.forName(className), serializer);
313     } catch (ClassNotFoundException e) {
314       throw new IllegalStateException("Class " + className + " is missing", e);
315     }
316   }
317 
318   /**
319    * Initialize reusable objects for reading from given DataInput.
320    *
321    * @param in Input stream
322    */
323   private void setDataInput(DataInput in) {
324     dataInputWrapperStream.setDataInput(in);
325     input.setInputStream(dataInputWrapperStream);
326   }
327 
328   /**
329    * Initialize reusable objects for writing into given DataOutput.
330    *
331    *  @param out Output stream
332    */
333   private void setDataOutput(DataOutput out) {
334     dataOutputWrapperStream.setDataOutput(out);
335     output.setOutputStream(dataOutputWrapperStream);
336   }
337 
338   /**
339    * Get or create reusable serializer for given class.
340    *
341    * @param type Type of the object
342    * @return Serializer
343    */
344   private ReusableFieldSerializer<Object> getOrCreateReusableSerializer(
345       Class<?> type) {
346     ReusableFieldSerializer<Object> serializer =
347         classToIntoSerializer.get(type);
348     if (serializer == null) {
349       serializer = new ReusableFieldSerializer<>(this, type);
350       classToIntoSerializer.put(type, serializer);
351     }
352     return serializer;
353   }
354 
355   /**
356    * Internal write implementation, that reuses HadoopKryo objects
357    * from the pool.
358    *
359    * @param out Output stream
360    * @param object Object to be written
361    * @param outOf whether we are writing reusable objects,
362    *              or full objects with class name
363    */
364   private static void writeInternal(
365       final DataOutput out, final Object object, final boolean outOf) {
366     KRYO_POOL.run(new KryoCallback<Void>() {
367       @Override
368       public Void execute(Kryo kryo) {
369         HadoopKryo hkryo = (HadoopKryo) kryo;
370         hkryo.setDataOutput(out);
371 
372         if (outOf) {
373           hkryo.writeOutOfObject(hkryo.output, object);
374         } else {
375           hkryo.writeClassAndObject(hkryo.output, object);
376         }
377 
378         hkryo.output.endChunks();
379         hkryo.output.close();
380 
381         return null;
382       }
383     });
384   }
385 
386   /**
387    * Internal read implementation, that reuses HadoopKryo objects
388    * from the pool.
389    *
390    * @param in Input stream
391    * @param outObject Object to fill from input (if not null)
392    * @param into whether we are reading reusable objects,
393    *             or full objects with class name
394    * @return Read object (new one, or same passed in if we use reusable)
395    * @param <T> Type of the object to read
396    */
397   @SuppressWarnings("unchecked")
398   private static <T> T readInternal(
399       final DataInput in, final T outObject, final boolean into) {
400     return KRYO_POOL.run(new KryoCallback<T>() {
401       @Override
402       public T execute(Kryo kryo) {
403         HadoopKryo hkryo = (HadoopKryo) kryo;
404         hkryo.setDataInput(in);
405 
406         T object;
407         if (into) {
408           hkryo.readIntoObject(hkryo.input, outObject);
409           object = outObject;
410         } else {
411           object = (T) hkryo.readClassAndObject(hkryo.input);
412         }
413         hkryo.input.nextChunks();
414 
415         hkryo.input.close();
416         return object;
417       }
418     });
419   }
420 
421   /**
422    * Reads an object, from input, into a given object,
423    * allowing object reuse.
424    *
425    * @param input Input stream
426    * @param object Object to fill from input
427    */
428   private void readIntoObject(Input input, Object object) {
429     Preconditions.checkNotNull(object);
430 
431     Class<?> type = object.getClass();
432     ReusableFieldSerializer<Object> serializer =
433         getOrCreateReusableSerializer(type);
434 
435     serializer.setReadIntoObject(object);
436     Object result = readObject(input, type, serializer);
437 
438     Preconditions.checkState(result == object);
439   }
440 
441   /**
442    * Write an object to output, in a way that can be read
443    * using readIntoObject.
444    * @param output Output stream
445    * @param object Object to be written
446    */
447   private void writeOutOfObject(Output output, Object object) {
448     ReusableFieldSerializer<Object> serializer =
449         getOrCreateReusableSerializer(object.getClass());
450     writeObject(output, object, serializer);
451   }
452 
453 }