This project has retired. For details please refer to its
        
        Attic page.
      
1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.giraph.utils.io;
20  
21  import org.apache.giraph.utils.ExtendedDataInput;
22  import org.apache.giraph.utils.ExtendedDataOutput;
23  import org.apache.giraph.utils.UnsafeByteArrayInputStream;
24  
25  import java.io.IOException;
26  import java.util.ArrayList;
27  import java.util.List;
28  
29  
30  
31  
32  
33  
34  
35  
36  public class BigDataInput implements ExtendedDataInput {
37    
38    private static final ExtendedDataInput EMPTY_INPUT =
39        new UnsafeByteArrayInputStream(new byte[0]);
40  
41    
42    private ExtendedDataInput currentInput;
43    
44    private final List<ExtendedDataInput> dataInputs;
45    
46    private int currentPositionInInputs;
47  
48    
49  
50  
51  
52  
53    public BigDataInput(BigDataOutput bigDataOutput) {
54      dataInputs = new ArrayList<ExtendedDataInput>(
55          bigDataOutput.getNumberOfDataOutputs());
56      for (ExtendedDataOutput dataOutput : bigDataOutput.getDataOutputs()) {
57        dataInputs.add(bigDataOutput.getConf().createExtendedDataInput(
58            dataOutput.getByteArray(), 0, dataOutput.getPos()));
59      }
60      currentPositionInInputs = -1;
61      moveToNextDataInput();
62    }
63  
64    
65    private void moveToNextDataInput() {
66      currentPositionInInputs++;
67      if (currentPositionInInputs < dataInputs.size()) {
68        currentInput = dataInputs.get(currentPositionInInputs);
69      } else {
70        currentInput = EMPTY_INPUT;
71      }
72    }
73  
74    
75  
76  
77  
78    private void checkIfShouldMoveToNextDataInput() {
79      if (currentInput.endOfInput()) {
80        moveToNextDataInput();
81      }
82    }
83  
84    @Override
85    public void readFully(byte[] b) throws IOException {
86      readFully(b, 0, b.length);
87    }
88  
89    @Override
90    public void readFully(byte[] b, int off, int len) throws IOException {
91      checkIfShouldMoveToNextDataInput();
92      int available = currentInput.available();
93      if (len <= available) {
94        currentInput.readFully(b, off, len);
95      } else {
96        
97        
98        currentInput.readFully(b, off, available);
99        readFully(b, off + available, len - available);
100     }
101   }
102 
103   @Override
104   public boolean readBoolean() throws IOException {
105     checkIfShouldMoveToNextDataInput();
106     return currentInput.readBoolean();
107   }
108 
109   @Override
110   public byte readByte() throws IOException {
111     checkIfShouldMoveToNextDataInput();
112     return currentInput.readByte();
113   }
114 
115   @Override
116   public int readUnsignedByte() throws IOException {
117     checkIfShouldMoveToNextDataInput();
118     return currentInput.readUnsignedByte();
119   }
120 
121   @Override
122   public short readShort() throws IOException {
123     checkIfShouldMoveToNextDataInput();
124     return currentInput.readShort();
125   }
126 
127   @Override
128   public int readUnsignedShort() throws IOException {
129     checkIfShouldMoveToNextDataInput();
130     return currentInput.readUnsignedShort();
131   }
132 
133   @Override
134   public char readChar() throws IOException {
135     checkIfShouldMoveToNextDataInput();
136     return currentInput.readChar();
137   }
138 
139   @Override
140   public int readInt() throws IOException {
141     checkIfShouldMoveToNextDataInput();
142     return currentInput.readInt();
143   }
144 
145   @Override
146   public long readLong() throws IOException {
147     checkIfShouldMoveToNextDataInput();
148     return currentInput.readLong();
149   }
150 
151   @Override
152   public float readFloat() throws IOException {
153     checkIfShouldMoveToNextDataInput();
154     return currentInput.readFloat();
155   }
156 
157   @Override
158   public double readDouble() throws IOException {
159     checkIfShouldMoveToNextDataInput();
160     return currentInput.readDouble();
161   }
162 
163   @Override
164   public String readLine() throws IOException {
165     checkIfShouldMoveToNextDataInput();
166     return currentInput.readLine();
167   }
168 
169   @Override
170   public String readUTF() throws IOException {
171     checkIfShouldMoveToNextDataInput();
172     return currentInput.readUTF();
173   }
174 
175   @Override
176   public int skipBytes(int n) throws IOException {
177     int bytesLeftToSkip = n;
178     while (bytesLeftToSkip > 0) {
179       int bytesSkipped = currentInput.skipBytes(bytesLeftToSkip);
180       bytesLeftToSkip -= bytesSkipped;
181       if (bytesLeftToSkip > 0) {
182         moveToNextDataInput();
183         if (endOfInput()) {
184           break;
185         }
186       }
187     }
188     return n - bytesLeftToSkip;
189   }
190 
191   @Override
192   public int getPos() {
193     int pos = 0;
194     for (int i = 0; i <= currentPositionInInputs; i++) {
195       pos += dataInputs.get(i).getPos();
196     }
197     return pos;
198   }
199 
200   @Override
201   public int available() {
202     throw new UnsupportedOperationException("available: " +
203         "Not supported with BigDataIO because overflow can happen");
204   }
205 
206   @Override
207   public boolean endOfInput() {
208     return currentInput == EMPTY_INPUT ||
209         (dataInputs.get(currentPositionInInputs).endOfInput() &&
210             currentPositionInInputs == dataInputs.size() - 1);
211   }
212 }