View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm;
20  
21  import static org.apache.giraph.conf.GiraphConstants.ADDITIONAL_MSG_REQUEST_SIZE;
22  import static org.apache.giraph.conf.GiraphConstants.MAX_MSG_REQUEST_SIZE;
23  
24  import java.util.Iterator;
25  
26  import org.apache.giraph.bsp.CentralizedServiceWorker;
27  import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
28  import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
29  import org.apache.giraph.comm.requests.WritableRequest;
30  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
31  import org.apache.giraph.edge.Edge;
32  import org.apache.giraph.graph.Vertex;
33  import org.apache.giraph.partition.PartitionOwner;
34  import org.apache.giraph.utils.ByteArrayVertexIdMessages;
35  import org.apache.giraph.utils.PairList;
36  import org.apache.giraph.utils.VertexIdMessages;
37  import org.apache.giraph.worker.WorkerInfo;
38  import org.apache.hadoop.io.Writable;
39  import org.apache.hadoop.io.WritableComparable;
40  import org.apache.log4j.Logger;
41  
42  /**
43   * Aggregates the messages to be sent to workers so they can be sent
44   * in bulk.  Not thread-safe.
45   *
46   * @param <I> Vertex id
47   * @param <M> Message data
48   */
49  @SuppressWarnings("unchecked")
50  public class SendMessageCache<I extends WritableComparable, M extends Writable>
51      extends SendVertexIdDataCache<I, M, VertexIdMessages<I, M>> {
52    /** Class logger */
53    private static final Logger LOG =
54        Logger.getLogger(SendMessageCache.class);
55    /** Messages sent during the last superstep */
56    protected long totalMsgsSentInSuperstep = 0;
57    /** Message bytes sent during the last superstep */
58    protected long totalMsgBytesSentInSuperstep = 0;
59    /** Max message size sent to a worker */
60    protected final int maxMessagesSizePerWorker;
61    /** NettyWorkerClientRequestProcessor for message sending */
62    protected final NettyWorkerClientRequestProcessor<I, ?, ?> clientProcessor;
63    /**
64     * Constructor
65     *
66     * @param conf Giraph configuration
67     * @param serviceWorker Service worker
68     * @param processor NettyWorkerClientRequestProcessor
69     * @param maxMsgSize Max message size sent to a worker
70     */
71    public SendMessageCache(ImmutableClassesGiraphConfiguration conf,
72        CentralizedServiceWorker<?, ?, ?> serviceWorker,
73        NettyWorkerClientRequestProcessor<I, ?, ?> processor,
74        int maxMsgSize) {
75      super(conf, serviceWorker, MAX_MSG_REQUEST_SIZE.get(conf),
76          ADDITIONAL_MSG_REQUEST_SIZE.get(conf));
77      maxMessagesSizePerWorker = maxMsgSize;
78      clientProcessor = processor;
79    }
80  
81    @Override
82    public VertexIdMessages<I, M> createVertexIdData() {
83      return new ByteArrayVertexIdMessages<I, M>(
84          getConf().<M>createOutgoingMessageValueFactory());
85    }
86  
87    /**
88     * Add a message to the cache.
89     *
90     * @param workerInfo the remote worker destination
91     * @param partitionId the remote Partition this message belongs to
92     * @param destVertexId vertex id that is ultimate destination
93     * @param message Message to send to remote worker
94     * @return Size of messages for the worker.
95     */
96    public int addMessage(WorkerInfo workerInfo,
97                          int partitionId, I destVertexId, M message) {
98      return addData(workerInfo, partitionId, destVertexId, message);
99    }
100 
101   /**
102    * Add a message to the cache with serialized ids.
103    *
104    * @param workerInfo The remote worker destination
105    * @param partitionId The remote Partition this message belongs to
106    * @param serializedId Serialized vertex id that is ultimate destination
107    * @param idSerializerPos The end position of serialized id in the byte array
108    * @param message Message to send to remote worker
109    * @return Size of messages for the worker.
110    */
111   protected int addMessage(WorkerInfo workerInfo, int partitionId,
112       byte[] serializedId, int idSerializerPos, M message) {
113     return addData(
114       workerInfo, partitionId, serializedId,
115       idSerializerPos, message);
116   }
117 
118   /**
119    * Gets the messages for a worker and removes it from the cache.
120    *
121    * @param workerInfo the address of the worker who owns the data
122    *                   partitions that are receiving the messages
123    * @return List of pairs (partitionId, ByteArrayVertexIdMessages),
124    *         where all partition ids belong to workerInfo
125    */
126   protected PairList<Integer, VertexIdMessages<I, M>>
127   removeWorkerMessages(WorkerInfo workerInfo) {
128     return removeWorkerData(workerInfo);
129   }
130 
131   /**
132    * Gets all the messages and removes them from the cache.
133    *
134    * @return All vertex messages for all partitions
135    */
136   private PairList<WorkerInfo, PairList<
137       Integer, VertexIdMessages<I, M>>> removeAllMessages() {
138     return removeAllData();
139   }
140 
141   /**
142    * Send a message to a target vertex id.
143    *
144    * @param destVertexId Target vertex id
145    * @param message The message sent to the target
146    */
147   public void sendMessageRequest(I destVertexId, M message) {
148     PartitionOwner owner =
149       getServiceWorker().getVertexPartitionOwner(destVertexId);
150     WorkerInfo workerInfo = owner.getWorkerInfo();
151     final int partitionId = owner.getPartitionId();
152     if (LOG.isTraceEnabled()) {
153       LOG.trace("sendMessageRequest: Send bytes (" + message.toString() +
154         ") to " + destVertexId + " on worker " + workerInfo);
155     }
156     ++totalMsgsSentInSuperstep;
157     // Add the message to the cache
158     int workerMessageSize = addMessage(
159       workerInfo, partitionId, destVertexId, message);
160     // Send a request if the cache of outgoing message to
161     // the remote worker 'workerInfo' is full enough to be flushed
162     if (workerMessageSize >= maxMessagesSizePerWorker) {
163       PairList<Integer, VertexIdMessages<I, M>>
164         workerMessages = removeWorkerMessages(workerInfo);
165       WritableRequest writableRequest =
166         new SendWorkerMessagesRequest<I, M>(workerMessages);
167       totalMsgBytesSentInSuperstep += writableRequest.getSerializedSize();
168       clientProcessor.doRequest(workerInfo, writableRequest);
169       // Notify sending
170       getServiceWorker().getGraphTaskManager().notifySentMessages();
171     }
172   }
173 
174   /**
175    * An iterator wrapper on edges to return
176    * target vertex ids.
177    */
178   public static class TargetVertexIdIterator<I extends WritableComparable>
179       implements Iterator<I> {
180     /** An edge iterator */
181     private final Iterator<Edge<I, Writable>> edgesIterator;
182 
183     /**
184      * Constructor.
185      *
186      * @param vertex The source vertex of the out edges
187      */
188     public TargetVertexIdIterator(Vertex<I, ?, ?> vertex) {
189       edgesIterator =
190         ((Vertex<I, Writable, Writable>) vertex).getEdges().iterator();
191     }
192 
193     @Override
194     public boolean hasNext() {
195       return edgesIterator.hasNext();
196     }
197 
198     @Override
199     public I next() {
200       return edgesIterator.next().getTargetVertexId();
201     }
202 
203     @Override
204     public void remove() {
205       throw new UnsupportedOperationException();
206     }
207   }
208 
209   /**
210    * Send message to all its neighbors
211    *
212    * @param vertex The source vertex
213    * @param message The message sent to a worker
214    */
215   public void sendMessageToAllRequest(Vertex<I, ?, ?> vertex, M message) {
216     TargetVertexIdIterator targetVertexIterator =
217       new TargetVertexIdIterator(vertex);
218     sendMessageToAllRequest(targetVertexIterator, message);
219   }
220 
221   /**
222    * Send message to the target ids in the iterator
223    *
224    * @param vertexIdIterator The iterator of target vertex ids
225    * @param message The message sent to a worker
226    */
227   public void sendMessageToAllRequest(Iterator<I> vertexIdIterator, M message) {
228     while (vertexIdIterator.hasNext()) {
229       sendMessageRequest(vertexIdIterator.next(), message);
230     }
231   }
232 
233   /**
234    * Flush the rest of the messages to the workers.
235    */
236   public void flush() {
237     PairList<WorkerInfo, PairList<Integer,
238         VertexIdMessages<I, M>>>
239     remainingMessageCache = removeAllMessages();
240     PairList<WorkerInfo, PairList<
241         Integer, VertexIdMessages<I, M>>>.Iterator
242     iterator = remainingMessageCache.getIterator();
243     while (iterator.hasNext()) {
244       iterator.next();
245       WritableRequest writableRequest =
246         new SendWorkerMessagesRequest<I, M>(
247           iterator.getCurrentSecond());
248       totalMsgBytesSentInSuperstep += writableRequest.getSerializedSize();
249       clientProcessor.doRequest(
250         iterator.getCurrentFirst(), writableRequest);
251     }
252   }
253 
254   /**
255    * Reset the message count per superstep.
256    *
257    * @return The message count sent in last superstep
258    */
259   public long resetMessageCount() {
260     long messagesSentInSuperstep = totalMsgsSentInSuperstep;
261     totalMsgsSentInSuperstep = 0;
262     return messagesSentInSuperstep;
263   }
264 
265   /**
266    * Reset the message bytes count per superstep.
267    *
268    * @return The message count sent in last superstep
269    */
270   public long resetMessageBytesCount() {
271     long messageBytesSentInSuperstep = totalMsgBytesSentInSuperstep;
272     totalMsgBytesSentInSuperstep = 0;
273     return messageBytesSentInSuperstep;
274   }
275 }