This project has retired. For details please refer to its
Attic page.
IdByteArrayMessageStore xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.comm.messages.primitives;
19
20 import com.google.common.collect.Lists;
21
22 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
23
24 import java.io.DataInput;
25 import java.io.DataOutput;
26 import java.io.IOException;
27 import java.util.Iterator;
28 import java.util.List;
29
30 import org.apache.giraph.comm.messages.MessageStore;
31 import org.apache.giraph.comm.messages.MessagesIterable;
32 import org.apache.giraph.comm.messages.PartitionSplitInfo;
33 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
34 import org.apache.giraph.factories.MessageValueFactory;
35 import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
36 import org.apache.giraph.types.ops.TypeOpsUtils;
37 import org.apache.giraph.types.ops.collections.Basic2ObjectMap;
38 import org.apache.giraph.types.ops.collections.WritableWriter;
39 import org.apache.giraph.utils.EmptyIterable;
40 import org.apache.giraph.utils.VerboseByteStructMessageWrite;
41 import org.apache.giraph.utils.VertexIdMessageBytesIterator;
42 import org.apache.giraph.utils.VertexIdMessageIterator;
43 import org.apache.giraph.utils.VertexIdMessages;
44 import org.apache.giraph.utils.io.DataInputOutput;
45 import org.apache.hadoop.io.Writable;
46 import org.apache.hadoop.io.WritableComparable;
47
48
49
50
51
52
53
54
55
56
57 public class IdByteArrayMessageStore<I extends WritableComparable,
58 M extends Writable> implements MessageStore<I, M> {
59
60 protected final MessageValueFactory<M> messageValueFactory;
61
62 private final Int2ObjectOpenHashMap<Basic2ObjectMap<I, DataInputOutput>> map;
63
64 private final PartitionSplitInfo<I> partitionInfo;
65
66 private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
67
68 private final PrimitiveIdTypeOps<I> idTypeOps;
69
70 private final WritableWriter<DataInputOutput>
71 dataInputOutputWriter = new WritableWriter<DataInputOutput>() {
72 @Override
73 public DataInputOutput readFields(DataInput in) throws IOException {
74 DataInputOutput dataInputOutput = config.createMessagesInputOutput();
75 dataInputOutput.readFields(in);
76 return dataInputOutput;
77 }
78
79 @Override
80 public void write(DataOutput out, DataInputOutput value)
81 throws IOException {
82 value.write(out);
83 }
84 };
85
86
87
88
89
90
91
92
93 public IdByteArrayMessageStore(MessageValueFactory<M> messageValueFactory,
94 PartitionSplitInfo<I> partitionInfo,
95 ImmutableClassesGiraphConfiguration<I, ?, ?> config
96 ) {
97 this.messageValueFactory = messageValueFactory;
98 this.partitionInfo = partitionInfo;
99 this.config = config;
100
101 idTypeOps = TypeOpsUtils.getPrimitiveIdTypeOps(config.getVertexIdClass());
102
103 map = new Int2ObjectOpenHashMap<Basic2ObjectMap<I, DataInputOutput>>();
104 for (int partitionId : partitionInfo.getPartitionIds()) {
105 int capacity = Math.max(10,
106 (int) partitionInfo.getPartitionVertexCount(partitionId));
107 Basic2ObjectMap<I, DataInputOutput> partitionMap =
108 idTypeOps.create2ObjectOpenHashMap(
109 capacity,
110 dataInputOutputWriter);
111
112 map.put(partitionId, partitionMap);
113 }
114 }
115
116
117
118
119
120
121
122 private Basic2ObjectMap<I, DataInputOutput> getPartitionMap(I vertexId) {
123 return map.get(partitionInfo.getPartitionId(vertexId));
124 }
125
126
127
128
129
130
131
132
133 private DataInputOutput getDataInputOutput(
134 Basic2ObjectMap<I, DataInputOutput> partitionMap,
135 I vertexId) {
136 DataInputOutput dataInputOutput = partitionMap.get(vertexId);
137 if (dataInputOutput == null) {
138 dataInputOutput = config.createMessagesInputOutput();
139 partitionMap.put(vertexId, dataInputOutput);
140 }
141 return dataInputOutput;
142 }
143
144 @Override
145 public void addPartitionMessages(int partitionId,
146 VertexIdMessages<I, M> messages) {
147 Basic2ObjectMap<I, DataInputOutput> partitionMap = map.get(partitionId);
148 synchronized (partitionMap) {
149 VertexIdMessageBytesIterator<I, M> vertexIdMessageBytesIterator =
150 messages.getVertexIdMessageBytesIterator();
151
152
153
154
155
156 if (vertexIdMessageBytesIterator != null) {
157 while (vertexIdMessageBytesIterator.hasNext()) {
158 vertexIdMessageBytesIterator.next();
159 DataInputOutput dataInputOutput = getDataInputOutput(
160 partitionMap, vertexIdMessageBytesIterator.getCurrentVertexId());
161 vertexIdMessageBytesIterator.writeCurrentMessageBytes(
162 dataInputOutput.getDataOutput());
163 }
164 } else {
165 try {
166 VertexIdMessageIterator<I, M> iterator =
167 messages.getVertexIdMessageIterator();
168 while (iterator.hasNext()) {
169 iterator.next();
170 DataInputOutput dataInputOutput =
171 getDataInputOutput(partitionMap, iterator.getCurrentVertexId());
172
173 VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator,
174 dataInputOutput.getDataOutput());
175 }
176 } catch (IOException e) {
177 throw new RuntimeException("addPartitionMessages: IOException while" +
178 " adding message for a partition: " + e);
179 }
180 }
181 }
182 }
183
184
185
186
187
188
189
190
191 @Override
192 public void addMessage(I vertexId, M message) throws IOException {
193 Basic2ObjectMap<I, DataInputOutput> partitionMap =
194 getPartitionMap(vertexId);
195 synchronized (partitionMap) {
196 DataInputOutput dataInputOutput = getDataInputOutput(
197 partitionMap, vertexId);
198 VerboseByteStructMessageWrite.verboseWriteCurrentMessage(
199 vertexId, message, dataInputOutput.getDataOutput());
200 }
201 }
202
203 @Override
204 public void clearPartition(int partitionId) {
205 map.get(partitionId).clear();
206 }
207
208 @Override
209 public boolean hasMessagesForVertex(I vertexId) {
210 return getPartitionMap(vertexId).containsKey(vertexId);
211 }
212
213 @Override
214 public boolean hasMessagesForPartition(int partitionId) {
215 Basic2ObjectMap<I, DataInputOutput> partitionMessages =
216 map.get(partitionId);
217 return partitionMessages != null && partitionMessages.size() != 0;
218 }
219
220 @Override
221 public Iterable<M> getVertexMessages(I vertexId) {
222 DataInputOutput dataInputOutput = getPartitionMap(vertexId).get(vertexId);
223 if (dataInputOutput == null) {
224 return EmptyIterable.get();
225 } else {
226 return new MessagesIterable<M>(dataInputOutput, messageValueFactory);
227 }
228 }
229
230 @Override
231 public void clearVertexMessages(I vertexId) {
232 getPartitionMap(vertexId).remove(vertexId);
233 }
234
235 @Override
236 public void clearAll() {
237 map.clear();
238 }
239
240 @Override
241 public Iterable<I> getPartitionDestinationVertices(int partitionId) {
242 Basic2ObjectMap<I, DataInputOutput> partitionMap = map.get(partitionId);
243 List<I> vertices = Lists.newArrayListWithCapacity(partitionMap.size());
244 Iterator<I> iterator = partitionMap.fastKeyIterator();
245 while (iterator.hasNext()) {
246 vertices.add(idTypeOps.createCopy(iterator.next()));
247 }
248 return vertices;
249 }
250
251 @Override
252 public void writePartition(DataOutput out, int partitionId)
253 throws IOException {
254 Basic2ObjectMap<I, DataInputOutput> partitionMap = map.get(partitionId);
255 partitionMap.write(out);
256 }
257
258 @Override
259 public void readFieldsForPartition(DataInput in, int partitionId)
260 throws IOException {
261 Basic2ObjectMap<I, DataInputOutput> partitionMap =
262 idTypeOps.create2ObjectOpenHashMap(dataInputOutputWriter);
263 partitionMap.readFields(in);
264 synchronized (map) {
265 map.put(partitionId, partitionMap);
266 }
267 }
268
269 @Override
270 public void finalizeStore() {
271 }
272
273 @Override
274 public boolean isPointerListEncoding() {
275 return false;
276 }
277 }