This project has retired. For details please refer to its
Attic page.
AsyncMessageStoreWrapper xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.comm.messages.queue;
19
20 import it.unimi.dsi.fastutil.ints.Int2IntArrayMap;
21 import it.unimi.dsi.fastutil.ints.Int2IntMap;
22
23 import java.io.DataInput;
24 import java.io.DataOutput;
25 import java.io.IOException;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.LinkedBlockingQueue;
30 import java.util.concurrent.Semaphore;
31
32 import org.apache.giraph.comm.messages.MessageStore;
33 import org.apache.giraph.utils.ThreadUtils;
34 import org.apache.giraph.utils.VertexIdMessages;
35 import org.apache.hadoop.io.Writable;
36 import org.apache.hadoop.io.WritableComparable;
37 import org.apache.log4j.Logger;
38
39
40
41
42
43
44
45
46
47
48 public final class AsyncMessageStoreWrapper<I extends WritableComparable,
49 M extends Writable> implements MessageStore<I, M> {
50
51
52 private static final Logger LOG =
53 Logger.getLogger(AsyncMessageStoreWrapper.class);
54
55
56 private static final PartitionMessage SHUTDOWN_QUEUE_MESSAGE =
57 new PartitionMessage(-1, null);
58
59 private static final PartitionMessage CLEAR_QUEUE_MESSAGE =
60 new PartitionMessage(-1, null);
61
62 private static final ExecutorService EXECUTOR_SERVICE =
63 Executors.newCachedThreadPool(
64 ThreadUtils.createThreadFactory("AsyncMessageStoreWrapper-%d"));
65
66
67 private final int threadsCount;
68
69 private final BlockingQueue<PartitionMessage<I, M>>[] queues;
70
71 private final Int2IntMap partition2Queue;
72
73 private Semaphore completionSemaphore;
74
75 private final MessageStore<I, M> store;
76
77
78
79
80
81
82
83
84
85
86
87
88
89 public AsyncMessageStoreWrapper(MessageStore<I, M> store,
90 Iterable<Integer> partitions,
91 int threadCount) {
92 this.store = store;
93 this.threadsCount = threadCount;
94 completionSemaphore = new Semaphore(1 - threadsCount);
95 queues = new BlockingQueue[threadsCount];
96 partition2Queue = new Int2IntArrayMap();
97 LOG.info("AsyncMessageStoreWrapper enabled. Threads= " + threadsCount);
98
99 for (int i = 0; i < threadsCount; i++) {
100 queues[i] = new LinkedBlockingQueue<>();
101 EXECUTOR_SERVICE.submit(new MessageStoreQueueWorker(queues[i]));
102 }
103
104 int cnt = 0;
105 for (int partitionId : partitions) {
106 partition2Queue.put(partitionId, cnt++ % threadsCount);
107 }
108
109 }
110
111 @Override
112 public boolean isPointerListEncoding() {
113 return store.isPointerListEncoding();
114 }
115
116 @Override
117 public Iterable<M> getVertexMessages(I vertexId) {
118 return store.getVertexMessages(vertexId);
119 }
120
121 @Override
122 public void clearVertexMessages(I vertexId) {
123 store.clearVertexMessages(vertexId);
124 }
125
126 @Override
127 public void clearAll() {
128 try {
129 for (BlockingQueue<PartitionMessage<I, M>> queue : queues) {
130 queue.put(SHUTDOWN_QUEUE_MESSAGE);
131 }
132 completionSemaphore.acquire();
133 } catch (InterruptedException e) {
134 throw new RuntimeException(e);
135 }
136 store.clearAll();
137 }
138
139 @Override
140 public boolean hasMessagesForVertex(I vertexId) {
141 return store.hasMessagesForVertex(vertexId);
142 }
143
144 @Override
145 public boolean hasMessagesForPartition(int partitionId) {
146 return store.hasMessagesForPartition(partitionId);
147 }
148
149 @Override
150 public void addPartitionMessages(
151 int partitionId, VertexIdMessages<I, M> messages) {
152 int hash = partition2Queue.get(partitionId);
153 try {
154 queues[hash].put(new PartitionMessage<>(partitionId, messages));
155 } catch (InterruptedException e) {
156 throw new RuntimeException(e);
157 }
158 }
159
160 @Override
161 public void addMessage(I vertexId, M message) throws IOException {
162
163 throw new UnsupportedOperationException();
164 }
165
166 @Override
167 public void finalizeStore() {
168 store.finalizeStore();
169 }
170
171 @Override
172 public Iterable<I> getPartitionDestinationVertices(int partitionId) {
173 return store.getPartitionDestinationVertices(partitionId);
174 }
175
176 @Override
177 public void clearPartition(int partitionId) {
178 store.clearPartition(partitionId);
179 }
180
181 @Override
182 public void writePartition(DataOutput out, int partitionId)
183 throws IOException {
184 store.writePartition(out, partitionId);
185 }
186
187 @Override
188 public void readFieldsForPartition(DataInput in, int partitionId)
189 throws IOException {
190 store.readFieldsForPartition(in, partitionId);
191 }
192
193
194
195
196 public void waitToComplete() {
197 try {
198 for (BlockingQueue<PartitionMessage<I, M>> queue : queues) {
199 queue.put(CLEAR_QUEUE_MESSAGE);
200 }
201 completionSemaphore.acquire();
202 completionSemaphore = new Semaphore(1 - threadsCount);
203 } catch (InterruptedException e) {
204 throw new RuntimeException(e);
205 }
206 }
207
208
209
210
211
212 private class MessageStoreQueueWorker implements Runnable {
213
214
215
216 private final BlockingQueue<PartitionMessage<I, M>> queue;
217
218
219
220
221
222 private MessageStoreQueueWorker(
223 BlockingQueue<PartitionMessage<I, M>> queue) {
224 this.queue = queue;
225 }
226
227 @Override
228 public void run() {
229 PartitionMessage<I, M> message = null;
230 while (true) {
231 try {
232 message = queue.take();
233 if (message.getMessage() != null) {
234 int partitionId = message.getPartitionId();
235 store.addPartitionMessages(partitionId, message.getMessage());
236 } else {
237 completionSemaphore.release();
238 if (message == SHUTDOWN_QUEUE_MESSAGE) {
239 return;
240 }
241 }
242 } catch (InterruptedException e) {
243 LOG.error("MessageStoreQueueWorker.run: " + message, e);
244 return;
245 }
246 }
247 }
248 }
249 }