This project has retired. For details please refer to its
        
        Attic page.
      
 
OneMessageToManyIdsIterator xref
1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.giraph.utils;
20  
21  import org.apache.hadoop.io.Writable;
22  import org.apache.hadoop.io.WritableComparable;
23  
24  import java.io.IOException;
25  
26  
27  
28  
29  
30  
31  
32  
33  public class OneMessageToManyIdsIterator<I extends WritableComparable,
34      M extends Writable> implements VertexIdMessageIterator<I, M> {
35    
36    private final ByteArrayOneMessageToManyIds<I, M> vertexIdMessages;
37    
38    private final ExtendedDataInput extendedDataInput;
39  
40    
41    private I vertexId;
42    
43    private M msg;
44    
45    private int idsToRead = 0;
46    
47    private int msgSize = 0;
48    
49    private boolean newMessage;
50  
51    
52  
53  
54  
55  
56    public OneMessageToManyIdsIterator(
57        final ByteArrayOneMessageToManyIds<I, M> vertexIdMessages) {
58      this.vertexIdMessages = vertexIdMessages;
59      this.extendedDataInput = vertexIdMessages.getConf()
60          .createExtendedDataInput(vertexIdMessages.extendedDataOutput);
61    }
62  
63    @Override
64    public I getCurrentVertexId() {
65      return vertexId;
66    }
67  
68    @Override
69    public M getCurrentMessage() {
70      return getCurrentData();
71    }
72  
73    @Override
74    public M getCurrentData() {
75      return msg;
76    }
77  
78    @Override
79    public M releaseCurrentData() {
80      M releasedData = msg;
81      msg = null;
82      return releasedData;
83    }
84  
85    @Override
86    public I releaseCurrentVertexId() {
87      I releasedVertexId = vertexId;
88      vertexId = null;
89      return releasedVertexId;
90    }
91  
92    @Override
93    public boolean hasNext() {
94      return extendedDataInput.available() > 0;
95    }
96  
97    
98  
99  
100   private void initialize() {
101     if (vertexId == null) {
102       vertexId = vertexIdMessages.getConf().createVertexId();
103     }
104     if (msg == null) {
105       msg = vertexIdMessages.createData();
106     }
107   }
108 
109   @Override
110   public void next() {
111     initialize();
112     try {
113       if (idsToRead == 0) {
114         newMessage = true; 
115         int initial = extendedDataInput.getPos();
116         msg.readFields(extendedDataInput);
117         msgSize = extendedDataInput.getPos() - initial;
118         idsToRead = extendedDataInput.readInt();
119       } else {
120         newMessage = false; 
121       }
122       vertexId.readFields(extendedDataInput);
123       idsToRead -= 1;
124     } catch (IOException e) {
125       throw new IllegalStateException("next: IOException", e);
126     }
127   }
128 
129   @Override
130   public int getCurrentMessageSize() {
131     return getCurrentDataSize();
132   }
133 
134   @Override
135   public int getCurrentDataSize() {
136     return msgSize;
137   }
138 
139   @Override
140   public boolean isNewMessage() {
141     return newMessage;
142   }
143 }