This project has retired. For details please refer to its Attic page.
BigDataOutput xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.utils.io;
20  
21  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
22  import org.apache.giraph.utils.ExtendedDataOutput;
23  import org.apache.hadoop.io.Writable;
24  
25  import com.google.common.collect.Iterables;
26  import com.google.common.collect.Lists;
27  
28  import java.io.DataInput;
29  import java.io.DataOutput;
30  import java.io.IOException;
31  import java.util.ArrayList;
32  import java.util.List;
33  
34  /**
35   * Implementations of {@link ExtendedDataOutput} are limited because they can
36   * only handle up to 1GB of data. This {@link DataOutput} overcomes that
37   * limitation, with almost no additional cost when data is not huge.
38   *
39   * Goes in pair with {@link BigDataInput}
40   */
41  public class BigDataOutput implements DataOutput, Writable {
42    /** Default initial size of the stream */
43    private static final int DEFAULT_INITIAL_SIZE = 16;
44    /** Max allowed size of the stream */
45    private static final int MAX_SIZE = 1 << 25;
46    /**
47     * Create a new stream when we have less then this number of bytes left in
48     * the stream. Should be larger than the largest serialized primitive.
49     */
50    private static final int SIZE_DELTA = 100;
51  
52    /** Data output which we are currently writing to */
53    protected ExtendedDataOutput currentDataOutput;
54    /** List of filled outputs, will be null until we get a lot of data */
55    protected List<ExtendedDataOutput> dataOutputs;
56    /** Configuration */
57    protected final ImmutableClassesGiraphConfiguration conf;
58  
59    /**
60     * Constructor
61     *
62     * @param conf Configuration
63     */
64    public BigDataOutput(ImmutableClassesGiraphConfiguration conf) {
65      this(DEFAULT_INITIAL_SIZE, conf);
66    }
67  
68    /**
69     * Constructor
70     *
71     * @param initialSize Initial size of data output
72     * @param conf        Configuration
73     */
74    public BigDataOutput(int initialSize,
75        ImmutableClassesGiraphConfiguration conf) {
76      this.conf = conf;
77      dataOutputs = null;
78      currentDataOutput = createOutput(initialSize);
79    }
80  
81    /**
82     * Get max size for single data output
83     *
84     * @return Max size for single data output
85     */
86    protected int getMaxSize() {
87      return MAX_SIZE;
88    }
89  
90    /**
91     * Create next data output
92     *
93     * @param size Size of data output to create
94     * @return Created data output
95     */
96    protected ExtendedDataOutput createOutput(int size) {
97      return conf.createExtendedDataOutput(size);
98    }
99  
100   /**
101    * Get DataOutput which data should be written to. If current DataOutput is
102    * full it will create a new one.
103    *
104    * @return DataOutput which data should be written to
105    */
106   private ExtendedDataOutput getDataOutputToWriteTo() {
107     return getDataOutputToWriteTo(SIZE_DELTA);
108   }
109 
110   /**
111    * Get DataOutput which data should be written to. If current DataOutput is
112    * full it will create a new one.
113    *
114    * @param additionalSize How many additional bytes we need space for
115    * @return DataOutput which data should be written to
116    */
117   private ExtendedDataOutput getDataOutputToWriteTo(int additionalSize) {
118     if (currentDataOutput.getPos() + additionalSize > getMaxSize()) {
119       if (dataOutputs == null) {
120         dataOutputs = new ArrayList<>(1);
121       }
122       dataOutputs.add(currentDataOutput);
123       currentDataOutput = createOutput(getMaxSize());
124     }
125     return currentDataOutput;
126   }
127 
128   /**
129    * Get number of DataOutputs which contain written data.
130    *
131    * @return Number of DataOutputs which contain written data
132    */
133   public int getNumberOfDataOutputs() {
134     return (dataOutputs == null) ? 1 : dataOutputs.size() + 1;
135   }
136 
137   /**
138    * Get DataOutputs which contain written data.
139    *
140    * @return DataOutputs which contain written data
141    */
142   public Iterable<ExtendedDataOutput> getDataOutputs() {
143     ArrayList<ExtendedDataOutput> currentList =
144         Lists.newArrayList(currentDataOutput);
145     if (dataOutputs == null) {
146       return currentList;
147     } else {
148       return Iterables.concat(dataOutputs, currentList);
149     }
150   }
151 
152   public ImmutableClassesGiraphConfiguration getConf() {
153     return conf;
154   }
155 
156   /**
157    * Get number of bytes written to this data output
158    *
159    * @return Size in bytes
160    */
161   public long getSize() {
162     long size = currentDataOutput.getPos();
163     if (dataOutputs != null) {
164       for (ExtendedDataOutput dataOutput : dataOutputs) {
165         size += dataOutput.getPos();
166       }
167     }
168     return size;
169   }
170 
171   @Override
172   public void write(int b) throws IOException {
173     getDataOutputToWriteTo().write(b);
174   }
175 
176   @Override
177   public void write(byte[] b) throws IOException {
178     write(b, 0, b.length);
179   }
180 
181   @Override
182   public void write(byte[] b, int off, int len) throws IOException {
183     if (len <= getMaxSize()) {
184       getDataOutputToWriteTo(len).write(b, off, len);
185     } else {
186       // When we try to write more bytes than the biggest size of single data
187       // output, we need to split up the byte array into multiple chunks
188       while (len > 0) {
189         int toWrite = Math.min(getMaxSize(), len);
190         write(b, off, toWrite);
191         len -= toWrite;
192         off += toWrite;
193       }
194     }
195   }
196 
197   @Override
198   public void writeBoolean(boolean v) throws IOException {
199     getDataOutputToWriteTo().writeBoolean(v);
200   }
201 
202   @Override
203   public void writeByte(int v) throws IOException {
204     getDataOutputToWriteTo().writeByte(v);
205   }
206 
207   @Override
208   public void writeShort(int v) throws IOException {
209     getDataOutputToWriteTo().writeShort(v);
210   }
211 
212   @Override
213   public void writeChar(int v) throws IOException {
214     getDataOutputToWriteTo().writeChar(v);
215   }
216 
217   @Override
218   public void writeInt(int v) throws IOException {
219     getDataOutputToWriteTo().writeInt(v);
220   }
221 
222   @Override
223   public void writeLong(long v) throws IOException {
224     getDataOutputToWriteTo().writeLong(v);
225   }
226 
227   @Override
228   public void writeFloat(float v) throws IOException {
229     getDataOutputToWriteTo().writeFloat(v);
230   }
231 
232   @Override
233   public void writeDouble(double v) throws IOException {
234     getDataOutputToWriteTo().writeDouble(v);
235   }
236 
237   @Override
238   public void writeBytes(String s) throws IOException {
239     getDataOutputToWriteTo().writeBytes(s);
240   }
241 
242   @Override
243   public void writeChars(String s) throws IOException {
244     getDataOutputToWriteTo().writeChars(s);
245   }
246 
247   @Override
248   public void writeUTF(String s) throws IOException {
249     getDataOutputToWriteTo().writeUTF(s);
250   }
251 
252   /**
253    * Write one of data outputs to another data output
254    *
255    * @param dataOutput Data output to write
256    * @param out        Data output to write to
257    */
258   private void writeExtendedDataOutput(ExtendedDataOutput dataOutput,
259       DataOutput out) throws IOException {
260     out.writeInt(dataOutput.getPos());
261     out.write(dataOutput.getByteArray(), 0, dataOutput.getPos());
262   }
263 
264   /**
265    * Read data output from data input
266    *
267    * @param in Data input to read from
268    * @return Data output read
269    */
270   private ExtendedDataOutput readExtendedDataOutput(
271       DataInput in) throws IOException {
272     int length = in.readInt();
273     byte[] data = new byte[length];
274     in.readFully(data);
275     return conf.createExtendedDataOutput(data, data.length);
276   }
277 
278   @Override
279   public void write(DataOutput out) throws IOException {
280     if (dataOutputs == null) {
281       out.writeInt(0);
282     } else {
283       out.writeInt(dataOutputs.size());
284       for (ExtendedDataOutput stream : dataOutputs) {
285         writeExtendedDataOutput(stream, out);
286       }
287     }
288     writeExtendedDataOutput(currentDataOutput, out);
289   }
290 
291   @Override
292   public void readFields(DataInput in) throws IOException {
293     int size = in.readInt();
294     if (size == 0) {
295       dataOutputs = null;
296     } else {
297       dataOutputs = new ArrayList<ExtendedDataOutput>(size);
298       while (size-- > 0) {
299         dataOutputs.add(readExtendedDataOutput(in));
300       }
301     }
302     currentDataOutput = readExtendedDataOutput(in);
303   }
304 }