This project has retired. For details please refer to its Attic page.
ResponseEncoder xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.netty.handler;
20  
21  import io.netty.buffer.ByteBufOutputStream;
22  /*if[HADOOP_NON_SECURE]
23  else[HADOOP_NON_SECURE]*/
24  import org.apache.giraph.comm.requests.RequestType;
25  /*end[HADOOP_NON_SECURE]*/
26  import org.apache.giraph.comm.requests.WritableRequest;
27  import org.apache.log4j.Logger;
28  import io.netty.buffer.ByteBuf;
29  import io.netty.channel.ChannelPromise;
30  import io.netty.channel.ChannelHandlerContext;
31  import io.netty.channel.ChannelOutboundHandlerAdapter;
32  
33  import static org.apache.giraph.utils.ByteUtils.SIZE_OF_INT;
34  
35  /**
36   * How a server should respond to a client. Currently only used for
37   * responding to client's SASL messages, and removed after client
38   * authenticates.
39   */
40  public class ResponseEncoder extends ChannelOutboundHandlerAdapter {
41    /** Class logger. */
42    private static final Logger LOG = Logger.getLogger(ResponseEncoder.class);
43  
44    @Override
45    public void write(ChannelHandlerContext ctx, Object msg,
46      ChannelPromise promise) throws Exception {
47      if (LOG.isDebugEnabled()) {
48        LOG.debug("write(" + ctx + "," + msg);
49      }
50  
51      if (!(msg instanceof WritableRequest)) {
52        throw new IllegalArgumentException(
53            "encode: cannot encode message of type " + msg.getClass() +
54                " since it is not an instance of an implementation of " +
55                " WritableRequest.");
56      }
57      @SuppressWarnings("unchecked")
58      WritableRequest writableRequest = (WritableRequest) msg;
59  
60      ByteBuf buf = ctx.alloc().buffer(10);
61      ByteBufOutputStream output = new ByteBufOutputStream(buf);
62  
63      if (LOG.isDebugEnabled()) {
64        LOG.debug("encode: Encoding a message of type " + msg.getClass());
65      }
66  
67      // Space is reserved now to be filled later by the serialize request size
68      output.writeInt(0);
69      // write type of object.
70      output.writeByte(writableRequest.getType().ordinal());
71      // write the object itself.
72      writableRequest.write(output);
73  
74      output.flush();
75      output.close();
76  
77      // Set the correct size at the end.
78      buf.setInt(0, buf.writerIndex() - SIZE_OF_INT);
79  
80      if (LOG.isDebugEnabled()) {
81        LOG.debug("encode: Encoding a message of type " + msg.getClass());
82      }
83      ctx.write(buf, promise);
84  /*if[HADOOP_NON_SECURE]
85  else[HADOOP_NON_SECURE]*/
86      if (writableRequest.getType() == RequestType.SASL_COMPLETE_REQUEST) {
87        // We are sending to the client a SASL_COMPLETE response (created by
88        // the SaslServer handler). The SaslServer handler has removed itself
89        // from the pipeline after creating this response, and now it's time for
90        // the ResponseEncoder to remove itself also.
91        if (LOG.isDebugEnabled()) {
92          LOG.debug("encode: Removing RequestEncoder handler: no longer needed," +
93              " since client: " + ctx.channel().remoteAddress() + " has " +
94              "completed authenticating.");
95        }
96        ctx.pipeline().remove(this);
97      }
98  /*end[HADOOP_NON_SECURE]*/
99      ctx.write(buf, promise);
100   }
101 }
102