This project has retired. For details please refer to its
Attic page.
ByteArrayMessagesPerVertexStore xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.messages;
20
21 import com.google.common.collect.Iterators;
22
23 import java.io.DataInput;
24 import java.io.DataOutput;
25 import java.io.IOException;
26 import java.util.concurrent.ConcurrentMap;
27
28 import org.apache.giraph.bsp.CentralizedServiceWorker;
29 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
30 import org.apache.giraph.conf.MessageClasses;
31 import org.apache.giraph.factories.MessageValueFactory;
32 import org.apache.giraph.utils.RepresentativeByteStructIterator;
33 import org.apache.giraph.utils.VerboseByteStructMessageWrite;
34 import org.apache.giraph.utils.VertexIdIterator;
35 import org.apache.giraph.utils.VertexIdMessageBytesIterator;
36 import org.apache.giraph.utils.VertexIdMessageIterator;
37 import org.apache.giraph.utils.VertexIdMessages;
38 import org.apache.giraph.utils.WritableUtils;
39 import org.apache.giraph.utils.io.DataInputOutput;
40 import org.apache.hadoop.io.Writable;
41 import org.apache.hadoop.io.WritableComparable;
42
43
44
45
46
47
48
49
50
51 public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
52 M extends Writable> extends SimpleMessageStore<I, M, DataInputOutput> {
53
54
55
56
57
58
59
60 public ByteArrayMessagesPerVertexStore(
61 MessageValueFactory<M> messageValueFactory,
62 PartitionSplitInfo<I> partitionInfo,
63 ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
64 super(messageValueFactory, partitionInfo, config);
65 }
66
67 @Override
68 public boolean isPointerListEncoding() {
69 return false;
70 }
71
72
73
74
75
76
77
78
79
80
81 private DataInputOutput getDataInputOutput(
82 ConcurrentMap<I, DataInputOutput> partitionMap,
83 VertexIdIterator<I> iterator) {
84 DataInputOutput dataInputOutput =
85 partitionMap.get(iterator.getCurrentVertexId());
86 if (dataInputOutput == null) {
87 DataInputOutput newDataOutput = config.createMessagesInputOutput();
88 dataInputOutput = partitionMap.putIfAbsent(
89 iterator.releaseCurrentVertexId(), newDataOutput);
90 if (dataInputOutput == null) {
91 dataInputOutput = newDataOutput;
92 }
93 }
94 return dataInputOutput;
95 }
96
97 @Override
98 public void addPartitionMessages(
99 int partitionId, VertexIdMessages<I, M> messages) {
100 ConcurrentMap<I, DataInputOutput> partitionMap =
101 getOrCreatePartitionMap(partitionId);
102 VertexIdMessageBytesIterator<I, M> vertexIdMessageBytesIterator =
103 messages.getVertexIdMessageBytesIterator();
104
105
106
107
108
109 if (vertexIdMessageBytesIterator != null) {
110 while (vertexIdMessageBytesIterator.hasNext()) {
111 vertexIdMessageBytesIterator.next();
112 DataInputOutput dataInputOutput =
113 getDataInputOutput(partitionMap, vertexIdMessageBytesIterator);
114
115 synchronized (dataInputOutput) {
116 vertexIdMessageBytesIterator.writeCurrentMessageBytes(
117 dataInputOutput.getDataOutput());
118 }
119 }
120 } else {
121 try {
122 VertexIdMessageIterator<I, M> vertexIdMessageIterator =
123 messages.getVertexIdMessageIterator();
124 while (vertexIdMessageIterator.hasNext()) {
125 vertexIdMessageIterator.next();
126 DataInputOutput dataInputOutput =
127 getDataInputOutput(partitionMap, vertexIdMessageIterator);
128
129 synchronized (dataInputOutput) {
130 VerboseByteStructMessageWrite.verboseWriteCurrentMessage(
131 vertexIdMessageIterator, dataInputOutput.getDataOutput());
132 }
133 }
134 } catch (IOException e) {
135 throw new RuntimeException("addPartitionMessages: IOException while" +
136 " adding messages for a partition: " + e);
137 }
138 }
139 }
140
141 @Override
142 public void addMessage(I vertexId, M message) throws IOException {
143 ConcurrentMap<I, DataInputOutput> partitionMap =
144 getOrCreatePartitionMap(getPartitionId(vertexId));
145 DataInputOutput dataInputOutput = partitionMap.get(vertexId);
146 if (dataInputOutput == null) {
147 DataInputOutput newDataOutput = config.createMessagesInputOutput();
148 I copyId = WritableUtils.createCopy(vertexId);
149 dataInputOutput = partitionMap.putIfAbsent(copyId, newDataOutput);
150 if (dataInputOutput == null) {
151 dataInputOutput = newDataOutput;
152 }
153 }
154
155 synchronized (dataInputOutput) {
156 VerboseByteStructMessageWrite.verboseWriteCurrentMessage(
157 vertexId, message, dataInputOutput.getDataOutput());
158 }
159 }
160
161 @Override
162 protected Iterable<M> getMessagesAsIterable(
163 DataInputOutput dataInputOutput) {
164 return new MessagesIterable<M>(dataInputOutput, messageValueFactory);
165 }
166
167 @Override
168 protected int getNumberOfMessagesIn(
169 ConcurrentMap<I, DataInputOutput> partitionMap) {
170 int numberOfMessages = 0;
171 for (DataInputOutput dataInputOutput : partitionMap.values()) {
172 numberOfMessages += Iterators.size(
173 new RepresentativeByteStructIterator<M>(
174 dataInputOutput.createDataInput()) {
175 @Override
176 protected M createWritable() {
177 return messageValueFactory.newInstance();
178 }
179 });
180 }
181 return numberOfMessages;
182 }
183
184 @Override
185 protected void writeMessages(DataInputOutput dataInputOutput,
186 DataOutput out) throws IOException {
187 dataInputOutput.write(out);
188 }
189
190 @Override
191 protected DataInputOutput readFieldsForMessages(DataInput in) throws
192 IOException {
193 DataInputOutput dataInputOutput = config.createMessagesInputOutput();
194 dataInputOutput.readFields(in);
195 return dataInputOutput;
196 }
197
198
199
200
201
202
203
204
205
206
207 public static <I extends WritableComparable, M extends Writable>
208 MessageStoreFactory<I, M, MessageStore<I, M>> newFactory(
209 CentralizedServiceWorker<I, ?, ?> service,
210 ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
211 return new Factory<I, M>(service, config);
212 }
213
214
215
216
217
218
219
220 public static class Factory<I extends WritableComparable, M extends Writable>
221 implements MessageStoreFactory<I, M, MessageStore<I, M>> {
222
223 private PartitionSplitInfo<I> partitionInfo;
224
225 private ImmutableClassesGiraphConfiguration<I, ?, ?> config;
226
227
228 public Factory() { }
229
230
231
232
233
234 public Factory(
235 PartitionSplitInfo<I> partitionInfo,
236 ImmutableClassesGiraphConfiguration<I, ?, ?> config
237 ) {
238 this.partitionInfo = partitionInfo;
239 this.config = config;
240 }
241
242 @Override
243 public MessageStore<I, M> newStore(
244 MessageClasses<I, M> messageClasses) {
245 return new ByteArrayMessagesPerVertexStore<I, M>(
246 messageClasses.createMessageValueFactory(config),
247 partitionInfo, config);
248 }
249
250 @Override
251 public void initialize(PartitionSplitInfo<I> partitionInfo,
252 ImmutableClassesGiraphConfiguration<I, ?, ?> conf) {
253 this.partitionInfo = partitionInfo;
254 this.config = conf;
255 }
256 }
257
258 }