This project has retired. For details please refer to its Attic page.
UnsafeByteArrayOutputStream xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.utils;
19  
20  import com.esotericsoftware.kryo.io.Output;
21  
22  import java.io.IOException;
23  import java.io.UTFDataFormatException;
24  import java.lang.reflect.Field;
25  import java.util.Arrays;
26  
27  import static org.apache.giraph.utils.ByteUtils.SIZE_OF_BOOLEAN;
28  import static org.apache.giraph.utils.ByteUtils.SIZE_OF_BYTE;
29  import static org.apache.giraph.utils.ByteUtils.SIZE_OF_CHAR;
30  import static org.apache.giraph.utils.ByteUtils.SIZE_OF_SHORT;
31  import static org.apache.giraph.utils.ByteUtils.SIZE_OF_INT;
32  import static org.apache.giraph.utils.ByteUtils.SIZE_OF_LONG;
33  import static org.apache.giraph.utils.ByteUtils.SIZE_OF_FLOAT;
34  import static org.apache.giraph.utils.ByteUtils.SIZE_OF_DOUBLE;
35  
36  /**
37   * Byte array output stream that uses Unsafe methods to serialize/deserialize
38   * much faster.
39   *
40   * This stream now extends com.esotericsoftware.kryo.io.Output so that kryo
41   * serialization can directly write to this stream without using an
42   * additional buffer, providing a faster serialization.
43   *
44   * Users of this class has to explicitly close the stream to avoid style check
45   * errors even though close is no-op when the underlying stream is not set.
46   */
47  public class UnsafeByteArrayOutputStream extends Output
48    implements ExtendedDataOutput {
49    static {
50      try {
51        Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
52        field.setAccessible(true);
53        UNSAFE = (sun.misc.Unsafe) field.get(null);
54        // Checkstyle exception due to needing to check if unsafe is allowed
55        // CHECKSTYLE: stop IllegalCatch
56      } catch (Exception e) {
57        // CHECKSTYLE: resume IllegalCatch
58        throw new RuntimeException("UnsafeByteArrayOutputStream: Failed to " +
59            "get unsafe", e);
60      }
61    }
62  
63    /** Default number of bytes */
64    private static final int DEFAULT_BYTES = 32;
65    /** Access to the unsafe class */
66    private static final sun.misc.Unsafe UNSAFE;
67  
68    /** Offset of a byte array */
69    private static final long BYTE_ARRAY_OFFSET  =
70        UNSAFE.arrayBaseOffset(byte[].class);
71  
72    /**
73     * Constructor
74     */
75    public UnsafeByteArrayOutputStream() {
76      this(DEFAULT_BYTES);
77    }
78  
79    /**
80     * Constructor
81     *
82     * @param size Initial size of the underlying byte array
83     */
84    public UnsafeByteArrayOutputStream(int size) {
85      buffer = new byte[size];
86      capacity = size;
87    }
88  
89    /**
90     * Constructor to take in a buffer
91     *
92     * @param buf Buffer to start with, or if null, create own buffer
93     */
94    public UnsafeByteArrayOutputStream(byte[] buf) {
95      if (buf == null) {
96        this.buffer = new byte[DEFAULT_BYTES];
97      } else {
98        this.buffer = buf;
99      }
100     capacity = this.buffer.length;
101   }
102 
103   /**
104    * Constructor to take in a buffer with a given position into that buffer
105    *
106    * @param buf Buffer to start with
107    * @param pos Position to write at the buffer
108    */
109   public UnsafeByteArrayOutputStream(byte[] buf, int pos) {
110     this(buf);
111     this.position = pos;
112   }
113 
114   /**
115    * Ensure that this buffer has enough remaining space to add the size.
116    * Creates and copies to a new buffer if necessary
117    *
118    * @param size Size to add
119    */
120   @Override
121   protected boolean require(int size) {
122     if (position + size > buffer.length) {
123       byte[] newBuf = new byte[(buffer.length + size) << 1];
124       System.arraycopy(buffer, 0, newBuf, 0, position);
125       buffer = newBuf;
126       capacity = buffer.length;
127       return true;
128     }
129     return false;
130   }
131 
132   @Override
133   public byte[] getByteArray() {
134     return buffer;
135   }
136 
137   @Override
138   public byte[] toByteArray() {
139     return Arrays.copyOf(buffer, position);
140   }
141 
142   @Override
143   public byte[] toByteArray(int offset, int length) {
144     if (offset + length > position) {
145       throw new IndexOutOfBoundsException(String.format("Offset: %d + " +
146         "Length: %d exceeds the size of buffer : %d",
147             offset, length, position));
148     }
149     return Arrays.copyOfRange(buffer, offset, length);
150   }
151 
152   @Override
153   public void reset() {
154     position = 0;
155   }
156 
157   @Override
158   public int getPos() {
159     return position;
160   }
161 
162   @Override
163   public void write(int b) {
164     require(SIZE_OF_BYTE);
165     buffer[position] = (byte) b;
166     position += SIZE_OF_BYTE;
167   }
168 
169   @Override
170   public void write(byte[] b) {
171     require(b.length);
172     System.arraycopy(b, 0, buffer, position, b.length);
173     position += b.length;
174   }
175 
176   @Override
177   public void write(byte[] b, int off, int len) {
178     require(len);
179     System.arraycopy(b, off, buffer, position, len);
180     position += len;
181   }
182 
183   @Override
184   public void writeBoolean(boolean v) {
185     require(SIZE_OF_BOOLEAN);
186     UNSAFE.putBoolean(buffer, BYTE_ARRAY_OFFSET + position, v);
187     position += SIZE_OF_BOOLEAN;
188   }
189 
190   @Override
191   public void writeByte(int v) {
192     require(SIZE_OF_BYTE);
193     UNSAFE.putByte(buffer, BYTE_ARRAY_OFFSET + position, (byte) v);
194     position += SIZE_OF_BYTE;
195   }
196 
197   @Override
198   public void writeShort(int v) {
199     require(SIZE_OF_SHORT);
200     UNSAFE.putShort(buffer, BYTE_ARRAY_OFFSET + position, (short) v);
201     position += SIZE_OF_SHORT;
202   }
203 
204   @Override
205   public void writeChar(int v) throws IOException {
206     require(SIZE_OF_CHAR);
207     UNSAFE.putChar(buffer, BYTE_ARRAY_OFFSET + position, (char) v);
208     position += SIZE_OF_CHAR;
209   }
210 
211   @Override
212   public void writeChar(char v) {
213     require(SIZE_OF_CHAR);
214     UNSAFE.putChar(buffer, BYTE_ARRAY_OFFSET + position, v);
215     position += SIZE_OF_CHAR;
216   }
217 
218   @Override
219   public void writeInt(int v) {
220     require(SIZE_OF_INT);
221     UNSAFE.putInt(buffer, BYTE_ARRAY_OFFSET + position, v);
222     position += SIZE_OF_INT;
223   }
224 
225   @Override
226   public void ensureWritable(int minSize) {
227     if ((position + minSize) > buffer.length) {
228       buffer = Arrays.copyOf(buffer,
229                 Math.max(buffer.length << 1, position + minSize));
230     }
231   }
232 
233   @Override
234   public void skipBytes(int bytesToSkip) {
235     ensureWritable(bytesToSkip);
236     position += bytesToSkip;
237   }
238 
239   @Override
240   public void writeInt(int pos, int value) {
241     if (pos + SIZE_OF_INT > this.position) {
242       throw new IndexOutOfBoundsException(
243           "writeInt: Tried to write int to position " + pos +
244               " but current length is " + this.position);
245     }
246     UNSAFE.putInt(buffer, BYTE_ARRAY_OFFSET + pos, value);
247   }
248 
249   @Override
250   public void writeLong(long v) {
251     require(SIZE_OF_LONG);
252     UNSAFE.putLong(buffer, BYTE_ARRAY_OFFSET + position, v);
253     position += SIZE_OF_LONG;
254   }
255 
256   @Override
257   public void writeFloat(float v) {
258     require(SIZE_OF_FLOAT);
259     UNSAFE.putFloat(buffer, BYTE_ARRAY_OFFSET + position, v);
260     position += SIZE_OF_FLOAT;
261   }
262 
263   @Override
264   public void writeDouble(double v) {
265     require(SIZE_OF_DOUBLE);
266     UNSAFE.putDouble(buffer, BYTE_ARRAY_OFFSET + position, v);
267     position += SIZE_OF_DOUBLE;
268   }
269 
270   @Override
271   public void writeBytes(String s) throws IOException {
272     // Note that this code is mostly copied from DataOutputStream
273     int len = s.length();
274     require(len);
275     for (int i = 0; i < len; i++) {
276       int v = s.charAt(i);
277       writeByte(v);
278     }
279   }
280 
281   @Override
282   public void writeChars(String s) throws IOException {
283     // Note that this code is mostly copied from DataOutputStream
284     int len = s.length();
285     require(len * SIZE_OF_CHAR);
286     for (int i = 0; i < len; i++) {
287       int v = s.charAt(i);
288       writeChar(v);
289     }
290   }
291 
292   @Override
293   public void writeUTF(String s) throws IOException {
294     // Note that this code is mostly copied from DataOutputStream
295     int strlen = s.length();
296     int utflen = 0;
297     int c;
298 
299     /* use charAt instead of copying String to char array */
300     for (int i = 0; i < strlen; i++) {
301       c = s.charAt(i);
302       if ((c >= 0x0001) && (c <= 0x007F)) {
303         utflen++;
304       } else if (c > 0x07FF) {
305         utflen += 3;
306       } else {
307         utflen += 2;
308       }
309     }
310 
311     if (utflen > 65535) {
312       throw new UTFDataFormatException(
313           "encoded string too long: " + utflen + " bytes");
314     }
315 
316     require(utflen + SIZE_OF_SHORT);
317     writeShort(utflen);
318 
319     int i = 0;
320     for (i = 0; i < strlen; i++) {
321       c = s.charAt(i);
322       if (!((c >= 0x0001) && (c <= 0x007F))) {
323         break;
324       }
325       buffer[position++] = (byte) c;
326     }
327 
328     for (; i < strlen; i++) {
329       c = s.charAt(i);
330       if ((c >= 0x0001) && (c <= 0x007F)) {
331         buffer[position++] = (byte) c;
332 
333       } else if (c > 0x07FF) {
334         buffer[position++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
335         buffer[position++] = (byte) (0x80 | ((c >>  6) & 0x3F));
336         buffer[position++] = (byte) (0x80 | ((c >>  0) & 0x3F));
337       } else {
338         buffer[position++] = (byte) (0xC0 | ((c >>  6) & 0x1F));
339         buffer[position++] = (byte) (0x80 | ((c >>  0) & 0x3F));
340       }
341     }
342   }
343 }