This project has retired. For details please refer to its
Attic page.
OneMessageToManyIdsIterator xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.utils;
20
21 import org.apache.hadoop.io.Writable;
22 import org.apache.hadoop.io.WritableComparable;
23
24 import java.io.IOException;
25
26
27
28
29
30
31
32
33 public class OneMessageToManyIdsIterator<I extends WritableComparable,
34 M extends Writable> implements VertexIdMessageIterator<I, M> {
35
36 private final ByteArrayOneMessageToManyIds<I, M> vertexIdMessages;
37
38 private final ExtendedDataInput extendedDataInput;
39
40
41 private I vertexId;
42
43 private M msg;
44
45 private int idsToRead = 0;
46
47 private int msgSize = 0;
48
49 private boolean newMessage;
50
51
52
53
54
55
56 public OneMessageToManyIdsIterator(
57 final ByteArrayOneMessageToManyIds<I, M> vertexIdMessages) {
58 this.vertexIdMessages = vertexIdMessages;
59 this.extendedDataInput = vertexIdMessages.getConf()
60 .createExtendedDataInput(vertexIdMessages.extendedDataOutput);
61 }
62
63 @Override
64 public I getCurrentVertexId() {
65 return vertexId;
66 }
67
68 @Override
69 public M getCurrentMessage() {
70 return getCurrentData();
71 }
72
73 @Override
74 public M getCurrentData() {
75 return msg;
76 }
77
78 @Override
79 public M releaseCurrentData() {
80 M releasedData = msg;
81 msg = null;
82 return releasedData;
83 }
84
85 @Override
86 public I releaseCurrentVertexId() {
87 I releasedVertexId = vertexId;
88 vertexId = null;
89 return releasedVertexId;
90 }
91
92 @Override
93 public boolean hasNext() {
94 return extendedDataInput.available() > 0;
95 }
96
97
98
99
100 private void initialize() {
101 if (vertexId == null) {
102 vertexId = vertexIdMessages.getConf().createVertexId();
103 }
104 if (msg == null) {
105 msg = vertexIdMessages.createData();
106 }
107 }
108
109 @Override
110 public void next() {
111 initialize();
112 try {
113 if (idsToRead == 0) {
114 newMessage = true;
115 int initial = extendedDataInput.getPos();
116 msg.readFields(extendedDataInput);
117 msgSize = extendedDataInput.getPos() - initial;
118 idsToRead = extendedDataInput.readInt();
119 } else {
120 newMessage = false;
121 }
122 vertexId.readFields(extendedDataInput);
123 idsToRead -= 1;
124 } catch (IOException e) {
125 throw new IllegalStateException("next: IOException", e);
126 }
127 }
128
129 @Override
130 public int getCurrentMessageSize() {
131 return getCurrentDataSize();
132 }
133
134 @Override
135 public int getCurrentDataSize() {
136 return msgSize;
137 }
138
139 @Override
140 public boolean isNewMessage() {
141 return newMessage;
142 }
143 }