View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.worker;
20  
21  import org.apache.giraph.conf.BooleanConfOption;
22  import org.apache.giraph.conf.FloatConfOption;
23  import org.apache.giraph.conf.GiraphConfiguration;
24  import org.apache.giraph.conf.IntConfOption;
25  import org.apache.giraph.utils.MemoryUtils;
26  import org.apache.giraph.zk.ZooKeeperExt;
27  import org.apache.log4j.Logger;
28  import org.apache.zookeeper.CreateMode;
29  import org.apache.zookeeper.KeeperException;
30  import org.apache.zookeeper.ZooDefs;
31  
32  import java.util.concurrent.atomic.AtomicLong;
33  
34  /**
35   * Memory observer to help synchronize when full gcs are happening across all
36   * the workers
37   */
38  public class MemoryObserver {
39    /** Whether or not to use memory observer */
40    public static final BooleanConfOption USE_MEMORY_OBSERVER =
41        new BooleanConfOption("giraph.memoryObserver.enabled", false,
42            "Whether or not to use memory observer");
43    /** For which fraction of free memory will we issue manual gc calls */
44    public static final FloatConfOption FREE_MEMORY_FRACTION_FOR_GC =
45        new FloatConfOption("giraph.memoryObserver.freeMemoryFractionForGc", 0.1f,
46            "For which fraction of free memory will we issue manual gc calls");
47    /** Minimum milliseconds between two manual gc calls */
48    public static final IntConfOption MIN_MS_BETWEEN_FULL_GCS =
49        new IntConfOption("giraph.memoryObserver.minMsBetweenFullGcs", 60 * 1000,
50            "Minimum milliseconds between two manual gc calls");
51  
52    /** Logger */
53    private static final Logger LOG = Logger.getLogger(MemoryObserver.class);
54    /** How long does memory observer thread sleep for */
55    private static final int MEMORY_OBSERVER_SLEEP_MS = 1000;
56  
57    /** When was the last manual gc call */
58    private final AtomicLong lastManualGc = new AtomicLong();
59    /** Zookeeper */
60    private final ZooKeeperExt zk;
61    /** Path on zookeeper for memory observer files */
62    private final String zkPath;
63    /** Value of conf setting MIN_MS_BETWEEN_FULL_GCS */
64    private final int minMsBetweenFullGcs;
65  
66    /**
67     * Constructor
68     *
69     * @param zk Zookeeper
70     * @param zkPath Path on zookeeper for memory observer files
71     * @param conf Configration
72     */
73    public MemoryObserver(final ZooKeeperExt zk,
74        final String zkPath, GiraphConfiguration conf) {
75      this.zk = zk;
76      this.zkPath = zkPath;
77      minMsBetweenFullGcs = MIN_MS_BETWEEN_FULL_GCS.get(conf);
78  
79      if (!USE_MEMORY_OBSERVER.get(conf)) {
80        return;
81      }
82  
83      try {
84        // Create base path for memory observer nodes
85        zk.createOnceExt(zkPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
86            CreateMode.PERSISTENT, true);
87      } catch (KeeperException | InterruptedException e) {
88        LOG.info("Exception occurred", e);
89      }
90      setWatcher();
91  
92      final float freeMemoryFractionForGc =
93          FREE_MEMORY_FRACTION_FOR_GC.get(conf);
94      Thread thread = new Thread(new Runnable() {
95        @Override
96        public void run() {
97  
98          while (true) {
99            double freeMemoryFraction = MemoryUtils.freeMemoryFraction();
100           long msFromLastGc = System.currentTimeMillis() - lastManualGc.get();
101           if (msFromLastGc > minMsBetweenFullGcs &&
102               freeMemoryFraction < freeMemoryFractionForGc) {
103             try {
104               if (LOG.isInfoEnabled()) {
105                 LOG.info("Notifying others about low memory (" +
106                     freeMemoryFraction + "% free)");
107               }
108               zk.createExt(
109                   zkPath + "/" + System.currentTimeMillis(),
110                   new byte[0],
111                   ZooDefs.Ids.OPEN_ACL_UNSAFE,
112                   CreateMode.EPHEMERAL,
113                   false);
114             } catch (KeeperException | InterruptedException e) {
115               LOG.warn("Exception occurred", e);
116             }
117           }
118           try {
119             Thread.sleep(MEMORY_OBSERVER_SLEEP_MS);
120           } catch (InterruptedException e) {
121             LOG.warn("Exception occurred", e);
122             return;
123           }
124         }
125       }
126     });
127     thread.setName("memory-observer");
128     thread.setDaemon(true);
129     thread.start();
130   }
131 
132   /** Set watcher on memory observer folder */
133   private void setWatcher() {
134     try {
135       // Set a watcher on this path
136       zk.getChildrenExt(zkPath, true, false, false);
137     } catch (KeeperException | InterruptedException e) {
138       LOG.warn("Exception occurred", e);
139     }
140   }
141 
142   /** Manually call gc, if enough time from last call has passed */
143   public void callGc() {
144     long last = lastManualGc.get();
145     if (System.currentTimeMillis() - last > minMsBetweenFullGcs &&
146         lastManualGc.compareAndSet(last, System.currentTimeMillis())) {
147       if (LOG.isInfoEnabled()) {
148         LOG.info("Calling gc manually");
149       }
150       System.gc();
151       if (LOG.isInfoEnabled()) {
152         LOG.info("Manual gc call done");
153       }
154     }
155     setWatcher();
156   }
157 }