This project has retired. For details please refer to its Attic page.
FastUtilSerializer xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.writable.kryo.serializers;
19  
20  import java.io.IOException;
21  import java.io.ObjectInputStream;
22  import java.io.ObjectOutputStream;
23  import java.lang.reflect.InvocationTargetException;
24  import java.lang.reflect.Method;
25  import java.util.ArrayList;
26  
27  import com.esotericsoftware.kryo.Kryo;
28  import com.esotericsoftware.kryo.Serializer;
29  import com.esotericsoftware.kryo.io.Input;
30  import com.esotericsoftware.kryo.io.Output;
31  import com.esotericsoftware.kryo.serializers.FieldSerializer;
32  
33  /**
34   * Kryo Serializer for Fastutil collection class.
35   * By default, because they extend boxed collections, are being serialized very
36   * inefficiently through a lot of temporary object creation.
37   *
38   * We are relying that fastutil classes are written to be correctly serialized
39   * with Java serialization, and have put transient on all array fields and are
40   * doing custom efficient serialization in writeObject/readObject methods.
41   * This Serializer then swaps ObjectOutputStream for all default fields with
42   * FieldSerializer, and then calls appropriate writeObject/readObject methods.
43   * We are also relying on defaultWriteObject/defaultReadObject being
44   * effectively called first within those methods
45   *
46   * @param <T> Object type
47   */
48  public class FastUtilSerializer<T> extends Serializer<T> {
49    /** List of all types generated by fastutil */
50    private static final String[] PRIMITIVE_TYPES = new String[] {
51      "Boolean", "Byte", "Short", "Int", "Long", "Float", "Double", "Char",
52      "Object"};
53    /** List of all types used as keys in fastutil */
54    private static final String[] PRIMITIVE_KEY_TYPES = new String[] {
55      "Byte", "Short", "Int", "Long", "Float", "Double", "Char", "Object"};
56  
57    /** Field serializer for this fastutil class */
58    private final FieldSerializer<T> fieldSerializer;
59  
60    /** Handle to writeObject Method on this fastutil class*/
61    private final Method writeMethod;
62    /** Handle to readObject Method on this fastutil class*/
63    private final Method readMethod;
64    /** Reusable output stream wrapper */
65    private final FastUtilSerializer.FastutilKryoObjectOutputStream outputWrapper;
66    /** Reusable input stream wrapper */
67    private final FastUtilSerializer.FastutilKryoObjectInputStream inputWrapper;
68  
69    /**
70     * Creates and initializes new serializer for a given fastutil class.
71     * @param kryo Kryo instance
72     * @param type Fastutil class
73     */
74    public FastUtilSerializer(Kryo kryo, Class<T> type) {
75      fieldSerializer = new FieldSerializer<>(kryo, type);
76      fieldSerializer.setIgnoreSyntheticFields(false);
77  
78      try {
79        writeMethod = type.getDeclaredMethod(
80            "writeObject", ObjectOutputStream.class);
81        writeMethod.setAccessible(true);
82        readMethod = type.getDeclaredMethod(
83            "readObject", ObjectInputStream.class);
84        readMethod.setAccessible(true);
85      } catch (NoSuchMethodException e) {
86        throw new RuntimeException(
87            "Fastutil class " + type +
88            " doesn't have readObject/writeObject methods", e);
89      }
90  
91      try {
92        outputWrapper = new FastutilKryoObjectOutputStream();
93        inputWrapper = new FastutilKryoObjectInputStream();
94      } catch (IOException e) {
95        throw new RuntimeException(e);
96      }
97    }
98  
99    /**
100    * Register serializer for a given fastutil class.
101    * @param kryo Kryo instance
102    * @param fastutilClass Fastutil class
103    */
104   public static void register(Kryo kryo, Class<?> fastutilClass) {
105     kryo.register(fastutilClass, new FastUtilSerializer<>(kryo, fastutilClass));
106   }
107 
108   /**
109    * Registers serializers for all possible fastutil classes.
110    *
111    * There are many fastutil classes, so it is recommended to call this
112    * function at the end, so they fastutil classes don't use up small
113    * registration numbers.
114    *
115    * @param kryo Kryo instance
116    */
117   public static void registerAll(Kryo kryo) {
118     registerArrayLists(kryo);
119     registerArrayBigList(kryo);
120     registerOpenHashSets(kryo);
121     registerArraySets(kryo);
122     registerRBTreeSets(kryo);
123     registerAVLTreeSets(kryo);
124     registerOpenHashMaps(kryo);
125     registerRBTreeMaps(kryo);
126     registerAVLTreeMaps(kryo);
127 
128     // Note - HeapPriorityQueues don't extend boxed collection,
129     // and so they work out of the box correctly
130   }
131 
132   /**
133    * Register all Fastutil ArrayLists.
134    *
135    * @param kryo Kryo instance
136    */
137   public static void registerArrayLists(Kryo kryo) {
138     registerAll(kryo, singleTypes(
139         "it.unimi.dsi.fastutil._t1_s._T1_ArrayList", PRIMITIVE_TYPES));
140   }
141 
142   /**
143    * Register all Fastutil ArrayBigLists.
144    *
145    * @param kryo Kryo instance
146    */
147   public static void registerArrayBigList(Kryo kryo) {
148     registerAll(kryo, singleTypes(
149         "it.unimi.dsi.fastutil._t1_s._T1_BigArrayBigList", PRIMITIVE_TYPES));
150   }
151 
152   /**
153    * Register all Fastutil OpenHashSets.
154    *
155    * @param kryo Kryo instance
156    */
157   public static void registerOpenHashSets(Kryo kryo) {
158     registerAll(kryo, singleTypes(
159         "it.unimi.dsi.fastutil._t1_s._T1_OpenHashSet", PRIMITIVE_TYPES));
160   }
161 
162   /**
163    * Register all Fastutil ArraySets.
164    *
165    * @param kryo Kryo instance
166    */
167   public static void registerArraySets(Kryo kryo) {
168     registerAll(kryo, singleTypes(
169         "it.unimi.dsi.fastutil._t1_s._T1_ArraySet", PRIMITIVE_TYPES));
170   }
171 
172   /**
173    * Register all Fastutil RBTreeSets.
174    *
175    * @param kryo Kryo instance
176    */
177   public static void registerRBTreeSets(Kryo kryo) {
178     registerAll(kryo, singleTypes(
179         "it.unimi.dsi.fastutil._t1_s._T1_RBTreeSet", PRIMITIVE_KEY_TYPES));
180   }
181 
182   /**
183    * Register all Fastutil AVLTreeSets.
184    *
185    * @param kryo Kryo instance
186    */
187   public static void registerAVLTreeSets(Kryo kryo) {
188     registerAll(kryo, singleTypes(
189         "it.unimi.dsi.fastutil._t1_s._T1_AVLTreeSet", PRIMITIVE_KEY_TYPES));
190   }
191 
192   /**
193    * Register all Fastutil OpenHashMaps.
194    *
195    * @param kryo Kryo instance
196    */
197   public static void registerOpenHashMaps(Kryo kryo) {
198     registerAll(kryo, doubleTypes(
199         "it.unimi.dsi.fastutil._t1_s._T1_2_T2_OpenHashMap",
200         PRIMITIVE_KEY_TYPES, PRIMITIVE_TYPES));
201   }
202 
203   /**
204    * Register all Fastutil RBTreeMaps.
205    *
206    * @param kryo Kryo instance
207    */
208   public static void registerRBTreeMaps(Kryo kryo) {
209     registerAll(kryo, doubleTypes(
210         "it.unimi.dsi.fastutil._t1_s._T1_2_T2_RBTreeMap",
211         PRIMITIVE_KEY_TYPES, PRIMITIVE_TYPES));
212   }
213 
214   /**
215    * Register all Fastutil AVLTreeMaps.
216    *
217    * @param kryo Kryo instance
218    */
219   public static void registerAVLTreeMaps(Kryo kryo) {
220     registerAll(kryo, doubleTypes(
221         "it.unimi.dsi.fastutil._t1_s._T1_2_T2_AVLTreeMap",
222         PRIMITIVE_KEY_TYPES, PRIMITIVE_TYPES));
223   }
224 
225   /**
226    * Register all class from the list of classes.
227    *
228    * @param kryo Kryo instance
229    * @param types List of classes
230    */
231   private static void registerAll(Kryo kryo, ArrayList<Class<?>> types) {
232     for (Class<?> type : types) {
233       register(kryo, type);
234     }
235   }
236 
237   /**
238    * Returns list of all classes that are generated by using given
239    * pattern, and replacing it with passed list of types.
240    * Pattern contains _t1_ and _T1_, for lowercase and actual name.
241    *
242    * @param pattern Given pattern
243    * @param types Given list of strings to replace into pattern
244    * @return List of all classes
245    */
246   private static ArrayList<Class<?>> singleTypes(
247       String pattern, String[] types) {
248     ArrayList<Class<?>> result = new ArrayList<>();
249 
250     for (String type : types) {
251       try {
252         result.add(Class.forName(
253             pattern.replaceAll("_T1_", type).replaceAll(
254                 "_t1_", type.toLowerCase())));
255       } catch (ClassNotFoundException e) {
256         throw new RuntimeException(pattern + " " + type, e);
257       }
258     }
259     return result;
260   }
261 
262   /**
263    * Returns list of all classes that are generated by using given
264    * pattern, and replacing it with passed list of types.
265    * Pattern contains two variable pairs: _t1_, _T1_ and _t2_, _T2_,
266    * in each pair one for lowercase and one for actual name.
267    *
268    * @param pattern Given pattern
269    * @param types1 Given list of strings to replace t1 into pattern
270    * @param types2 Given list of strings to replace t2 into pattern
271    * @return List of all classes
272    */
273   private static ArrayList<Class<?>> doubleTypes(
274       String pattern, String[] types1, String[] types2) {
275     ArrayList<Class<?>> result = new ArrayList<>();
276 
277     for (String type1 : types1) {
278       for (String type2 : types2) {
279         try {
280           result.add(Class.forName(
281               pattern.replaceAll("_T1_", type1).replaceAll(
282                   "_t1_", type1.toLowerCase())
283                 .replaceAll("_T2_", type2).replaceAll(
284                     "_t2_", type2.toLowerCase())));
285         } catch (ClassNotFoundException e) {
286           throw new RuntimeException(pattern + " " + type1 + " " + type2, e);
287         }
288       }
289     }
290     return result;
291   }
292 
293   @Override
294   public void write(Kryo kryo, Output output, T object) {
295     fieldSerializer.write(kryo, output, object);
296 
297     outputWrapper.set(output, kryo);
298     try {
299       writeMethod.invoke(object, outputWrapper);
300     } catch (IllegalAccessException | InvocationTargetException e) {
301       throw new RuntimeException("writeObject failed", e);
302     }
303   }
304 
305   @Override
306   public T read(Kryo kryo, Input input, Class<T> type) {
307     T result = fieldSerializer.read(kryo, input, type);
308 
309     if (result != null) {
310       inputWrapper.set(input, kryo);
311       try {
312         readMethod.invoke(result, inputWrapper);
313       } catch (IllegalAccessException | InvocationTargetException e) {
314         throw new RuntimeException("readObject failed", e);
315       }
316     }
317 
318     return result;
319   }
320 
321   /**
322    * Wrapper around ObjectOutputStream that ignores defaultWriteObject (assumes
323    * that needed logic was already executed before), and passes all other calls
324    * to Output
325    */
326   private static class FastutilKryoObjectOutputStream
327       extends ObjectOutputStream {
328     /** Output */
329     private Output output;
330     /** Kryo */
331     private Kryo kryo;
332 
333     /** Constructor */
334     FastutilKryoObjectOutputStream() throws IOException {
335       super();
336     }
337 
338     /**
339      * Setter
340      *
341      * @param output Output
342      * @param kryo kryo
343      */
344     public void set(Output output, Kryo kryo) {
345       this.output = output;
346       this.kryo = kryo;
347     }
348 
349     @Override
350     public void defaultWriteObject() throws IOException {
351     }
352 
353     @Override
354     public void writeBoolean(boolean val) throws IOException {
355       output.writeBoolean(val);
356     }
357 
358     @Override
359     public void writeByte(int val) throws IOException  {
360       output.writeByte(val);
361     }
362 
363     @Override
364     public void writeShort(int val)  throws IOException {
365       output.writeShort(val);
366     }
367 
368     @Override
369     public void writeChar(int val)  throws IOException {
370       output.writeChar((char) val);
371     }
372 
373     @Override
374     public void writeInt(int val)  throws IOException {
375       output.writeInt(val, false);
376     }
377 
378     @Override
379     public void writeLong(long val)  throws IOException {
380       output.writeLong(val, false);
381     }
382 
383     @Override
384     public void writeFloat(float val) throws IOException {
385       output.writeFloat(val);
386     }
387 
388     @Override
389     public void writeDouble(double val) throws IOException {
390       output.writeDouble(val);
391     }
392 
393     @Override
394     protected void writeObjectOverride(Object obj) throws IOException {
395       kryo.writeClassAndObject(output, obj);
396     }
397   }
398 
399   /**
400    * Wrapper around ObjectOutputStream that ignores defaultReadObject
401    * (assumes that needed logic was already executed before), and passes
402    * all other calls to Output
403    */
404   private static class FastutilKryoObjectInputStream extends ObjectInputStream {
405     /** Input */
406     private Input input;
407     /** Kryo */
408     private Kryo kryo;
409 
410     /** Constructor */
411     FastutilKryoObjectInputStream() throws IOException {
412       super();
413     }
414 
415     /**
416      * Setter
417      *
418      * @param input Input
419      * @param kryo Kryo
420      */
421     public void set(Input input, Kryo kryo) {
422       this.input = input;
423       this.kryo = kryo;
424     }
425 
426     @Override
427     public void defaultReadObject() throws IOException, ClassNotFoundException {
428     }
429 
430     @Override
431     public boolean readBoolean() throws IOException {
432       return input.readBoolean();
433     }
434 
435     @Override
436     public byte readByte() throws IOException {
437       return input.readByte();
438     }
439 
440     @Override
441     public char readChar() throws IOException {
442       return input.readChar();
443     }
444 
445     @Override
446     public short readShort() throws IOException {
447       return input.readShort();
448     }
449 
450     @Override
451     public int readInt() throws IOException {
452       return input.readInt(false);
453     }
454 
455     @Override
456     public long readLong() throws IOException {
457       return input.readLong(false);
458     }
459 
460     @Override
461     public float readFloat() throws IOException {
462       return input.readFloat();
463     }
464 
465     @Override
466     public double readDouble() throws IOException {
467       return input.readDouble();
468     }
469 
470     @Override
471     protected Object readObjectOverride()
472         throws IOException, ClassNotFoundException {
473       return kryo.readClassAndObject(input);
474     }
475   }
476 }