1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.comm;
2021import java.io.IOException;
22import java.util.Arrays;
23import java.util.Iterator;
2425import javax.annotation.concurrent.NotThreadSafe;
2627import org.apache.giraph.bsp.CentralizedServiceWorker;
28import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
29import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
30import org.apache.giraph.comm.requests.SendWorkerOneMessageToManyRequest;
31import org.apache.giraph.comm.requests.WritableRequest;
32import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
33import org.apache.giraph.partition.PartitionOwner;
34import org.apache.giraph.utils.ByteArrayOneMessageToManyIds;
35import org.apache.giraph.utils.ExtendedDataOutput;
36import org.apache.giraph.utils.PairList;
37import org.apache.giraph.utils.VertexIdMessages;
38import org.apache.giraph.worker.WorkerInfo;
39import org.apache.hadoop.io.Writable;
40import org.apache.hadoop.io.WritableComparable;
41import org.apache.log4j.Logger;
4243/**44 * Aggregates the messages to be sent to workers so they can be sent45 * in bulk.46 *47 * @param <I> Vertex id48 * @param <M> Message data49 */50 @NotThreadSafe
51 @SuppressWarnings("unchecked")
52publicclass SendOneMessageToManyCache<I extends WritableComparable,
53 M extends Writable> extends SendMessageCache<I, M> {
54/** Class logger */55privatestaticfinal Logger LOG =
56 Logger.getLogger(SendOneMessageToManyCache.class);
57/** Cache serialized one to many messages for each worker */58privatefinal ByteArrayOneMessageToManyIds<I, M>[] msgVidsCache;
59/** Tracking message-vertexIds sizes for each worker */60privatefinalint[] msgVidsSizes;
61/** Reused byte array to serialize target ids on each worker */62privatefinalExtendedDataOutput[] idSerializer;
63/** Reused int array to count target id distribution */64privatefinalint[] idCounter;
65/**66 * Reused int array to record the partition id67 * of the first target vertex id found on the worker.68 */69privatefinalint[] firstPartitionMap;
70/** The WorkerInfo list */71privatefinalWorkerInfo[] workerInfoList;
7273/**74 * Constructor75 *76 * @param conf Giraph configuration77 * @param serviceWorker Service worker78 * @param processor NettyWorkerClientRequestProcessor79 * @param maxMsgSize Max message size sent to a worker80 */81publicSendOneMessageToManyCache(ImmutableClassesGiraphConfiguration conf,
82 CentralizedServiceWorker<?, ?, ?> serviceWorker,
83 NettyWorkerClientRequestProcessor<I, ?, ?> processor,
84int maxMsgSize) {
85super(conf, serviceWorker, processor, maxMsgSize);
86int numWorkers = getNumWorkers();
87 msgVidsCache = newByteArrayOneMessageToManyIds[numWorkers];
88 msgVidsSizes = newint[numWorkers];
89 idSerializer = newExtendedDataOutput[numWorkers];
90// InitialBufferSizes is alo initialized based on the number of workers.91// As a result, initialBufferSizes is the same as idSerializer in length92int initialBufferSize = 0;
93for (int i = 0; i < this.idSerializer.length; i++) {
94 initialBufferSize = getSendWorkerInitialBufferSize(i);
95if (initialBufferSize > 0) {
96// InitialBufferSizes is from super class.97// Each element is for one worker.98 idSerializer[i] = conf.createExtendedDataOutput(initialBufferSize);
99 }
100 }
101 idCounter = newint[numWorkers];
102 firstPartitionMap = newint[numWorkers];
103// Get worker info list.104 workerInfoList = newWorkerInfo[numWorkers];
105// Remember there could be null in the array.106for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
107 workerInfoList[workerInfo.getTaskId()] = workerInfo;
108 }
109 }
110111/**112 * Reset ExtendedDataOutput array for id serialization113 * in next message-Vids encoding114 */115privatevoid resetIdSerializers() {
116for (int i = 0; i < this.idSerializer.length; i++) {
117if (idSerializer[i] != null) {
118 idSerializer[i].reset();
119 }
120 }
121 }
122123/**124 * Reset id counter for next message-vertexIds encoding125 */126privatevoid resetIdCounter() {
127 Arrays.fill(idCounter, 0);
128 }
129130/**131 * Add message with multiple target ids to message cache.132 *133 * @param workerInfo The remote worker destination134 * @param ids A byte array to hold serialized vertex ids135 * @param idPos The end position of ids136 * information in the byte array above137 * @param count The number of target ids138 * @param message Message to send to remote worker139 * @return The size of messages for the worker.140 */141privateint addOneToManyMessage(
142WorkerInfo workerInfo, byte[] ids, int idPos, int count, M message) {
143// Get the data collection144 ByteArrayOneMessageToManyIds<I, M> workerData =
145 msgVidsCache[workerInfo.getTaskId()];
146if (workerData == null) {
147 workerData = new ByteArrayOneMessageToManyIds<I, M>(
148 messageValueFactory);
149 workerData.setConf(getConf());
150 workerData.initialize(getSendWorkerInitialBufferSize(
151 workerInfo.getTaskId()));
152 msgVidsCache[workerInfo.getTaskId()] = workerData;
153 }
154 workerData.add(ids, idPos, count, message);
155// Update the size of cached, outgoing data per worker156 msgVidsSizes[workerInfo.getTaskId()] =
157 workerData.getSize();
158return msgVidsSizes[workerInfo.getTaskId()];
159 }
160161/**162 * Gets the messages + vertexIds for a worker and removes it from the cache.163 * Here the {@link org.apache.giraph.utils.ByteArrayOneMessageToManyIds}164 * returned could be null.But when invoking this method, we also check if165 * the data size sent to this worker is above the threshold.166 * Therefore, it doesn't matter if the result is null or not.167 *168 * @param workerInfo Target worker to which one messages - many ids are sent169 * @return {@link org.apache.giraph.utils.ByteArrayOneMessageToManyIds}170 * that belong to the workerInfo171 */172private ByteArrayOneMessageToManyIds<I, M>
173 removeWorkerMsgVids(WorkerInfo workerInfo) {
174 ByteArrayOneMessageToManyIds<I, M> workerData =
175 msgVidsCache[workerInfo.getTaskId()];
176if (workerData != null) {
177 msgVidsCache[workerInfo.getTaskId()] = null;
178 msgVidsSizes[workerInfo.getTaskId()] = 0;
179 }
180return workerData;
181 }
182183/**184 * Gets all messages - vertexIds and removes them from the cache.185 *186 * @return All vertex messages for all workers187 */188private PairList<WorkerInfo, ByteArrayOneMessageToManyIds<I, M>>
189 removeAllMsgVids() {
190 PairList<WorkerInfo, ByteArrayOneMessageToManyIds<I, M>> allData =
191new PairList<WorkerInfo, ByteArrayOneMessageToManyIds<I, M>>();
192 allData.initialize(msgVidsCache.length);
193for (WorkerInfo workerInfo : getWorkerPartitions().keySet()) {
194 ByteArrayOneMessageToManyIds<I, M> workerData =
195 removeWorkerMsgVids(workerInfo);
196if (workerData != null && !workerData.isEmpty()) {
197 allData.add(workerInfo, workerData);
198 }
199 }
200return allData;
201 }
202203 @Override
204publicvoid sendMessageToAllRequest(Iterator<I> vertexIdIterator, M message) {
205// This is going to be reused through every message sending206 resetIdSerializers();
207 resetIdCounter();
208// Count messages209int currentMachineId = 0;
210PartitionOwner owner = null;
211WorkerInfo workerInfo = null;
212 I vertexId = null;
213while (vertexIdIterator.hasNext()) {
214 vertexId = vertexIdIterator.next();
215 owner = getServiceWorker().getVertexPartitionOwner(vertexId);
216 workerInfo = owner.getWorkerInfo();
217 currentMachineId = workerInfo.getTaskId();
218// Serialize this target vertex id219try {
220 vertexId.write(idSerializer[currentMachineId]);
221 } catch (IOException e) {
222thrownew IllegalStateException(
223"Failed to serialize the target vertex id.");
224 }
225 idCounter[currentMachineId]++;
226// Record the first partition id in the worker which message send to.227// If idCounter shows there is only one target on this worker228// then this is the partition number of the target vertex.229if (idCounter[currentMachineId] == 1) {
230 firstPartitionMap[currentMachineId] = owner.getPartitionId();
231 }
232 }
233// Add the message to the cache234int idSerializerPos = 0;
235int workerMessageSize = 0;
236 byte[] serializedId = null;
237WritableRequest writableRequest = null;
238for (int i = 0; i < idCounter.length; i++) {
239if (idCounter[i] == 1) {
240 serializedId = idSerializer[i].getByteArray();
241 idSerializerPos = idSerializer[i].getPos();
242// Add the message to the cache243 workerMessageSize = addMessage(workerInfoList[i],
244 firstPartitionMap[i], serializedId, idSerializerPos, message);
245246if (LOG.isTraceEnabled()) {
247 LOG.trace("sendMessageToAllRequest: Send bytes (" +
248 message.toString() + ") to one target in worker " +
249 workerInfoList[i]);
250 }
251 ++totalMsgsSentInSuperstep;
252if (workerMessageSize >= maxMessagesSizePerWorker) {
253 PairList<Integer, VertexIdMessages<I, M>>
254 workerMessages = removeWorkerMessages(workerInfoList[i]);
255 writableRequest = new SendWorkerMessagesRequest<>(workerMessages);
256 totalMsgBytesSentInSuperstep += writableRequest.getSerializedSize();
257 clientProcessor.doRequest(workerInfoList[i], writableRequest);
258// Notify sending259 getServiceWorker().getGraphTaskManager().notifySentMessages();
260 }
261 } elseif (idCounter[i] > 1) {
262 serializedId = idSerializer[i].getByteArray();
263 idSerializerPos = idSerializer[i].getPos();
264 workerMessageSize = addOneToManyMessage(
265 workerInfoList[i], serializedId, idSerializerPos, idCounter[i],
266 message);
267268if (LOG.isTraceEnabled()) {
269 LOG.trace("sendMessageToAllRequest: Send bytes (" +
270 message.toString() + ") to all targets in worker" +
271 workerInfoList[i]);
272 }
273 totalMsgsSentInSuperstep += idCounter[i];
274if (workerMessageSize >= maxMessagesSizePerWorker) {
275 ByteArrayOneMessageToManyIds<I, M> workerMsgVids =
276 removeWorkerMsgVids(workerInfoList[i]);
277 writableRequest = new SendWorkerOneMessageToManyRequest<>(
278 workerMsgVids, getConf());
279 totalMsgBytesSentInSuperstep += writableRequest.getSerializedSize();
280 clientProcessor.doRequest(workerInfoList[i], writableRequest);
281// Notify sending282 getServiceWorker().getGraphTaskManager().notifySentMessages();
283 }
284 }
285 }
286 }
287288 @Override
289publicvoid flush() {
290super.flush();
291 PairList<WorkerInfo, ByteArrayOneMessageToManyIds<I, M>>
292 remainingMsgVidsCache = removeAllMsgVids();
293 PairList<WorkerInfo,
294 ByteArrayOneMessageToManyIds<I, M>>.Iterator
295 msgIdsIterator = remainingMsgVidsCache.getIterator();
296while (msgIdsIterator.hasNext()) {
297 msgIdsIterator.next();
298WritableRequest writableRequest =
299new SendWorkerOneMessageToManyRequest<>(
300 msgIdsIterator.getCurrentSecond(), getConf());
301 totalMsgBytesSentInSuperstep += writableRequest.getSerializedSize();
302 clientProcessor.doRequest(
303 msgIdsIterator.getCurrentFirst(), writableRequest);
304 }
305 }
306 }