1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.comm.netty.handler;
2021import org.apache.giraph.comm.flow_control.FlowControl;
22import org.apache.giraph.comm.requests.MasterRequest;
23import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24import org.apache.giraph.graph.TaskInfo;
25import org.apache.giraph.master.MasterGlobalCommHandler;
2627/** Handler for requests on master */28publicclassMasterRequestServerHandlerextends29 RequestServerHandler<MasterRequest> {
30/** Aggregator handler */31privatefinalMasterGlobalCommHandler commHandler;
3233/**34 * Constructor35 *36 * @param workerRequestReservedMap Worker request reservation map37 * @param conf Configuration38 * @param myTaskInfo Current task info39 * @param commHandler Master communication handler40 * @param exceptionHandler Handles uncaught exceptions41 * @param flowControl Reference to the flow control used42 */43publicMasterRequestServerHandler(
44WorkerRequestReservedMap workerRequestReservedMap,
45ImmutableClassesGiraphConfiguration conf,
46TaskInfo myTaskInfo,
47MasterGlobalCommHandler commHandler,
48 Thread.UncaughtExceptionHandler exceptionHandler,
49FlowControl flowControl) {
50super(workerRequestReservedMap, conf, myTaskInfo, exceptionHandler);
51this.commHandler = commHandler;
52this.flowControl = flowControl;
53 }
5455 @Override
56publicvoid processRequest(MasterRequest request) {
57 request.doRequest(commHandler);
58 }
5960/**61 * Factory for {@link MasterRequestServerHandler}62 */63publicstaticclassFactoryimplements RequestServerHandler.Factory {
64/** Master aggregator handler */65privatefinalMasterGlobalCommHandler commHandler;
66/** Flow control used in sending requests */67privateFlowControl flowControl;
6869/**70 * Constructor71 *72 * @param commHandler Master global communication handler73 */74publicFactory(MasterGlobalCommHandler commHandler) {
75this.commHandler = commHandler;
76 }
7778 @Override
79publicRequestServerHandler newHandler(
80WorkerRequestReservedMap workerRequestReservedMap,
81ImmutableClassesGiraphConfiguration conf,
82TaskInfo myTaskInfo,
83 Thread.UncaughtExceptionHandler exceptionHandler) {
84returnnewMasterRequestServerHandler(workerRequestReservedMap, conf,
85 myTaskInfo, commHandler, exceptionHandler, flowControl);
86 }
8788 @Override
89publicvoid setFlowControl(FlowControl flowControl) {
90this.flowControl = flowControl;
91 }
92 }
93 }