This project has retired. For details please refer to its
Attic page.
IdOneMessagePerVertexStore xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.comm.messages.primitives;
19
20 import com.google.common.collect.Lists;
21
22 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
23
24 import java.io.DataInput;
25 import java.io.DataOutput;
26 import java.io.IOException;
27 import java.util.Collections;
28 import java.util.Iterator;
29 import java.util.List;
30
31 import org.apache.giraph.combiner.MessageCombiner;
32 import org.apache.giraph.comm.messages.MessageStore;
33 import org.apache.giraph.comm.messages.PartitionSplitInfo;
34 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
35 import org.apache.giraph.factories.MessageValueFactory;
36 import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
37 import org.apache.giraph.types.ops.TypeOpsUtils;
38 import org.apache.giraph.types.ops.collections.Basic2ObjectMap;
39 import org.apache.giraph.types.ops.collections.WritableWriter;
40 import org.apache.giraph.utils.EmptyIterable;
41 import org.apache.giraph.utils.VertexIdMessageIterator;
42 import org.apache.giraph.utils.VertexIdMessages;
43 import org.apache.hadoop.io.Writable;
44 import org.apache.hadoop.io.WritableComparable;
45
46
47
48
49
50
51
52
53
54
55
56
57 public class IdOneMessagePerVertexStore<I extends WritableComparable,
58 M extends Writable> implements MessageStore<I, M> {
59
60 private final Int2ObjectOpenHashMap<Basic2ObjectMap<I, M>> map;
61
62 private final MessageValueFactory<M> messageValueFactory;
63
64 private final MessageCombiner<? super I, M> messageCombiner;
65
66 private final PartitionSplitInfo<I> partitionInfo;
67
68 private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
69
70 private final PrimitiveIdTypeOps<I> idTypeOps;
71
72 private final WritableWriter<M> messageWriter = new WritableWriter<M>() {
73 @Override
74 public M readFields(DataInput in) throws IOException {
75 M message = messageValueFactory.newInstance();
76 message.readFields(in);
77 return message;
78 }
79
80 @Override
81 public void write(DataOutput out, M value) throws IOException {
82 value.write(out);
83 }
84 };
85
86
87
88
89
90
91
92
93
94 public IdOneMessagePerVertexStore(
95 MessageValueFactory<M> messageValueFactory,
96 PartitionSplitInfo<I> partitionInfo,
97 MessageCombiner<? super I, M> messageCombiner,
98 ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
99 this.partitionInfo = partitionInfo;
100 this.config = config;
101 this.messageValueFactory = messageValueFactory;
102 this.messageCombiner = messageCombiner;
103
104 idTypeOps = TypeOpsUtils.getPrimitiveIdTypeOps(config.getVertexIdClass());
105
106 map = new Int2ObjectOpenHashMap<>();
107 for (int partitionId : partitionInfo.getPartitionIds()) {
108 Basic2ObjectMap<I, M> partitionMap = idTypeOps.create2ObjectOpenHashMap(
109 Math.max(10, (int) partitionInfo.getPartitionVertexCount(partitionId)),
110 messageWriter
111 );
112 map.put(partitionId, partitionMap);
113 }
114 }
115
116
117
118
119
120
121
122 private Basic2ObjectMap<I, M> getPartitionMap(I vertexId) {
123 return map.get(partitionInfo.getPartitionId(vertexId));
124 }
125
126 @Override
127 public void addPartitionMessages(
128 int partitionId,
129 VertexIdMessages<I, M> messages) {
130 Basic2ObjectMap<I, M> partitionMap = map.get(partitionId);
131 synchronized (partitionMap) {
132 VertexIdMessageIterator<I, M>
133 iterator = messages.getVertexIdMessageIterator();
134
135
136 while (iterator.hasNext()) {
137 iterator.next();
138 I vertexId = iterator.getCurrentVertexId();
139 M currentMessage =
140 partitionMap.get(iterator.getCurrentVertexId());
141 if (currentMessage == null) {
142 M newMessage = messageCombiner.createInitialMessage();
143 currentMessage = partitionMap.put(
144 iterator.getCurrentVertexId(), newMessage);
145 if (currentMessage == null) {
146 currentMessage = newMessage;
147 }
148 }
149 messageCombiner.combine(vertexId, currentMessage,
150 iterator.getCurrentMessage());
151 }
152 }
153 }
154
155
156
157
158
159
160
161
162 @Override
163 public void addMessage(I vertexId, M message) throws IOException {
164 Basic2ObjectMap<I, M> partitionMap = getPartitionMap(vertexId);
165 synchronized (partitionMap) {
166 M currentMessage = partitionMap.get(vertexId);
167 if (currentMessage == null) {
168 M newMessage = messageCombiner.createInitialMessage();
169 currentMessage = partitionMap.put(vertexId, newMessage);
170 if (currentMessage == null) {
171 currentMessage = newMessage;
172 }
173 }
174 messageCombiner.combine(vertexId, currentMessage, message);
175 }
176 }
177
178 @Override
179 public void clearPartition(int partitionId) {
180 map.get(partitionId).clear();
181 }
182
183 @Override
184 public boolean hasMessagesForVertex(I vertexId) {
185 return getPartitionMap(vertexId).containsKey(vertexId);
186 }
187
188 @Override
189 public boolean hasMessagesForPartition(int partitionId) {
190 Basic2ObjectMap<I, M> partitionMessages = map.get(partitionId);
191 return partitionMessages != null && partitionMessages.size() != 0;
192 }
193
194 @Override
195 public Iterable<M> getVertexMessages(I vertexId) {
196 Basic2ObjectMap<I, M> partitionMap = getPartitionMap(vertexId);
197 if (!partitionMap.containsKey(vertexId)) {
198 return EmptyIterable.get();
199 } else {
200 return Collections.singleton(partitionMap.get(vertexId));
201 }
202 }
203
204 @Override
205 public void clearVertexMessages(I vertexId) {
206 getPartitionMap(vertexId).remove(vertexId);
207 }
208
209 @Override
210 public void clearAll() {
211 map.clear();
212 }
213
214 @Override
215 public Iterable<I> getPartitionDestinationVertices(
216 int partitionId) {
217 Basic2ObjectMap<I, M> partitionMap = map.get(partitionId);
218 List<I> vertices =
219 Lists.newArrayListWithCapacity(partitionMap.size());
220 Iterator<I> iterator = partitionMap.fastKeyIterator();
221 while (iterator.hasNext()) {
222 vertices.add(idTypeOps.createCopy(iterator.next()));
223 }
224 return vertices;
225 }
226
227 @Override
228 public void writePartition(DataOutput out,
229 int partitionId) throws IOException {
230 Basic2ObjectMap<I, M> partitionMap = map.get(partitionId);
231 partitionMap.write(out);
232 }
233
234 @Override
235 public void readFieldsForPartition(DataInput in,
236 int partitionId) throws IOException {
237 Basic2ObjectMap<I, M> partitionMap = idTypeOps.create2ObjectOpenHashMap(
238 messageWriter);
239 partitionMap.readFields(in);
240 synchronized (map) {
241 map.put(partitionId, partitionMap);
242 }
243 }
244
245 @Override
246 public void finalizeStore() {
247 }
248
249 @Override
250 public boolean isPointerListEncoding() {
251 return false;
252 }
253 }