This project has retired. For details please refer to its Attic page.
RequestDecoder xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.netty.handler;
20  
21  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
22  import org.apache.giraph.comm.netty.InboundByteCounter;
23  import org.apache.giraph.comm.requests.RequestType;
24  import org.apache.giraph.comm.requests.WritableRequest;
25  import org.apache.giraph.time.SystemTime;
26  import org.apache.giraph.time.Time;
27  import org.apache.giraph.time.Times;
28  import org.apache.giraph.utils.ReflectionUtils;
29  import org.apache.giraph.utils.RequestUtils;
30  import org.apache.log4j.Logger;
31  
32  import io.netty.buffer.ByteBuf;
33  import io.netty.channel.ChannelHandlerContext;
34  import io.netty.channel.ChannelInboundHandlerAdapter;
35  import io.netty.util.ReferenceCountUtil;
36  
37  /**
38   * Decodes encoded requests from the client.
39   */
40  public class RequestDecoder extends ChannelInboundHandlerAdapter {
41    /** Class logger */
42    private static final Logger LOG =
43        Logger.getLogger(RequestDecoder.class);
44    /** Time class to use */
45    private static final Time TIME = SystemTime.get();
46    /** Configuration */
47    private final ImmutableClassesGiraphConfiguration conf;
48    /** In bound byte counter to output */
49    private final InboundByteCounter byteCounter;
50    /** Start nanoseconds for the decoding time */
51    private long startDecodingNanoseconds = -1;
52    /**
53     * Constructor.
54     *
55     * @param conf Configuration
56     * @param byteCounter Keeps track of the decoded bytes
57     */
58    public RequestDecoder(ImmutableClassesGiraphConfiguration conf,
59      InboundByteCounter byteCounter) {
60      this.conf = conf;
61      this.byteCounter = byteCounter;
62    }
63  
64    @Override
65    public void channelRead(ChannelHandlerContext ctx, Object msg)
66      throws Exception {
67      if (!(msg instanceof ByteBuf)) {
68        throw new IllegalStateException("decode: Got illegal message " + msg);
69      }
70      // Output metrics every 1/2 minute
71      String metrics = byteCounter.getMetricsWindow(30000);
72      if (metrics != null) {
73        if (LOG.isInfoEnabled()) {
74          LOG.info("decode: Server window metrics " + metrics);
75        }
76      }
77  
78      if (LOG.isDebugEnabled()) {
79        startDecodingNanoseconds = TIME.getNanoseconds();
80      }
81  
82      // Decode the request
83      ByteBuf buf = (ByteBuf) msg;
84      int enumValue = buf.readByte();
85      RequestType type = RequestType.values()[enumValue];
86      Class<? extends WritableRequest> requestClass = type.getRequestClass();
87      WritableRequest request =
88          ReflectionUtils.newInstance(requestClass, conf);
89      request = RequestUtils.decodeWritableRequest(buf, request);
90  
91      if (LOG.isDebugEnabled()) {
92        LOG.debug("decode: Client " + request.getClientId() +
93            ", requestId " + request.getRequestId() +
94            ", " +  request.getType() + ", with size " +
95            buf.writerIndex() + " took " +
96            Times.getNanosSince(TIME, startDecodingNanoseconds) + " ns");
97      }
98      ReferenceCountUtil.release(buf);
99      // fire writableRequest object to upstream handlers
100     ctx.fireChannelRead(request);
101   }
102 }