This project has retired. For details please refer to its Attic page.
ByteCounterDelegate xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.netty;
20  
21  import com.yammer.metrics.core.Histogram;
22  import com.yammer.metrics.core.Meter;
23  import com.yammer.metrics.core.NoOpHistogram;
24  import com.yammer.metrics.core.NoOpMeter;
25  import io.netty.buffer.ByteBuf;
26  import org.apache.giraph.metrics.MeterDesc;
27  import org.apache.giraph.metrics.MetricNames;
28  import org.apache.giraph.metrics.SuperstepMetricsRegistry;
29  import org.apache.giraph.time.SystemTime;
30  import org.apache.giraph.time.Time;
31  
32  import java.text.DecimalFormat;
33  import java.util.concurrent.atomic.AtomicLong;
34  
35  /**
36   * Delegate Object to help keep track of the bytes processed and provide some
37   * metrics when desired as part of the Netty Channel stack.
38   */
39  public class ByteCounterDelegate implements ByteCounter {
40    /** Megabyte in bytes */
41    public static final double MEGABYTE = 1024f * 1024f;
42    /** Helper to format the doubles */
43    private static final DecimalFormat DOUBLE_FORMAT =
44        new DecimalFormat("#######.####");
45    /** Class timer */
46    private static final Time TIME = SystemTime.get();
47    /** Bytes processed during the most recent time interval */
48    private final AtomicLong bytesProcessed = new AtomicLong();
49    /** Aggregate bytes per superstep */
50    private final AtomicLong bytesProcessedPerSuperstep = new AtomicLong();
51    /** Total processed requests */
52    private final AtomicLong processedRequests = new AtomicLong();
53    /** Start time (for bandwidth calculation) */
54    private final AtomicLong startMsecs = new AtomicLong();
55    /** Last updated msecs for getMetricsWindow */
56    private final AtomicLong metricsWindowLastUpdatedMsecs = new AtomicLong();
57  
58    // Metrics
59    /** Meter of requests sent */
60    private Meter processedRequestsMeter = NoOpMeter.INSTANCE;
61    /** Histogram of bytes sent */
62    private Histogram processedBytesHist = NoOpHistogram.INSTANCE;
63  
64    /** Is it delegate for InBoundByteCounter */
65    private final boolean isInbound;
66  
67    /**
68     * Constructor to specify if delegate is created by InBound/ Outbound counter
69     *
70     * @param isInBound switch to specify if instantiated by inbound counter
71     */
72    public ByteCounterDelegate(boolean isInBound) {
73      this.isInbound = isInBound;
74    }
75  
76    /**
77     * Called by Inbound/ Outbound counters to refresh meters on a new superstep
78     *
79     * @param superstepMetrics superstepmetrics registry
80     */
81    public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
82      if (isInbound) {
83        processedRequestsMeter = superstepMetrics.getMeter(
84            MeterDesc.RECEIVED_REQUESTS);
85        processedBytesHist = superstepMetrics.getUniformHistogram(
86            MetricNames.RECEIVED_BYTES);
87      } else {
88        processedRequestsMeter = superstepMetrics.getMeter(
89            MeterDesc.SENT_REQUESTS);
90        processedBytesHist = superstepMetrics.getUniformHistogram(
91            MetricNames.SENT_BYTES);
92      }
93    }
94  
95    /**
96     * Updates properties based on bytes sent / received
97     *
98     * @param buf ByteBuf received by the counter
99     * @return number of readable bytes
100    */
101   public int byteBookkeeper(ByteBuf buf) {
102     int processedBytes = buf.readableBytes();
103     bytesProcessed.addAndGet(processedBytes);
104     bytesProcessedPerSuperstep.addAndGet(processedBytes);
105     processedBytesHist.update(processedBytes);
106     processedRequests.incrementAndGet();
107     processedRequestsMeter.mark();
108     return processedBytes;
109   }
110 
111   /**
112    * Reset all the bytes kept track of.
113    */
114   public void resetBytes() {
115     bytesProcessed.set(0);
116     processedRequests.set(0);
117   }
118 
119   /**
120    * Reset the start msecs.
121    */
122   public void resetStartMsecs() {
123     startMsecs.set(TIME.getMilliseconds());
124   }
125 
126   @Override
127   public void resetAll() {
128     resetBytes();
129     resetStartMsecs();
130   }
131 
132   /**
133    * Returns bytes processed per superstep.
134    * @return Number of bytes.
135    */
136   public long getBytesProcessedPerSuperstep() {
137     return bytesProcessedPerSuperstep.get();
138   }
139 
140   /**
141    * Set bytes processed per superstep to 0.
142    */
143   public void resetBytesProcessedPerSuperstep() {
144     bytesProcessedPerSuperstep.set(0);
145   }
146 
147   public long getBytesProcessed() {
148     return bytesProcessed.get();
149   }
150 
151   /**
152    * @return Mbytes processed / sec in the current interval
153    */
154   public double getMbytesPerSecProcessed() {
155     return bytesProcessed.get() * 1000f /
156         (1 + TIME.getMilliseconds() - startMsecs.get()) / MEGABYTE;
157   }
158 
159   /**
160    * Helper method used by getMetrics to create its return string
161    * @param mBytesProcessed mbytes processed
162    * @param mBytesProcessedPerReq mbytes processed per request
163    * @return A string containing all the metrics
164    */
165   public String getMetricsHelper(double mBytesProcessed,
166                                  double mBytesProcessedPerReq) {
167     if (isInbound) {
168       return "MBytes/sec received = " +
169           DOUBLE_FORMAT.format(getMbytesPerSecProcessed()) +
170           ", MBytesReceived = " + DOUBLE_FORMAT.format(mBytesProcessed) +
171           ", ave received req MBytes = " +
172           DOUBLE_FORMAT.format(mBytesProcessedPerReq) +
173           ", secs waited = " +
174           ((TIME.getMilliseconds() - startMsecs.get()) / 1000f);
175     } else {
176       return "MBytes/sec sent = " +
177           DOUBLE_FORMAT.format(getMbytesPerSecProcessed()) +
178           ", MBytesSent = " + DOUBLE_FORMAT.format(mBytesProcessed) +
179           ", ave sent req MBytes = " +
180           DOUBLE_FORMAT.format(mBytesProcessedPerReq) +
181           ", secs waited = " +
182           ((TIME.getMilliseconds() - startMsecs.get()) / 1000f);
183     }
184   }
185 
186   @Override
187   public String getMetrics() {
188     double mBytesProcessed = bytesProcessed.get() / MEGABYTE;
189     long curProcessedRequests = processedRequests.get();
190     double mBytesProcessedPerReq = (curProcessedRequests == 0) ? 0 :
191         mBytesProcessed / curProcessedRequests;
192 
193     return getMetricsHelper(mBytesProcessed, mBytesProcessedPerReq);
194   }
195 
196   @Override
197   public String getMetricsWindow(int minMsecsWindow) {
198     long lastUpdatedMsecs =  metricsWindowLastUpdatedMsecs.get();
199     long curMsecs = TIME.getMilliseconds();
200     if (curMsecs - lastUpdatedMsecs > minMsecsWindow) {
201       // Make sure that only one thread does this update
202       if (metricsWindowLastUpdatedMsecs.compareAndSet(lastUpdatedMsecs,
203           curMsecs)) {
204         String metrics = getMetrics();
205         resetAll();
206         return metrics;
207       }
208     }
209     return null;
210   }
211 }