This project has retired. For details please refer to its Attic page.
WorkerRequestServerHandler xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.netty.handler;
20  
21  import org.apache.giraph.comm.flow_control.FlowControl;
22  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23  import org.apache.giraph.comm.ServerData;
24  import org.apache.giraph.comm.requests.WorkerRequest;
25  import org.apache.giraph.graph.TaskInfo;
26  import org.apache.hadoop.io.Writable;
27  import org.apache.hadoop.io.WritableComparable;
28  
29  /**
30   * Handler for requests on worker
31   *
32   * @param <I> Vertex id
33   * @param <V> Vertex data
34   * @param <E> Edge data
35   * @param <M> Message data
36   */
37  public class WorkerRequestServerHandler<I extends WritableComparable,
38      V extends Writable, E extends Writable, M extends Writable> extends
39      RequestServerHandler<WorkerRequest<I, V, E>> {
40    /** Data that can be accessed for handling requests */
41    private final ServerData<I, V, E> serverData;
42  
43    /**
44     * Constructor with external server data
45     *
46     * @param serverData               Data held by the server
47     * @param workerRequestReservedMap Worker request reservation map
48     * @param conf                     Configuration
49     * @param myTaskInfo               Current task info
50     * @param exceptionHandler         Handles uncaught exceptions
51     * @param flowControl              Reference to the flow control used
52     */
53    public WorkerRequestServerHandler(ServerData<I, V, E> serverData,
54        WorkerRequestReservedMap workerRequestReservedMap,
55        ImmutableClassesGiraphConfiguration conf,
56        TaskInfo myTaskInfo,
57        Thread.UncaughtExceptionHandler exceptionHandler,
58        FlowControl flowControl) {
59      super(workerRequestReservedMap, conf, myTaskInfo, exceptionHandler);
60      this.serverData = serverData;
61      this.flowControl = flowControl;
62    }
63  
64    @Override
65    public void processRequest(WorkerRequest<I, V, E> request) {
66      request.doRequest(serverData);
67    }
68  
69    /** Factory for {@link WorkerRequestServerHandler} */
70    public static class Factory<I extends WritableComparable,
71        V extends Writable, E extends Writable> implements
72        RequestServerHandler.Factory {
73      /** Data that can be accessed for handling requests */
74      private final ServerData<I, V, E> serverData;
75      /** Flow control used in sending requests */
76      private FlowControl flowControl;
77  
78      /**
79       * Constructor
80       *
81       * @param serverData Data held by the server
82       */
83      public Factory(ServerData<I, V, E> serverData) {
84        this.serverData = serverData;
85      }
86  
87      @Override
88      public RequestServerHandler newHandler(
89          WorkerRequestReservedMap workerRequestReservedMap,
90          ImmutableClassesGiraphConfiguration conf,
91          TaskInfo myTaskInfo,
92          Thread.UncaughtExceptionHandler exceptionHandler) {
93        return new WorkerRequestServerHandler<I, V, E, Writable>(serverData,
94            workerRequestReservedMap, conf, myTaskInfo, exceptionHandler,
95            flowControl);
96      }
97  
98      @Override
99      public void setFlowControl(FlowControl flowControl) {
100       this.flowControl = flowControl;
101     }
102   }
103 }