View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.netty;
20  
21  import org.apache.giraph.metrics.GiraphMetrics;
22  import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
23  import org.apache.giraph.metrics.SuperstepMetricsRegistry;
24  import org.apache.log4j.Logger;
25  
26  import io.netty.buffer.ByteBuf;
27  import io.netty.channel.ChannelHandlerContext;
28  import io.netty.channel.ChannelInboundHandlerAdapter;
29  import io.netty.channel.ChannelHandler.Sharable;
30  
31  
32  /**
33   * Keep track of the bytes received and provide some metrics when
34   * desired as part of the Netty Channel stack.
35   */
36  @Sharable
37  public class InboundByteCounter extends ChannelInboundHandlerAdapter implements
38      ByteCounter, ResetSuperstepMetricsObserver {
39    /** Class logger */
40    private static final Logger LOG =
41        Logger.getLogger(InboundByteCounter.class);
42    /** ByteCounter delegate object */
43    private final ByteCounterDelegate delegate = new ByteCounterDelegate(true);
44  
45    /** Constructor */
46    public InboundByteCounter() {
47      // Initialize Metrics
48      GiraphMetrics.get().addSuperstepResetObserver(this);
49    }
50  
51    public long getBytesReceived() {
52      return delegate.getBytesProcessed();
53    }
54  
55    /**
56     * @return Mbytes received / sec in the current interval
57     */
58    public double getMbytesPerSecReceived() {
59      return delegate.getMbytesPerSecProcessed();
60    }
61  
62    @Override
63    public void channelRead(ChannelHandlerContext ctx, Object msg)
64      throws Exception {
65      if (msg instanceof ByteBuf) {
66        ByteBuf buf = (ByteBuf) msg;
67        int receivedBytes = delegate.byteBookkeeper(buf);
68        if (LOG.isDebugEnabled()) {
69          LOG.debug("channelRead: " + ctx.channel().toString() + " buffer " +
70              "size = " + receivedBytes + ", total bytes = " +
71              getBytesReceived());
72        }
73      }
74      ctx.fireChannelRead(msg);
75    }
76  
77    @Override
78    public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
79      delegate.newSuperstep(superstepMetrics);
80    }
81  
82    @Override
83    public void resetAll() {
84      delegate.resetAll();
85    }
86  
87    @Override
88    public String getMetrics() {
89      return delegate.getMetrics();
90    }
91  
92    @Override
93    public String getMetricsWindow(int minMsecsWindow) {
94      return delegate.getMetricsWindow(minMsecsWindow);
95    }
96  }