1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.comm;
2021importstatic org.apache.giraph.conf.GiraphConstants.ADDITIONAL_MSG_REQUEST_SIZE;
22importstatic org.apache.giraph.conf.GiraphConstants.MAX_MSG_REQUEST_SIZE;
2324import java.util.Iterator;
2526import org.apache.giraph.bsp.CentralizedServiceWorker;
27import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
28import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
29import org.apache.giraph.comm.requests.WritableRequest;
30import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
31import org.apache.giraph.edge.Edge;
32import org.apache.giraph.factories.MessageValueFactory;
33import org.apache.giraph.graph.Vertex;
34import org.apache.giraph.partition.PartitionOwner;
35import org.apache.giraph.utils.ByteArrayVertexIdMessages;
36import org.apache.giraph.utils.PairList;
37import org.apache.giraph.utils.VertexIdMessages;
38import org.apache.giraph.worker.WorkerInfo;
39import org.apache.hadoop.io.Writable;
40import org.apache.hadoop.io.WritableComparable;
41import org.apache.log4j.Logger;
4243/**44 * Aggregates the messages to be sent to workers so they can be sent45 * in bulk. Not thread-safe.46 *47 * @param <I> Vertex id48 * @param <M> Message data49 */50 @SuppressWarnings("unchecked")
51publicclass SendMessageCache<I extends WritableComparable, M extends Writable>
52extends SendVertexIdDataCache<I, M, VertexIdMessages<I, M>> {
53/** Class logger */54privatestaticfinal Logger LOG =
55 Logger.getLogger(SendMessageCache.class);
56/** Messages sent during the last superstep */57protectedlong totalMsgsSentInSuperstep = 0;
58/** Message bytes sent during the last superstep */59protectedlong totalMsgBytesSentInSuperstep = 0;
60/** Max message size sent to a worker */61protectedfinalint maxMessagesSizePerWorker;
62/**NettyWorkerClientRequestProcessor for message sending */63protectedfinal NettyWorkerClientRequestProcessor<I, ?, ?> clientProcessor;
64/** Cached message value factory */65protected MessageValueFactory<M> messageValueFactory;
66/**67 * Constructor68 *69 * @param conf Giraph configuration70 * @param serviceWorker Service worker71 * @param processor NettyWorkerClientRequestProcessor72 * @param maxMsgSize Max message size sent to a worker73 */74publicSendMessageCache(ImmutableClassesGiraphConfiguration conf,
75 CentralizedServiceWorker<?, ?, ?> serviceWorker,
76 NettyWorkerClientRequestProcessor<I, ?, ?> processor,
77int maxMsgSize) {
78super(conf, serviceWorker, MAX_MSG_REQUEST_SIZE.get(conf),
79 ADDITIONAL_MSG_REQUEST_SIZE.get(conf));
80 maxMessagesSizePerWorker = maxMsgSize;
81 clientProcessor = processor;
82 messageValueFactory =
83 conf.createOutgoingMessageValueFactory();
84 }
8586 @Override
87public VertexIdMessages<I, M> createVertexIdData() {
88returnnew ByteArrayVertexIdMessages<I, M>(messageValueFactory);
89 }
9091/**92 * Add a message to the cache.93 *94 * @param workerInfo the remote worker destination95 * @param partitionId the remote Partition this message belongs to96 * @param destVertexId vertex id that is ultimate destination97 * @param message Message to send to remote worker98 * @return Size of messages for the worker.99 */100publicint addMessage(WorkerInfo workerInfo,
101int partitionId, I destVertexId, M message) {
102return addData(workerInfo, partitionId, destVertexId, message);
103 }
104105/**106 * Add a message to the cache with serialized ids.107 *108 * @param workerInfo The remote worker destination109 * @param partitionId The remote Partition this message belongs to110 * @param serializedId Serialized vertex id that is ultimate destination111 * @param idSerializerPos The end position of serialized id in the byte array112 * @param message Message to send to remote worker113 * @return Size of messages for the worker.114 */115protectedint addMessage(WorkerInfo workerInfo, int partitionId,
116 byte[] serializedId, int idSerializerPos, M message) {
117return addData(
118 workerInfo, partitionId, serializedId,
119 idSerializerPos, message);
120 }
121122/**123 * Gets the messages for a worker and removes it from the cache.124 *125 * @param workerInfo the address of the worker who owns the data126 * partitions that are receiving the messages127 * @return List of pairs (partitionId, ByteArrayVertexIdMessages),128 * where all partition ids belong to workerInfo129 */130protected PairList<Integer, VertexIdMessages<I, M>>
131 removeWorkerMessages(WorkerInfo workerInfo) {
132return removeWorkerData(workerInfo);
133 }
134135/**136 * Gets all the messages and removes them from the cache.137 *138 * @return All vertex messages for all partitions139 */140private PairList<WorkerInfo, PairList<
141 Integer, VertexIdMessages<I, M>>> removeAllMessages() {
142return removeAllData();
143 }
144145/**146 * Send a message to a target vertex id.147 *148 * @param destVertexId Target vertex id149 * @param message The message sent to the target150 */151publicvoid sendMessageRequest(I destVertexId, M message) {
152PartitionOwner owner =
153 getServiceWorker().getVertexPartitionOwner(destVertexId);
154WorkerInfo workerInfo = owner.getWorkerInfo();
155finalint partitionId = owner.getPartitionId();
156if (LOG.isTraceEnabled()) {
157 LOG.trace("sendMessageRequest: Send bytes (" + message.toString() +
158") to " + destVertexId + " on worker " + workerInfo);
159 }
160 ++totalMsgsSentInSuperstep;
161// Add the message to the cache162int workerMessageSize = addMessage(
163 workerInfo, partitionId, destVertexId, message);
164// Send a request if the cache of outgoing message to165// the remote worker 'workerInfo' is full enough to be flushed166if (workerMessageSize >= maxMessagesSizePerWorker) {
167 PairList<Integer, VertexIdMessages<I, M>>
168 workerMessages = removeWorkerMessages(workerInfo);
169WritableRequest writableRequest =
170new SendWorkerMessagesRequest<I, M>(workerMessages);
171 totalMsgBytesSentInSuperstep += writableRequest.getSerializedSize();
172 clientProcessor.doRequest(workerInfo, writableRequest);
173// Notify sending174 getServiceWorker().getGraphTaskManager().notifySentMessages();
175 }
176 }
177178/**179 * An iterator wrapper on edges to return180 * target vertex ids.181 */182publicstaticclass TargetVertexIdIterator<I extends WritableComparable>
183implements Iterator<I> {
184/** An edge iterator */185privatefinal Iterator<Edge<I, Writable>> edgesIterator;
186187/**188 * Constructor.189 *190 * @param vertex The source vertex of the out edges191 */192publicTargetVertexIdIterator(Vertex<I, ?, ?> vertex) {
193 edgesIterator =
194 ((Vertex<I, Writable, Writable>) vertex).getEdges().iterator();
195 }
196197 @Override
198publicboolean hasNext() {
199return edgesIterator.hasNext();
200 }
201202 @Override
203public I next() {
204return edgesIterator.next().getTargetVertexId();
205 }
206207 @Override
208publicvoid remove() {
209thrownew UnsupportedOperationException();
210 }
211 }
212213/**214 * Send message to all its neighbors215 *216 * @param vertex The source vertex217 * @param message The message sent to a worker218 */219publicvoid sendMessageToAllRequest(Vertex<I, ?, ?> vertex, M message) {
220TargetVertexIdIterator targetVertexIterator =
221newTargetVertexIdIterator(vertex);
222 sendMessageToAllRequest(targetVertexIterator, message);
223 }
224225/**226 * Send message to the target ids in the iterator227 *228 * @param vertexIdIterator The iterator of target vertex ids229 * @param message The message sent to a worker230 */231publicvoid sendMessageToAllRequest(Iterator<I> vertexIdIterator, M message) {
232while (vertexIdIterator.hasNext()) {
233 sendMessageRequest(vertexIdIterator.next(), message);
234 }
235 }
236237/**238 * Flush the rest of the messages to the workers.239 */240publicvoid flush() {
241 PairList<WorkerInfo, PairList<Integer,
242 VertexIdMessages<I, M>>>
243 remainingMessageCache = removeAllMessages();
244 PairList<WorkerInfo, PairList<
245 Integer, VertexIdMessages<I, M>>>.Iterator
246 iterator = remainingMessageCache.getIterator();
247while (iterator.hasNext()) {
248 iterator.next();
249WritableRequest writableRequest =
250new SendWorkerMessagesRequest<I, M>(
251 iterator.getCurrentSecond());
252 totalMsgBytesSentInSuperstep += writableRequest.getSerializedSize();
253 clientProcessor.doRequest(
254 iterator.getCurrentFirst(), writableRequest);
255 }
256 }
257258/**259 * Reset the message count per superstep.260 *261 * @return The message count sent in last superstep262 */263publiclong resetMessageCount() {
264long messagesSentInSuperstep = totalMsgsSentInSuperstep;
265 totalMsgsSentInSuperstep = 0;
266return messagesSentInSuperstep;
267 }
268269/**270 * Reset the message bytes count per superstep.271 *272 * @return The message count sent in last superstep273 */274publiclong resetMessageBytesCount() {
275long messageBytesSentInSuperstep = totalMsgBytesSentInSuperstep;
276 totalMsgBytesSentInSuperstep = 0;
277return messageBytesSentInSuperstep;
278 }
279 }