View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.netty;
20  
21  import com.yammer.metrics.core.Histogram;
22  import com.yammer.metrics.core.Meter;
23  import com.yammer.metrics.core.NoOpHistogram;
24  import com.yammer.metrics.core.NoOpMeter;
25  import io.netty.buffer.ByteBuf;
26  import org.apache.giraph.metrics.MeterDesc;
27  import org.apache.giraph.metrics.MetricNames;
28  import org.apache.giraph.metrics.SuperstepMetricsRegistry;
29  import org.apache.giraph.time.SystemTime;
30  import org.apache.giraph.time.Time;
31  
32  import java.text.DecimalFormat;
33  import java.util.concurrent.atomic.AtomicLong;
34  
35  /**
36   * Delegate Object to help keep track of the bytes processed and provide some
37   * metrics when desired as part of the Netty Channel stack.
38   */
39  public class ByteCounterDelegate implements ByteCounter {
40    /** Megabyte in bytes */
41    public static final double MEGABYTE = 1024f * 1024f;
42    /** Helper to format the doubles */
43    private static final DecimalFormat DOUBLE_FORMAT =
44        new DecimalFormat("#######.####");
45    /** Class timer */
46    private static final Time TIME = SystemTime.get();
47    /** All bytes ever processed */
48    private final AtomicLong bytesProcessed = new AtomicLong();
49    /** Total processed requests */
50    private final AtomicLong processedRequests = new AtomicLong();
51    /** Start time (for bandwidth calculation) */
52    private final AtomicLong startMsecs = new AtomicLong();
53    /** Last updated msecs for getMetricsWindow */
54    private final AtomicLong metricsWindowLastUpdatedMsecs = new AtomicLong();
55  
56    // Metrics
57    /** Meter of requests sent */
58    private Meter processedRequestsMeter = NoOpMeter.INSTANCE;
59    /** Histogram of bytes sent */
60    private Histogram processedBytesHist = NoOpHistogram.INSTANCE;
61  
62    /** Is it delegate for InBoundByteCounter */
63    private final boolean isInbound;
64  
65    /**
66     * Constructor to specify if delegate is created by InBound/ Outbound counter
67     *
68     * @param isInBound switch to specify if instantiated by inbound counter
69     */
70    public ByteCounterDelegate(boolean isInBound) {
71      this.isInbound = isInBound;
72    }
73  
74    /**
75     * Called by Inbound/ Outbound counters to refresh meters on a new superstep
76     *
77     * @param superstepMetrics superstepmetrics registry
78     */
79    public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
80      if (isInbound) {
81        processedRequestsMeter = superstepMetrics.getMeter(
82            MeterDesc.RECEIVED_REQUESTS);
83        processedBytesHist = superstepMetrics.getUniformHistogram(
84            MetricNames.RECEIVED_BYTES);
85      } else {
86        processedRequestsMeter = superstepMetrics.getMeter(
87            MeterDesc.SENT_REQUESTS);
88        processedBytesHist = superstepMetrics.getUniformHistogram(
89            MetricNames.SENT_BYTES);
90      }
91    }
92  
93    /**
94     * Updates properties based on bytes sent / received
95     *
96     * @param buf ByteBuf received by the counter
97     * @return number of readable bytes
98     */
99    public int byteBookkeeper(ByteBuf buf) {
100     int processedBytes = buf.readableBytes();
101     bytesProcessed.addAndGet(processedBytes);
102     processedBytesHist.update(processedBytes);
103     processedRequests.incrementAndGet();
104     processedRequestsMeter.mark();
105     return processedBytes;
106   }
107 
108   /**
109    * Reset all the bytes kept track of.
110    */
111   public void resetBytes() {
112     bytesProcessed.set(0);
113     processedRequests.set(0);
114   }
115 
116   /**
117    * Reset the start msecs.
118    */
119   public void resetStartMsecs() {
120     startMsecs.set(TIME.getMilliseconds());
121   }
122 
123   @Override
124   public void resetAll() {
125     resetBytes();
126     resetStartMsecs();
127   }
128 
129   public long getBytesProcessed() {
130     return bytesProcessed.get();
131   }
132 
133   /**
134    * @return Mbytes processed / sec in the current interval
135    */
136   public double getMbytesPerSecProcessed() {
137     return bytesProcessed.get() * 1000f /
138         (1 + TIME.getMilliseconds() - startMsecs.get()) / MEGABYTE;
139   }
140 
141   /**
142    * Helper method used by getMetrics to create its return string
143    * @param mBytesProcessed mbytes processed
144    * @param mBytesProcessedPerReq mbytes processed per request
145    * @return A string containing all the metrics
146    */
147   public String getMetricsHelper(double mBytesProcessed,
148                                  double mBytesProcessedPerReq) {
149     if (isInbound) {
150       return "MBytes/sec received = " +
151           DOUBLE_FORMAT.format(getMbytesPerSecProcessed()) +
152           ", MBytesReceived = " + DOUBLE_FORMAT.format(mBytesProcessed) +
153           ", ave received req MBytes = " +
154           DOUBLE_FORMAT.format(mBytesProcessedPerReq) +
155           ", secs waited = " +
156           ((TIME.getMilliseconds() - startMsecs.get()) / 1000f);
157     } else {
158       return "MBytes/sec sent = " +
159           DOUBLE_FORMAT.format(getMbytesPerSecProcessed()) +
160           ", MBytesSent = " + DOUBLE_FORMAT.format(mBytesProcessed) +
161           ", ave sent req MBytes = " +
162           DOUBLE_FORMAT.format(mBytesProcessedPerReq) +
163           ", secs waited = " +
164           ((TIME.getMilliseconds() - startMsecs.get()) / 1000f);
165     }
166   }
167 
168   @Override
169   public String getMetrics() {
170     double mBytesProcessed = bytesProcessed.get() / MEGABYTE;
171     long curProcessedRequests = processedRequests.get();
172     double mBytesProcessedPerReq = (curProcessedRequests == 0) ? 0 :
173         mBytesProcessed / curProcessedRequests;
174 
175     return getMetricsHelper(mBytesProcessed, mBytesProcessedPerReq);
176   }
177 
178   @Override
179   public String getMetricsWindow(int minMsecsWindow) {
180     long lastUpdatedMsecs =  metricsWindowLastUpdatedMsecs.get();
181     long curMsecs = TIME.getMilliseconds();
182     if (curMsecs - lastUpdatedMsecs > minMsecsWindow) {
183       // Make sure that only one thread does this update
184       if (metricsWindowLastUpdatedMsecs.compareAndSet(lastUpdatedMsecs,
185           curMsecs)) {
186         String metrics = getMetrics();
187         resetAll();
188         return metrics;
189       }
190     }
191     return null;
192   }
193 }