View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.utils;
20  
21  import static org.apache.hadoop.util.ReflectionUtils.newInstance;
22  
23  import java.io.ByteArrayInputStream;
24  import java.io.ByteArrayOutputStream;
25  import java.io.DataInput;
26  import java.io.DataInputStream;
27  import java.io.DataOutput;
28  import java.io.DataOutputStream;
29  import java.io.IOException;
30  import java.lang.reflect.InvocationTargetException;
31  import java.util.ArrayList;
32  import java.util.List;
33  
34  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
35  import org.apache.giraph.edge.Edge;
36  import org.apache.giraph.edge.OutEdges;
37  import org.apache.giraph.factories.ValueFactory;
38  import org.apache.giraph.graph.Vertex;
39  import org.apache.giraph.zk.ZooKeeperExt;
40  import org.apache.giraph.zk.ZooKeeperExt.PathStat;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.io.NullWritable;
43  import org.apache.hadoop.io.Writable;
44  import org.apache.hadoop.io.WritableComparable;
45  import org.apache.zookeeper.CreateMode;
46  import org.apache.zookeeper.KeeperException;
47  import org.apache.zookeeper.ZooDefs.Ids;
48  import org.apache.zookeeper.data.Stat;
49  
50  /**
51   * Helper static methods for working with Writable objects.
52   */
53  public class WritableUtils {
54    /**
55     * Don't construct.
56     */
57    private WritableUtils() { }
58  
59    /**
60     * Instantiate a new Writable, checking for NullWritable along the way.
61     *
62     * @param klass Class
63     * @param <W> type
64     * @return new instance of class
65     */
66    public static <W extends Writable> W createWritable(Class<W> klass) {
67      return createWritable(klass, null);
68    }
69  
70    /**
71     * Instantiate a new Writable, checking for NullWritable along the way.
72     *
73     * @param klass Class
74     * @param configuration Configuration
75     * @param <W> type
76     * @return new instance of class
77     */
78    public static <W extends Writable> W createWritable(
79        Class<W> klass,
80        ImmutableClassesGiraphConfiguration configuration) {
81      W result;
82      if (NullWritable.class.equals(klass)) {
83        result = (W) NullWritable.get();
84      } else {
85        result = ReflectionUtils.newInstance(klass);
86      }
87      ConfigurationUtils.configureIfPossible(result, configuration);
88      return result;
89    }
90  
91  
92    /**
93     * Read fields from byteArray to a Writeable object.
94     *
95     * @param byteArray Byte array to find the fields in.
96     * @param writableObjects Objects to fill in the fields.
97     */
98    public static void readFieldsFromByteArray(
99        byte[] byteArray, Writable... writableObjects) {
100     DataInputStream inputStream =
101       new DataInputStream(new ByteArrayInputStream(byteArray));
102     try {
103       for (Writable writableObject : writableObjects) {
104         writableObject.readFields(inputStream);
105       }
106     } catch (IOException e) {
107       throw new IllegalStateException(
108           "readFieldsFromByteArray: IOException", e);
109     }
110   }
111 
112   /**
113    * Read fields from a ZooKeeper znode.
114    *
115    * @param zkExt ZooKeeper instance.
116    * @param zkPath Path of znode.
117    * @param watch Add a watch?
118    * @param stat Stat of znode if desired.
119    * @param writableObjects Objects to read into.
120    */
121   public static void readFieldsFromZnode(ZooKeeperExt zkExt,
122                                          String zkPath,
123                                          boolean watch,
124                                          Stat stat,
125                                          Writable... writableObjects) {
126     try {
127       byte[] zkData = zkExt.getData(zkPath, false, stat);
128       readFieldsFromByteArray(zkData, writableObjects);
129     } catch (KeeperException e) {
130       throw new IllegalStateException(
131         "readFieldsFromZnode: KeeperException on " + zkPath, e);
132     } catch (InterruptedException e) {
133       throw new IllegalStateException(
134         "readFieldsFromZnode: InterrruptedStateException on " + zkPath, e);
135     }
136   }
137 
138   /**
139    * Write object to a byte array.
140    *
141    * @param writableObjects Objects to write from.
142    * @return Byte array with serialized object.
143    */
144   public static byte[] writeToByteArray(Writable... writableObjects) {
145     ByteArrayOutputStream outputStream =
146         new ByteArrayOutputStream();
147     DataOutput output = new DataOutputStream(outputStream);
148     try {
149       for (Writable writableObject : writableObjects) {
150         writableObject.write(output);
151       }
152     } catch (IOException e) {
153       throw new IllegalStateException(
154           "writeToByteArray: IOStateException", e);
155     }
156     return outputStream.toByteArray();
157   }
158 
159   /**
160    * Read fields from byteArray to a Writeable object, skipping the size.
161    * Serialization method is choosable
162    *
163    * @param byteArray Byte array to find the fields in.
164    * @param writableObject Object to fill in the fields.
165    * @param unsafe Use unsafe deserialization
166    */
167   public static void readFieldsFromByteArrayWithSize(
168       byte[] byteArray, Writable writableObject, boolean unsafe) {
169     ExtendedDataInput extendedDataInput;
170     if (unsafe) {
171       extendedDataInput = new UnsafeByteArrayInputStream(byteArray);
172     } else {
173       extendedDataInput = new ExtendedByteArrayDataInput(byteArray);
174     }
175     try {
176       extendedDataInput.readInt();
177       writableObject.readFields(extendedDataInput);
178     } catch (IOException e) {
179       throw new IllegalStateException(
180           "readFieldsFromByteArrayWithSize: IOException", e);
181     }
182   }
183 
184   /**
185    * Write object to a byte array with the first 4 bytes as the size of the
186    * entire buffer (including the size).
187    *
188    * @param writableObject Object to write from.
189    * @param unsafe Use unsafe serialization?
190    * @return Byte array with serialized object.
191    */
192   public static byte[] writeToByteArrayWithSize(Writable writableObject,
193                                                 boolean unsafe) {
194     return writeToByteArrayWithSize(writableObject, null, unsafe);
195   }
196 
197   /**
198    * Write object to a byte array with the first 4 bytes as the size of the
199    * entire buffer (including the size).
200    *
201    * @param writableObject Object to write from.
202    * @param buffer Use this buffer instead
203    * @param unsafe Use unsafe serialization?
204    * @return Byte array with serialized object.
205    */
206   public static byte[] writeToByteArrayWithSize(Writable writableObject,
207                                                 byte[] buffer,
208                                                 boolean unsafe) {
209     ExtendedDataOutput extendedDataOutput;
210     if (unsafe) {
211       extendedDataOutput = new UnsafeByteArrayOutputStream(buffer);
212     } else {
213       extendedDataOutput = new ExtendedByteArrayDataOutput(buffer);
214     }
215     try {
216       extendedDataOutput.writeInt(-1);
217       writableObject.write(extendedDataOutput);
218       extendedDataOutput.writeInt(0, extendedDataOutput.getPos());
219     } catch (IOException e) {
220       throw new IllegalStateException("writeToByteArrayWithSize: " +
221           "IOException", e);
222     }
223 
224     return extendedDataOutput.getByteArray();
225   }
226 
227   /**
228    * Write object to a ZooKeeper znode.
229    *
230    * @param zkExt ZooKeeper instance.
231    * @param zkPath Path of znode.
232    * @param version Version of the write.
233    * @param writableObjects Objects to write from.
234    * @return Path and stat information of the znode.
235    */
236   public static PathStat writeToZnode(ZooKeeperExt zkExt,
237                                       String zkPath,
238                                       int version,
239                                       Writable... writableObjects) {
240     try {
241       byte[] byteArray = writeToByteArray(writableObjects);
242       return zkExt.createOrSetExt(zkPath,
243           byteArray,
244           Ids.OPEN_ACL_UNSAFE,
245           CreateMode.PERSISTENT,
246           true,
247           version);
248     } catch (KeeperException e) {
249       throw new IllegalStateException(
250           "writeToZnode: KeeperException on " + zkPath, e);
251     } catch (InterruptedException e) {
252       throw new IllegalStateException(
253           "writeToZnode: InterruptedException on " + zkPath, e);
254     }
255   }
256 
257   /**
258    * Write list of object to a byte array.
259    *
260    * @param writableList List of object to write from.
261    * @return Byte array with serialized objects.
262    */
263   public static byte[] writeListToByteArray(
264       List<? extends Writable> writableList) {
265     ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
266     DataOutput output = new DataOutputStream(outputStream);
267     try {
268       output.writeInt(writableList.size());
269       for (Writable writable : writableList) {
270         writable.write(output);
271       }
272     } catch (IOException e) {
273       throw new IllegalStateException(
274           "writeListToByteArray: IOException", e);
275     }
276     return outputStream.toByteArray();
277   }
278 
279   /**
280    * Write list of objects to a ZooKeeper znode.
281    *
282    * @param zkExt ZooKeeper instance.
283    * @param zkPath Path of znode.
284    * @param version Version of the write.
285    * @param writableList List of objects to write from.
286    * @return Path and stat information of the znode.
287    */
288   public static PathStat writeListToZnode(
289       ZooKeeperExt zkExt,
290       String zkPath,
291       int version,
292       List<? extends Writable> writableList) {
293     try {
294       return zkExt.createOrSetExt(
295           zkPath,
296           writeListToByteArray(writableList),
297           Ids.OPEN_ACL_UNSAFE,
298           CreateMode.PERSISTENT,
299           true,
300           version);
301     } catch (KeeperException e) {
302       throw new IllegalStateException(
303           "writeListToZnode: KeeperException on " + zkPath, e);
304     } catch (InterruptedException e) {
305       throw new IllegalStateException(
306           "writeListToZnode: InterruptedException on " + zkPath, e);
307     }
308   }
309 
310   /**
311    * Read fields from byteArray to a list of objects.
312    *
313    * @param byteArray Byte array to find the fields in.
314    * @param writableClass Class of the objects to instantiate.
315    * @param conf Configuration used for instantiation (i.e Configurable)
316    * @param <T> Object type
317    * @return List of objects.
318    */
319   public static <T extends Writable> List<T> readListFieldsFromByteArray(
320       byte[] byteArray,
321       Class<? extends T> writableClass,
322       Configuration conf) {
323     try {
324       DataInputStream inputStream =
325           new DataInputStream(new ByteArrayInputStream(byteArray));
326       int size = inputStream.readInt();
327       List<T> writableList = new ArrayList<T>(size);
328       for (int i = 0; i < size; ++i) {
329         T writable = newInstance(writableClass, conf);
330         writable.readFields(inputStream);
331         writableList.add(writable);
332       }
333       return writableList;
334     } catch (IOException e) {
335       throw new IllegalStateException(
336           "readListFieldsFromZnode: IOException", e);
337     }
338   }
339 
340   /**
341    * Read fields from a ZooKeeper znode into a list of objects.
342    *
343    * @param zkExt ZooKeeper instance.
344    * @param zkPath Path of znode.
345    * @param watch Add a watch?
346    * @param stat Stat of znode if desired.
347    * @param writableClass Class of the objects to instantiate.
348    * @param conf Configuration used for instantiation (i.e Configurable)
349    * @param <T> Object type
350    * @return List of objects.
351    */
352   public static <T extends Writable> List<T> readListFieldsFromZnode(
353       ZooKeeperExt zkExt,
354       String zkPath,
355       boolean watch,
356       Stat stat,
357       Class<? extends T> writableClass,
358       Configuration conf) {
359     try {
360       byte[] zkData = zkExt.getData(zkPath, false, stat);
361       return WritableUtils.<T>readListFieldsFromByteArray(zkData,
362           writableClass, conf);
363     } catch (KeeperException e) {
364       throw new IllegalStateException(
365           "readListFieldsFromZnode: KeeperException on " + zkPath, e);
366     } catch (InterruptedException e) {
367       throw new IllegalStateException(
368           "readListFieldsFromZnode: InterruptedException on " + zkPath,
369           e);
370     }
371   }
372 
373   /**
374    * Write ExtendedDataOutput to DataOutput
375    *
376    * @param extendedDataOutput ExtendedDataOutput to write
377    * @param out DataOutput to write to
378    */
379   public static void writeExtendedDataOutput(
380       ExtendedDataOutput extendedDataOutput, DataOutput out)
381     throws IOException {
382     out.writeInt(extendedDataOutput.getPos());
383     out.write(
384         extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
385   }
386 
387   /**
388    * Read ExtendedDataOutput from DataInput
389    *
390    * @param in DataInput to read from
391    * @param conf Configuration
392    * @return ExtendedDataOutput read
393    */
394   public static ExtendedDataOutput readExtendedDataOutput(DataInput in,
395       ImmutableClassesGiraphConfiguration conf) throws IOException {
396     int size = in.readInt();
397     byte[] buf = new byte[size];
398     in.readFully(buf);
399     return conf.createExtendedDataOutput(buf, size);
400   }
401 
402   /**
403    * Write vertex data to byte array with the first 4 bytes as the size of the
404    * entire buffer (including the size).
405    *
406    * @param vertex Vertex to write from.
407    * @param buffer Use this buffer instead
408    * @param unsafe Use unsafe serialization?
409    * @param conf Configuration
410    * @param <I> Vertex id
411    * @param <V> Vertex value
412    * @param <E> Edge value
413    * @return Byte array with serialized object.
414    */
415   public static <I extends WritableComparable, V extends Writable,
416       E extends Writable> byte[] writeVertexToByteArray(
417       Vertex<I, V, E> vertex,
418       byte[] buffer,
419       boolean unsafe,
420       ImmutableClassesGiraphConfiguration<I, V, E> conf) {
421     ExtendedDataOutput extendedDataOutput;
422     if (unsafe) {
423       extendedDataOutput = new UnsafeByteArrayOutputStream(buffer);
424     } else {
425       extendedDataOutput = new ExtendedByteArrayDataOutput(buffer);
426     }
427     try {
428       extendedDataOutput.writeInt(-1);
429       writeVertexToDataOutput(extendedDataOutput, vertex, conf);
430       extendedDataOutput.writeInt(0, extendedDataOutput.getPos());
431     } catch (IOException e) {
432       throw new IllegalStateException("writeVertexToByteArray: " +
433           "IOException", e);
434     }
435 
436     return extendedDataOutput.getByteArray();
437   }
438 
439   /**
440    * Write vertex data to byte array with the first 4 bytes as the size of the
441    * entire buffer (including the size).
442    *
443    * @param vertex Vertex to write from.
444    * @param unsafe Use unsafe serialization?
445    * @param conf Configuration
446    * @param <I> Vertex id
447    * @param <V> Vertex value
448    * @param <E> Edge value
449    * @return Byte array with serialized object.
450    */
451   public static <I extends WritableComparable, V extends Writable,
452       E extends Writable> byte[] writeVertexToByteArray(
453       Vertex<I, V, E> vertex,
454       boolean unsafe,
455       ImmutableClassesGiraphConfiguration<I, V, E> conf) {
456     return writeVertexToByteArray(vertex, null, unsafe, conf);
457   }
458 
459   /**
460   * Read vertex data from byteArray to a Writeable object, skipping the size.
461   * Serialization method is choosable. Assumes the vertex has already been
462   * initialized and contains values for Id, value, and edges.
463   *
464   * @param byteArray Byte array to find the fields in.
465   * @param vertex Vertex to fill in the fields.
466   * @param unsafe Use unsafe deserialization
467   * @param <I> Vertex id
468   * @param <V> Vertex value
469   * @param <E> Edge value
470   * @param conf Configuration
471   */
472   public static <I extends WritableComparable, V extends Writable,
473   E extends Writable> void reinitializeVertexFromByteArray(
474       byte[] byteArray,
475       Vertex<I, V, E> vertex,
476       boolean unsafe,
477       ImmutableClassesGiraphConfiguration<I, V, E> conf) {
478     ExtendedDataInput extendedDataInput;
479     if (unsafe) {
480       extendedDataInput = new UnsafeByteArrayInputStream(byteArray);
481     } else {
482       extendedDataInput = new ExtendedByteArrayDataInput(byteArray);
483     }
484     try {
485       extendedDataInput.readInt();
486       reinitializeVertexFromDataInput(extendedDataInput, vertex, conf);
487     } catch (IOException e) {
488       throw new IllegalStateException(
489           "readFieldsFromByteArrayWithSize: IOException", e);
490     }
491   }
492 
493   /**
494    * Write an edge to an output stream.
495    *
496    * @param out Data output
497    * @param edge Edge to write
498    * @param <I> Vertex id
499    * @param <E> Edge value
500    * @throws IOException
501    */
502   public static <I extends WritableComparable, E extends Writable>
503   void writeEdge(DataOutput out, Edge<I, E> edge) throws IOException {
504     edge.getTargetVertexId().write(out);
505     edge.getValue().write(out);
506   }
507 
508   /**
509    * Read an edge from an input stream.
510    *
511    * @param in Data input
512    * @param edge Edge to fill in-place
513    * @param <I> Vertex id
514    * @param <E> Edge value
515    * @throws IOException
516    */
517   public static <I extends WritableComparable, E extends Writable>
518   void readEdge(DataInput in, Edge<I, E> edge) throws IOException {
519     edge.getTargetVertexId().readFields(in);
520     edge.getValue().readFields(in);
521   }
522 
523   /**
524    * Reads data from input stream to inizialize Vertex. Assumes the vertex has
525    * already been initialized and contains values for Id, value, and edges.
526    *
527    * @param input The input stream
528    * @param vertex The vertex to initialize
529    * @param conf Configuration
530    * @param <I> Vertex id
531    * @param <V> Vertex value
532    * @param <E> Edge value
533    * @throws IOException
534    */
535   @SuppressWarnings("unchecked")
536   public static <I extends WritableComparable, V extends Writable,
537   E extends Writable> void reinitializeVertexFromDataInput(
538       DataInput input,
539       Vertex<I, V, E> vertex,
540       ImmutableClassesGiraphConfiguration<I, V, E> conf)
541     throws IOException {
542     vertex.getId().readFields(input);
543     vertex.getValue().readFields(input);
544     ((OutEdges<I, E>) vertex.getEdges()).readFields(input);
545     if (input.readBoolean()) {
546       vertex.voteToHalt();
547     } else {
548       vertex.wakeUp();
549     }
550   }
551 
552   /**
553    * Reads data from input stream to initialize Vertex.
554    *
555    * @param input The input stream
556    * @param conf Configuration
557    * @param <I> Vertex id
558    * @param <V> Vertex value
559    * @param <E> Edge value
560    * @return The vertex
561    * @throws IOException
562    */
563   public static <I extends WritableComparable, V extends Writable,
564   E extends Writable> Vertex<I, V, E>
565   readVertexFromDataInput(
566       DataInput input,
567       ImmutableClassesGiraphConfiguration<I, V, E> conf)
568     throws IOException {
569     Vertex<I, V, E> vertex = conf.createVertex();
570     I id = conf.createVertexId();
571     V value = conf.createVertexValue();
572     OutEdges<I, E> edges = conf.createOutEdges();
573     vertex.initialize(id, value, edges);
574     reinitializeVertexFromDataInput(input, vertex, conf);
575     return vertex;
576   }
577 
578   /**
579    * Writes Vertex data to output stream.
580    *
581    * @param output the output stream
582    * @param vertex The vertex to serialize
583    * @param conf Configuration
584    * @param <I> Vertex id
585    * @param <V> Vertex value
586    * @param <E> Edge value
587    * @throws IOException
588    */
589   @SuppressWarnings("unchecked")
590   public static <I extends WritableComparable, V extends Writable,
591   E extends Writable> void writeVertexToDataOutput(
592       DataOutput output,
593       Vertex<I, V, E> vertex,
594       ImmutableClassesGiraphConfiguration<I, V, E> conf)
595     throws IOException {
596     vertex.getId().write(output);
597     vertex.getValue().write(output);
598     ((OutEdges<I, E>) vertex.getEdges()).write(output);
599     output.writeBoolean(vertex.isHalted());
600   }
601 
602   /**
603    * Write class to data output. Also handles the case when class is null.
604    *
605    * @param clazz Class
606    * @param output Data output
607    * @param <T> Class type
608    */
609   public static <T> void writeClass(Class<T> clazz,
610       DataOutput output) throws IOException {
611     output.writeBoolean(clazz != null);
612     if (clazz != null) {
613       output.writeUTF(clazz.getName());
614     }
615   }
616 
617   /**
618    * Read class from data input.
619    * Matches {@link #writeClass(Class, DataOutput)}.
620    *
621    * @param input Data input
622    * @param <T> Class type
623    * @return Class, or null if null was written
624    */
625   @SuppressWarnings("unchecked")
626   public static <T> Class<T> readClass(DataInput input) throws IOException {
627     if (input.readBoolean()) {
628       String className = input.readUTF();
629       try {
630         return (Class<T>) Class.forName(className);
631       } catch (ClassNotFoundException e) {
632         throw new IllegalStateException("readClass: No class found " +
633             className);
634       }
635     } else {
636       return null;
637     }
638   }
639 
640   /**
641    * Write object to output stream
642    * @param object Object
643    * @param output Output stream
644    * @throws IOException
645    */
646   public static void writeWritableObject(
647     Writable object, DataOutput output)
648     throws IOException {
649     output.writeBoolean(object != null);
650     if (object != null) {
651       output.writeUTF(object.getClass().getName());
652       object.write(output);
653     }
654   }
655 
656   /**
657    * Reads object from input stream
658    * @param input Input stream
659    * @param conf Configuration
660    * @param <T> Object type
661    * @return Object
662    * @throws IOException
663    */
664   public static <T extends Writable>
665   T readWritableObject(DataInput input,
666       ImmutableClassesGiraphConfiguration conf) throws IOException {
667     if (input.readBoolean()) {
668       String className = input.readUTF();
669       try {
670         T object =
671             (T) ReflectionUtils.newInstance(Class.forName(className), conf);
672         object.readFields(input);
673         return object;
674       } catch (ClassNotFoundException e) {
675         throw new IllegalStateException("readWritableObject: No class found " +
676             className);
677       }
678     } else {
679       return null;
680     }
681   }
682 
683   /**
684    * Writes a list of Writable objects into output stream.
685    * This method is trying to optimize space occupied by class information only
686    * storing class object if it is different from the previous one
687    * as in most cases arrays tend to have objects of the same type inside.
688    * @param list serialized object
689    * @param output the output stream
690    * @throws IOException
691    */
692   public static void writeList(List<? extends Writable> list, DataOutput output)
693     throws IOException {
694     output.writeBoolean(list != null);
695     if (list != null) {
696       output.writeInt(list.size());
697       Class<? extends Writable> clazz = null;
698       for (Writable element : list) {
699         output.writeBoolean(element == null);
700         if (element != null) {
701           if (element.getClass() != clazz) {
702             clazz = element.getClass();
703             output.writeBoolean(true);
704             writeClass(clazz, output);
705           } else {
706             output.writeBoolean(false);
707           }
708           element.write(output);
709         }
710       }
711     }
712   }
713 
714   /**
715    * Reads list of Writable objects from data input stream.
716    * Input stream should have class information along with object data.
717    * @param input input stream
718    * @return deserialized list
719    * @throws IOException
720    */
721   public static List<? extends Writable> readList(DataInput input)
722     throws IOException {
723     try {
724       List<Writable> res = null;
725       if (input.readBoolean()) {
726         int size = input.readInt();
727         res = new ArrayList<>(size);
728         Class<? extends Writable> clazz = null;
729         for (int i = 0; i < size; i++) {
730           boolean isNull = input.readBoolean();
731           if (isNull) {
732             res.add(null);
733           } else {
734             boolean hasClassInfo = input.readBoolean();
735             if (hasClassInfo) {
736               clazz = readClass(input);
737             }
738             Writable element = clazz.newInstance();
739             element.readFields(input);
740             res.add(element);
741           }
742         }
743       }
744       return res;
745 
746     } catch (InstantiationException | IllegalAccessException e) {
747       throw new IllegalStateException("unable to instantiate object", e);
748     }
749   }
750 
751   /**
752    * Writes primitive int array of ints into output stream.
753    * Array can be null or empty.
754    * @param array array to be written
755    * @param dataOutput output stream
756    * @throws IOException
757    */
758   public static void writeIntArray(int[] array, DataOutput dataOutput)
759     throws IOException {
760     if (array != null) {
761       dataOutput.writeInt(array.length);
762       for (int r : array) {
763         dataOutput.writeInt(r);
764       }
765     } else {
766       dataOutput.writeInt(-1);
767     }
768   }
769 
770   /**
771    * Reads primitive int array from input stream.
772    * @param dataInput input stream to read from
773    * @return may return null or empty array.
774    * @throws IOException
775    */
776   public static int[] readIntArray(DataInput dataInput)
777     throws IOException {
778     int [] res = null;
779     int size = dataInput.readInt();
780     if (size >= 0) {
781       res = new int[size];
782       for (int i = 0; i < size; i++) {
783         res[i] = dataInput.readInt();
784       }
785     }
786     return res;
787   }
788 
789   /**
790    * Writes primitive long array of ints into output stream.
791    * Array can be null or empty.
792    * @param array array to be written
793    * @param dataOutput output stream
794    * @throws IOException
795    */
796   public static void writeLongArray(DataOutput dataOutput, long[] array)
797     throws IOException {
798     if (array != null) {
799       dataOutput.writeInt(array.length);
800       for (long r : array) {
801         dataOutput.writeLong(r);
802       }
803     } else {
804       dataOutput.writeInt(-1);
805     }
806   }
807   /**
808    * Reads primitive long array from input stream.
809    * @param dataInput input stream to read from
810    * @return may return null or empty array.
811    * @throws IOException
812    */
813   public static long[] readLongArray(DataInput dataInput)
814     throws IOException {
815     long [] res = null;
816     int size = dataInput.readInt();
817     if (size >= 0) {
818       res = new long[size];
819       for (int i = 0; i < size; i++) {
820         res[i] = dataInput.readLong();
821       }
822     }
823     return res;
824   }
825 
826   /**
827    * Writes enum into a stream, by serializing class name and it's index
828    * @param enumValue Enum value
829    * @param output Output stream
830    * @param <T> Enum type
831    */
832   public static <T extends Enum<T>> void writeEnum(T enumValue,
833       DataOutput output) throws IOException {
834     writeClass(
835         enumValue != null ? enumValue.getDeclaringClass() : null, output);
836     if (enumValue != null) {
837       Varint.writeUnsignedVarInt(enumValue.ordinal(), output);
838     }
839   }
840 
841   /**
842    * Reads enum from the stream, serialized by writeEnum
843    * @param input Input stream
844    * @param <T> Enum type
845    * @return Enum value
846    */
847   public static <T extends Enum<T>> T readEnum(DataInput input) throws
848       IOException {
849     Class<T> clazz = readClass(input);
850     if (clazz != null) {
851       int ordinal = Varint.readUnsignedVarInt(input);
852       try {
853         T[] values = (T[]) clazz.getDeclaredMethod("values").invoke(null);
854         return values[ordinal];
855       } catch (IllegalAccessException | IllegalArgumentException |
856           InvocationTargetException | NoSuchMethodException |
857           SecurityException e) {
858         throw new IOException("Cannot read enum", e);
859       }
860     } else {
861       return null;
862     }
863   }
864 
865 
866   /**
867    * Copy {@code from} into {@code to}, by serializing and deserializing it.
868    * Since it is creating streams inside, it's mostly useful for
869    * tests/non-performant code.
870    *
871    * @param from Object to copy from
872    * @param to Object to copy into
873    * @param <T> Type of the object
874    */
875   public static <T extends Writable> void copyInto(T from, T to) {
876     copyInto(from, to, false);
877   }
878 
879   /**
880    * Copy {@code from} into {@code to}, by serializing and deserializing it.
881    * Since it is creating streams inside, it's mostly useful for
882    * tests/non-performant code.
883    *
884    * @param from Object to copy from
885    * @param to Object to copy into
886    * @param checkOverRead if true, will add one more byte at the end of writing,
887    *                      to make sure read is not touching it. Useful for tests
888    * @param <T> Type of the object
889    */
890   public static <T extends Writable> void copyInto(
891       T from, T to, boolean checkOverRead) {
892     try {
893       if (from.getClass() != to.getClass()) {
894         throw new RuntimeException(
895             "Trying to copy from " + from.getClass() +
896             " into " + to.getClass());
897       }
898 
899       UnsafeByteArrayOutputStream out = new UnsafeByteArrayOutputStream();
900       from.write(out);
901       if (checkOverRead) {
902         out.writeByte(0);
903       }
904 
905       UnsafeByteArrayInputStream in =
906           new UnsafeByteArrayInputStream(out.getByteArray(), 0, out.getPos());
907       to.readFields(in);
908 
909       if (in.available() != (checkOverRead ? 1 : 0)) {
910         throw new RuntimeException(
911             "Serialization encountered issues with " + from.getClass() + ", " +
912             (in.available() - (checkOverRead ? 1 : 0)) + " fewer bytes read");
913       }
914     } catch (IOException e) {
915       throw new RuntimeException(e);
916     }
917   }
918 
919   /**
920    * Create a copy of Writable object, by serializing and deserializing it.
921    *
922    * @param reusableOut Reusable output stream to serialize into
923    * @param reusableIn Reusable input stream to deserialize out of
924    * @param original Original value of which to make a copy
925    * @param conf Configuration
926    * @param <T> Type of the object
927    * @return Copy of the original value
928    */
929   public static <T extends Writable> T createCopy(
930       UnsafeByteArrayOutputStream reusableOut,
931       UnsafeReusableByteArrayInput reusableIn, T original,
932       ImmutableClassesGiraphConfiguration conf) {
933     T copy = (T) createWritable(original.getClass(), conf);
934 
935     try {
936       reusableOut.reset();
937       original.write(reusableOut);
938       reusableIn.initialize(
939           reusableOut.getByteArray(), 0, reusableOut.getPos());
940       copy.readFields(reusableIn);
941 
942       if (reusableIn.available() != 0) {
943         throw new RuntimeException("Serialization of " +
944             original.getClass() + " encountered issues, " +
945             reusableIn.available() + " bytes left to be read");
946       }
947     } catch (IOException e) {
948       throw new IllegalStateException(
949           "IOException occurred while trying to create a copy " +
950           original.getClass(), e);
951     }
952     return copy;
953   }
954 
955   /**
956    * Create a copy of Writable object, by serializing and deserializing it.
957    *
958    * @param original Original value of which to make a copy
959    * @return Copy of the original value
960    * @param <T> Type of the object
961    */
962   public static final <T extends Writable> T createCopy(T original) {
963     return (T) createCopy(original, original.getClass(), null);
964   }
965 
966   /**
967    * Create a copy of Writable object, by serializing and deserializing it.
968    *
969    * @param original Original value of which to make a copy
970    * @param outputClass Expected copy class, needs to match original
971    * @param conf Configuration
972    * @return Copy of the original value
973    * @param <T> Type of the object
974    */
975   public static final <T extends Writable>
976   T createCopy(T original, Class<? extends T> outputClass,
977       ImmutableClassesGiraphConfiguration conf) {
978     T result = WritableUtils.createWritable(outputClass, conf);
979     copyInto(original, result);
980     return result;
981   }
982 
983   /**
984    * Create a copy of Writable object, by serializing and deserializing it.
985    *
986    * @param original Original value of which to make a copy
987    * @param classFactory Factory to create new empty object from
988    * @param conf Configuration
989    * @return Copy of the original value
990    * @param <T> Type of the object
991    */
992   public static final <T extends Writable>
993   T createCopy(T original, ValueFactory<T> classFactory,
994       ImmutableClassesGiraphConfiguration conf) {
995     T result = classFactory.newInstance();
996     copyInto(original, result);
997     return result;
998   }
999 
1000   /**
1001    * Serialize given writable object, and return it's size.
1002    *
1003    * @param w Writable object
1004    * @return it's size after serialization
1005    */
1006   public static int size(Writable w) {
1007     try {
1008       ExtendedByteArrayDataOutput out = new ExtendedByteArrayDataOutput();
1009       w.write(out);
1010       return out.getPos();
1011     } catch (IOException e) {
1012       throw new RuntimeException(e);
1013     }
1014   }
1015 
1016   /**
1017    * Serialize given writable to byte array,
1018    * using new instance of ExtendedByteArrayDataOutput.
1019    *
1020    * @param w Writable object
1021    * @return array of bytes
1022    * @param <T> Type of the object
1023    */
1024   public static <T extends Writable> byte[] toByteArray(T w) {
1025     try {
1026       ExtendedByteArrayDataOutput out = new ExtendedByteArrayDataOutput();
1027       w.write(out);
1028       return out.toByteArray();
1029     } catch (IOException e) {
1030       throw new RuntimeException(e);
1031     }
1032   }
1033 
1034   /**
1035    * Deserialize from given byte array into given writable,
1036    * using new instance of ExtendedByteArrayDataInput.
1037    *
1038    * @param data Byte array representing writable
1039    * @param to Object to fill
1040    * @param <T> Type of the object
1041    */
1042   public static <T extends Writable> void fromByteArray(byte[] data, T to) {
1043     try {
1044       ExtendedByteArrayDataInput in =
1045           new ExtendedByteArrayDataInput(data, 0, data.length);
1046       to.readFields(in);
1047 
1048       if (in.available() != 0) {
1049         throw new RuntimeException(
1050             "Serialization encountered issues, " + in.available() +
1051             " bytes left to be read");
1052       }
1053     } catch (IOException e) {
1054       throw new RuntimeException(e);
1055     }
1056   }
1057 
1058   /**
1059    * Serialize given writable to byte array,
1060    * using new instance of UnsafeByteArrayOutputStream.
1061    *
1062    * @param w Writable object
1063    * @return array of bytes
1064    * @param <T> Type of the object
1065    */
1066   public static <T extends Writable> byte[] toByteArrayUnsafe(T w) {
1067     try {
1068       UnsafeByteArrayOutputStream out = new UnsafeByteArrayOutputStream();
1069       w.write(out);
1070       return out.toByteArray();
1071     } catch (IOException e) {
1072       throw new RuntimeException(e);
1073     }
1074   }
1075 
1076   /**
1077    * Deserialize from given byte array into given writable,
1078    * using given reusable UnsafeReusableByteArrayInput.
1079    *
1080    * @param data Byte array representing writable
1081    * @param to Object to fill
1082    * @param reusableInput Reusable input to use
1083    * @param <T> Type of the object
1084    */
1085   public static <T extends Writable> void fromByteArrayUnsafe(
1086       byte[] data, T to, UnsafeReusableByteArrayInput reusableInput) {
1087     try {
1088       reusableInput.initialize(data, 0, data.length);
1089       to.readFields(reusableInput);
1090 
1091       if (reusableInput.available() != 0) {
1092         throw new RuntimeException(
1093             "Serialization encountered issues, " + reusableInput.available() +
1094             " bytes left to be read");
1095       }
1096     } catch (IOException e) {
1097       throw new RuntimeException(e);
1098     }
1099   }
1100 
1101   /**
1102    * First write a boolean saying whether an object is not null,
1103    * and if it's not write the object
1104    *
1105    * @param object Object to write
1106    * @param out DataOutput to write to
1107    * @param <T> Object type
1108    */
1109   public static <T extends Writable> void writeIfNotNullAndObject(T object,
1110       DataOutput out) throws IOException {
1111     out.writeBoolean(object != null);
1112     if (object != null) {
1113       object.write(out);
1114     }
1115   }
1116 
1117   /**
1118    * First read a boolean saying whether an object is not null,
1119    * and if it's not read the object
1120    *
1121    * @param reusableObject Reuse this object instance
1122    * @param objectClass Class of the object, to create if reusableObject is null
1123    * @param in DataInput to read from
1124    * @param <T> Object type
1125    * @return Object, or null
1126    */
1127   public static <T extends Writable> T readIfNotNullAndObject(T reusableObject,
1128       Class<T> objectClass, DataInput in) throws IOException {
1129     if (in.readBoolean()) {
1130       if (reusableObject == null) {
1131         reusableObject = ReflectionUtils.newInstance(objectClass);
1132       }
1133       reusableObject.readFields(in);
1134       return reusableObject;
1135     } else {
1136       return null;
1137     }
1138   }
1139 
1140 }