This project has retired. For details please refer to its Attic page.
WorkerSuperstepMetrics xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.metrics;
20  
21  import org.apache.giraph.graph.GraphTaskManager;
22  import org.apache.giraph.ooc.OutOfCoreEngine;
23  import org.apache.giraph.ooc.OutOfCoreIOCallable;
24  import org.apache.giraph.worker.BspServiceWorker;
25  import org.apache.hadoop.io.Writable;
26  
27  import com.yammer.metrics.core.Gauge;
28  
29  import java.io.DataInput;
30  import java.io.DataOutput;
31  import java.io.IOException;
32  import java.io.PrintStream;
33  import java.util.concurrent.TimeUnit;
34  
35  /**
36   * Per-superstep metrics for a Worker.
37   */
38  public class WorkerSuperstepMetrics implements Writable {
39    /** Total network communication time */
40    private LongAndTimeUnit commTimer;
41    /** Time for all compute calls to complete */
42    private LongAndTimeUnit computeAllTimer;
43    /** Time till first message gets flushed */
44    private LongAndTimeUnit timeToFirstMsg;
45    /** Total superstep time */
46    private LongAndTimeUnit superstepTimer;
47    /** Time spent waiting for other workers to finish */
48    private LongAndTimeUnit waitRequestsTimer;
49    /** Time spent doing GC in a superstep */
50    private LongAndTimeUnit superstepGCTimer;
51    /** Number of bytes loaded from disk to memory in out-of-core mechanism */
52    private long bytesLoadedFromDisk;
53    /** Number of bytes stored from memory to disk in out-of-core mechanism */
54    private long bytesStoredOnDisk;
55    /** Percentage of graph kept in memory */
56    private double graphPercentageInMemory;
57  
58    /**
59     * Constructor
60     */
61    public WorkerSuperstepMetrics() {
62      commTimer = new LongAndTimeUnit();
63      computeAllTimer = new LongAndTimeUnit();
64      timeToFirstMsg = new LongAndTimeUnit();
65      superstepTimer = new LongAndTimeUnit();
66      waitRequestsTimer = new LongAndTimeUnit();
67      superstepGCTimer = new LongAndTimeUnit();
68      superstepGCTimer.setTimeUnit(TimeUnit.MILLISECONDS);
69      bytesLoadedFromDisk = 0;
70      bytesStoredOnDisk = 0;
71      graphPercentageInMemory = 100;
72    }
73  
74    /**
75     * Read metric values from global MetricsRegistry.
76     *
77     * @return this object, for chaining
78     */
79    public WorkerSuperstepMetrics readFromRegistry() {
80      readGiraphTimer(GraphTaskManager.TIMER_COMMUNICATION_TIME, commTimer);
81      readGiraphTimer(GraphTaskManager.TIMER_COMPUTE_ALL, computeAllTimer);
82      readGiraphTimer(GraphTaskManager.TIMER_TIME_TO_FIRST_MSG, timeToFirstMsg);
83      readGiraphTimer(GraphTaskManager.TIMER_SUPERSTEP_TIME, superstepTimer);
84      readGiraphTimer(BspServiceWorker.TIMER_WAIT_REQUESTS, waitRequestsTimer);
85      SuperstepMetricsRegistry registry = GiraphMetrics.get().perSuperstep();
86      superstepGCTimer.setValue(
87          registry.getCounter(GraphTaskManager.TIMER_SUPERSTEP_GC_TIME).count());
88      bytesLoadedFromDisk =
89          registry.getCounter(OutOfCoreIOCallable.BYTES_LOAD_FROM_DISK).count();
90      bytesStoredOnDisk =
91          registry.getCounter(OutOfCoreIOCallable.BYTES_STORE_TO_DISK).count();
92      Gauge<Double> gauge =
93          registry.getExistingGauge(OutOfCoreEngine.GRAPH_PERCENTAGE_IN_MEMORY);
94      if (gauge != null) {
95        graphPercentageInMemory = gauge.value();
96      }
97      return this;
98    }
99  
100   /**
101    * Read data from GiraphTimer into a LongAndTimeUnit.
102    *
103    * @param name String name of Gauge to retrieve.
104    * @param data LongAndTimeUnit to read data into.
105    */
106   private void readGiraphTimer(String name, LongAndTimeUnit data) {
107     Gauge<Long> gauge = GiraphMetrics.get().perSuperstep().
108         getExistingGauge(name);
109     if (gauge instanceof GiraphTimer) {
110       GiraphTimer giraphTimer = (GiraphTimer) gauge;
111       data.setTimeUnit(giraphTimer.getTimeUnit());
112       data.setValue(giraphTimer.value());
113     } else if (gauge != null) {
114       throw new IllegalStateException(name + " is not a GiraphTimer");
115     }
116   }
117 
118   /**
119    * Human readable dump of metrics stored here.
120    *
121    * @param superstep long number of superstep.
122    * @param out PrintStream to write to.
123    * @return this object, for chaining
124    */
125   public WorkerSuperstepMetrics print(long superstep, PrintStream out) {
126     out.println();
127     out.println("--- METRICS: superstep " + superstep + " ---");
128     out.println("  superstep time: " + superstepTimer);
129     out.println("  compute all partitions: " + computeAllTimer);
130     out.println("  time spent in gc: " + superstepGCTimer);
131     out.println("  bytes transferred in out-of-core: " +
132         (bytesLoadedFromDisk + bytesStoredOnDisk));
133     out.println("  network communication time: " + commTimer);
134     out.println("  time to first message: " + timeToFirstMsg);
135     out.println("  wait on requests time: " + waitRequestsTimer);
136     return this;
137   }
138 
139   /**
140    * @return Communication timer
141    */
142   public long getCommTimer() {
143     return commTimer.getValue();
144   }
145 
146   /**
147    * @return Total compute timer
148    */
149   public long getComputeAllTimer() {
150     return computeAllTimer.getValue();
151   }
152 
153   /**
154    * @return timer between start time and first message flushed.
155    */
156   public long getTimeToFirstMsg() {
157     return timeToFirstMsg.getValue();
158   }
159 
160   /**
161    * @return timer for superstep time
162    */
163   public long getSuperstepTimer() {
164     return superstepTimer.getValue();
165   }
166 
167   /**
168    * @return timer waiting for other workers
169    */
170   public long getWaitRequestsTimer() {
171     return waitRequestsTimer.getValue();
172   }
173 
174   /**
175    * @return number of bytes loaded from disk by out-of-core mechanism (if any
176    *         is used)
177    */
178   public long getBytesLoadedFromDisk() {
179     return bytesLoadedFromDisk;
180   }
181 
182   /**
183    * @return number of bytes stored on disk by out-of-core mechanism (if any is
184    *         used)
185    */
186   public long getBytesStoredOnDisk() {
187     return bytesStoredOnDisk;
188   }
189 
190   /**
191    * @return a rough estimate of percentage of graph in memory
192    */
193   public double getGraphPercentageInMemory() {
194     return graphPercentageInMemory;
195   }
196 
197   @Override
198   public void readFields(DataInput dataInput) throws IOException {
199     commTimer.setValue(dataInput.readLong());
200     computeAllTimer.setValue(dataInput.readLong());
201     timeToFirstMsg.setValue(dataInput.readLong());
202     superstepTimer.setValue(dataInput.readLong());
203     waitRequestsTimer.setValue(dataInput.readLong());
204     bytesLoadedFromDisk = dataInput.readLong();
205     bytesStoredOnDisk = dataInput.readLong();
206     graphPercentageInMemory = dataInput.readDouble();
207   }
208 
209   @Override
210   public void write(DataOutput dataOutput) throws IOException {
211     dataOutput.writeLong(commTimer.getValue());
212     dataOutput.writeLong(computeAllTimer.getValue());
213     dataOutput.writeLong(timeToFirstMsg.getValue());
214     dataOutput.writeLong(superstepTimer.getValue());
215     dataOutput.writeLong(waitRequestsTimer.getValue());
216     dataOutput.writeLong(bytesLoadedFromDisk);
217     dataOutput.writeLong(bytesStoredOnDisk);
218     dataOutput.writeDouble(graphPercentageInMemory);
219   }
220 }