This project has retired. For details please refer to its Attic page.
RequestEncoder xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.netty.handler;
20  
21  import io.netty.buffer.ByteBufOutputStream;
22  import org.apache.giraph.comm.requests.WritableRequest;
23  import org.apache.giraph.conf.GiraphConfiguration;
24  import org.apache.giraph.conf.GiraphConstants;
25  import org.apache.giraph.time.SystemTime;
26  import org.apache.giraph.time.Time;
27  import org.apache.giraph.time.Times;
28  import org.apache.log4j.Logger;
29  
30  import io.netty.buffer.ByteBuf;
31  import io.netty.channel.ChannelHandlerContext;
32  import io.netty.channel.ChannelOutboundHandlerAdapter;
33  import io.netty.channel.ChannelPromise;
34  
35  import static org.apache.giraph.utils.ByteUtils.SIZE_OF_BYTE;
36  import static org.apache.giraph.utils.ByteUtils.SIZE_OF_INT;
37  
38  /**
39   * Requests have a request type and an encoded request.
40   */
41  public class RequestEncoder extends ChannelOutboundHandlerAdapter {
42    /** Class logger */
43    private static final Logger LOG = Logger.getLogger(RequestEncoder.class);
44    /** Time class to use */
45    private static final Time TIME = SystemTime.get();
46    /** Buffer starting size */
47    private final int bufferStartingSize;
48    /** Start nanoseconds for the encoding time */
49    private long startEncodingNanoseconds = -1;
50  
51    /**
52     * Constructor.
53     *
54     * @param conf Giraph configuration
55     */
56    public RequestEncoder(GiraphConfiguration conf) {
57      bufferStartingSize =
58          GiraphConstants.NETTY_REQUEST_ENCODER_BUFFER_SIZE.get(conf);
59    }
60  
61    @Override
62    public void write(ChannelHandlerContext ctx, Object msg,
63      ChannelPromise promise) throws Exception {
64      if (!(msg instanceof WritableRequest)) {
65        throw new IllegalArgumentException(
66            "encode: Got a message of type " + msg.getClass());
67      }
68  
69      // Encode the request
70      if (LOG.isDebugEnabled()) {
71        startEncodingNanoseconds = TIME.getNanoseconds();
72      }
73  
74      ByteBuf buf;
75      WritableRequest request = (WritableRequest) msg;
76      int requestSize = request.getSerializedSize();
77      if (requestSize == WritableRequest.UNKNOWN_SIZE) {
78        buf = ctx.alloc().buffer(bufferStartingSize);
79      } else {
80        requestSize +=  SIZE_OF_INT + SIZE_OF_BYTE;
81        buf = ctx.alloc().buffer(requestSize);
82      }
83      ByteBufOutputStream output = new ByteBufOutputStream(buf);
84  
85      // This will later be filled with the correct size of serialized request
86      output.writeInt(0);
87      output.writeByte(request.getType().ordinal());
88      try {
89        request.write(output);
90      } catch (IndexOutOfBoundsException e) {
91        LOG.error("write: Most likely the size of request was not properly " +
92            "specified (this buffer is too small) - see getSerializedSize() " +
93            "in " + request.getType().getRequestClass());
94        throw new IllegalStateException(e);
95      }
96      output.flush();
97      output.close();
98  
99      // Set the correct size at the end
100     buf.setInt(0, buf.writerIndex() - SIZE_OF_INT);
101     if (LOG.isDebugEnabled()) {
102       LOG.debug("write: Client " + request.getClientId() + ", " +
103           "requestId " + request.getRequestId() +
104           ", size = " + buf.readableBytes() + ", " +
105           request.getType() + " took " +
106           Times.getNanosSince(TIME, startEncodingNanoseconds) + " ns");
107     }
108     ctx.write(buf, promise);
109   }
110 }