This project has retired. For details please refer to its
        
        Attic page.
      
1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  package org.apache.giraph.comm.messages.queue;
19  
20  import it.unimi.dsi.fastutil.ints.Int2IntArrayMap;
21  import it.unimi.dsi.fastutil.ints.Int2IntMap;
22  
23  import java.io.DataInput;
24  import java.io.DataOutput;
25  import java.io.IOException;
26  import java.util.concurrent.BlockingQueue;
27  import java.util.concurrent.ExecutorService;
28  import java.util.concurrent.Executors;
29  import java.util.concurrent.LinkedBlockingQueue;
30  import java.util.concurrent.Semaphore;
31  
32  import org.apache.giraph.comm.messages.MessageStore;
33  import org.apache.giraph.utils.ThreadUtils;
34  import org.apache.giraph.utils.VertexIdMessages;
35  import org.apache.hadoop.io.Writable;
36  import org.apache.hadoop.io.WritableComparable;
37  import org.apache.log4j.Logger;
38  
39  
40  
41  
42  
43  
44  
45  
46  
47  
48  public final class AsyncMessageStoreWrapper<I extends WritableComparable,
49      M extends Writable> implements MessageStore<I, M> {
50  
51    
52    private static final Logger LOG =
53        Logger.getLogger(AsyncMessageStoreWrapper.class);
54    
55  
56    private static final PartitionMessage SHUTDOWN_QUEUE_MESSAGE =
57        new PartitionMessage(-1, null);
58    
59    private static final PartitionMessage CLEAR_QUEUE_MESSAGE =
60        new PartitionMessage(-1, null);
61    
62    private static final ExecutorService EXECUTOR_SERVICE =
63        Executors.newCachedThreadPool(
64            ThreadUtils.createThreadFactory("AsyncMessageStoreWrapper-%d"));
65  
66    
67    private final int threadsCount;
68    
69    private final BlockingQueue<PartitionMessage<I, M>>[] queues;
70    
71    private final Int2IntMap partition2Queue;
72    
73    private Semaphore completionSemaphore;
74    
75    private final MessageStore<I, M> store;
76  
77    
78  
79  
80  
81  
82  
83  
84  
85  
86  
87  
88  
89    public AsyncMessageStoreWrapper(MessageStore<I, M> store,
90                                    Iterable<Integer> partitions,
91                                    int threadCount) {
92      this.store = store;
93      this.threadsCount = threadCount;
94      completionSemaphore = new Semaphore(1 - threadsCount);
95      queues = new BlockingQueue[threadsCount];
96      partition2Queue = new Int2IntArrayMap();
97      LOG.info("AsyncMessageStoreWrapper enabled. Threads= " + threadsCount);
98  
99      for (int i = 0; i < threadsCount; i++) {
100       queues[i] = new LinkedBlockingQueue<>();
101       EXECUTOR_SERVICE.submit(new MessageStoreQueueWorker(queues[i]));
102     }
103 
104     int cnt = 0;
105     for (int partitionId : partitions) {
106       partition2Queue.put(partitionId, cnt++ % threadsCount);
107     }
108 
109   }
110 
111   @Override
112   public boolean isPointerListEncoding() {
113     return store.isPointerListEncoding();
114   }
115 
116   @Override
117   public Iterable<M> getVertexMessages(I vertexId) {
118     return store.getVertexMessages(vertexId);
119   }
120 
121   @Override
122   public void clearVertexMessages(I vertexId) {
123     store.clearVertexMessages(vertexId);
124   }
125 
126   @Override
127   public void clearAll() {
128     try {
129       for (BlockingQueue<PartitionMessage<I, M>> queue : queues) {
130         queue.put(SHUTDOWN_QUEUE_MESSAGE);
131       }
132       completionSemaphore.acquire();
133     } catch (InterruptedException e) {
134       throw new RuntimeException(e);
135     }
136     store.clearAll();
137   }
138 
139   @Override
140   public boolean hasMessagesForVertex(I vertexId) {
141     return store.hasMessagesForVertex(vertexId);
142   }
143 
144   @Override
145   public boolean hasMessagesForPartition(int partitionId) {
146     return store.hasMessagesForPartition(partitionId);
147   }
148 
149   @Override
150   public void addPartitionMessages(
151       int partitionId, VertexIdMessages<I, M> messages) {
152     int hash = partition2Queue.get(partitionId);
153     try {
154       queues[hash].put(new PartitionMessage<>(partitionId, messages));
155     } catch (InterruptedException e) {
156       throw new RuntimeException(e);
157     }
158   }
159 
160   @Override
161   public void addMessage(I vertexId, M message) throws IOException {
162     
163     throw new UnsupportedOperationException();
164   }
165 
166   @Override
167   public void finalizeStore() {
168     store.finalizeStore();
169   }
170 
171   @Override
172   public Iterable<I> getPartitionDestinationVertices(int partitionId) {
173     return store.getPartitionDestinationVertices(partitionId);
174   }
175 
176   @Override
177   public void clearPartition(int partitionId) {
178     store.clearPartition(partitionId);
179   }
180 
181   @Override
182   public void writePartition(DataOutput out, int partitionId)
183     throws IOException {
184     store.writePartition(out, partitionId);
185   }
186 
187   @Override
188   public void readFieldsForPartition(DataInput in, int partitionId)
189     throws IOException {
190     store.readFieldsForPartition(in, partitionId);
191   }
192 
193   
194 
195 
196   public void waitToComplete() {
197     try {
198       for (BlockingQueue<PartitionMessage<I, M>> queue : queues) {
199         queue.put(CLEAR_QUEUE_MESSAGE);
200       }
201       completionSemaphore.acquire();
202       completionSemaphore = new Semaphore(1 - threadsCount);
203     } catch (InterruptedException e) {
204       throw new RuntimeException(e);
205     }
206   }
207 
208   
209 
210 
211 
212   private class MessageStoreQueueWorker implements Runnable {
213     
214 
215 
216     private final BlockingQueue<PartitionMessage<I, M>> queue;
217 
218     
219 
220 
221 
222     private MessageStoreQueueWorker(
223         BlockingQueue<PartitionMessage<I, M>> queue) {
224       this.queue = queue;
225     }
226 
227     @Override
228     public void run() {
229       PartitionMessage<I, M> message = null;
230       while (true) {
231         try {
232           message = queue.take();
233           if (message.getMessage() != null) {
234             int partitionId = message.getPartitionId();
235             store.addPartitionMessages(partitionId, message.getMessage());
236           } else {
237             completionSemaphore.release();
238             if (message == SHUTDOWN_QUEUE_MESSAGE) {
239               return;
240             }
241           }
242         } catch (InterruptedException e) {
243           LOG.error("MessageStoreQueueWorker.run: " + message, e);
244           return;
245         }
246       }
247     }
248   }
249 }