1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.giraph.comm.requests;
20
21 import java.io.IOException;
22
23 import org.apache.giraph.comm.GlobalCommType;
24 import org.apache.giraph.comm.ServerData;
25 import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
26 import org.apache.giraph.utils.UnsafeByteArrayInputStream;
27 import org.apache.giraph.utils.WritableUtils;
28 import org.apache.hadoop.io.LongWritable;
29 import org.apache.hadoop.io.Writable;
30
31 /**
32 * Request to send final aggregated values from worker which owns them to
33 * other workers
34 */
35 public class SendAggregatorsToWorkerRequest extends
36 ByteArrayWithSenderTaskIdRequest implements WorkerRequest {
37
38 /**
39 * Constructor
40 *
41 * @param data Serialized aggregator data
42 * @param senderTaskId Sender task id
43 */
44 public SendAggregatorsToWorkerRequest(byte[] data, int senderTaskId) {
45 super(data, senderTaskId);
46 }
47
48 /**
49 * Constructor used for reflection only
50 */
51 public SendAggregatorsToWorkerRequest() {
52 }
53
54 @Override
55 public void doRequest(ServerData serverData) {
56 UnsafeByteArrayInputStream input = getUnsafeByteArrayInput();
57 AllAggregatorServerData aggregatorData = serverData.getAllAggregatorData();
58 try {
59 int num = input.readInt();
60 for (int i = 0; i < num; i++) {
61 String name = input.readUTF();
62 GlobalCommType type = GlobalCommType.values()[input.readByte()];
63 Writable value = WritableUtils.readWritableObject(input, conf);
64 if (type == GlobalCommType.SPECIAL_COUNT) {
65 aggregatorData.receivedRequestCountFromWorker(
66 ((LongWritable) value).get(),
67 getSenderTaskId());
68 } else {
69 aggregatorData.receiveValueFromMaster(name, type, value);
70 }
71 }
72 } catch (IOException e) {
73 throw new IllegalStateException("doRequest: " +
74 "IOException occurred while processing request", e);
75 }
76 aggregatorData.receivedRequestFromWorker();
77 }
78
79 @Override
80 public RequestType getType() {
81 return RequestType.SEND_AGGREGATORS_TO_WORKER_REQUEST;
82 }
83 }