1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.comm.requests;
2021import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
2223import java.io.DataInput;
24import java.io.DataOutput;
25import java.io.IOException;
26import java.util.Map.Entry;
2728import org.apache.giraph.bsp.CentralizedServiceWorker;
29import org.apache.giraph.comm.ServerData;
30import org.apache.giraph.comm.messages.MessageStore;
31import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
32import org.apache.giraph.partition.PartitionOwner;
33import org.apache.giraph.utils.ByteArrayOneMessageToManyIds;
34import org.apache.giraph.utils.ByteArrayVertexIdMessages;
35import org.apache.giraph.utils.VertexIdMessageIterator;
36import org.apache.hadoop.io.Writable;
37import org.apache.hadoop.io.WritableComparable;
3839/**40 * Send a collection of ByteArrayOneMessageToManyIds messages to a worker.41 *42 * @param <I> Vertex id43 * @param <M> Message data44 */45 @SuppressWarnings("unchecked")
46publicclass SendWorkerOneMessageToManyRequest<I extends WritableComparable,
47 M extends Writable> extends WritableRequest<I, Writable, Writable>
48implements WorkerRequest<I, Writable, Writable> {
49/**ByteArrayOneMessageToManyIds encoding of vertexId & messages */50protected ByteArrayOneMessageToManyIds<I, M> oneMessageToManyIds;
5152/**53 * Constructor used for reflection only.54 */55publicSendWorkerOneMessageToManyRequest() { }
5657/**58 * Constructor used to send request.59 *60 * @param oneMessageToManyIds ByteArrayOneMessageToManyIds61 * @param conf ImmutableClassesGiraphConfiguration62 */63publicSendWorkerOneMessageToManyRequest(
64 ByteArrayOneMessageToManyIds<I, M> oneMessageToManyIds,
65ImmutableClassesGiraphConfiguration conf) {
66this.oneMessageToManyIds = oneMessageToManyIds;
67 setConf(conf);
68 }
6970 @Override
71publicRequestType getType() {
72return RequestType.SEND_WORKER_ONE_MESSAGE_TO_MANY_REQUEST;
73 }
7475 @Override
76publicvoid readFieldsRequest(DataInput input) throws IOException {
77 oneMessageToManyIds = new ByteArrayOneMessageToManyIds<>(
78 getConf().<M>createOutgoingMessageValueFactory());
79 oneMessageToManyIds.setConf(getConf());
80 oneMessageToManyIds.readFields(input);
81 }
8283 @Override
84publicvoid writeRequest(DataOutput output) throws IOException {
85this.oneMessageToManyIds.write(output);
86 }
8788 @Override
89publicint getSerializedSize() {
90returnsuper.getSerializedSize() +
91this.oneMessageToManyIds.getSerializedSize();
92 }
9394 @Override
95publicvoid doRequest(ServerData serverData) {
96 MessageStore<I, M> messageStore = serverData.getIncomingMessageStore();
97if (messageStore.isPointerListEncoding()) {
98// if message store is pointer list based then send data as is99 messageStore.addPartitionMessages(-1, oneMessageToManyIds);
100 } else { // else split the data per partition and send individually101 CentralizedServiceWorker<I, ?, ?> serviceWorker =
102 serverData.getServiceWorker();
103// Get the initial size of ByteArrayVertexIdMessages per partition104// on this worker. To make sure every ByteArrayVertexIdMessages to have105// enough space to store the messages, we divide the original106// ByteArrayOneMessageToManyIds message size by the number of partitions107// and double the size108// (Assume the major component in ByteArrayOneMessageToManyIds message109// is a id list. Now each target id has a copy of message,110// therefore we double the buffer size)111// to get the initial size of ByteArrayVertexIdMessages.112int initialSize = oneMessageToManyIds.getSize() /
113 serverData.getPartitionStore().getNumPartitions() * 2;
114// Create ByteArrayVertexIdMessages for115// message reformatting.116 Int2ObjectOpenHashMap<ByteArrayVertexIdMessages> partitionIdMsgs =
117new Int2ObjectOpenHashMap<>();
118119// Put data from ByteArrayOneMessageToManyIds120// to ByteArrayVertexIdMessages121 VertexIdMessageIterator<I, M> vertexIdMessageIterator =
122 oneMessageToManyIds.getVertexIdMessageIterator();
123while (vertexIdMessageIterator.hasNext()) {
124 vertexIdMessageIterator.next();
125 M msg = vertexIdMessageIterator.getCurrentMessage();
126 I vertexId = vertexIdMessageIterator.getCurrentVertexId();
127PartitionOwner owner =
128 serviceWorker.getVertexPartitionOwner(vertexId);
129int partitionId = owner.getPartitionId();
130 ByteArrayVertexIdMessages<I, M> idMsgs = partitionIdMsgs
131 .get(partitionId);
132if (idMsgs == null) {
133 idMsgs = new ByteArrayVertexIdMessages<>(
134 getConf().<M>createOutgoingMessageValueFactory());
135 idMsgs.setConf(getConf());
136 idMsgs.initialize(initialSize);
137 partitionIdMsgs.put(partitionId, idMsgs);
138 }
139 idMsgs.add(vertexId, msg);
140 }
141142// Read ByteArrayVertexIdMessages and write to message store143for (Entry<Integer, ByteArrayVertexIdMessages> idMsgs :
144 partitionIdMsgs.entrySet()) {
145if (!idMsgs.getValue().isEmpty()) {
146 serverData.getIncomingMessageStore().addPartitionMessages(
147 idMsgs.getKey(), idMsgs.getValue());
148 }
149 }
150 }
151 }
152 }