View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.utils.io;
20  
21  import org.apache.giraph.utils.ExtendedByteArrayDataInput;
22  import org.apache.giraph.utils.ExtendedDataInput;
23  import org.apache.giraph.utils.ExtendedDataOutput;
24  
25  import java.io.IOException;
26  import java.util.ArrayList;
27  import java.util.List;
28  
29  /**
30   * Implementations of {@link ExtendedDataInput} are limited because they can
31   * only handle up to 1GB of data. This {@link ExtendedDataInput} overcomes
32   * that limitation, with almost no additional cost when data is not huge.
33   *
34   * Goes in pair with {@link BigDataOutput}
35   */
36  public class BigDataInput implements ExtendedDataInput {
37    /** Empty data input */
38    private static final ExtendedDataInput EMPTY_INPUT =
39        new ExtendedByteArrayDataInput(new byte[0]);
40  
41    /** Input which we are currently reading from */
42    private ExtendedDataInput currentInput;
43    /** List of all data inputs which contain data */
44    private final List<ExtendedDataInput> dataInputs;
45    /** Which position within dataInputs are we currently reading from */
46    private int currentPositionInInputs;
47  
48    /**
49     * Constructor
50     *
51     * @param bigDataOutput {@link BigDataOutput} which we want to read data from
52     */
53    public BigDataInput(BigDataOutput bigDataOutput) {
54      dataInputs = new ArrayList<ExtendedDataInput>(
55          bigDataOutput.getNumberOfDataOutputs());
56      for (ExtendedDataOutput dataOutput : bigDataOutput.getDataOutputs()) {
57        dataInputs.add(bigDataOutput.getConf().createExtendedDataInput(
58            dataOutput.getByteArray(), 0, dataOutput.getPos()));
59      }
60      currentPositionInInputs = -1;
61      moveToNextDataInput();
62    }
63  
64    /** Start reading the following data input */
65    private void moveToNextDataInput() {
66      currentPositionInInputs++;
67      if (currentPositionInInputs < dataInputs.size()) {
68        currentInput = dataInputs.get(currentPositionInInputs);
69      } else {
70        currentInput = EMPTY_INPUT;
71      }
72    }
73  
74    /**
75     * Check if we read everything from the current data input, and move to the
76     * next one if needed.
77     */
78    private void checkIfShouldMoveToNextDataInput() {
79      if (currentInput.endOfInput()) {
80        moveToNextDataInput();
81      }
82    }
83  
84    @Override
85    public void readFully(byte[] b) throws IOException {
86      checkIfShouldMoveToNextDataInput();
87      currentInput.readFully(b);
88    }
89  
90    @Override
91    public void readFully(byte[] b, int off, int len) throws IOException {
92      checkIfShouldMoveToNextDataInput();
93      currentInput.readFully(b, off, len);
94    }
95  
96    @Override
97    public boolean readBoolean() throws IOException {
98      checkIfShouldMoveToNextDataInput();
99      return currentInput.readBoolean();
100   }
101 
102   @Override
103   public byte readByte() throws IOException {
104     checkIfShouldMoveToNextDataInput();
105     return currentInput.readByte();
106   }
107 
108   @Override
109   public int readUnsignedByte() throws IOException {
110     checkIfShouldMoveToNextDataInput();
111     return currentInput.readUnsignedByte();
112   }
113 
114   @Override
115   public short readShort() throws IOException {
116     checkIfShouldMoveToNextDataInput();
117     return currentInput.readShort();
118   }
119 
120   @Override
121   public int readUnsignedShort() throws IOException {
122     checkIfShouldMoveToNextDataInput();
123     return currentInput.readUnsignedShort();
124   }
125 
126   @Override
127   public char readChar() throws IOException {
128     checkIfShouldMoveToNextDataInput();
129     return currentInput.readChar();
130   }
131 
132   @Override
133   public int readInt() throws IOException {
134     checkIfShouldMoveToNextDataInput();
135     return currentInput.readInt();
136   }
137 
138   @Override
139   public long readLong() throws IOException {
140     checkIfShouldMoveToNextDataInput();
141     return currentInput.readLong();
142   }
143 
144   @Override
145   public float readFloat() throws IOException {
146     checkIfShouldMoveToNextDataInput();
147     return currentInput.readFloat();
148   }
149 
150   @Override
151   public double readDouble() throws IOException {
152     checkIfShouldMoveToNextDataInput();
153     return currentInput.readDouble();
154   }
155 
156   @Override
157   public String readLine() throws IOException {
158     checkIfShouldMoveToNextDataInput();
159     return currentInput.readLine();
160   }
161 
162   @Override
163   public String readUTF() throws IOException {
164     checkIfShouldMoveToNextDataInput();
165     return currentInput.readUTF();
166   }
167 
168   @Override
169   public int skipBytes(int n) throws IOException {
170     int bytesLeftToSkip = n;
171     while (bytesLeftToSkip > 0) {
172       int bytesSkipped = currentInput.skipBytes(bytesLeftToSkip);
173       bytesLeftToSkip -= bytesSkipped;
174       if (bytesLeftToSkip > 0) {
175         moveToNextDataInput();
176         if (endOfInput()) {
177           break;
178         }
179       }
180     }
181     return n - bytesLeftToSkip;
182   }
183 
184   @Override
185   public int getPos() {
186     int pos = 0;
187     for (int i = 0; i <= currentPositionInInputs; i++) {
188       pos += dataInputs.get(i).getPos();
189     }
190     return pos;
191   }
192 
193   @Override
194   public int available() {
195     throw new UnsupportedOperationException("available: " +
196         "Not supported with BigDataIO because overflow can happen");
197   }
198 
199   @Override
200   public boolean endOfInput() {
201     return currentInput == EMPTY_INPUT ||
202         (dataInputs.get(currentPositionInInputs).endOfInput() &&
203             currentPositionInInputs == dataInputs.size() - 1);
204   }
205 }