This project has retired. For details please refer to its Attic page.
RequestServerHandler xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.netty.handler;
20  
21  import org.apache.giraph.comm.flow_control.FlowControl;
22  import org.apache.giraph.comm.requests.WritableRequest;
23  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24  import org.apache.giraph.graph.TaskInfo;
25  import org.apache.giraph.time.SystemTime;
26  import org.apache.giraph.time.Time;
27  import org.apache.giraph.time.Times;
28  import org.apache.log4j.Logger;
29  
30  import io.netty.buffer.ByteBuf;
31  import io.netty.channel.ChannelHandlerContext;
32  import io.netty.channel.ChannelInboundHandlerAdapter;
33  
34  import static org.apache.giraph.conf.GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED;
35  
36  /**
37   * Generic handler of requests.
38   *
39   * @param <R> Request type
40   */
41  public abstract class RequestServerHandler<R> extends
42    ChannelInboundHandlerAdapter {
43    /** Number of bytes in the encoded response */
44    public static final int RESPONSE_BYTES = 16;
45    /** Time class to use */
46    private static Time TIME = SystemTime.get();
47    /** Class logger */
48    private static final Logger LOG =
49        Logger.getLogger(RequestServerHandler.class);
50    /** Already closed first request? */
51    private static volatile boolean ALREADY_CLOSED_FIRST_REQUEST = false;
52    /** Flow control used in sending requests */
53    protected FlowControl flowControl;
54    /** Close connection on first request (used for simulating failure) */
55    private final boolean closeFirstRequest;
56    /** Request reserved map (for exactly one semantics) */
57    private final WorkerRequestReservedMap workerRequestReservedMap;
58    /** My task info */
59    private final TaskInfo myTaskInfo;
60    /** Start nanoseconds for the processing time */
61    private long startProcessingNanoseconds = -1;
62    /** Handler for uncaught exceptions */
63    private final Thread.UncaughtExceptionHandler exceptionHandler;
64  
65    /**
66     * Constructor
67     *
68     * @param workerRequestReservedMap Worker request reservation map
69     * @param conf Configuration
70     * @param myTaskInfo Current task info
71     * @param exceptionHandler Handles uncaught exceptions
72     */
73    public RequestServerHandler(
74        WorkerRequestReservedMap workerRequestReservedMap,
75        ImmutableClassesGiraphConfiguration conf,
76        TaskInfo myTaskInfo,
77        Thread.UncaughtExceptionHandler exceptionHandler) {
78      this.workerRequestReservedMap = workerRequestReservedMap;
79      closeFirstRequest = NETTY_SIMULATE_FIRST_REQUEST_CLOSED.get(conf);
80      this.myTaskInfo = myTaskInfo;
81      this.exceptionHandler = exceptionHandler;
82    }
83  
84    @Override
85    public void channelRead(ChannelHandlerContext ctx, Object msg)
86      throws Exception {
87      if (LOG.isTraceEnabled()) {
88        LOG.trace("messageReceived: Got " + msg.getClass());
89      }
90  
91      WritableRequest request = (WritableRequest) msg;
92  
93      // Simulate a closed connection on the first request (if desired)
94      if (closeFirstRequest && !ALREADY_CLOSED_FIRST_REQUEST) {
95        LOG.info("messageReceived: Simulating closing channel on first " +
96            "request " + request.getRequestId() + " from " +
97            request.getClientId());
98        setAlreadyClosedFirstRequest();
99        ctx.close();
100       return;
101     }
102 
103     // Only execute this request exactly once
104     AckSignalFlag alreadyDone = AckSignalFlag.DUPLICATE_REQUEST;
105     if (workerRequestReservedMap.reserveRequest(
106         request.getClientId(),
107         request.getRequestId())) {
108       if (LOG.isDebugEnabled()) {
109         startProcessingNanoseconds = TIME.getNanoseconds();
110       }
111       processRequest((R) request);
112       if (LOG.isDebugEnabled()) {
113         LOG.debug("messageReceived: Processing client " +
114             request.getClientId() + ", " +
115             "requestId " + request.getRequestId() +
116             ", " +  request.getType() + " took " +
117             Times.getNanosSince(TIME, startProcessingNanoseconds) + " ns");
118       }
119       alreadyDone = AckSignalFlag.NEW_REQUEST;
120     } else {
121       LOG.info("messageReceived: Request id " +
122           request.getRequestId() + " from client " +
123           request.getClientId() +
124           " was already processed, " +
125           "not processing again.");
126     }
127 
128     // Send the response with the request id
129     ByteBuf buffer = ctx.alloc().buffer(RESPONSE_BYTES);
130     buffer.writeInt(myTaskInfo.getTaskId());
131     buffer.writeLong(request.getRequestId());
132     int signal =
133         flowControl.calculateResponse(alreadyDone, request.getClientId());
134     buffer.writeInt(signal);
135     ctx.write(buffer);
136   }
137 
138   /**
139    * Set the flag indicating already closed first request
140    */
141   private static void setAlreadyClosedFirstRequest() {
142     ALREADY_CLOSED_FIRST_REQUEST = true;
143   }
144 
145   /**
146    * Process request
147    *
148    * @param request Request to process
149    */
150   public abstract void processRequest(R request);
151 
152   @Override
153   public void channelActive(ChannelHandlerContext ctx) throws Exception {
154     if (LOG.isDebugEnabled()) {
155       LOG.debug("channelActive: Connected the channel on " +
156           ctx.channel().remoteAddress());
157     }
158     ctx.fireChannelActive();
159   }
160 
161   @Override
162   public void channelInactive(ChannelHandlerContext ctx) throws Exception {
163     if (LOG.isDebugEnabled()) {
164       LOG.debug("channelInactive: Closed the channel on " +
165           ctx.channel().remoteAddress());
166     }
167     ctx.fireChannelInactive();
168   }
169 
170   @Override
171   public void exceptionCaught(
172       ChannelHandlerContext ctx, Throwable cause) throws Exception {
173     exceptionHandler.uncaughtException(Thread.currentThread(), cause);
174   }
175 
176   /**
177    * Factory for {@link RequestServerHandler}
178    */
179   public interface Factory {
180     /**
181      * Create new {@link RequestServerHandler}
182      *
183      * @param workerRequestReservedMap Worker request reservation map
184      * @param conf Configuration to use
185      * @param myTaskInfo Current task info
186      * @param exceptionHandler Handles uncaught exceptions
187      * @return New {@link RequestServerHandler}
188      */
189     RequestServerHandler newHandler(
190         WorkerRequestReservedMap workerRequestReservedMap,
191         ImmutableClassesGiraphConfiguration conf,
192         TaskInfo myTaskInfo,
193         Thread.UncaughtExceptionHandler exceptionHandler);
194 
195     /**
196      * Inform the factory about the flow control policy used (this method should
197      * be called before any call to `#newHandle()`)
198      *
199      * @param flowControl reference to flow control used
200      */
201     void setFlowControl(FlowControl flowControl);
202   }
203 }