This project has retired. For details please refer to its Attic page.
SendMessageCache xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm;
20  
21  import static org.apache.giraph.conf.GiraphConstants.ADDITIONAL_MSG_REQUEST_SIZE;
22  import static org.apache.giraph.conf.GiraphConstants.MAX_MSG_REQUEST_SIZE;
23  
24  import java.util.Iterator;
25  
26  import org.apache.giraph.bsp.CentralizedServiceWorker;
27  import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
28  import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
29  import org.apache.giraph.comm.requests.WritableRequest;
30  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
31  import org.apache.giraph.edge.Edge;
32  import org.apache.giraph.factories.MessageValueFactory;
33  import org.apache.giraph.graph.Vertex;
34  import org.apache.giraph.partition.PartitionOwner;
35  import org.apache.giraph.utils.ByteArrayVertexIdMessages;
36  import org.apache.giraph.utils.PairList;
37  import org.apache.giraph.utils.VertexIdMessages;
38  import org.apache.giraph.worker.WorkerInfo;
39  import org.apache.hadoop.io.Writable;
40  import org.apache.hadoop.io.WritableComparable;
41  import org.apache.log4j.Logger;
42  
43  /**
44   * Aggregates the messages to be sent to workers so they can be sent
45   * in bulk.  Not thread-safe.
46   *
47   * @param <I> Vertex id
48   * @param <M> Message data
49   */
50  @SuppressWarnings("unchecked")
51  public class SendMessageCache<I extends WritableComparable, M extends Writable>
52      extends SendVertexIdDataCache<I, M, VertexIdMessages<I, M>> {
53    /** Class logger */
54    private static final Logger LOG =
55        Logger.getLogger(SendMessageCache.class);
56    /** Messages sent during the last superstep */
57    protected long totalMsgsSentInSuperstep = 0;
58    /** Message bytes sent during the last superstep */
59    protected long totalMsgBytesSentInSuperstep = 0;
60    /** Max message size sent to a worker */
61    protected final int maxMessagesSizePerWorker;
62    /** NettyWorkerClientRequestProcessor for message sending */
63    protected final NettyWorkerClientRequestProcessor<I, ?, ?> clientProcessor;
64    /** Cached message value factory */
65    protected MessageValueFactory<M> messageValueFactory;
66    /**
67     * Constructor
68     *
69     * @param conf Giraph configuration
70     * @param serviceWorker Service worker
71     * @param processor NettyWorkerClientRequestProcessor
72     * @param maxMsgSize Max message size sent to a worker
73     */
74    public SendMessageCache(ImmutableClassesGiraphConfiguration conf,
75        CentralizedServiceWorker<?, ?, ?> serviceWorker,
76        NettyWorkerClientRequestProcessor<I, ?, ?> processor,
77        int maxMsgSize) {
78      super(conf, serviceWorker, MAX_MSG_REQUEST_SIZE.get(conf),
79          ADDITIONAL_MSG_REQUEST_SIZE.get(conf));
80      maxMessagesSizePerWorker = maxMsgSize;
81      clientProcessor = processor;
82      messageValueFactory =
83              conf.createOutgoingMessageValueFactory();
84    }
85  
86    @Override
87    public VertexIdMessages<I, M> createVertexIdData() {
88      return new ByteArrayVertexIdMessages<I, M>(messageValueFactory);
89    }
90  
91    /**
92     * Add a message to the cache.
93     *
94     * @param workerInfo the remote worker destination
95     * @param partitionId the remote Partition this message belongs to
96     * @param destVertexId vertex id that is ultimate destination
97     * @param message Message to send to remote worker
98     * @return Size of messages for the worker.
99     */
100   public int addMessage(WorkerInfo workerInfo,
101                         int partitionId, I destVertexId, M message) {
102     return addData(workerInfo, partitionId, destVertexId, message);
103   }
104 
105   /**
106    * Add a message to the cache with serialized ids.
107    *
108    * @param workerInfo The remote worker destination
109    * @param partitionId The remote Partition this message belongs to
110    * @param serializedId Serialized vertex id that is ultimate destination
111    * @param idSerializerPos The end position of serialized id in the byte array
112    * @param message Message to send to remote worker
113    * @return Size of messages for the worker.
114    */
115   protected int addMessage(WorkerInfo workerInfo, int partitionId,
116       byte[] serializedId, int idSerializerPos, M message) {
117     return addData(
118       workerInfo, partitionId, serializedId,
119       idSerializerPos, message);
120   }
121 
122   /**
123    * Gets the messages for a worker and removes it from the cache.
124    *
125    * @param workerInfo the address of the worker who owns the data
126    *                   partitions that are receiving the messages
127    * @return List of pairs (partitionId, ByteArrayVertexIdMessages),
128    *         where all partition ids belong to workerInfo
129    */
130   protected PairList<Integer, VertexIdMessages<I, M>>
131   removeWorkerMessages(WorkerInfo workerInfo) {
132     return removeWorkerData(workerInfo);
133   }
134 
135   /**
136    * Gets all the messages and removes them from the cache.
137    *
138    * @return All vertex messages for all partitions
139    */
140   private PairList<WorkerInfo, PairList<
141       Integer, VertexIdMessages<I, M>>> removeAllMessages() {
142     return removeAllData();
143   }
144 
145   /**
146    * Send a message to a target vertex id.
147    *
148    * @param destVertexId Target vertex id
149    * @param message The message sent to the target
150    */
151   public void sendMessageRequest(I destVertexId, M message) {
152     PartitionOwner owner =
153       getServiceWorker().getVertexPartitionOwner(destVertexId);
154     WorkerInfo workerInfo = owner.getWorkerInfo();
155     final int partitionId = owner.getPartitionId();
156     if (LOG.isTraceEnabled()) {
157       LOG.trace("sendMessageRequest: Send bytes (" + message.toString() +
158         ") to " + destVertexId + " on worker " + workerInfo);
159     }
160     ++totalMsgsSentInSuperstep;
161     // Add the message to the cache
162     int workerMessageSize = addMessage(
163       workerInfo, partitionId, destVertexId, message);
164     // Send a request if the cache of outgoing message to
165     // the remote worker 'workerInfo' is full enough to be flushed
166     if (workerMessageSize >= maxMessagesSizePerWorker) {
167       PairList<Integer, VertexIdMessages<I, M>>
168         workerMessages = removeWorkerMessages(workerInfo);
169       WritableRequest writableRequest =
170         new SendWorkerMessagesRequest<I, M>(workerMessages);
171       totalMsgBytesSentInSuperstep += writableRequest.getSerializedSize();
172       clientProcessor.doRequest(workerInfo, writableRequest);
173       // Notify sending
174       getServiceWorker().getGraphTaskManager().notifySentMessages();
175     }
176   }
177 
178   /**
179    * An iterator wrapper on edges to return
180    * target vertex ids.
181    */
182   public static class TargetVertexIdIterator<I extends WritableComparable>
183       implements Iterator<I> {
184     /** An edge iterator */
185     private final Iterator<Edge<I, Writable>> edgesIterator;
186 
187     /**
188      * Constructor.
189      *
190      * @param vertex The source vertex of the out edges
191      */
192     public TargetVertexIdIterator(Vertex<I, ?, ?> vertex) {
193       edgesIterator =
194         ((Vertex<I, Writable, Writable>) vertex).getEdges().iterator();
195     }
196 
197     @Override
198     public boolean hasNext() {
199       return edgesIterator.hasNext();
200     }
201 
202     @Override
203     public I next() {
204       return edgesIterator.next().getTargetVertexId();
205     }
206 
207     @Override
208     public void remove() {
209       throw new UnsupportedOperationException();
210     }
211   }
212 
213   /**
214    * Send message to all its neighbors
215    *
216    * @param vertex The source vertex
217    * @param message The message sent to a worker
218    */
219   public void sendMessageToAllRequest(Vertex<I, ?, ?> vertex, M message) {
220     TargetVertexIdIterator targetVertexIterator =
221       new TargetVertexIdIterator(vertex);
222     sendMessageToAllRequest(targetVertexIterator, message);
223   }
224 
225   /**
226    * Send message to the target ids in the iterator
227    *
228    * @param vertexIdIterator The iterator of target vertex ids
229    * @param message The message sent to a worker
230    */
231   public void sendMessageToAllRequest(Iterator<I> vertexIdIterator, M message) {
232     while (vertexIdIterator.hasNext()) {
233       sendMessageRequest(vertexIdIterator.next(), message);
234     }
235   }
236 
237   /**
238    * Flush the rest of the messages to the workers.
239    */
240   public void flush() {
241     PairList<WorkerInfo, PairList<Integer,
242         VertexIdMessages<I, M>>>
243     remainingMessageCache = removeAllMessages();
244     PairList<WorkerInfo, PairList<
245         Integer, VertexIdMessages<I, M>>>.Iterator
246     iterator = remainingMessageCache.getIterator();
247     while (iterator.hasNext()) {
248       iterator.next();
249       WritableRequest writableRequest =
250         new SendWorkerMessagesRequest<I, M>(
251           iterator.getCurrentSecond());
252       totalMsgBytesSentInSuperstep += writableRequest.getSerializedSize();
253       clientProcessor.doRequest(
254         iterator.getCurrentFirst(), writableRequest);
255     }
256   }
257 
258   /**
259    * Reset the message count per superstep.
260    *
261    * @return The message count sent in last superstep
262    */
263   public long resetMessageCount() {
264     long messagesSentInSuperstep = totalMsgsSentInSuperstep;
265     totalMsgsSentInSuperstep = 0;
266     return messagesSentInSuperstep;
267   }
268 
269   /**
270    * Reset the message bytes count per superstep.
271    *
272    * @return The message count sent in last superstep
273    */
274   public long resetMessageBytesCount() {
275     long messageBytesSentInSuperstep = totalMsgBytesSentInSuperstep;
276     totalMsgBytesSentInSuperstep = 0;
277     return messageBytesSentInSuperstep;
278   }
279 }