This project has retired. For details please refer to its
Attic page.
SimpleMessageStore xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.messages;
20
21 import com.google.common.collect.MapMaker;
22 import com.google.common.collect.Maps;
23
24 import java.io.DataInput;
25 import java.io.DataOutput;
26 import java.io.IOException;
27 import java.util.Collections;
28 import java.util.Map;
29 import java.util.concurrent.ConcurrentMap;
30
31 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
32 import org.apache.giraph.factories.MessageValueFactory;
33 import org.apache.hadoop.io.Writable;
34 import org.apache.hadoop.io.WritableComparable;
35
36
37
38
39
40
41
42
43
44
45
46 public abstract class SimpleMessageStore<I extends WritableComparable,
47 M extends Writable, T> implements MessageStore<I, M> {
48
49 protected final MessageValueFactory<M> messageValueFactory;
50
51 protected final PartitionSplitInfo<I> partitionInfo;
52
53 protected final ConcurrentMap<Integer, ConcurrentMap<I, T>> map;
54
55 protected final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
56
57
58
59
60
61
62
63
64 public SimpleMessageStore(
65 MessageValueFactory<M> messageValueFactory,
66 PartitionSplitInfo<I> partitionInfo,
67 ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
68 this.messageValueFactory = messageValueFactory;
69 this.partitionInfo = partitionInfo;
70 this.config = config;
71 map = new MapMaker().concurrencyLevel(
72 config.getNettyServerExecutionConcurrency()).makeMap();
73 }
74
75
76
77
78
79
80
81 protected abstract Iterable<M> getMessagesAsIterable(T messages);
82
83
84
85
86
87
88
89 protected abstract int getNumberOfMessagesIn(
90 ConcurrentMap<I, T> partitionMap);
91
92
93
94
95
96
97
98
99 protected abstract void writeMessages(T messages, DataOutput out) throws
100 IOException;
101
102
103
104
105
106
107
108
109 protected abstract T readFieldsForMessages(DataInput in) throws IOException;
110
111
112
113
114
115
116
117 protected int getPartitionId(I vertexId) {
118 return partitionInfo.getPartitionId(vertexId);
119 }
120
121
122
123
124
125
126
127
128
129 protected ConcurrentMap<I, T> getOrCreatePartitionMap(int partitionId) {
130 ConcurrentMap<I, T> partitionMap = map.get(partitionId);
131 if (partitionMap == null) {
132 ConcurrentMap<I, T> tmpMap = new MapMaker().concurrencyLevel(
133 config.getNettyServerExecutionConcurrency()).makeMap();
134 partitionMap = map.putIfAbsent(partitionId, tmpMap);
135 if (partitionMap == null) {
136 partitionMap = tmpMap;
137 }
138 }
139 return partitionMap;
140 }
141
142 @Override
143 public void finalizeStore() {
144 }
145
146 @Override
147 public Iterable<I> getPartitionDestinationVertices(int partitionId) {
148 ConcurrentMap<I, ?> partitionMap = map.get(partitionId);
149 return (partitionMap == null) ? Collections.<I>emptyList() :
150 partitionMap.keySet();
151 }
152
153 @Override
154 public boolean hasMessagesForVertex(I vertexId) {
155 ConcurrentMap<I, ?> partitionMap =
156 map.get(getPartitionId(vertexId));
157 return partitionMap != null && partitionMap.containsKey(vertexId);
158 }
159
160 @Override
161 public Iterable<M> getVertexMessages(I vertexId) {
162 ConcurrentMap<I, T> partitionMap = map.get(getPartitionId(vertexId));
163 if (partitionMap == null) {
164 return Collections.<M>emptyList();
165 }
166 T messages = partitionMap.get(vertexId);
167 return (messages == null) ? Collections.<M>emptyList() :
168 getMessagesAsIterable(messages);
169 }
170
171 @Override
172 public void writePartition(DataOutput out,
173 int partitionId) throws IOException {
174 ConcurrentMap<I, T> partitionMap = map.get(partitionId);
175 out.writeBoolean(partitionMap != null);
176 if (partitionMap != null) {
177 out.writeInt(partitionMap.size());
178 for (Map.Entry<I, T> entry : partitionMap.entrySet()) {
179 entry.getKey().write(out);
180 writeMessages(entry.getValue(), out);
181 }
182 }
183 }
184
185 @Override
186 public void readFieldsForPartition(DataInput in,
187 int partitionId) throws IOException {
188 if (in.readBoolean()) {
189 ConcurrentMap<I, T> partitionMap = Maps.newConcurrentMap();
190 int numVertices = in.readInt();
191 for (int v = 0; v < numVertices; v++) {
192 I vertexId = config.createVertexId();
193 vertexId.readFields(in);
194 partitionMap.put(vertexId, readFieldsForMessages(in));
195 }
196 map.put(partitionId, partitionMap);
197 }
198 }
199
200 @Override
201 public void clearVertexMessages(I vertexId) {
202 ConcurrentMap<I, ?> partitionMap =
203 map.get(getPartitionId(vertexId));
204 if (partitionMap != null) {
205 partitionMap.remove(vertexId);
206 }
207 }
208
209 @Override
210 public void clearPartition(int partitionId) {
211 map.remove(partitionId);
212 }
213
214 @Override
215 public boolean hasMessagesForPartition(int partitionId) {
216 ConcurrentMap<I, T> partitionMessages = map.get(partitionId);
217 return partitionMessages != null && !partitionMessages.isEmpty();
218 }
219
220 @Override
221 public void clearAll() {
222 map.clear();
223 }
224 }