This project has retired. For details please refer to its
Attic page.
WritableUtils xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.utils;
20
21 import static org.apache.hadoop.util.ReflectionUtils.newInstance;
22
23 import java.io.ByteArrayInputStream;
24 import java.io.ByteArrayOutputStream;
25 import java.io.DataInput;
26 import java.io.DataInputStream;
27 import java.io.DataOutput;
28 import java.io.DataOutputStream;
29 import java.io.IOException;
30 import java.lang.reflect.InvocationTargetException;
31 import java.util.ArrayList;
32 import java.util.List;
33
34 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
35 import org.apache.giraph.edge.Edge;
36 import org.apache.giraph.edge.OutEdges;
37 import org.apache.giraph.factories.ValueFactory;
38 import org.apache.giraph.graph.Vertex;
39 import org.apache.giraph.zk.ZooKeeperExt;
40 import org.apache.giraph.zk.ZooKeeperExt.PathStat;
41 import org.apache.hadoop.conf.Configuration;
42 import org.apache.hadoop.io.NullWritable;
43 import org.apache.hadoop.io.Writable;
44 import org.apache.hadoop.io.WritableComparable;
45 import org.apache.zookeeper.CreateMode;
46 import org.apache.zookeeper.KeeperException;
47 import org.apache.zookeeper.ZooDefs.Ids;
48 import org.apache.zookeeper.data.Stat;
49
50
51
52
53 public class WritableUtils {
54
55
56
57 private WritableUtils() { }
58
59
60
61
62
63
64
65
66 public static <W extends Writable> W createWritable(Class<W> klass) {
67 return createWritable(klass, null);
68 }
69
70
71
72
73
74
75
76
77
78 public static <W extends Writable> W createWritable(
79 Class<W> klass,
80 ImmutableClassesGiraphConfiguration configuration) {
81 W result;
82 if (NullWritable.class.equals(klass)) {
83 result = (W) NullWritable.get();
84 } else {
85 result = ReflectionUtils.newInstance(klass);
86 }
87 ConfigurationUtils.configureIfPossible(result, configuration);
88 return result;
89 }
90
91
92
93
94
95
96
97
98 public static void readFieldsFromByteArray(
99 byte[] byteArray, Writable... writableObjects) {
100 DataInputStream inputStream =
101 new DataInputStream(new ByteArrayInputStream(byteArray));
102 try {
103 for (Writable writableObject : writableObjects) {
104 writableObject.readFields(inputStream);
105 }
106 } catch (IOException e) {
107 throw new IllegalStateException(
108 "readFieldsFromByteArray: IOException", e);
109 }
110 }
111
112
113
114
115
116
117
118
119
120
121 public static void readFieldsFromZnode(ZooKeeperExt zkExt,
122 String zkPath,
123 boolean watch,
124 Stat stat,
125 Writable... writableObjects) {
126 try {
127 byte[] zkData = zkExt.getData(zkPath, false, stat);
128 readFieldsFromByteArray(zkData, writableObjects);
129 } catch (KeeperException e) {
130 throw new IllegalStateException(
131 "readFieldsFromZnode: KeeperException on " + zkPath, e);
132 } catch (InterruptedException e) {
133 throw new IllegalStateException(
134 "readFieldsFromZnode: InterrruptedStateException on " + zkPath, e);
135 }
136 }
137
138
139
140
141
142
143
144 public static byte[] writeToByteArray(Writable... writableObjects) {
145 ByteArrayOutputStream outputStream =
146 new ByteArrayOutputStream();
147 DataOutput output = new DataOutputStream(outputStream);
148 try {
149 for (Writable writableObject : writableObjects) {
150 writableObject.write(output);
151 }
152 } catch (IOException e) {
153 throw new IllegalStateException(
154 "writeToByteArray: IOStateException", e);
155 }
156 return outputStream.toByteArray();
157 }
158
159
160
161
162
163
164
165
166
167 public static void readFieldsFromByteArrayWithSize(
168 byte[] byteArray, Writable writableObject, boolean unsafe) {
169 ExtendedDataInput extendedDataInput;
170 if (unsafe) {
171 extendedDataInput = new UnsafeByteArrayInputStream(byteArray);
172 } else {
173 extendedDataInput = new ExtendedByteArrayDataInput(byteArray);
174 }
175 try {
176 extendedDataInput.readInt();
177 writableObject.readFields(extendedDataInput);
178 } catch (IOException e) {
179 throw new IllegalStateException(
180 "readFieldsFromByteArrayWithSize: IOException", e);
181 }
182 }
183
184
185
186
187
188
189
190
191
192 public static byte[] writeToByteArrayWithSize(Writable writableObject,
193 boolean unsafe) {
194 return writeToByteArrayWithSize(writableObject, null, unsafe);
195 }
196
197
198
199
200
201
202
203
204
205
206 public static byte[] writeToByteArrayWithSize(Writable writableObject,
207 byte[] buffer,
208 boolean unsafe) {
209 ExtendedDataOutput extendedDataOutput;
210 if (unsafe) {
211 extendedDataOutput = new UnsafeByteArrayOutputStream(buffer);
212 } else {
213 extendedDataOutput = new ExtendedByteArrayDataOutput(buffer);
214 }
215 try {
216 extendedDataOutput.writeInt(-1);
217 writableObject.write(extendedDataOutput);
218 extendedDataOutput.writeInt(0, extendedDataOutput.getPos());
219 } catch (IOException e) {
220 throw new IllegalStateException("writeToByteArrayWithSize: " +
221 "IOException", e);
222 }
223
224 return extendedDataOutput.getByteArray();
225 }
226
227
228
229
230
231
232
233
234
235
236 public static PathStat writeToZnode(ZooKeeperExt zkExt,
237 String zkPath,
238 int version,
239 Writable... writableObjects) {
240 try {
241 byte[] byteArray = writeToByteArray(writableObjects);
242 return zkExt.createOrSetExt(zkPath,
243 byteArray,
244 Ids.OPEN_ACL_UNSAFE,
245 CreateMode.PERSISTENT,
246 true,
247 version);
248 } catch (KeeperException e) {
249 throw new IllegalStateException(
250 "writeToZnode: KeeperException on " + zkPath, e);
251 } catch (InterruptedException e) {
252 throw new IllegalStateException(
253 "writeToZnode: InterruptedException on " + zkPath, e);
254 }
255 }
256
257
258
259
260
261
262
263 public static byte[] writeListToByteArray(
264 List<? extends Writable> writableList) {
265 ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
266 DataOutput output = new DataOutputStream(outputStream);
267 try {
268 output.writeInt(writableList.size());
269 for (Writable writable : writableList) {
270 writable.write(output);
271 }
272 } catch (IOException e) {
273 throw new IllegalStateException(
274 "writeListToByteArray: IOException", e);
275 }
276 return outputStream.toByteArray();
277 }
278
279
280
281
282
283
284
285
286
287
288 public static PathStat writeListToZnode(
289 ZooKeeperExt zkExt,
290 String zkPath,
291 int version,
292 List<? extends Writable> writableList) {
293 try {
294 return zkExt.createOrSetExt(
295 zkPath,
296 writeListToByteArray(writableList),
297 Ids.OPEN_ACL_UNSAFE,
298 CreateMode.PERSISTENT,
299 true,
300 version);
301 } catch (KeeperException e) {
302 throw new IllegalStateException(
303 "writeListToZnode: KeeperException on " + zkPath, e);
304 } catch (InterruptedException e) {
305 throw new IllegalStateException(
306 "writeListToZnode: InterruptedException on " + zkPath, e);
307 }
308 }
309
310
311
312
313
314
315
316
317
318
319 public static <T extends Writable> List<T> readListFieldsFromByteArray(
320 byte[] byteArray,
321 Class<? extends T> writableClass,
322 Configuration conf) {
323 try {
324 DataInputStream inputStream =
325 new DataInputStream(new ByteArrayInputStream(byteArray));
326 int size = inputStream.readInt();
327 List<T> writableList = new ArrayList<T>(size);
328 for (int i = 0; i < size; ++i) {
329 T writable = newInstance(writableClass, conf);
330 writable.readFields(inputStream);
331 writableList.add(writable);
332 }
333 return writableList;
334 } catch (IOException e) {
335 throw new IllegalStateException(
336 "readListFieldsFromZnode: IOException", e);
337 }
338 }
339
340
341
342
343
344
345
346
347
348
349
350
351
352 public static <T extends Writable> List<T> readListFieldsFromZnode(
353 ZooKeeperExt zkExt,
354 String zkPath,
355 boolean watch,
356 Stat stat,
357 Class<? extends T> writableClass,
358 Configuration conf) {
359 try {
360 byte[] zkData = zkExt.getData(zkPath, false, stat);
361 return WritableUtils.<T>readListFieldsFromByteArray(zkData,
362 writableClass, conf);
363 } catch (KeeperException e) {
364 throw new IllegalStateException(
365 "readListFieldsFromZnode: KeeperException on " + zkPath, e);
366 } catch (InterruptedException e) {
367 throw new IllegalStateException(
368 "readListFieldsFromZnode: InterruptedException on " + zkPath,
369 e);
370 }
371 }
372
373
374
375
376
377
378
379 public static void writeExtendedDataOutput(
380 ExtendedDataOutput extendedDataOutput, DataOutput out)
381 throws IOException {
382 out.writeInt(extendedDataOutput.getPos());
383 out.write(
384 extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
385 }
386
387
388
389
390
391
392
393
394 public static ExtendedDataOutput readExtendedDataOutput(DataInput in,
395 ImmutableClassesGiraphConfiguration conf) throws IOException {
396 int size = in.readInt();
397 byte[] buf = new byte[size];
398 in.readFully(buf);
399 return conf.createExtendedDataOutput(buf, size);
400 }
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415 public static <I extends WritableComparable, V extends Writable,
416 E extends Writable> byte[] writeVertexToByteArray(
417 Vertex<I, V, E> vertex,
418 byte[] buffer,
419 boolean unsafe,
420 ImmutableClassesGiraphConfiguration<I, V, E> conf) {
421 ExtendedDataOutput extendedDataOutput;
422 if (unsafe) {
423 extendedDataOutput = new UnsafeByteArrayOutputStream(buffer);
424 } else {
425 extendedDataOutput = new ExtendedByteArrayDataOutput(buffer);
426 }
427 try {
428 extendedDataOutput.writeInt(-1);
429 writeVertexToDataOutput(extendedDataOutput, vertex, conf);
430 extendedDataOutput.writeInt(0, extendedDataOutput.getPos());
431 } catch (IOException e) {
432 throw new IllegalStateException("writeVertexToByteArray: " +
433 "IOException", e);
434 }
435
436 return extendedDataOutput.getByteArray();
437 }
438
439
440
441
442
443
444
445
446
447
448
449
450
451 public static <I extends WritableComparable, V extends Writable,
452 E extends Writable> byte[] writeVertexToByteArray(
453 Vertex<I, V, E> vertex,
454 boolean unsafe,
455 ImmutableClassesGiraphConfiguration<I, V, E> conf) {
456 return writeVertexToByteArray(vertex, null, unsafe, conf);
457 }
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472 public static <I extends WritableComparable, V extends Writable,
473 E extends Writable> void reinitializeVertexFromByteArray(
474 byte[] byteArray,
475 Vertex<I, V, E> vertex,
476 boolean unsafe,
477 ImmutableClassesGiraphConfiguration<I, V, E> conf) {
478 ExtendedDataInput extendedDataInput;
479 if (unsafe) {
480 extendedDataInput = new UnsafeByteArrayInputStream(byteArray);
481 } else {
482 extendedDataInput = new ExtendedByteArrayDataInput(byteArray);
483 }
484 try {
485 extendedDataInput.readInt();
486 reinitializeVertexFromDataInput(extendedDataInput, vertex, conf);
487 } catch (IOException e) {
488 throw new IllegalStateException(
489 "readFieldsFromByteArrayWithSize: IOException", e);
490 }
491 }
492
493
494
495
496
497
498
499
500
501
502 public static <I extends WritableComparable, E extends Writable>
503 void writeEdge(DataOutput out, Edge<I, E> edge) throws IOException {
504 edge.getTargetVertexId().write(out);
505 edge.getValue().write(out);
506 }
507
508
509
510
511
512
513
514
515
516
517 public static <I extends WritableComparable, E extends Writable>
518 void readEdge(DataInput in, Edge<I, E> edge) throws IOException {
519 edge.getTargetVertexId().readFields(in);
520 edge.getValue().readFields(in);
521 }
522
523
524
525
526
527
528
529
530
531
532
533
534
535 @SuppressWarnings("unchecked")
536 public static <I extends WritableComparable, V extends Writable,
537 E extends Writable> void reinitializeVertexFromDataInput(
538 DataInput input,
539 Vertex<I, V, E> vertex,
540 ImmutableClassesGiraphConfiguration<I, V, E> conf)
541 throws IOException {
542 vertex.getId().readFields(input);
543 vertex.getValue().readFields(input);
544 ((OutEdges<I, E>) vertex.getEdges()).readFields(input);
545 if (input.readBoolean()) {
546 vertex.voteToHalt();
547 } else {
548 vertex.wakeUp();
549 }
550 }
551
552
553
554
555
556
557
558
559
560
561
562
563 public static <I extends WritableComparable, V extends Writable,
564 E extends Writable> Vertex<I, V, E>
565 readVertexFromDataInput(
566 DataInput input,
567 ImmutableClassesGiraphConfiguration<I, V, E> conf)
568 throws IOException {
569 Vertex<I, V, E> vertex = conf.createVertex();
570 I id = conf.createVertexId();
571 V value = conf.createVertexValue();
572 OutEdges<I, E> edges = conf.createOutEdges();
573 vertex.initialize(id, value, edges);
574 reinitializeVertexFromDataInput(input, vertex, conf);
575 return vertex;
576 }
577
578
579
580
581
582
583
584
585
586
587
588
589 @SuppressWarnings("unchecked")
590 public static <I extends WritableComparable, V extends Writable,
591 E extends Writable> void writeVertexToDataOutput(
592 DataOutput output,
593 Vertex<I, V, E> vertex,
594 ImmutableClassesGiraphConfiguration<I, V, E> conf)
595 throws IOException {
596 vertex.getId().write(output);
597 vertex.getValue().write(output);
598 ((OutEdges<I, E>) vertex.getEdges()).write(output);
599 output.writeBoolean(vertex.isHalted());
600 }
601
602
603
604
605
606
607
608
609 public static <T> void writeClass(Class<T> clazz,
610 DataOutput output) throws IOException {
611 output.writeBoolean(clazz != null);
612 if (clazz != null) {
613 output.writeUTF(clazz.getName());
614 }
615 }
616
617
618
619
620
621
622
623
624
625 @SuppressWarnings("unchecked")
626 public static <T> Class<T> readClass(DataInput input) throws IOException {
627 if (input.readBoolean()) {
628 String className = input.readUTF();
629 try {
630 return (Class<T>) Class.forName(className);
631 } catch (ClassNotFoundException e) {
632 throw new IllegalStateException("readClass: No class found " +
633 className);
634 }
635 } else {
636 return null;
637 }
638 }
639
640
641
642
643
644
645
646 public static void writeWritableObject(
647 Writable object, DataOutput output)
648 throws IOException {
649 output.writeBoolean(object != null);
650 if (object != null) {
651 output.writeUTF(object.getClass().getName());
652 object.write(output);
653 }
654 }
655
656
657
658
659
660
661
662
663
664 public static <T extends Writable>
665 T readWritableObject(DataInput input,
666 ImmutableClassesGiraphConfiguration conf) throws IOException {
667 if (input.readBoolean()) {
668 String className = input.readUTF();
669 try {
670 T object =
671 (T) ReflectionUtils.newInstance(Class.forName(className), conf);
672 object.readFields(input);
673 return object;
674 } catch (ClassNotFoundException e) {
675 throw new IllegalStateException("readWritableObject: No class found " +
676 className);
677 }
678 } else {
679 return null;
680 }
681 }
682
683
684
685
686
687
688
689
690
691
692 public static void writeList(List<? extends Writable> list, DataOutput output)
693 throws IOException {
694 output.writeBoolean(list != null);
695 if (list != null) {
696 output.writeInt(list.size());
697 Class<? extends Writable> clazz = null;
698 for (Writable element : list) {
699 output.writeBoolean(element == null);
700 if (element != null) {
701 if (element.getClass() != clazz) {
702 clazz = element.getClass();
703 output.writeBoolean(true);
704 writeClass(clazz, output);
705 } else {
706 output.writeBoolean(false);
707 }
708 element.write(output);
709 }
710 }
711 }
712 }
713
714
715
716
717
718
719
720
721 public static List<? extends Writable> readList(DataInput input)
722 throws IOException {
723 try {
724 List<Writable> res = null;
725 if (input.readBoolean()) {
726 int size = input.readInt();
727 res = new ArrayList<>(size);
728 Class<? extends Writable> clazz = null;
729 for (int i = 0; i < size; i++) {
730 boolean isNull = input.readBoolean();
731 if (isNull) {
732 res.add(null);
733 } else {
734 boolean hasClassInfo = input.readBoolean();
735 if (hasClassInfo) {
736 clazz = readClass(input);
737 }
738 Writable element = clazz.newInstance();
739 element.readFields(input);
740 res.add(element);
741 }
742 }
743 }
744 return res;
745
746 } catch (InstantiationException | IllegalAccessException e) {
747 throw new IllegalStateException("unable to instantiate object", e);
748 }
749 }
750
751
752
753
754
755
756
757
758 public static void writeIntArray(int[] array, DataOutput dataOutput)
759 throws IOException {
760 if (array != null) {
761 dataOutput.writeInt(array.length);
762 for (int r : array) {
763 dataOutput.writeInt(r);
764 }
765 } else {
766 dataOutput.writeInt(-1);
767 }
768 }
769
770
771
772
773
774
775
776 public static int[] readIntArray(DataInput dataInput)
777 throws IOException {
778 int [] res = null;
779 int size = dataInput.readInt();
780 if (size >= 0) {
781 res = new int[size];
782 for (int i = 0; i < size; i++) {
783 res[i] = dataInput.readInt();
784 }
785 }
786 return res;
787 }
788
789
790
791
792
793
794
795
796 public static void writeLongArray(DataOutput dataOutput, long[] array)
797 throws IOException {
798 if (array != null) {
799 dataOutput.writeInt(array.length);
800 for (long r : array) {
801 dataOutput.writeLong(r);
802 }
803 } else {
804 dataOutput.writeInt(-1);
805 }
806 }
807
808
809
810
811
812
813 public static long[] readLongArray(DataInput dataInput)
814 throws IOException {
815 long [] res = null;
816 int size = dataInput.readInt();
817 if (size >= 0) {
818 res = new long[size];
819 for (int i = 0; i < size; i++) {
820 res[i] = dataInput.readLong();
821 }
822 }
823 return res;
824 }
825
826
827
828
829
830
831
832 public static <T extends Enum<T>> void writeEnum(T enumValue,
833 DataOutput output) throws IOException {
834 writeClass(
835 enumValue != null ? enumValue.getDeclaringClass() : null, output);
836 if (enumValue != null) {
837 Varint.writeUnsignedVarInt(enumValue.ordinal(), output);
838 }
839 }
840
841
842
843
844
845
846
847 public static <T extends Enum<T>> T readEnum(DataInput input) throws
848 IOException {
849 Class<T> clazz = readClass(input);
850 if (clazz != null) {
851 int ordinal = Varint.readUnsignedVarInt(input);
852 try {
853 T[] values = (T[]) clazz.getDeclaredMethod("values").invoke(null);
854 return values[ordinal];
855 } catch (IllegalAccessException | IllegalArgumentException |
856 InvocationTargetException | NoSuchMethodException |
857 SecurityException e) {
858 throw new IOException("Cannot read enum", e);
859 }
860 } else {
861 return null;
862 }
863 }
864
865
866
867
868
869
870
871
872
873
874
875 public static <T extends Writable> void copyInto(T from, T to) {
876 copyInto(from, to, false);
877 }
878
879
880
881
882
883
884
885
886
887
888
889
890 public static <T extends Writable> void copyInto(
891 T from, T to, boolean checkOverRead) {
892 try {
893 if (from.getClass() != to.getClass()) {
894 throw new RuntimeException(
895 "Trying to copy from " + from.getClass() +
896 " into " + to.getClass());
897 }
898
899 UnsafeByteArrayOutputStream out = new UnsafeByteArrayOutputStream();
900 from.write(out);
901 if (checkOverRead) {
902 out.writeByte(0);
903 }
904
905 UnsafeByteArrayInputStream in =
906 new UnsafeByteArrayInputStream(out.getByteArray(), 0, out.getPos());
907 to.readFields(in);
908
909 if (in.available() != (checkOverRead ? 1 : 0)) {
910 throw new RuntimeException(
911 "Serialization encountered issues with " + from.getClass() + ", " +
912 (in.available() - (checkOverRead ? 1 : 0)) + " fewer bytes read");
913 }
914 } catch (IOException e) {
915 throw new RuntimeException(e);
916 }
917 }
918
919
920
921
922
923
924
925
926
927
928
929 public static <T extends Writable> T createCopy(
930 UnsafeByteArrayOutputStream reusableOut,
931 UnsafeReusableByteArrayInput reusableIn, T original,
932 ImmutableClassesGiraphConfiguration conf) {
933 T copy = (T) createWritable(original.getClass(), conf);
934
935 try {
936 reusableOut.reset();
937 original.write(reusableOut);
938 reusableIn.initialize(
939 reusableOut.getByteArray(), 0, reusableOut.getPos());
940 copy.readFields(reusableIn);
941
942 if (reusableIn.available() != 0) {
943 throw new RuntimeException("Serialization of " +
944 original.getClass() + " encountered issues, " +
945 reusableIn.available() + " bytes left to be read");
946 }
947 } catch (IOException e) {
948 throw new IllegalStateException(
949 "IOException occurred while trying to create a copy " +
950 original.getClass(), e);
951 }
952 return copy;
953 }
954
955
956
957
958
959
960
961
962 public static final <T extends Writable> T createCopy(T original) {
963 return (T) createCopy(original, original.getClass(), null);
964 }
965
966
967
968
969
970
971
972
973
974
975 public static final <T extends Writable>
976 T createCopy(T original, Class<? extends T> outputClass,
977 ImmutableClassesGiraphConfiguration conf) {
978 T result = WritableUtils.createWritable(outputClass, conf);
979 copyInto(original, result);
980 return result;
981 }
982
983
984
985
986
987
988
989
990
991
992 public static final <T extends Writable>
993 T createCopy(T original, ValueFactory<T> classFactory,
994 ImmutableClassesGiraphConfiguration conf) {
995 T result = classFactory.newInstance();
996 copyInto(original, result);
997 return result;
998 }
999
1000
1001
1002
1003
1004
1005
1006 public static int size(Writable w) {
1007 try {
1008 ExtendedByteArrayDataOutput out = new ExtendedByteArrayDataOutput();
1009 w.write(out);
1010 return out.getPos();
1011 } catch (IOException e) {
1012 throw new RuntimeException(e);
1013 }
1014 }
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024 public static <T extends Writable> byte[] toByteArray(T w) {
1025 try {
1026 ExtendedByteArrayDataOutput out = new ExtendedByteArrayDataOutput();
1027 w.write(out);
1028 return out.toByteArray();
1029 } catch (IOException e) {
1030 throw new RuntimeException(e);
1031 }
1032 }
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042 public static <T extends Writable> void fromByteArray(byte[] data, T to) {
1043 try {
1044 ExtendedByteArrayDataInput in =
1045 new ExtendedByteArrayDataInput(data, 0, data.length);
1046 to.readFields(in);
1047
1048 if (in.available() != 0) {
1049 throw new RuntimeException(
1050 "Serialization encountered issues, " + in.available() +
1051 " bytes left to be read");
1052 }
1053 } catch (IOException e) {
1054 throw new RuntimeException(e);
1055 }
1056 }
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066 public static <T extends Writable> byte[] toByteArrayUnsafe(T w) {
1067 try {
1068 UnsafeByteArrayOutputStream out = new UnsafeByteArrayOutputStream();
1069 w.write(out);
1070 return out.toByteArray();
1071 } catch (IOException e) {
1072 throw new RuntimeException(e);
1073 }
1074 }
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085 public static <T extends Writable> void fromByteArrayUnsafe(
1086 byte[] data, T to, UnsafeReusableByteArrayInput reusableInput) {
1087 try {
1088 reusableInput.initialize(data, 0, data.length);
1089 to.readFields(reusableInput);
1090
1091 if (reusableInput.available() != 0) {
1092 throw new RuntimeException(
1093 "Serialization encountered issues, " + reusableInput.available() +
1094 " bytes left to be read");
1095 }
1096 } catch (IOException e) {
1097 throw new RuntimeException(e);
1098 }
1099 }
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109 public static <T extends Writable> void writeIfNotNullAndObject(T object,
1110 DataOutput out) throws IOException {
1111 out.writeBoolean(object != null);
1112 if (object != null) {
1113 object.write(out);
1114 }
1115 }
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127 public static <T extends Writable> T readIfNotNullAndObject(T reusableObject,
1128 Class<T> objectClass, DataInput in) throws IOException {
1129 if (in.readBoolean()) {
1130 if (reusableObject == null) {
1131 reusableObject = ReflectionUtils.newInstance(objectClass);
1132 }
1133 reusableObject.readFields(in);
1134 return reusableObject;
1135 } else {
1136 return null;
1137 }
1138 }
1139
1140 }