This project has retired. For details please refer to its Attic page.
AsyncMessageStoreWrapper xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.comm.messages.queue;
19  
20  import it.unimi.dsi.fastutil.ints.Int2IntArrayMap;
21  import it.unimi.dsi.fastutil.ints.Int2IntMap;
22  
23  import java.io.DataInput;
24  import java.io.DataOutput;
25  import java.io.IOException;
26  import java.util.concurrent.BlockingQueue;
27  import java.util.concurrent.ExecutorService;
28  import java.util.concurrent.Executors;
29  import java.util.concurrent.LinkedBlockingQueue;
30  import java.util.concurrent.Semaphore;
31  
32  import org.apache.giraph.comm.messages.MessageStore;
33  import org.apache.giraph.utils.ThreadUtils;
34  import org.apache.giraph.utils.VertexIdMessages;
35  import org.apache.hadoop.io.Writable;
36  import org.apache.hadoop.io.WritableComparable;
37  import org.apache.log4j.Logger;
38  
39  /**
40   * This class decouples message receiving and processing
41   * into separate threads thus reducing contention.
42   * It does not provide message store functionality itself, rather
43   * providing a wrapper around existing message stores that
44   * can now be used in async mode with only slight modifications.
45   * @param <I> Vertex id
46   * @param <M> Message data
47   */
48  public final class AsyncMessageStoreWrapper<I extends WritableComparable,
49      M extends Writable> implements MessageStore<I, M> {
50  
51    /** Logger */
52    private static final Logger LOG =
53        Logger.getLogger(AsyncMessageStoreWrapper.class);
54    /** Pass this id to clear the queues and shutdown all threads
55     * started by this processor */
56    private static final PartitionMessage SHUTDOWN_QUEUE_MESSAGE =
57        new PartitionMessage(-1, null);
58    /** Pass this message to clear the queues but keep threads alive */
59    private static final PartitionMessage CLEAR_QUEUE_MESSAGE =
60        new PartitionMessage(-1, null);
61    /** Executor that processes messages in background */
62    private static final ExecutorService EXECUTOR_SERVICE =
63        Executors.newCachedThreadPool(
64            ThreadUtils.createThreadFactory("AsyncMessageStoreWrapper-%d"));
65  
66    /** Number of threads that will process messages in background */
67    private final int threadsCount;
68    /** Queue that temporary stores messages */
69    private final BlockingQueue<PartitionMessage<I, M>>[] queues;
70    /** Map from partition id to thread that process this partition */
71    private final Int2IntMap partition2Queue;
72    /** Signals that all procesing is done */
73    private Semaphore completionSemaphore;
74    /** Underlying message store */
75    private final MessageStore<I, M> store;
76  
77    /**
78     * Constructs async wrapper around existing message store
79     * object. Requires partition list and number of threads
80     * to properly initialize background threads and assign partitions.
81     * Partitions are assigned to threads in round-robin fashion.
82     * It guarantees that all threads have almost the same number of
83     * partitions (+-1) no matter how partitions are assigned to this worker.
84     * @param store underlying message store to be used in computation
85     * @param partitions partitions assigned to this worker
86     * @param threadCount number of threads that will be used to process
87     *                    messages.
88     */
89    public AsyncMessageStoreWrapper(MessageStore<I, M> store,
90                                    Iterable<Integer> partitions,
91                                    int threadCount) {
92      this.store = store;
93      this.threadsCount = threadCount;
94      completionSemaphore = new Semaphore(1 - threadsCount);
95      queues = new BlockingQueue[threadsCount];
96      partition2Queue = new Int2IntArrayMap();
97      LOG.info("AsyncMessageStoreWrapper enabled. Threads= " + threadsCount);
98  
99      for (int i = 0; i < threadsCount; i++) {
100       queues[i] = new LinkedBlockingQueue<>();
101       EXECUTOR_SERVICE.submit(new MessageStoreQueueWorker(queues[i]));
102     }
103 
104     int cnt = 0;
105     for (int partitionId : partitions) {
106       partition2Queue.put(partitionId, cnt++ % threadsCount);
107     }
108 
109   }
110 
111   @Override
112   public boolean isPointerListEncoding() {
113     return store.isPointerListEncoding();
114   }
115 
116   @Override
117   public Iterable<M> getVertexMessages(I vertexId) {
118     return store.getVertexMessages(vertexId);
119   }
120 
121   @Override
122   public void clearVertexMessages(I vertexId) {
123     store.clearVertexMessages(vertexId);
124   }
125 
126   @Override
127   public void clearAll() {
128     try {
129       for (BlockingQueue<PartitionMessage<I, M>> queue : queues) {
130         queue.put(SHUTDOWN_QUEUE_MESSAGE);
131       }
132       completionSemaphore.acquire();
133     } catch (InterruptedException e) {
134       throw new RuntimeException(e);
135     }
136     store.clearAll();
137   }
138 
139   @Override
140   public boolean hasMessagesForVertex(I vertexId) {
141     return store.hasMessagesForVertex(vertexId);
142   }
143 
144   @Override
145   public boolean hasMessagesForPartition(int partitionId) {
146     return store.hasMessagesForPartition(partitionId);
147   }
148 
149   @Override
150   public void addPartitionMessages(
151       int partitionId, VertexIdMessages<I, M> messages) {
152     int hash = partition2Queue.get(partitionId);
153     try {
154       queues[hash].put(new PartitionMessage<>(partitionId, messages));
155     } catch (InterruptedException e) {
156       throw new RuntimeException(e);
157     }
158   }
159 
160   @Override
161   public void addMessage(I vertexId, M message) throws IOException {
162     // TODO: implement if LocalBlockRunner needs async message store
163     throw new UnsupportedOperationException();
164   }
165 
166   @Override
167   public void finalizeStore() {
168     store.finalizeStore();
169   }
170 
171   @Override
172   public Iterable<I> getPartitionDestinationVertices(int partitionId) {
173     return store.getPartitionDestinationVertices(partitionId);
174   }
175 
176   @Override
177   public void clearPartition(int partitionId) {
178     store.clearPartition(partitionId);
179   }
180 
181   @Override
182   public void writePartition(DataOutput out, int partitionId)
183     throws IOException {
184     store.writePartition(out, partitionId);
185   }
186 
187   @Override
188   public void readFieldsForPartition(DataInput in, int partitionId)
189     throws IOException {
190     store.readFieldsForPartition(in, partitionId);
191   }
192 
193   /**
194    * Wait till all messages are processed and all queues are empty.
195    */
196   public void waitToComplete() {
197     try {
198       for (BlockingQueue<PartitionMessage<I, M>> queue : queues) {
199         queue.put(CLEAR_QUEUE_MESSAGE);
200       }
201       completionSemaphore.acquire();
202       completionSemaphore = new Semaphore(1 - threadsCount);
203     } catch (InterruptedException e) {
204       throw new RuntimeException(e);
205     }
206   }
207 
208   /**
209    * This runnable has logic for background thread
210    * that actually does message processing.
211    */
212   private class MessageStoreQueueWorker implements Runnable {
213     /**
214      * Queue assigned to this background thread.
215      */
216     private final BlockingQueue<PartitionMessage<I, M>> queue;
217 
218     /**
219      * Constructs runnable.
220      * @param queue where messages are put by client
221      */
222     private MessageStoreQueueWorker(
223         BlockingQueue<PartitionMessage<I, M>> queue) {
224       this.queue = queue;
225     }
226 
227     @Override
228     public void run() {
229       PartitionMessage<I, M> message = null;
230       while (true) {
231         try {
232           message = queue.take();
233           if (message.getMessage() != null) {
234             int partitionId = message.getPartitionId();
235             store.addPartitionMessages(partitionId, message.getMessage());
236           } else {
237             completionSemaphore.release();
238             if (message == SHUTDOWN_QUEUE_MESSAGE) {
239               return;
240             }
241           }
242         } catch (InterruptedException e) {
243           LOG.error("MessageStoreQueueWorker.run: " + message, e);
244           return;
245         }
246       }
247     }
248   }
249 }