This project has retired. For details please refer to its Attic page.
MasterRequestServerHandler xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.netty.handler;
20  
21  import org.apache.giraph.comm.flow_control.FlowControl;
22  import org.apache.giraph.comm.requests.MasterRequest;
23  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24  import org.apache.giraph.graph.TaskInfo;
25  import org.apache.giraph.master.MasterGlobalCommHandler;
26  
27  /** Handler for requests on master */
28  public class MasterRequestServerHandler extends
29      RequestServerHandler<MasterRequest> {
30    /** Aggregator handler */
31    private final MasterGlobalCommHandler commHandler;
32  
33    /**
34     * Constructor
35     *
36     * @param workerRequestReservedMap Worker request reservation map
37     * @param conf                     Configuration
38     * @param myTaskInfo               Current task info
39     * @param commHandler              Master communication handler
40     * @param exceptionHandler         Handles uncaught exceptions
41     * @param flowControl              Reference to the flow control used
42     */
43    public MasterRequestServerHandler(
44        WorkerRequestReservedMap workerRequestReservedMap,
45        ImmutableClassesGiraphConfiguration conf,
46        TaskInfo myTaskInfo,
47        MasterGlobalCommHandler commHandler,
48        Thread.UncaughtExceptionHandler exceptionHandler,
49        FlowControl flowControl) {
50      super(workerRequestReservedMap, conf, myTaskInfo, exceptionHandler);
51      this.commHandler = commHandler;
52      this.flowControl = flowControl;
53    }
54  
55    @Override
56    public void processRequest(MasterRequest request) {
57      request.doRequest(commHandler);
58    }
59  
60    /**
61     * Factory for {@link MasterRequestServerHandler}
62     */
63    public static class Factory implements RequestServerHandler.Factory {
64      /** Master aggregator handler */
65      private final MasterGlobalCommHandler commHandler;
66      /** Flow control used in sending requests */
67      private FlowControl flowControl;
68  
69      /**
70       * Constructor
71       *
72       * @param commHandler Master global communication handler
73       */
74      public Factory(MasterGlobalCommHandler commHandler) {
75        this.commHandler = commHandler;
76      }
77  
78      @Override
79      public RequestServerHandler newHandler(
80          WorkerRequestReservedMap workerRequestReservedMap,
81          ImmutableClassesGiraphConfiguration conf,
82          TaskInfo myTaskInfo,
83          Thread.UncaughtExceptionHandler exceptionHandler) {
84        return new MasterRequestServerHandler(workerRequestReservedMap, conf,
85            myTaskInfo, commHandler, exceptionHandler, flowControl);
86      }
87  
88      @Override
89      public void setFlowControl(FlowControl flowControl) {
90        this.flowControl = flowControl;
91      }
92    }
93  }