This project has retired. For details please refer to its Attic page.
ReactiveJMapHistoDumper xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.utils;
20  
21  import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
22  import org.apache.giraph.conf.GiraphConstants;
23  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24  import org.apache.giraph.master.MasterObserver;
25  import org.apache.giraph.metrics.AggregatedMetrics;
26  import org.apache.giraph.partition.PartitionStats;
27  import org.apache.giraph.worker.WorkerObserver;
28  import org.apache.log4j.Logger;
29  
30  import java.util.List;
31  
32  /**
33   * An observer for both worker and master that periodically checks if available
34   * memory on heap is below certain threshold, and if found to be the case
35   * dumps jmap -histo for the process
36   */
37  public class ReactiveJMapHistoDumper extends
38    DefaultImmutableClassesGiraphConfigurable implements
39    MasterObserver, WorkerObserver {
40    /** Logger */
41    private static final Logger LOG = Logger.getLogger(
42        ReactiveJMapHistoDumper.class);
43    /** Size of mb */
44    private static final int MB = 1024 * 1024;
45  
46    /** How many msec to sleep between calls */
47    private int sleepMillis;
48    /** How many lines of output to print */
49    private int linesToPrint;
50    /** How much free memory is expected */
51    private int minFreeMemory;
52  
53    /** The jmap printing thread */
54    private Thread thread;
55    /** Halt jmap thread */
56    private volatile boolean stop = false;
57    /** Path to jmap*/
58    private String jmapPath;
59  
60    @Override
61    public void preLoad() {
62      // This is called by both WorkerObserver and MasterObserver
63      startSupervisorThread();
64    }
65  
66    @Override
67    public void postSave() {
68      // This is called by both WorkerObserver and MasterObserver
69      joinSupervisorThread();
70    }
71  
72    @Override
73    public void preApplication() {
74    }
75  
76    @Override
77    public void postApplication() {
78    }
79  
80    /**
81     * Join the supervisor thread
82     */
83    private void joinSupervisorThread() {
84      stop = true;
85      try {
86        thread.join(sleepMillis + 5000);
87      } catch (InterruptedException e) {
88        LOG.error("Failed to join jmap thread");
89      }
90    }
91  
92    /**
93     * Start the supervisor thread
94     */
95    public void startSupervisorThread() {
96      stop = false;
97      final Runtime runtime = Runtime.getRuntime();
98      thread = ThreadUtils.startThread(new Runnable() {
99        @Override
100       public void run() {
101         while (!stop) {
102           long potentialMemory = (runtime.maxMemory() -
103               runtime.totalMemory()) + runtime.freeMemory();
104           if (potentialMemory / MB < minFreeMemory) {
105             JMap.heapHistogramDump(linesToPrint, jmapPath);
106           }
107           ThreadUtils.trySleep(sleepMillis);
108         }
109       }
110     }, "ReactiveJMapHistoDumperSupervisorThread");
111   }
112 
113   @Override
114   public void preSuperstep(long superstep) { }
115 
116   @Override
117   public void postSuperstep(long superstep) { }
118 
119   @Override
120   public void superstepMetricsUpdate(long superstep,
121       AggregatedMetrics aggregatedMetrics,
122       List<PartitionStats> partitionStatsList) { }
123 
124   @Override
125   public void applicationFailed(Exception e) { }
126 
127   @Override
128   public void setConf(ImmutableClassesGiraphConfiguration configuration) {
129     sleepMillis = GiraphConstants.JMAP_SLEEP_MILLIS.get(configuration);
130     linesToPrint = GiraphConstants.JMAP_PRINT_LINES.get(configuration);
131     minFreeMemory = GiraphConstants.MIN_FREE_MBS_ON_HEAP.get(configuration);
132     jmapPath = GiraphConstants.JMAP_PATH.get(configuration);
133   }
134 }