This project has retired. For details please refer to its
Attic page.
LongDoubleMessageStore xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.messages.primitives;
20
21 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
22 import it.unimi.dsi.fastutil.longs.Long2DoubleMap;
23 import it.unimi.dsi.fastutil.longs.Long2DoubleOpenHashMap;
24 import it.unimi.dsi.fastutil.longs.LongIterator;
25 import it.unimi.dsi.fastutil.objects.ObjectIterator;
26
27 import java.io.DataInput;
28 import java.io.DataOutput;
29 import java.io.IOException;
30 import java.util.Collections;
31 import java.util.List;
32
33 import org.apache.giraph.combiner.MessageCombiner;
34 import org.apache.giraph.comm.messages.MessageStore;
35 import org.apache.giraph.comm.messages.PartitionSplitInfo;
36 import org.apache.giraph.utils.EmptyIterable;
37 import org.apache.giraph.utils.VertexIdMessageIterator;
38 import org.apache.giraph.utils.VertexIdMessages;
39 import org.apache.hadoop.io.DoubleWritable;
40 import org.apache.hadoop.io.LongWritable;
41
42 import com.google.common.collect.Lists;
43
44
45
46
47
48
49
50 public class LongDoubleMessageStore
51 implements MessageStore<LongWritable, DoubleWritable> {
52
53 private final Int2ObjectOpenHashMap<Long2DoubleOpenHashMap> map;
54
55 private final
56 MessageCombiner<? super LongWritable, DoubleWritable> messageCombiner;
57
58 private final PartitionSplitInfo<LongWritable> partitionInfo;
59
60
61
62
63
64
65
66 public LongDoubleMessageStore(
67 PartitionSplitInfo<LongWritable> partitionInfo,
68 MessageCombiner<? super LongWritable, DoubleWritable> messageCombiner
69 ) {
70 this.partitionInfo = partitionInfo;
71 this.messageCombiner = messageCombiner;
72
73 map = new Int2ObjectOpenHashMap<Long2DoubleOpenHashMap>();
74 for (int partitionId : partitionInfo.getPartitionIds()) {
75 Long2DoubleOpenHashMap partitionMap = new Long2DoubleOpenHashMap(
76 (int) partitionInfo.getPartitionVertexCount(partitionId));
77 map.put(partitionId, partitionMap);
78 }
79 }
80
81 @Override
82 public boolean isPointerListEncoding() {
83 return false;
84 }
85
86
87
88
89
90
91
92 private Long2DoubleOpenHashMap getPartitionMap(LongWritable vertexId) {
93 return map.get(partitionInfo.getPartitionId(vertexId));
94 }
95
96 @Override
97 public void addPartitionMessages(int partitionId,
98 VertexIdMessages<LongWritable, DoubleWritable> messages) {
99 LongWritable reusableVertexId = new LongWritable();
100 DoubleWritable reusableMessage = new DoubleWritable();
101 DoubleWritable reusableCurrentMessage = new DoubleWritable();
102
103 Long2DoubleOpenHashMap partitionMap = map.get(partitionId);
104 synchronized (partitionMap) {
105 VertexIdMessageIterator<LongWritable, DoubleWritable> iterator =
106 messages.getVertexIdMessageIterator();
107 while (iterator.hasNext()) {
108 iterator.next();
109 long vertexId = iterator.getCurrentVertexId().get();
110 double message = iterator.getCurrentMessage().get();
111 if (partitionMap.containsKey(vertexId)) {
112 reusableVertexId.set(vertexId);
113 reusableMessage.set(message);
114 reusableCurrentMessage.set(partitionMap.get(vertexId));
115 messageCombiner.combine(reusableVertexId, reusableCurrentMessage,
116 reusableMessage);
117 message = reusableCurrentMessage.get();
118 }
119
120 partitionMap.put(vertexId, message);
121 }
122 }
123 }
124
125 @Override
126 public void addMessage(
127 LongWritable vertexId,
128 DoubleWritable message
129 ) throws IOException {
130 Long2DoubleOpenHashMap partitionMap = getPartitionMap(vertexId);
131 synchronized (partitionMap) {
132 double originalValue = partitionMap.get(vertexId.get());
133 DoubleWritable originalMessage = new DoubleWritable(originalValue);
134 messageCombiner.combine(vertexId, originalMessage, message);
135 partitionMap.put(vertexId.get(), originalMessage.get());
136 }
137 }
138
139 @Override
140 public void finalizeStore() {
141 }
142
143 @Override
144 public void clearPartition(int partitionId) {
145 map.get(partitionId).clear();
146 }
147
148 @Override
149 public boolean hasMessagesForVertex(LongWritable vertexId) {
150 return getPartitionMap(vertexId).containsKey(vertexId.get());
151 }
152
153 @Override
154 public boolean hasMessagesForPartition(int partitionId) {
155 Long2DoubleOpenHashMap partitionMessages = map.get(partitionId);
156 return partitionMessages != null && !partitionMessages.isEmpty();
157 }
158
159 @Override
160 public Iterable<DoubleWritable> getVertexMessages(
161 LongWritable vertexId) {
162 Long2DoubleOpenHashMap partitionMap = getPartitionMap(vertexId);
163 if (!partitionMap.containsKey(vertexId.get())) {
164 return EmptyIterable.get();
165 } else {
166 return Collections.singleton(
167 new DoubleWritable(partitionMap.get(vertexId.get())));
168 }
169 }
170
171 @Override
172 public void clearVertexMessages(LongWritable vertexId) {
173 getPartitionMap(vertexId).remove(vertexId.get());
174 }
175
176 @Override
177 public void clearAll() {
178 map.clear();
179 }
180
181 @Override
182 public Iterable<LongWritable> getPartitionDestinationVertices(
183 int partitionId) {
184 Long2DoubleOpenHashMap partitionMap = map.get(partitionId);
185 List<LongWritable> vertices =
186 Lists.newArrayListWithCapacity(partitionMap.size());
187 LongIterator iterator = partitionMap.keySet().iterator();
188 while (iterator.hasNext()) {
189 vertices.add(new LongWritable(iterator.nextLong()));
190 }
191 return vertices;
192 }
193
194 @Override
195 public void writePartition(DataOutput out,
196 int partitionId) throws IOException {
197 Long2DoubleOpenHashMap partitionMap = map.get(partitionId);
198 out.writeInt(partitionMap.size());
199 ObjectIterator<Long2DoubleMap.Entry> iterator =
200 partitionMap.long2DoubleEntrySet().fastIterator();
201 while (iterator.hasNext()) {
202 Long2DoubleMap.Entry entry = iterator.next();
203 out.writeLong(entry.getLongKey());
204 out.writeDouble(entry.getDoubleValue());
205 }
206 }
207
208 @Override
209 public void readFieldsForPartition(DataInput in,
210 int partitionId) throws IOException {
211 int size = in.readInt();
212 Long2DoubleOpenHashMap partitionMap = new Long2DoubleOpenHashMap(size);
213 while (size-- > 0) {
214 long vertexId = in.readLong();
215 double message = in.readDouble();
216 partitionMap.put(vertexId, message);
217 }
218 synchronized (map) {
219 map.put(partitionId, partitionMap);
220 }
221 }
222 }