1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.comm.requests;
2021import java.io.IOException;
2223import org.apache.giraph.comm.GlobalCommType;
24import org.apache.giraph.comm.ServerData;
25import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
26import org.apache.giraph.reducers.ReduceOperation;
27import org.apache.giraph.utils.UnsafeByteArrayInputStream;
28import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
29import org.apache.giraph.utils.UnsafeReusableByteArrayInput;
30import org.apache.giraph.utils.WritableUtils;
31import org.apache.hadoop.io.LongWritable;
32import org.apache.hadoop.io.Writable;
3334/**35 * Request to send final aggregatd values from master to worker which owns36 * the aggregators37 */38publicclassSendAggregatorsToOwnerRequest39extendsByteArrayWithSenderTaskIdRequestimplementsWorkerRequest {
4041/**42 * Constructor43 *44 * @param data Serialized aggregator data45 * @param senderTaskId Sender task id46 */47publicSendAggregatorsToOwnerRequest(byte[] data, int senderTaskId) {
48super(data, senderTaskId);
49 }
5051/**52 * Constructor used for reflection only53 */54publicSendAggregatorsToOwnerRequest() {
55 }
5657 @Override
58publicvoid doRequest(ServerData serverData) {
59UnsafeByteArrayOutputStream reusedOut = newUnsafeByteArrayOutputStream();
60UnsafeReusableByteArrayInput reusedIn = newUnsafeReusableByteArrayInput();
6162UnsafeByteArrayInputStream input = getUnsafeByteArrayInput();
63AllAggregatorServerData aggregatorData = serverData.getAllAggregatorData();
64try {
65int num = input.readInt();
66for (int i = 0; i < num; i++) {
67 String name = input.readUTF();
68GlobalCommType type = GlobalCommType.values()[input.readByte()];
69 Writable value = WritableUtils.readWritableObject(input, conf);
70if (type == GlobalCommType.SPECIAL_COUNT) {
71 aggregatorData.receivedRequestCountFromMaster(
72 ((LongWritable) value).get(),
73 getSenderTaskId());
74 } else {
75 aggregatorData.receiveValueFromMaster(name, type, value);
7677if (type == GlobalCommType.REDUCE_OPERATIONS) {
78 ReduceOperation<Object, Writable> reduceOpCopy =
79 (ReduceOperation<Object, Writable>)
80 WritableUtils.createCopy(reusedOut, reusedIn, value, conf);
8182 serverData.getOwnerAggregatorData().registerReducer(
83 name, reduceOpCopy);
84 }
85 }
86 }
87 } catch (IOException e) {
88thrownew IllegalStateException("doRequest: " +
89"IOException occurred while processing request", e);
90 }
91 aggregatorData.receivedRequestFromMaster(getData());
92 }
9394 @Override
95publicRequestType getType() {
96return RequestType.SEND_AGGREGATORS_TO_OWNER_REQUEST;
97 }
98 }