View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.netty.handler;
20  
21  import org.apache.giraph.comm.netty.NettyClient;
22  import org.apache.hadoop.conf.Configuration;
23  import org.apache.log4j.Logger;
24  
25  import io.netty.buffer.ByteBuf;
26  import io.netty.channel.ChannelHandlerContext;
27  import io.netty.channel.ChannelInboundHandlerAdapter;
28  import io.netty.util.ReferenceCountUtil;
29  
30  import static org.apache.giraph.conf.GiraphConstants.NETTY_SIMULATE_FIRST_RESPONSE_FAILED;
31  
32  /**
33   * Generic handler of responses.
34   */
35  public class ResponseClientHandler extends ChannelInboundHandlerAdapter {
36    /** Class logger */
37    private static final Logger LOG =
38        Logger.getLogger(ResponseClientHandler.class);
39    /** Already dropped first response? (used if dropFirstResponse == true) */
40    private static volatile boolean ALREADY_DROPPED_FIRST_RESPONSE = false;
41    /** Drop first response (used for simulating failure) */
42    private final boolean dropFirstResponse;
43    /** Netty client that does the actual I/O and keeps track of open requests */
44    private final NettyClient nettyClient;
45  
46    /**
47     * Constructor.
48     * @param nettyClient Client that does the actual I/O
49     * @param conf Configuration
50     */
51    public ResponseClientHandler(NettyClient nettyClient, Configuration conf) {
52      this.nettyClient = nettyClient;
53      dropFirstResponse = NETTY_SIMULATE_FIRST_RESPONSE_FAILED.get(conf);
54    }
55  
56    @Override
57    public void channelRead(ChannelHandlerContext ctx, Object msg)
58      throws Exception {
59      if (!(msg instanceof ByteBuf)) {
60        throw new IllegalStateException("channelRead: Got a " +
61            "non-ByteBuf message " + msg);
62      }
63  
64      ByteBuf buf = (ByteBuf) msg;
65      int senderId = -1;
66      long requestId = -1;
67      int response = -1;
68      try {
69        senderId = buf.readInt();
70        requestId = buf.readLong();
71        response = buf.readInt();
72      } catch (IndexOutOfBoundsException e) {
73        throw new IllegalStateException(
74            "channelRead: Got IndexOutOfBoundsException ", e);
75      }
76      ReferenceCountUtil.release(buf);
77  
78      boolean shouldDrop = false;
79      // Simulate a failed response on the first response (if desired)
80      if (dropFirstResponse && !ALREADY_DROPPED_FIRST_RESPONSE) {
81        LOG.info("channelRead: Simulating dropped response " + response +
82            " for request " + requestId);
83        setAlreadyDroppedFirstResponse();
84        shouldDrop = true;
85      }
86  
87      nettyClient.messageReceived(senderId, requestId, response, shouldDrop);
88    }
89  
90    /**
91     * Set already dropped first response flag
92     */
93    private static void setAlreadyDroppedFirstResponse() {
94      ALREADY_DROPPED_FIRST_RESPONSE = true;
95    }
96  
97    @Override
98    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
99      if (LOG.isDebugEnabled()) {
100       LOG.debug("channelClosed: Closed the channel on " +
101           ctx.channel().remoteAddress());
102     }
103     ctx.fireChannelInactive();
104   }
105 
106   @Override
107   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
108     throws Exception {
109     LOG.warn("exceptionCaught: Channel failed with " +
110         "remote address " + ctx.channel().remoteAddress(), cause);
111   }
112 }
113