View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.metrics;
20  
21  import org.apache.giraph.graph.GraphTaskManager;
22  import org.apache.giraph.ooc.OutOfCoreEngine;
23  import org.apache.giraph.ooc.OutOfCoreIOCallable;
24  import org.apache.giraph.worker.BspServiceWorker;
25  
26  import com.google.common.collect.Maps;
27  
28  import java.io.PrintStream;
29  import java.util.Map;
30  
31  /**
32   * Map of a bunch of aggregated metrics
33   */
34  public class AggregatedMetrics {
35    /** Mapping from name to aggregated metric */
36    private Map<String, AggregatedMetric<?>> metrics = Maps.newHashMap();
37  
38    /**
39     * Add value from hostname for a metric.
40     *
41     * @param name String name of metric
42     * @param value long value to track
43     * @param hostnamePartitionId String host it came from
44     * @return this
45     */
46    public AggregatedMetrics add(String name, long value,
47                                 String hostnamePartitionId) {
48      AggregatedMetricLong aggregatedMetric =
49          (AggregatedMetricLong) metrics.get(name);
50      if (aggregatedMetric == null) {
51        aggregatedMetric = new AggregatedMetricLong();
52        metrics.put(name, aggregatedMetric);
53      }
54      aggregatedMetric.addItem(value, hostnamePartitionId);
55      return this;
56    }
57  
58    /**
59     * Add value from hostname for a metric.
60     *
61     * @param name String name of metric
62     * @param value double value to track
63     * @param hostnamePartitionId String host it came from
64     * @return this
65     */
66    public AggregatedMetrics add(String name, double value,
67                                 String hostnamePartitionId) {
68      AggregatedMetricDouble aggregatedMetric =
69          (AggregatedMetricDouble) metrics.get(name);
70      if (aggregatedMetric == null) {
71        aggregatedMetric = new AggregatedMetricDouble();
72        metrics.put(name, aggregatedMetric);
73      }
74      aggregatedMetric.addItem(value, hostnamePartitionId);
75      return this;
76    }
77  
78    /**
79     * Add metrics from worker.
80     *
81     * @param workerMetrics WorkerSuperstepMetrics from work
82     * @param hostname String hostname of worker
83     * @return this
84     */
85    public AggregatedMetrics add(WorkerSuperstepMetrics workerMetrics,
86                                 String hostname) {
87      add(GraphTaskManager.TIMER_SUPERSTEP_TIME,
88          workerMetrics.getSuperstepTimer(), hostname);
89      add(GraphTaskManager.TIMER_COMMUNICATION_TIME,
90          workerMetrics.getCommTimer(), hostname);
91      add(GraphTaskManager.TIMER_COMPUTE_ALL,
92          workerMetrics.getComputeAllTimer(), hostname);
93      add(GraphTaskManager.TIMER_TIME_TO_FIRST_MSG,
94          workerMetrics.getTimeToFirstMsg(), hostname);
95      add(BspServiceWorker.TIMER_WAIT_REQUESTS,
96          workerMetrics.getWaitRequestsTimer(), hostname);
97      add(OutOfCoreIOCallable.BYTES_LOAD_FROM_DISK,
98          workerMetrics.getBytesLoadedFromDisk(), hostname);
99      add(OutOfCoreIOCallable.BYTES_STORE_TO_DISK,
100         workerMetrics.getBytesStoredOnDisk(), hostname);
101     add(OutOfCoreEngine.GRAPH_PERCENTAGE_IN_MEMORY,
102         workerMetrics.getGraphPercentageInMemory(), hostname);
103     return this;
104   }
105 
106   /**
107    * Print the aggregated metrics to the stream provided.
108    *
109    * @param superstep long number of superstep.
110    * @param out PrintStream to write to.
111    * @return this
112    */
113   public AggregatedMetrics print(long superstep, PrintStream out) {
114     AggregatedMetric superstepTime = get(GraphTaskManager.TIMER_SUPERSTEP_TIME);
115     AggregatedMetric commTime = get(GraphTaskManager.TIMER_COMMUNICATION_TIME);
116     AggregatedMetric computeAll = get(GraphTaskManager.TIMER_COMPUTE_ALL);
117     AggregatedMetric timeToFirstMsg =
118         get(GraphTaskManager.TIMER_TIME_TO_FIRST_MSG);
119     AggregatedMetric waitRequestsMicros = get(
120         BspServiceWorker.TIMER_WAIT_REQUESTS);
121     AggregatedMetric bytesLoaded =
122         get(OutOfCoreIOCallable.BYTES_LOAD_FROM_DISK);
123     AggregatedMetric bytesStored =
124         get(OutOfCoreIOCallable.BYTES_STORE_TO_DISK);
125     AggregatedMetric graphInMem =
126         get(OutOfCoreEngine.GRAPH_PERCENTAGE_IN_MEMORY);
127 
128     out.println();
129     out.println("--- METRICS: superstep " + superstep + " ---");
130     printAggregatedMetric(out, "superstep time", "ms", superstepTime);
131     printAggregatedMetric(out, "compute all partitions", "ms", computeAll);
132     printAggregatedMetric(out, "network communication time", "ms", commTime);
133     printAggregatedMetric(out, "time to first message", "us", timeToFirstMsg);
134     printAggregatedMetric(out, "wait requests time", "us", waitRequestsMicros);
135     printAggregatedMetric(out, "bytes loaded from disk", "bytes", bytesLoaded);
136     printAggregatedMetric(out, "bytes stored to disk", "bytes", bytesStored);
137     printAggregatedMetric(out, "graph in mem", "%", graphInMem);
138 
139     return this;
140   }
141 
142   /**
143    * Print batch of lines for AggregatedMetric
144    *
145    * @param out PrintStream to write to
146    * @param header String header to print.
147    * @param unit String unit of metric
148    * @param aggregatedMetric AggregatedMetric to write
149    */
150   private void printAggregatedMetric(PrintStream out, String header,
151                                      String unit,
152                                      AggregatedMetric aggregatedMetric) {
153     if (aggregatedMetric.hasData()) {
154       out.println(header);
155       out.println("  mean: " + aggregatedMetric.mean() + " " + unit);
156       printValueFromHost(out, "  smallest: ", unit, aggregatedMetric.max());
157       printValueFromHost(out, "  largest: ", unit, aggregatedMetric.min());
158     } else {
159       out.println(header + ": NO DATA");
160     }
161   }
162 
163   /**
164    * Print a line for a value with the host it came from.
165    *
166    * @param out PrintStream to write to
167    * @param prefix String to write at beginning
168    * @param unit String unit of metric
169    * @param vh ValueWithHostname to write
170    */
171   private void printValueFromHost(PrintStream out, String prefix,
172                                   String unit, ValueWithHostname vh) {
173     out.println(prefix + vh.getValue() + ' ' + unit +
174         " from " + vh.getHostname());
175   }
176 
177   /**
178    * Get AggregatedMetric with given name.
179    *
180    * @param name String metric to lookup.
181    * @return AggregatedMetric for given metric name.
182    */
183   public AggregatedMetric get(String name) {
184     return metrics.get(name);
185   }
186 
187   /**
188    * Get map of all aggregated metrics.
189    *
190    * @return Map of all the aggregated metrics.
191    */
192   public Map<String, AggregatedMetric<?>> getAll() {
193     return metrics;
194   }
195 }