This project has retired. For details please refer to its Attic page.
OneMessageToManyIdsIterator xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.utils;
20  
21  import org.apache.hadoop.io.Writable;
22  import org.apache.hadoop.io.WritableComparable;
23  
24  import java.io.IOException;
25  
26  /**
27   * VertexIdData iterator for
28   * {@link ByteArrayOneMessageToManyIds}
29   *
30   * @param <I> vertexId type
31   * @param <M> message type
32   */
33  public class OneMessageToManyIdsIterator<I extends WritableComparable,
34      M extends Writable> implements VertexIdMessageIterator<I, M> {
35    /** VertexIdMessages object to iterate over */
36    private final ByteArrayOneMessageToManyIds<I, M> vertexIdMessages;
37    /** Reader of the serialized edges */
38    private final ExtendedDataInput extendedDataInput;
39  
40    /** Current vertex Id*/
41    private I vertexId;
42    /** Current message */
43    private M msg;
44    /** Counts of ids left to read before next message */
45    private int idsToRead = 0;
46    /** Size of message read */
47    private int msgSize = 0;
48    /** Is current message newly read */
49    private boolean newMessage;
50  
51    /**
52     * Constructor
53     *
54     * @param vertexIdMessages vertexId messages object to iterate over
55     */
56    public OneMessageToManyIdsIterator(
57        final ByteArrayOneMessageToManyIds<I, M> vertexIdMessages) {
58      this.vertexIdMessages = vertexIdMessages;
59      this.extendedDataInput = vertexIdMessages.getConf()
60          .createExtendedDataInput(vertexIdMessages.extendedDataOutput);
61    }
62  
63    @Override
64    public I getCurrentVertexId() {
65      return vertexId;
66    }
67  
68    @Override
69    public M getCurrentMessage() {
70      return getCurrentData();
71    }
72  
73    @Override
74    public M getCurrentData() {
75      return msg;
76    }
77  
78    @Override
79    public M releaseCurrentData() {
80      M releasedData = msg;
81      msg = null;
82      return releasedData;
83    }
84  
85    @Override
86    public I releaseCurrentVertexId() {
87      I releasedVertexId = vertexId;
88      vertexId = null;
89      return releasedVertexId;
90    }
91  
92    @Override
93    public boolean hasNext() {
94      return extendedDataInput.available() > 0;
95    }
96  
97    /**
98     * Properly initialize vertexId & msg object before calling next()
99     */
100   private void initialize() {
101     if (vertexId == null) {
102       vertexId = vertexIdMessages.getConf().createVertexId();
103     }
104     if (msg == null) {
105       msg = vertexIdMessages.createData();
106     }
107   }
108 
109   @Override
110   public void next() {
111     initialize();
112     try {
113       if (idsToRead == 0) {
114         newMessage = true; // a new message is read
115         int initial = extendedDataInput.getPos();
116         msg.readFields(extendedDataInput);
117         msgSize = extendedDataInput.getPos() - initial;
118         idsToRead = extendedDataInput.readInt();
119       } else {
120         newMessage = false; // same as previous message
121       }
122       vertexId.readFields(extendedDataInput);
123       idsToRead -= 1;
124     } catch (IOException e) {
125       throw new IllegalStateException("next: IOException", e);
126     }
127   }
128 
129   @Override
130   public int getCurrentMessageSize() {
131     return getCurrentDataSize();
132   }
133 
134   @Override
135   public int getCurrentDataSize() {
136     return msgSize;
137   }
138 
139   @Override
140   public boolean isNewMessage() {
141     return newMessage;
142   }
143 }