1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.comm.requests;
2021import java.io.DataInput;
22import java.io.DataOutput;
23import java.io.IOException;
2425import org.apache.giraph.comm.ServerData;
26import org.apache.giraph.utils.ByteArrayVertexIdMessages;
27import org.apache.hadoop.io.Writable;
28import org.apache.hadoop.io.WritableComparable;
2930/**31 * Send a collection of vertex messages for a partition. It adds messages to32 * current message store and it should be used only during partition exchange.33 *34 * @param <I> Vertex id35 * @param <V> Vertex data36 * @param <E> Edge data37 * @param <M> Message data38 */39publicclass SendPartitionCurrentMessagesRequest<I extends WritableComparable,
40 V extends Writable, E extends Writable, M extends Writable> extends41 WritableRequest<I, V, E> implements WorkerRequest<I, V, E> {
42/** Destination partition for these vertices' messages*/43privateint partitionId;
44/** Map of destination vertex ID's to message lists */45private ByteArrayVertexIdMessages<I, M> vertexIdMessageMap;
4647/** Constructor used for reflection only */48publicSendPartitionCurrentMessagesRequest() { }
4950/**51 * Constructor used to send request.52 *53 * @param partitionId Partition to send the request to54 * @param vertexIdMessages Map of messages to send55 */56publicSendPartitionCurrentMessagesRequest(int partitionId,
57 ByteArrayVertexIdMessages<I, M> vertexIdMessages) {
58super();
59this.partitionId = partitionId;
60this.vertexIdMessageMap = vertexIdMessages;
61 }
6263 @Override
64publicRequestType getType() {
65return RequestType.SEND_PARTITION_CURRENT_MESSAGES_REQUEST;
66 }
6768 @Override
69publicvoid readFieldsRequest(DataInput input) throws IOException {
70 partitionId = input.readInt();
71// At this moment the Computation class have already been replaced with72// the new one, and we deal with messages from previous superstep73 vertexIdMessageMap = new ByteArrayVertexIdMessages<>(
74 getConf().<M>createIncomingMessageValueFactory());
75 vertexIdMessageMap.setConf(getConf());
76 vertexIdMessageMap.initialize();
77 vertexIdMessageMap.readFields(input);
78 }
7980 @Override
81publicvoid writeRequest(DataOutput output) throws IOException {
82 output.writeInt(partitionId);
83 vertexIdMessageMap.write(output);
84 }
8586 @Override
87publicvoid doRequest(ServerData<I, V, E> serverData) {
88 serverData.<M>getCurrentMessageStore().addPartitionMessages(partitionId,
89 vertexIdMessageMap);
90 }
9192 @Override
93publicint getSerializedSize() {
94returnsuper.getSerializedSize() + 4 +
95 vertexIdMessageMap.getSerializedSize();
96 }
97 }