View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.requests;
20  
21  import java.io.DataInput;
22  import java.io.IOException;
23  
24  import org.apache.giraph.comm.GlobalCommType;
25  import org.apache.giraph.comm.ServerData;
26  import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
27  import org.apache.giraph.reducers.ReduceOperation;
28  import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
29  import org.apache.giraph.utils.UnsafeReusableByteArrayInput;
30  import org.apache.giraph.utils.WritableUtils;
31  import org.apache.hadoop.io.LongWritable;
32  import org.apache.hadoop.io.Writable;
33  
34  /**
35   * Request to send final aggregatd values from master to worker which owns
36   * the aggregators
37   */
38  public class SendAggregatorsToOwnerRequest
39      extends ByteArrayWithSenderTaskIdRequest implements WorkerRequest {
40  
41    /**
42     * Constructor
43     *
44     * @param data Serialized aggregator data
45     * @param senderTaskId Sender task id
46     */
47    public SendAggregatorsToOwnerRequest(byte[] data, int senderTaskId) {
48      super(data, senderTaskId);
49    }
50  
51    /**
52     * Constructor used for reflection only
53     */
54    public SendAggregatorsToOwnerRequest() {
55    }
56  
57    @Override
58    public void doRequest(ServerData serverData) {
59      UnsafeByteArrayOutputStream reusedOut = new UnsafeByteArrayOutputStream();
60      UnsafeReusableByteArrayInput reusedIn = new UnsafeReusableByteArrayInput();
61  
62      DataInput input = getDataInput();
63      AllAggregatorServerData aggregatorData = serverData.getAllAggregatorData();
64      try {
65        int num = input.readInt();
66        for (int i = 0; i < num; i++) {
67          String name = input.readUTF();
68          GlobalCommType type = GlobalCommType.values()[input.readByte()];
69          Writable value = WritableUtils.readWritableObject(input, conf);
70          if (type == GlobalCommType.SPECIAL_COUNT) {
71            aggregatorData.receivedRequestCountFromMaster(
72                ((LongWritable) value).get(),
73                getSenderTaskId());
74          } else {
75            aggregatorData.receiveValueFromMaster(name, type, value);
76  
77            if (type == GlobalCommType.REDUCE_OPERATIONS) {
78              ReduceOperation<Object, Writable> reduceOpCopy =
79                  (ReduceOperation<Object, Writable>)
80                  WritableUtils.createCopy(reusedOut, reusedIn, value, conf);
81  
82              serverData.getOwnerAggregatorData().registerReducer(
83                  name, reduceOpCopy);
84            }
85          }
86        }
87      } catch (IOException e) {
88        throw new IllegalStateException("doRequest: " +
89            "IOException occurred while processing request", e);
90      }
91      aggregatorData.receivedRequestFromMaster(getData());
92    }
93  
94    @Override
95    public RequestType getType() {
96      return RequestType.SEND_AGGREGATORS_TO_OWNER_REQUEST;
97    }
98  }