View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.utils;
19  
20  import java.io.IOException;
21  import java.io.OutputStream;
22  import java.io.UTFDataFormatException;
23  import java.lang.reflect.Field;
24  import java.util.Arrays;
25  
26  import static org.apache.giraph.utils.ByteUtils.SIZE_OF_BOOLEAN;
27  import static org.apache.giraph.utils.ByteUtils.SIZE_OF_BYTE;
28  import static org.apache.giraph.utils.ByteUtils.SIZE_OF_CHAR;
29  import static org.apache.giraph.utils.ByteUtils.SIZE_OF_SHORT;
30  import static org.apache.giraph.utils.ByteUtils.SIZE_OF_INT;
31  import static org.apache.giraph.utils.ByteUtils.SIZE_OF_LONG;
32  import static org.apache.giraph.utils.ByteUtils.SIZE_OF_FLOAT;
33  import static org.apache.giraph.utils.ByteUtils.SIZE_OF_DOUBLE;
34  
35  /**
36   * Byte array output stream that uses Unsafe methods to serialize/deserialize
37   * much faster
38   */
39  public class UnsafeByteArrayOutputStream extends OutputStream
40    implements ExtendedDataOutput {
41    static {
42      try {
43        Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
44        field.setAccessible(true);
45        UNSAFE = (sun.misc.Unsafe) field.get(null);
46        // Checkstyle exception due to needing to check if unsafe is allowed
47        // CHECKSTYLE: stop IllegalCatch
48      } catch (Exception e) {
49        // CHECKSTYLE: resume IllegalCatch
50        throw new RuntimeException("UnsafeByteArrayOutputStream: Failed to " +
51            "get unsafe", e);
52      }
53    }
54  
55    /** Default number of bytes */
56    private static final int DEFAULT_BYTES = 32;
57    /** Access to the unsafe class */
58    private static final sun.misc.Unsafe UNSAFE;
59  
60    /** Offset of a byte array */
61    private static final long BYTE_ARRAY_OFFSET  =
62        UNSAFE.arrayBaseOffset(byte[].class);
63  
64    /** Byte buffer */
65    private byte[] buf;
66    /** Position in the buffer */
67    private int pos = 0;
68  
69    /**
70     * Constructor
71     */
72    public UnsafeByteArrayOutputStream() {
73      this(DEFAULT_BYTES);
74    }
75  
76    /**
77     * Constructor
78     *
79     * @param size Initial size of the underlying byte array
80     */
81    public UnsafeByteArrayOutputStream(int size) {
82      buf = new byte[size];
83    }
84  
85    /**
86     * Constructor to take in a buffer
87     *
88     * @param buf Buffer to start with, or if null, create own buffer
89     */
90    public UnsafeByteArrayOutputStream(byte[] buf) {
91      if (buf == null) {
92        this.buf = new byte[DEFAULT_BYTES];
93      } else {
94        this.buf = buf;
95      }
96    }
97  
98    /**
99     * Constructor to take in a buffer with a given position into that buffer
100    *
101    * @param buf Buffer to start with
102    * @param pos Position to write at the buffer
103    */
104   public UnsafeByteArrayOutputStream(byte[] buf, int pos) {
105     this(buf);
106     this.pos = pos;
107   }
108 
109   /**
110    * Ensure that this buffer has enough remaining space to add the size.
111    * Creates and copies to a new buffer if necessary
112    *
113    * @param size Size to add
114    */
115   private void ensureSize(int size) {
116     if (pos + size > buf.length) {
117       byte[] newBuf = new byte[(buf.length + size) << 1];
118       System.arraycopy(buf, 0, newBuf, 0, pos);
119       buf = newBuf;
120     }
121   }
122 
123   @Override
124   public byte[] getByteArray() {
125     return buf;
126   }
127 
128   @Override
129   public byte[] toByteArray() {
130     return Arrays.copyOf(buf, pos);
131   }
132 
133   @Override
134   public byte[] toByteArray(int offset, int length) {
135     if (offset + length > pos) {
136       throw new IndexOutOfBoundsException(String.format("Offset: %d + " +
137           "Length: %d exceeds the size of buf : %d", offset, length, pos));
138     }
139     return Arrays.copyOfRange(buf, offset, length);
140   }
141 
142   @Override
143   public void reset() {
144     pos = 0;
145   }
146 
147   @Override
148   public int getPos() {
149     return pos;
150   }
151 
152   @Override
153   public void write(int b) throws IOException {
154     ensureSize(SIZE_OF_BYTE);
155     buf[pos] = (byte) b;
156     pos += SIZE_OF_BYTE;
157   }
158 
159   @Override
160   public void write(byte[] b) throws IOException {
161     ensureSize(b.length);
162     System.arraycopy(b, 0, buf, pos, b.length);
163     pos += b.length;
164   }
165 
166   @Override
167   public void write(byte[] b, int off, int len) throws IOException {
168     ensureSize(len);
169     System.arraycopy(b, off, buf, pos, len);
170     pos += len;
171   }
172 
173   @Override
174   public void writeBoolean(boolean v) throws IOException {
175     ensureSize(SIZE_OF_BOOLEAN);
176     UNSAFE.putBoolean(buf, BYTE_ARRAY_OFFSET + pos, v);
177     pos += SIZE_OF_BOOLEAN;
178   }
179 
180   @Override
181   public void writeByte(int v) throws IOException {
182     ensureSize(SIZE_OF_BYTE);
183     UNSAFE.putByte(buf, BYTE_ARRAY_OFFSET + pos, (byte) v);
184     pos += SIZE_OF_BYTE;
185   }
186 
187   @Override
188   public void writeShort(int v) throws IOException {
189     ensureSize(SIZE_OF_SHORT);
190     UNSAFE.putShort(buf, BYTE_ARRAY_OFFSET + pos, (short) v);
191     pos += SIZE_OF_SHORT;
192   }
193 
194   @Override
195   public void writeChar(int v) throws IOException {
196     ensureSize(SIZE_OF_CHAR);
197     UNSAFE.putChar(buf, BYTE_ARRAY_OFFSET + pos, (char) v);
198     pos += SIZE_OF_CHAR;
199   }
200 
201   @Override
202   public void writeInt(int v) throws IOException {
203     ensureSize(SIZE_OF_INT);
204     UNSAFE.putInt(buf, BYTE_ARRAY_OFFSET + pos, v);
205     pos += SIZE_OF_INT;
206   }
207 
208   @Override
209   public void ensureWritable(int minSize) {
210     if ((pos + minSize) > buf.length) {
211       buf = Arrays.copyOf(buf, Math.max(buf.length << 1, pos + minSize));
212     }
213   }
214 
215   @Override
216   public void skipBytes(int bytesToSkip) {
217     ensureWritable(bytesToSkip);
218     pos += bytesToSkip;
219   }
220 
221   @Override
222   public void writeInt(int pos, int value) {
223     if (pos + SIZE_OF_INT > this.pos) {
224       throw new IndexOutOfBoundsException(
225           "writeInt: Tried to write int to position " + pos +
226               " but current length is " + this.pos);
227     }
228     UNSAFE.putInt(buf, BYTE_ARRAY_OFFSET + pos, value);
229   }
230 
231   @Override
232   public void writeLong(long v) throws IOException {
233     ensureSize(SIZE_OF_LONG);
234     UNSAFE.putLong(buf, BYTE_ARRAY_OFFSET + pos, v);
235     pos += SIZE_OF_LONG;
236   }
237 
238   @Override
239   public void writeFloat(float v) throws IOException {
240     ensureSize(SIZE_OF_FLOAT);
241     UNSAFE.putFloat(buf, BYTE_ARRAY_OFFSET + pos, v);
242     pos += SIZE_OF_FLOAT;
243   }
244 
245   @Override
246   public void writeDouble(double v) throws IOException {
247     ensureSize(SIZE_OF_DOUBLE);
248     UNSAFE.putDouble(buf, BYTE_ARRAY_OFFSET + pos, v);
249     pos += SIZE_OF_DOUBLE;
250   }
251 
252   @Override
253   public void writeBytes(String s) throws IOException {
254     // Note that this code is mostly copied from DataOutputStream
255     int len = s.length();
256     ensureSize(len);
257     for (int i = 0; i < len; i++) {
258       int v = s.charAt(i);
259       writeByte(v);
260     }
261   }
262 
263   @Override
264   public void writeChars(String s) throws IOException {
265     // Note that this code is mostly copied from DataOutputStream
266     int len = s.length();
267     ensureSize(len * SIZE_OF_CHAR);
268     for (int i = 0; i < len; i++) {
269       int v = s.charAt(i);
270       writeChar(v);
271     }
272   }
273 
274   @Override
275   public void writeUTF(String s) throws IOException {
276     // Note that this code is mostly copied from DataOutputStream
277     int strlen = s.length();
278     int utflen = 0;
279     int c;
280 
281     /* use charAt instead of copying String to char array */
282     for (int i = 0; i < strlen; i++) {
283       c = s.charAt(i);
284       if ((c >= 0x0001) && (c <= 0x007F)) {
285         utflen++;
286       } else if (c > 0x07FF) {
287         utflen += 3;
288       } else {
289         utflen += 2;
290       }
291     }
292 
293     if (utflen > 65535) {
294       throw new UTFDataFormatException(
295           "encoded string too long: " + utflen + " bytes");
296     }
297 
298     ensureSize(utflen + SIZE_OF_SHORT);
299     writeShort(utflen);
300 
301     int i = 0;
302     for (i = 0; i < strlen; i++) {
303       c = s.charAt(i);
304       if (!((c >= 0x0001) && (c <= 0x007F))) {
305         break;
306       }
307       buf[pos++] = (byte) c;
308     }
309 
310     for (; i < strlen; i++) {
311       c = s.charAt(i);
312       if ((c >= 0x0001) && (c <= 0x007F)) {
313         buf[pos++] = (byte) c;
314 
315       } else if (c > 0x07FF) {
316         buf[pos++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
317         buf[pos++] = (byte) (0x80 | ((c >>  6) & 0x3F));
318         buf[pos++] = (byte) (0x80 | ((c >>  0) & 0x3F));
319       } else {
320         buf[pos++] = (byte) (0xC0 | ((c >>  6) & 0x1F));
321         buf[pos++] = (byte) (0x80 | ((c >>  0) & 0x3F));
322       }
323     }
324   }
325 }