This project has retired. For details please refer to its Attic page.
BigDataInput xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.utils.io;
20  
21  import org.apache.giraph.utils.ExtendedDataInput;
22  import org.apache.giraph.utils.ExtendedDataOutput;
23  import org.apache.giraph.utils.UnsafeByteArrayInputStream;
24  
25  import java.io.IOException;
26  import java.util.ArrayList;
27  import java.util.List;
28  
29  /**
30   * Implementations of {@link ExtendedDataInput} are limited because they can
31   * only handle up to 1GB of data. This {@link ExtendedDataInput} overcomes
32   * that limitation, with almost no additional cost when data is not huge.
33   *
34   * Goes in pair with {@link BigDataOutput}
35   */
36  public class BigDataInput implements ExtendedDataInput {
37    /** Empty data input */
38    private static final ExtendedDataInput EMPTY_INPUT =
39        new UnsafeByteArrayInputStream(new byte[0]);
40  
41    /** Input which we are currently reading from */
42    private ExtendedDataInput currentInput;
43    /** List of all data inputs which contain data */
44    private final List<ExtendedDataInput> dataInputs;
45    /** Which position within dataInputs are we currently reading from */
46    private int currentPositionInInputs;
47  
48    /**
49     * Constructor
50     *
51     * @param bigDataOutput {@link BigDataOutput} which we want to read data from
52     */
53    public BigDataInput(BigDataOutput bigDataOutput) {
54      dataInputs = new ArrayList<ExtendedDataInput>(
55          bigDataOutput.getNumberOfDataOutputs());
56      for (ExtendedDataOutput dataOutput : bigDataOutput.getDataOutputs()) {
57        dataInputs.add(bigDataOutput.getConf().createExtendedDataInput(
58            dataOutput.getByteArray(), 0, dataOutput.getPos()));
59      }
60      currentPositionInInputs = -1;
61      moveToNextDataInput();
62    }
63  
64    /** Start reading the following data input */
65    private void moveToNextDataInput() {
66      currentPositionInInputs++;
67      if (currentPositionInInputs < dataInputs.size()) {
68        currentInput = dataInputs.get(currentPositionInInputs);
69      } else {
70        currentInput = EMPTY_INPUT;
71      }
72    }
73  
74    /**
75     * Check if we read everything from the current data input, and move to the
76     * next one if needed.
77     */
78    private void checkIfShouldMoveToNextDataInput() {
79      if (currentInput.endOfInput()) {
80        moveToNextDataInput();
81      }
82    }
83  
84    @Override
85    public void readFully(byte[] b) throws IOException {
86      readFully(b, 0, b.length);
87    }
88  
89    @Override
90    public void readFully(byte[] b, int off, int len) throws IOException {
91      checkIfShouldMoveToNextDataInput();
92      int available = currentInput.available();
93      if (len <= available) {
94        currentInput.readFully(b, off, len);
95      } else {
96        // When we are trying to read more bytes than there are in single chunk
97        // we need to read part by part
98        currentInput.readFully(b, off, available);
99        readFully(b, off + available, len - available);
100     }
101   }
102 
103   @Override
104   public boolean readBoolean() throws IOException {
105     checkIfShouldMoveToNextDataInput();
106     return currentInput.readBoolean();
107   }
108 
109   @Override
110   public byte readByte() throws IOException {
111     checkIfShouldMoveToNextDataInput();
112     return currentInput.readByte();
113   }
114 
115   @Override
116   public int readUnsignedByte() throws IOException {
117     checkIfShouldMoveToNextDataInput();
118     return currentInput.readUnsignedByte();
119   }
120 
121   @Override
122   public short readShort() throws IOException {
123     checkIfShouldMoveToNextDataInput();
124     return currentInput.readShort();
125   }
126 
127   @Override
128   public int readUnsignedShort() throws IOException {
129     checkIfShouldMoveToNextDataInput();
130     return currentInput.readUnsignedShort();
131   }
132 
133   @Override
134   public char readChar() throws IOException {
135     checkIfShouldMoveToNextDataInput();
136     return currentInput.readChar();
137   }
138 
139   @Override
140   public int readInt() throws IOException {
141     checkIfShouldMoveToNextDataInput();
142     return currentInput.readInt();
143   }
144 
145   @Override
146   public long readLong() throws IOException {
147     checkIfShouldMoveToNextDataInput();
148     return currentInput.readLong();
149   }
150 
151   @Override
152   public float readFloat() throws IOException {
153     checkIfShouldMoveToNextDataInput();
154     return currentInput.readFloat();
155   }
156 
157   @Override
158   public double readDouble() throws IOException {
159     checkIfShouldMoveToNextDataInput();
160     return currentInput.readDouble();
161   }
162 
163   @Override
164   public String readLine() throws IOException {
165     checkIfShouldMoveToNextDataInput();
166     return currentInput.readLine();
167   }
168 
169   @Override
170   public String readUTF() throws IOException {
171     checkIfShouldMoveToNextDataInput();
172     return currentInput.readUTF();
173   }
174 
175   @Override
176   public int skipBytes(int n) throws IOException {
177     int bytesLeftToSkip = n;
178     while (bytesLeftToSkip > 0) {
179       int bytesSkipped = currentInput.skipBytes(bytesLeftToSkip);
180       bytesLeftToSkip -= bytesSkipped;
181       if (bytesLeftToSkip > 0) {
182         moveToNextDataInput();
183         if (endOfInput()) {
184           break;
185         }
186       }
187     }
188     return n - bytesLeftToSkip;
189   }
190 
191   @Override
192   public int getPos() {
193     int pos = 0;
194     for (int i = 0; i <= currentPositionInInputs; i++) {
195       pos += dataInputs.get(i).getPos();
196     }
197     return pos;
198   }
199 
200   @Override
201   public int available() {
202     throw new UnsupportedOperationException("available: " +
203         "Not supported with BigDataIO because overflow can happen");
204   }
205 
206   @Override
207   public boolean endOfInput() {
208     return currentInput == EMPTY_INPUT ||
209         (dataInputs.get(currentPositionInInputs).endOfInput() &&
210             currentPositionInInputs == dataInputs.size() - 1);
211   }
212 }