View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.comm.messages.queue;
19  
20  import it.unimi.dsi.fastutil.ints.Int2IntArrayMap;
21  import it.unimi.dsi.fastutil.ints.Int2IntMap;
22  import org.apache.giraph.comm.messages.MessageStore;
23  import org.apache.giraph.utils.ThreadUtils;
24  import org.apache.giraph.utils.VertexIdMessages;
25  import org.apache.hadoop.io.Writable;
26  import org.apache.hadoop.io.WritableComparable;
27  import org.apache.log4j.Logger;
28  
29  import java.io.DataInput;
30  import java.io.DataOutput;
31  import java.io.IOException;
32  import java.util.concurrent.BlockingQueue;
33  import java.util.concurrent.ExecutorService;
34  import java.util.concurrent.Executors;
35  import java.util.concurrent.LinkedBlockingQueue;
36  import java.util.concurrent.Semaphore;
37  
38  /**
39   * This class decouples message receiving and processing
40   * into separate threads thus reducing contention.
41   * It does not provide message store functionality itself, rather
42   * providing a wrapper around existing message stores that
43   * can now be used in async mode with only slight modifications.
44   * @param <I> Vertex id
45   * @param <M> Message data
46   */
47  public final class AsyncMessageStoreWrapper<I extends WritableComparable,
48      M extends Writable> implements MessageStore<I, M> {
49  
50    /** Logger */
51    private static final Logger LOG =
52        Logger.getLogger(AsyncMessageStoreWrapper.class);
53    /** Pass this id to clear the queues and shutdown all threads
54     * started by this processor */
55    private static final PartitionMessage SHUTDOWN_QUEUE_MESSAGE =
56        new PartitionMessage(-1, null);
57    /** Pass this message to clear the queues but keep threads alive */
58    private static final PartitionMessage CLEAR_QUEUE_MESSAGE =
59        new PartitionMessage(-1, null);
60    /** Executor that processes messages in background */
61    private static final ExecutorService EXECUTOR_SERVICE =
62        Executors.newCachedThreadPool(
63            ThreadUtils.createThreadFactory("AsyncMessageStoreWrapper-%d"));
64  
65    /** Number of threads that will process messages in background */
66    private final int threadsCount;
67    /** Queue that temporary stores messages */
68    private final BlockingQueue<PartitionMessage<I, M>>[] queues;
69    /** Map from partition id to thread that process this partition */
70    private final Int2IntMap partition2Queue;
71    /** Signals that all procesing is done */
72    private Semaphore completionSemaphore;
73    /** Underlying message store */
74    private final MessageStore<I, M> store;
75  
76    /**
77     * Constructs async wrapper around existing message store
78     * object. Requires partition list and number of threads
79     * to properly initialize background threads and assign partitions.
80     * Partitions are assigned to threads in round-robin fashion.
81     * It guarantees that all threads have almost the same number of
82     * partitions (+-1) no matter how partitions are assigned to this worker.
83     * @param store underlying message store to be used in computation
84     * @param partitions partitions assigned to this worker
85     * @param threadCount number of threads that will be used to process
86     *                    messages.
87     */
88    public AsyncMessageStoreWrapper(MessageStore<I, M> store,
89                                    Iterable<Integer> partitions,
90                                    int threadCount) {
91      this.store = store;
92      this.threadsCount = threadCount;
93      completionSemaphore = new Semaphore(1 - threadsCount);
94      queues = new BlockingQueue[threadsCount];
95      partition2Queue = new Int2IntArrayMap();
96      LOG.info("AsyncMessageStoreWrapper enabled. Threads= " + threadsCount);
97  
98      for (int i = 0; i < threadsCount; i++) {
99        queues[i] = new LinkedBlockingQueue<>();
100       EXECUTOR_SERVICE.submit(new MessageStoreQueueWorker(queues[i]));
101     }
102 
103     int cnt = 0;
104     for (int partitionId : partitions) {
105       partition2Queue.put(partitionId, cnt++ % threadsCount);
106     }
107 
108   }
109 
110   @Override
111   public boolean isPointerListEncoding() {
112     return store.isPointerListEncoding();
113   }
114 
115   @Override
116   public Iterable<M> getVertexMessages(I vertexId) {
117     return store.getVertexMessages(vertexId);
118   }
119 
120   @Override
121   public void clearVertexMessages(I vertexId) {
122     store.clearVertexMessages(vertexId);
123   }
124 
125   @Override
126   public void clearAll() {
127     try {
128       for (BlockingQueue<PartitionMessage<I, M>> queue : queues) {
129         queue.put(SHUTDOWN_QUEUE_MESSAGE);
130       }
131       completionSemaphore.acquire();
132     } catch (InterruptedException e) {
133       throw new RuntimeException(e);
134     }
135     store.clearAll();
136   }
137 
138   @Override
139   public boolean hasMessagesForVertex(I vertexId) {
140     return store.hasMessagesForVertex(vertexId);
141   }
142 
143   @Override
144   public boolean hasMessagesForPartition(int partitionId) {
145     return store.hasMessagesForPartition(partitionId);
146   }
147 
148   @Override
149   public void addPartitionMessages(
150       int partitionId, VertexIdMessages<I, M> messages) {
151     int hash = partition2Queue.get(partitionId);
152     try {
153       queues[hash].put(new PartitionMessage<>(partitionId, messages));
154     } catch (InterruptedException e) {
155       throw new RuntimeException(e);
156     }
157   }
158 
159   @Override
160   public void finalizeStore() {
161     store.finalizeStore();
162   }
163 
164   @Override
165   public Iterable<I> getPartitionDestinationVertices(int partitionId) {
166     return store.getPartitionDestinationVertices(partitionId);
167   }
168 
169   @Override
170   public void clearPartition(int partitionId) {
171     store.clearPartition(partitionId);
172   }
173 
174   @Override
175   public void writePartition(DataOutput out, int partitionId)
176     throws IOException {
177     store.writePartition(out, partitionId);
178   }
179 
180   @Override
181   public void readFieldsForPartition(DataInput in, int partitionId)
182     throws IOException {
183     store.readFieldsForPartition(in, partitionId);
184   }
185 
186   /**
187    * Wait till all messages are processed and all queues are empty.
188    */
189   public void waitToComplete() {
190     try {
191       for (BlockingQueue<PartitionMessage<I, M>> queue : queues) {
192         queue.put(CLEAR_QUEUE_MESSAGE);
193       }
194       completionSemaphore.acquire();
195       completionSemaphore = new Semaphore(1 - threadsCount);
196     } catch (InterruptedException e) {
197       throw new RuntimeException(e);
198     }
199   }
200 
201   /**
202    * This runnable has logic for background thread
203    * that actually does message processing.
204    */
205   private class MessageStoreQueueWorker implements Runnable {
206     /**
207      * Queue assigned to this background thread.
208      */
209     private final BlockingQueue<PartitionMessage<I, M>> queue;
210 
211     /**
212      * Constructs runnable.
213      * @param queue where messages are put by client
214      */
215     private MessageStoreQueueWorker(
216         BlockingQueue<PartitionMessage<I, M>> queue) {
217       this.queue = queue;
218     }
219 
220     @Override
221     public void run() {
222       PartitionMessage<I, M> message = null;
223       while (true) {
224         try {
225           message = queue.take();
226           if (message.getMessage() != null) {
227             int partitionId = message.getPartitionId();
228             store.addPartitionMessages(partitionId, message.getMessage());
229           } else {
230             completionSemaphore.release();
231             if (message == SHUTDOWN_QUEUE_MESSAGE) {
232               return;
233             }
234           }
235         } catch (InterruptedException e) {
236           LOG.error("MessageStoreQueueWorker.run: " + message, e);
237           return;
238         }
239       }
240     }
241   }
242 }