View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.netty.handler;
20  
21  import org.apache.giraph.comm.flow_control.FlowControl;
22  import org.apache.giraph.comm.requests.WritableRequest;
23  import org.apache.giraph.conf.GiraphConstants;
24  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
25  import org.apache.giraph.graph.TaskInfo;
26  import org.apache.giraph.time.SystemTime;
27  import org.apache.giraph.time.Time;
28  import org.apache.giraph.time.Times;
29  import org.apache.log4j.Logger;
30  
31  import io.netty.buffer.ByteBuf;
32  import io.netty.channel.ChannelHandlerContext;
33  import io.netty.channel.ChannelInboundHandlerAdapter;
34  
35  import java.util.concurrent.atomic.AtomicBoolean;
36  
37  import static org.apache.giraph.conf.GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED;
38  
39  /**
40   * Generic handler of requests.
41   *
42   * @param <R> Request type
43   */
44  public abstract class RequestServerHandler<R> extends
45    ChannelInboundHandlerAdapter {
46    /** Number of bytes in the encoded response */
47    public static final int RESPONSE_BYTES = 16;
48    /** Time class to use */
49    private static Time TIME = SystemTime.get();
50    /** Class logger */
51    private static final Logger LOG =
52        Logger.getLogger(RequestServerHandler.class);
53    /** Already closed first request? */
54    private static volatile boolean ALREADY_CLOSED_FIRST_REQUEST = false;
55    /** Flow control used in sending requests */
56    protected FlowControl flowControl;
57    /** Close connection on first request (used for simulating failure) */
58    private final boolean closeFirstRequest;
59    /** Request reserved map (for exactly one semantics) */
60    private final WorkerRequestReservedMap workerRequestReservedMap;
61    /** My task info */
62    private final TaskInfo myTaskInfo;
63    /** Start nanoseconds for the processing time */
64    private long startProcessingNanoseconds = -1;
65    /** Handler for uncaught exceptions */
66    private final Thread.UncaughtExceptionHandler exceptionHandler;
67    /** Whether it is the first time reading/handling a request*/
68    private final AtomicBoolean firstRead = new AtomicBoolean(true);
69    /** Cached value for NETTY_AUTO_READ configuration option */
70    private final boolean nettyAutoRead;
71  
72    /**
73     * Constructor
74     *
75     * @param workerRequestReservedMap Worker request reservation map
76     * @param conf Configuration
77     * @param myTaskInfo Current task info
78     * @param exceptionHandler Handles uncaught exceptions
79     */
80    public RequestServerHandler(
81        WorkerRequestReservedMap workerRequestReservedMap,
82        ImmutableClassesGiraphConfiguration conf,
83        TaskInfo myTaskInfo,
84        Thread.UncaughtExceptionHandler exceptionHandler) {
85      this.workerRequestReservedMap = workerRequestReservedMap;
86      closeFirstRequest = NETTY_SIMULATE_FIRST_REQUEST_CLOSED.get(conf);
87      this.myTaskInfo = myTaskInfo;
88      this.exceptionHandler = exceptionHandler;
89      this.nettyAutoRead = GiraphConstants.NETTY_AUTO_READ.get(conf);
90    }
91  
92    @Override
93    public void channelRead(ChannelHandlerContext ctx, Object msg)
94      throws Exception {
95      if (LOG.isTraceEnabled()) {
96        LOG.trace("messageReceived: Got " + msg.getClass());
97      }
98  
99      WritableRequest request = (WritableRequest) msg;
100 
101     // Simulate a closed connection on the first request (if desired)
102     if (closeFirstRequest && !ALREADY_CLOSED_FIRST_REQUEST) {
103       LOG.info("messageReceived: Simulating closing channel on first " +
104           "request " + request.getRequestId() + " from " +
105           request.getClientId());
106       setAlreadyClosedFirstRequest();
107       ctx.close();
108       return;
109     }
110 
111     // Only execute this request exactly once
112     AckSignalFlag alreadyDone = AckSignalFlag.DUPLICATE_REQUEST;
113     if (workerRequestReservedMap.reserveRequest(
114         request.getClientId(),
115         request.getRequestId())) {
116       if (LOG.isDebugEnabled()) {
117         startProcessingNanoseconds = TIME.getNanoseconds();
118       }
119       processRequest((R) request);
120       if (LOG.isDebugEnabled()) {
121         LOG.debug("messageReceived: Processing client " +
122             request.getClientId() + ", " +
123             "requestId " + request.getRequestId() +
124             ", " +  request.getType() + " took " +
125             Times.getNanosSince(TIME, startProcessingNanoseconds) + " ns");
126       }
127       alreadyDone = AckSignalFlag.NEW_REQUEST;
128     } else {
129       LOG.info("messageReceived: Request id " +
130           request.getRequestId() + " from client " +
131           request.getClientId() +
132           " was already processed, " +
133           "not processing again.");
134     }
135 
136     // Send the response with the request id
137     ByteBuf buffer = ctx.alloc().buffer(RESPONSE_BYTES);
138     buffer.writeInt(myTaskInfo.getTaskId());
139     buffer.writeLong(request.getRequestId());
140     int signal =
141         flowControl.calculateResponse(alreadyDone, request.getClientId());
142     buffer.writeInt(signal);
143     ctx.write(buffer);
144     // NettyServer is bootstrapped with auto-read set to true by default. After
145     // the first request is processed, we set auto-read to false. This prevents
146     // netty from reading requests continuously and putting them in off-heap
147     // memory. Instead, we will call `read` on requests one by one, so that the
148     // lower level transport layer handles the congestion if the rate of
149     // incoming requests is more than the available processing capability.
150     if (!nettyAutoRead && firstRead.compareAndSet(true, false)) {
151       ctx.channel().config().setAutoRead(false);
152     }
153   }
154 
155   @Override
156   public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
157     if (!nettyAutoRead) {
158       ctx.read();
159     } else {
160       super.channelReadComplete(ctx);
161     }
162   }
163 
164   /**
165    * Set the flag indicating already closed first request
166    */
167   private static void setAlreadyClosedFirstRequest() {
168     ALREADY_CLOSED_FIRST_REQUEST = true;
169   }
170 
171   /**
172    * Process request
173    *
174    * @param request Request to process
175    */
176   public abstract void processRequest(R request);
177 
178   @Override
179   public void channelActive(ChannelHandlerContext ctx) throws Exception {
180     if (LOG.isDebugEnabled()) {
181       LOG.debug("channelActive: Connected the channel on " +
182           ctx.channel().remoteAddress());
183     }
184     ctx.fireChannelActive();
185   }
186 
187   @Override
188   public void channelInactive(ChannelHandlerContext ctx) throws Exception {
189     if (LOG.isDebugEnabled()) {
190       LOG.debug("channelInactive: Closed the channel on " +
191           ctx.channel().remoteAddress());
192     }
193     ctx.fireChannelInactive();
194   }
195 
196   @Override
197   public void exceptionCaught(
198       ChannelHandlerContext ctx, Throwable cause) throws Exception {
199     exceptionHandler.uncaughtException(Thread.currentThread(), cause);
200   }
201 
202   /**
203    * Factory for {@link RequestServerHandler}
204    */
205   public interface Factory {
206     /**
207      * Create new {@link RequestServerHandler}
208      *
209      * @param workerRequestReservedMap Worker request reservation map
210      * @param conf Configuration to use
211      * @param myTaskInfo Current task info
212      * @param exceptionHandler Handles uncaught exceptions
213      * @return New {@link RequestServerHandler}
214      */
215     RequestServerHandler newHandler(
216         WorkerRequestReservedMap workerRequestReservedMap,
217         ImmutableClassesGiraphConfiguration conf,
218         TaskInfo myTaskInfo,
219         Thread.UncaughtExceptionHandler exceptionHandler);
220 
221     /**
222      * Inform the factory about the flow control policy used (this method should
223      * be called before any call to `#newHandle()`)
224      *
225      * @param flowControl reference to flow control used
226      */
227     void setFlowControl(FlowControl flowControl);
228   }
229 }