1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.comm.requests;
2021import org.apache.giraph.utils.VertexIdData;
22import org.apache.giraph.utils.PairList;
23import org.apache.hadoop.io.WritableComparable;
24import org.apache.log4j.Logger;
2526import java.io.DataInput;
27import java.io.DataOutput;
28import java.io.IOException;
2930/**31 * Abstract request to send a collection of data, indexed by vertex id,32 * for a partition.33 *34 * @param <I> Vertex id35 * @param <T> Data36 * @param <B> Specialization of37 * {@link org.apache.giraph.utils.VertexIdData} for T38 */39 @SuppressWarnings("unchecked")
40publicabstractclass SendWorkerDataRequest<I extends WritableComparable, T,
41 B extends VertexIdData<I, T>>
42extendsWritableRequestimplementsWorkerRequest {
43/** Class logger */44privatestaticfinal Logger LOG =
45 Logger.getLogger(SendWorkerDataRequest.class);
46/**47 * All data for a group of vertices, organized by partition, which48 * are owned by a single (destination) worker. This data is all49 * destined for this worker.50 * */51protected PairList<Integer, B> partitionVertexData;
5253/**54 * Constructor used for reflection only55 */56publicSendWorkerDataRequest() { }
5758/**59 * Constructor used to send request.60 *61 * @param partVertData Map of remote partitions => VertexIdData62 */63publicSendWorkerDataRequest(
64 PairList<Integer, B> partVertData) {
65this.partitionVertexData = partVertData;
66 }
6768/**69 * Create a new {@link org.apache.giraph.utils.VertexIdData}70 * specialized for the use case.71 *72 * @return A new instance of73 * {@link org.apache.giraph.utils.VertexIdData}74 */75publicabstract B createVertexIdData();
7677 @Override
78publicvoid readFieldsRequest(DataInput input) throws IOException {
79int numPartitions = input.readInt();
80 partitionVertexData = new PairList<Integer, B>();
81 partitionVertexData.initialize(numPartitions);
82while (numPartitions-- > 0) {
83finalint partitionId = input.readInt();
84 B vertexIdData = createVertexIdData();
85 vertexIdData.setConf(getConf());
86 vertexIdData.readFields(input);
87 partitionVertexData.add(partitionId, vertexIdData);
88 }
89 }
9091 @Override
92publicvoid writeRequest(DataOutput output) throws IOException {
93 output.writeInt(partitionVertexData.getSize());
94 PairList<Integer, B>.Iterator
95 iterator = partitionVertexData.getIterator();
96while (iterator.hasNext()) {
97 iterator.next();
98 output.writeInt(iterator.getCurrentFirst());
99 iterator.getCurrentSecond().write(output);
100 }
101 }
102103 @Override
104publicint getSerializedSize() {
105int size = super.getSerializedSize() + 4;
106 PairList<Integer, B>.Iterator iterator = partitionVertexData.getIterator();
107while (iterator.hasNext()) {
108 iterator.next();
109 size += 4 + iterator.getCurrentSecond().getSerializedSize();
110 }
111return size;
112 }
113 }
114