View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.utils.io;
20  
21  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
22  import org.apache.giraph.utils.ExtendedDataOutput;
23  import org.apache.hadoop.io.Writable;
24  
25  import com.google.common.collect.Iterables;
26  import com.google.common.collect.Lists;
27  
28  import java.io.DataInput;
29  import java.io.DataOutput;
30  import java.io.IOException;
31  import java.util.ArrayList;
32  import java.util.List;
33  
34  /**
35   * Implementations of {@link ExtendedDataOutput} are limited because they can
36   * only handle up to 1GB of data. This {@link DataOutput} overcomes that
37   * limitation, with almost no additional cost when data is not huge.
38   *
39   * Goes in pair with {@link BigDataInput}
40   */
41  public class BigDataOutput implements DataOutput, Writable {
42    /** Default initial size of the stream */
43    private static final int DEFAULT_INITIAL_SIZE = 16;
44    /** Max allowed size of the stream */
45    private static final int MAX_SIZE = 1 << 25;
46    /**
47     * Create a new stream when we have less then this number of bytes left in
48     * the stream. Should be larger than the largest serialized primitive.
49     */
50    private static final int SIZE_DELTA = 100;
51  
52    /** Data output which we are currently writing to */
53    private ExtendedDataOutput currentDataOutput;
54    /** List of filled outputs, will be null until we get a lot of data */
55    private List<ExtendedDataOutput> dataOutputs;
56    /** Configuration */
57    private final ImmutableClassesGiraphConfiguration conf;
58  
59    /**
60     * Constructor
61     *
62     * @param conf Configuration
63     */
64    public BigDataOutput(ImmutableClassesGiraphConfiguration conf) {
65      this(DEFAULT_INITIAL_SIZE, conf);
66    }
67  
68    /**
69     * Constructor
70     *
71     * @param initialSize Initial size of data output
72     * @param conf        Configuration
73     */
74    public BigDataOutput(int initialSize,
75        ImmutableClassesGiraphConfiguration conf) {
76      this.conf = conf;
77      dataOutputs = null;
78      currentDataOutput = conf.createExtendedDataOutput(initialSize);
79    }
80  
81    /**
82     * Get DataOutput which data should be written to. If current DataOutput is
83     * full it will create a new one.
84     *
85     * @return DataOutput which data should be written to
86     */
87    private ExtendedDataOutput getDataOutputToWriteTo() {
88      if (currentDataOutput.getPos() + SIZE_DELTA < MAX_SIZE) {
89        return currentDataOutput;
90      } else {
91        if (dataOutputs == null) {
92          dataOutputs = new ArrayList<ExtendedDataOutput>(1);
93        }
94        dataOutputs.add(currentDataOutput);
95        currentDataOutput = conf.createExtendedDataOutput(MAX_SIZE);
96        return currentDataOutput;
97      }
98    }
99  
100   /**
101    * Get number of DataOutputs which contain written data.
102    *
103    * @return Number of DataOutputs which contain written data
104    */
105   public int getNumberOfDataOutputs() {
106     return (dataOutputs == null) ? 1 : dataOutputs.size() + 1;
107   }
108 
109   /**
110    * Get DataOutputs which contain written data.
111    *
112    * @return DataOutputs which contain written data
113    */
114   public Iterable<ExtendedDataOutput> getDataOutputs() {
115     ArrayList<ExtendedDataOutput> currentList =
116         Lists.newArrayList(currentDataOutput);
117     if (dataOutputs == null) {
118       return currentList;
119     } else {
120       return Iterables.concat(dataOutputs, currentList);
121     }
122   }
123 
124   public ImmutableClassesGiraphConfiguration getConf() {
125     return conf;
126   }
127 
128   /**
129    * Get number of bytes written to this data output
130    *
131    * @return Size in bytes
132    */
133   public long getSize() {
134     long size = currentDataOutput.getPos();
135     if (dataOutputs != null) {
136       for (ExtendedDataOutput dataOutput : dataOutputs) {
137         size += dataOutput.getPos();
138       }
139     }
140     return size;
141   }
142 
143   @Override
144   public void write(int b) throws IOException {
145     getDataOutputToWriteTo().write(b);
146   }
147 
148   @Override
149   public void write(byte[] b) throws IOException {
150     getDataOutputToWriteTo().write(b);
151   }
152 
153   @Override
154   public void write(byte[] b, int off, int len) throws IOException {
155     getDataOutputToWriteTo().write(b, off, len);
156   }
157 
158   @Override
159   public void writeBoolean(boolean v) throws IOException {
160     getDataOutputToWriteTo().writeBoolean(v);
161   }
162 
163   @Override
164   public void writeByte(int v) throws IOException {
165     getDataOutputToWriteTo().writeByte(v);
166   }
167 
168   @Override
169   public void writeShort(int v) throws IOException {
170     getDataOutputToWriteTo().writeShort(v);
171   }
172 
173   @Override
174   public void writeChar(int v) throws IOException {
175     getDataOutputToWriteTo().writeChar(v);
176   }
177 
178   @Override
179   public void writeInt(int v) throws IOException {
180     getDataOutputToWriteTo().writeInt(v);
181   }
182 
183   @Override
184   public void writeLong(long v) throws IOException {
185     getDataOutputToWriteTo().writeLong(v);
186   }
187 
188   @Override
189   public void writeFloat(float v) throws IOException {
190     getDataOutputToWriteTo().writeFloat(v);
191   }
192 
193   @Override
194   public void writeDouble(double v) throws IOException {
195     getDataOutputToWriteTo().writeDouble(v);
196   }
197 
198   @Override
199   public void writeBytes(String s) throws IOException {
200     getDataOutputToWriteTo().writeBytes(s);
201   }
202 
203   @Override
204   public void writeChars(String s) throws IOException {
205     getDataOutputToWriteTo().writeChars(s);
206   }
207 
208   @Override
209   public void writeUTF(String s) throws IOException {
210     getDataOutputToWriteTo().writeUTF(s);
211   }
212 
213   /**
214    * Write one of data outputs to another data output
215    *
216    * @param dataOutput Data output to write
217    * @param out        Data output to write to
218    */
219   private void writeExtendedDataOutput(ExtendedDataOutput dataOutput,
220       DataOutput out) throws IOException {
221     out.writeInt(dataOutput.getPos());
222     out.write(dataOutput.getByteArray(), 0, dataOutput.getPos());
223   }
224 
225   /**
226    * Read data output from data input
227    *
228    * @param in Data input to read from
229    * @return Data output read
230    */
231   private ExtendedDataOutput readExtendedDataOutput(
232       DataInput in) throws IOException {
233     int length = in.readInt();
234     byte[] data = new byte[length];
235     in.readFully(data);
236     return conf.createExtendedDataOutput(data, data.length);
237   }
238 
239   @Override
240   public void write(DataOutput out) throws IOException {
241     if (dataOutputs == null) {
242       out.writeInt(0);
243     } else {
244       out.writeInt(dataOutputs.size());
245       for (ExtendedDataOutput stream : dataOutputs) {
246         writeExtendedDataOutput(stream, out);
247       }
248     }
249     writeExtendedDataOutput(currentDataOutput, out);
250   }
251 
252   @Override
253   public void readFields(DataInput in) throws IOException {
254     int size = in.readInt();
255     if (size == 0) {
256       dataOutputs = null;
257     } else {
258       dataOutputs = new ArrayList<ExtendedDataOutput>(size);
259       while (size-- > 0) {
260         dataOutputs.add(readExtendedDataOutput(in));
261       }
262     }
263     currentDataOutput = readExtendedDataOutput(in);
264   }
265 }