This project has retired. For details please refer to its
Attic page.
IntFloatMessageStore xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.messages.primitives;
20
21 import com.google.common.collect.Lists;
22
23 import it.unimi.dsi.fastutil.ints.Int2FloatMap;
24 import it.unimi.dsi.fastutil.ints.Int2FloatOpenHashMap;
25 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
26 import it.unimi.dsi.fastutil.ints.IntIterator;
27 import it.unimi.dsi.fastutil.objects.ObjectIterator;
28
29 import java.io.DataInput;
30 import java.io.DataOutput;
31 import java.io.IOException;
32 import java.util.Collections;
33 import java.util.List;
34
35 import org.apache.giraph.combiner.MessageCombiner;
36 import org.apache.giraph.comm.messages.MessageStore;
37 import org.apache.giraph.comm.messages.PartitionSplitInfo;
38 import org.apache.giraph.utils.EmptyIterable;
39 import org.apache.giraph.utils.VertexIdMessageIterator;
40 import org.apache.giraph.utils.VertexIdMessages;
41 import org.apache.hadoop.io.FloatWritable;
42 import org.apache.hadoop.io.IntWritable;
43
44
45
46
47
48
49
50 public class IntFloatMessageStore
51 implements MessageStore<IntWritable, FloatWritable> {
52
53 private final Int2ObjectOpenHashMap<Int2FloatOpenHashMap> map;
54
55 private final
56 MessageCombiner<? super IntWritable, FloatWritable> messageCombiner;
57
58 private final PartitionSplitInfo<IntWritable> partitionInfo;
59
60
61
62
63
64
65
66 public IntFloatMessageStore(
67 PartitionSplitInfo<IntWritable> partitionInfo,
68 MessageCombiner<? super IntWritable, FloatWritable> messageCombiner
69 ) {
70 this.partitionInfo = partitionInfo;
71 this.messageCombiner = messageCombiner;
72
73 map = new Int2ObjectOpenHashMap<Int2FloatOpenHashMap>();
74 for (int partitionId : partitionInfo.getPartitionIds()) {
75 Int2FloatOpenHashMap partitionMap = new Int2FloatOpenHashMap(
76 (int) partitionInfo.getPartitionVertexCount(partitionId));
77 map.put(partitionId, partitionMap);
78 }
79 }
80
81 @Override
82 public boolean isPointerListEncoding() {
83 return false;
84 }
85
86
87
88
89
90
91
92 private Int2FloatOpenHashMap getPartitionMap(IntWritable vertexId) {
93 return map.get(partitionInfo.getPartitionId(vertexId));
94 }
95
96 @Override
97 public void addPartitionMessages(int partitionId,
98 VertexIdMessages<IntWritable, FloatWritable> messages) {
99 IntWritable reusableVertexId = new IntWritable();
100 FloatWritable reusableMessage = new FloatWritable();
101 FloatWritable reusableCurrentMessage = new FloatWritable();
102
103 Int2FloatOpenHashMap partitionMap = map.get(partitionId);
104 synchronized (partitionMap) {
105 VertexIdMessageIterator<IntWritable, FloatWritable>
106 iterator = messages.getVertexIdMessageIterator();
107 while (iterator.hasNext()) {
108 iterator.next();
109 int vertexId = iterator.getCurrentVertexId().get();
110 float message = iterator.getCurrentMessage().get();
111 if (partitionMap.containsKey(vertexId)) {
112 reusableVertexId.set(vertexId);
113 reusableMessage.set(message);
114 reusableCurrentMessage.set(partitionMap.get(vertexId));
115 messageCombiner.combine(reusableVertexId, reusableCurrentMessage,
116 reusableMessage);
117 message = reusableCurrentMessage.get();
118 }
119
120 partitionMap.put(vertexId, message);
121 }
122 }
123 }
124
125 @Override
126 public void addMessage(
127 IntWritable vertexId,
128 FloatWritable message
129 ) throws IOException {
130 Int2FloatOpenHashMap partitionMap = getPartitionMap(vertexId);
131 synchronized (partitionMap) {
132 float originalValue = partitionMap.get(vertexId.get());
133 FloatWritable originalMessage = new FloatWritable(originalValue);
134 messageCombiner.combine(vertexId, originalMessage, message);
135 partitionMap.put(vertexId.get(), originalMessage.get());
136 }
137 }
138
139 @Override
140 public void finalizeStore() {
141 }
142
143 @Override
144 public void clearPartition(int partitionId) {
145 map.get(partitionId).clear();
146 }
147
148 @Override
149 public boolean hasMessagesForVertex(IntWritable vertexId) {
150 return getPartitionMap(vertexId).containsKey(vertexId.get());
151 }
152
153 @Override
154 public boolean hasMessagesForPartition(int partitionId) {
155 Int2FloatOpenHashMap partitionMessages = map.get(partitionId);
156 return partitionMessages != null && !partitionMessages.isEmpty();
157 }
158
159 @Override
160 public Iterable<FloatWritable> getVertexMessages(
161 IntWritable vertexId) {
162 Int2FloatOpenHashMap partitionMap = getPartitionMap(vertexId);
163 if (!partitionMap.containsKey(vertexId.get())) {
164 return EmptyIterable.get();
165 } else {
166 return Collections.singleton(
167 new FloatWritable(partitionMap.get(vertexId.get())));
168 }
169 }
170
171 @Override
172 public void clearVertexMessages(IntWritable vertexId) {
173 getPartitionMap(vertexId).remove(vertexId.get());
174 }
175
176 @Override
177 public void clearAll() {
178 map.clear();
179 }
180
181 @Override
182 public Iterable<IntWritable> getPartitionDestinationVertices(
183 int partitionId) {
184 Int2FloatOpenHashMap partitionMap = map.get(partitionId);
185 List<IntWritable> vertices =
186 Lists.newArrayListWithCapacity(partitionMap.size());
187 IntIterator iterator = partitionMap.keySet().iterator();
188 while (iterator.hasNext()) {
189 vertices.add(new IntWritable(iterator.nextInt()));
190 }
191 return vertices;
192 }
193
194 @Override
195 public void writePartition(DataOutput out,
196 int partitionId) throws IOException {
197 Int2FloatOpenHashMap partitionMap = map.get(partitionId);
198 out.writeInt(partitionMap.size());
199 ObjectIterator<Int2FloatMap.Entry> iterator =
200 partitionMap.int2FloatEntrySet().fastIterator();
201 while (iterator.hasNext()) {
202 Int2FloatMap.Entry entry = iterator.next();
203 out.writeInt(entry.getIntKey());
204 out.writeFloat(entry.getFloatValue());
205 }
206 }
207
208 @Override
209 public void readFieldsForPartition(DataInput in,
210 int partitionId) throws IOException {
211 int size = in.readInt();
212 Int2FloatOpenHashMap partitionMap = new Int2FloatOpenHashMap(size);
213 while (size-- > 0) {
214 int vertexId = in.readInt();
215 float message = in.readFloat();
216 partitionMap.put(vertexId, message);
217 }
218 synchronized (map) {
219 map.put(partitionId, partitionMap);
220 }
221 }
222 }