View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.requests;
20  
21  import java.io.DataInput;
22  import java.io.IOException;
23  
24  import org.apache.giraph.comm.GlobalCommType;
25  import org.apache.giraph.comm.ServerData;
26  import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
27  import org.apache.hadoop.io.LongWritable;
28  import org.apache.hadoop.io.Writable;
29  
30  /**
31   * Request to send partial aggregated values for current superstep (values
32   * which were computed by one worker's vertices)
33   */
34  public class SendWorkerAggregatorsRequest extends
35      ByteArrayWithSenderTaskIdRequest implements WorkerRequest {
36  
37    /**
38     * Constructor
39     *
40     * @param data Serialized aggregator data
41     * @param senderTaskId Sender task id
42     */
43    public SendWorkerAggregatorsRequest(byte[] data, int senderTaskId) {
44      super(data, senderTaskId);
45    }
46  
47    /**
48     * Constructor used for reflection only
49     */
50    public SendWorkerAggregatorsRequest() {
51    }
52  
53    @Override
54    public void doRequest(ServerData serverData) {
55      DataInput input = getDataInput();
56      OwnerAggregatorServerData aggregatorData =
57          serverData.getOwnerAggregatorData();
58      try {
59        int num = input.readInt();
60        for (int i = 0; i < num; i++) {
61          String name = input.readUTF();
62          GlobalCommType type = GlobalCommType.values()[input.readByte()];
63          if (type == GlobalCommType.SPECIAL_COUNT) {
64            LongWritable value = new LongWritable();
65            value.readFields(input);
66            aggregatorData.receivedRequestCountFromWorker(
67                value.get(),
68                getSenderTaskId());
69          } else if (type == GlobalCommType.REDUCED_VALUE) {
70            Writable value = aggregatorData.createInitialValue(name);
71            value.readFields(input);
72            aggregatorData.reduce(name, value);
73          } else {
74            throw new IllegalStateException(
75                "SendWorkerAggregatorsRequest received " + type);
76          }
77        }
78      } catch (IOException e) {
79        throw new IllegalStateException("doRequest: " +
80            "IOException occurred while processing request", e);
81      }
82      aggregatorData.receivedRequestFromWorker();
83    }
84  
85    @Override
86    public RequestType getType() {
87      return RequestType.SEND_WORKER_AGGREGATORS_REQUEST;
88    }
89  }