1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.comm.netty.handler;
2021import org.apache.giraph.comm.flow_control.FlowControl;
22import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23import org.apache.giraph.comm.ServerData;
24import org.apache.giraph.comm.requests.WorkerRequest;
25import org.apache.giraph.graph.TaskInfo;
26import org.apache.hadoop.io.Writable;
27import org.apache.hadoop.io.WritableComparable;
2829/**30 * Handler for requests on worker31 *32 * @param <I> Vertex id33 * @param <V> Vertex data34 * @param <E> Edge data35 * @param <M> Message data36 */37publicclass WorkerRequestServerHandler<I extends WritableComparable,
38 V extends Writable, E extends Writable, M extends Writable> extends39 RequestServerHandler<WorkerRequest<I, V, E>> {
40/** Data that can be accessed for handling requests */41privatefinal ServerData<I, V, E> serverData;
4243/**44 * Constructor with external server data45 *46 * @param serverData Data held by the server47 * @param workerRequestReservedMap Worker request reservation map48 * @param conf Configuration49 * @param myTaskInfo Current task info50 * @param exceptionHandler Handles uncaught exceptions51 * @param flowControl Reference to the flow control used52 */53publicWorkerRequestServerHandler(ServerData<I, V, E> serverData,
54WorkerRequestReservedMap workerRequestReservedMap,
55ImmutableClassesGiraphConfiguration conf,
56TaskInfo myTaskInfo,
57 Thread.UncaughtExceptionHandler exceptionHandler,
58FlowControl flowControl) {
59super(workerRequestReservedMap, conf, myTaskInfo, exceptionHandler);
60this.serverData = serverData;
61this.flowControl = flowControl;
62 }
6364 @Override
65publicvoid processRequest(WorkerRequest<I, V, E> request) {
66 request.doRequest(serverData);
67 }
6869/**Factory for {@link WorkerRequestServerHandler} */70publicstaticclass Factory<I extends WritableComparable,
71 V extends Writable, E extends Writable> implements72 RequestServerHandler.Factory {
73/** Data that can be accessed for handling requests */74privatefinal ServerData<I, V, E> serverData;
75/** Flow control used in sending requests */76privateFlowControl flowControl;
7778/**79 * Constructor80 *81 * @param serverData Data held by the server82 */83publicFactory(ServerData<I, V, E> serverData) {
84this.serverData = serverData;
85 }
8687 @Override
88publicRequestServerHandler newHandler(
89WorkerRequestReservedMap workerRequestReservedMap,
90ImmutableClassesGiraphConfiguration conf,
91TaskInfo myTaskInfo,
92 Thread.UncaughtExceptionHandler exceptionHandler) {
93returnnew WorkerRequestServerHandler<I, V, E, Writable>(serverData,
94 workerRequestReservedMap, conf, myTaskInfo, exceptionHandler,
95 flowControl);
96 }
9798 @Override
99publicvoid setFlowControl(FlowControl flowControl) {
100this.flowControl = flowControl;
101 }
102 }
103 }