This project has retired. For details please refer to its Attic page.
MemoryObserver xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.worker;
20  
21  import org.apache.giraph.conf.BooleanConfOption;
22  import org.apache.giraph.conf.FloatConfOption;
23  import org.apache.giraph.conf.GiraphConfiguration;
24  import org.apache.giraph.conf.IntConfOption;
25  import org.apache.giraph.utils.MemoryUtils;
26  import org.apache.giraph.utils.ThreadUtils;
27  import org.apache.giraph.zk.ZooKeeperExt;
28  import org.apache.log4j.Logger;
29  import org.apache.zookeeper.CreateMode;
30  import org.apache.zookeeper.KeeperException;
31  import org.apache.zookeeper.ZooDefs;
32  
33  import java.util.concurrent.atomic.AtomicLong;
34  
35  /**
36   * Memory observer to help synchronize when full gcs are happening across all
37   * the workers
38   */
39  public class MemoryObserver {
40    /** Whether or not to use memory observer */
41    public static final BooleanConfOption USE_MEMORY_OBSERVER =
42        new BooleanConfOption("giraph.memoryObserver.enabled", false,
43            "Whether or not to use memory observer");
44    /** For which fraction of free memory will we issue manual gc calls */
45    public static final FloatConfOption FREE_MEMORY_FRACTION_FOR_GC =
46        new FloatConfOption("giraph.memoryObserver.freeMemoryFractionForGc", 0.1f,
47            "For which fraction of free memory will we issue manual gc calls");
48    /** Minimum milliseconds between two manual gc calls */
49    public static final IntConfOption MIN_MS_BETWEEN_FULL_GCS =
50        new IntConfOption("giraph.memoryObserver.minMsBetweenFullGcs", 60 * 1000,
51            "Minimum milliseconds between two manual gc calls");
52  
53    /** Logger */
54    private static final Logger LOG = Logger.getLogger(MemoryObserver.class);
55    /** How long does memory observer thread sleep for */
56    private static final int MEMORY_OBSERVER_SLEEP_MS = 1000;
57  
58    /** When was the last manual gc call */
59    private final AtomicLong lastManualGc = new AtomicLong();
60    /** Zookeeper */
61    private final ZooKeeperExt zk;
62    /** Path on zookeeper for memory observer files */
63    private final String zkPath;
64    /** Value of conf setting MIN_MS_BETWEEN_FULL_GCS */
65    private final int minMsBetweenFullGcs;
66  
67    /**
68     * Constructor
69     *
70     * @param zk Zookeeper
71     * @param zkPath Path on zookeeper for memory observer files
72     * @param conf Configration
73     */
74    public MemoryObserver(final ZooKeeperExt zk,
75        final String zkPath, GiraphConfiguration conf) {
76      this.zk = zk;
77      this.zkPath = zkPath;
78      minMsBetweenFullGcs = MIN_MS_BETWEEN_FULL_GCS.get(conf);
79  
80      if (!USE_MEMORY_OBSERVER.get(conf)) {
81        return;
82      }
83  
84      try {
85        // Create base path for memory observer nodes
86        zk.createOnceExt(zkPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
87            CreateMode.PERSISTENT, true);
88      } catch (KeeperException | InterruptedException e) {
89        LOG.info("Exception occurred", e);
90      }
91      setWatcher();
92  
93      final float freeMemoryFractionForGc =
94          FREE_MEMORY_FRACTION_FOR_GC.get(conf);
95      ThreadUtils.startThread(new Runnable() {
96        @Override
97        public void run() {
98  
99          while (true) {
100           double freeMemoryFraction = MemoryUtils.freeMemoryFraction();
101           long msFromLastGc = System.currentTimeMillis() - lastManualGc.get();
102           if (msFromLastGc > minMsBetweenFullGcs &&
103               freeMemoryFraction < freeMemoryFractionForGc) {
104             try {
105               if (LOG.isInfoEnabled()) {
106                 LOG.info("Notifying others about low memory (" +
107                     freeMemoryFraction + "% free)");
108               }
109               zk.createExt(
110                   zkPath + "/" + System.currentTimeMillis(),
111                   new byte[0],
112                   ZooDefs.Ids.OPEN_ACL_UNSAFE,
113                   CreateMode.EPHEMERAL,
114                   false);
115             } catch (KeeperException | InterruptedException e) {
116               LOG.warn("Exception occurred", e);
117             }
118           }
119           if (!ThreadUtils.trySleep(MEMORY_OBSERVER_SLEEP_MS)) {
120             return;
121           }
122         }
123       }
124     }, "memory-observer");
125   }
126 
127   /** Set watcher on memory observer folder */
128   private void setWatcher() {
129     try {
130       // Set a watcher on this path
131       zk.getChildrenExt(zkPath, true, false, false);
132     } catch (KeeperException | InterruptedException e) {
133       LOG.warn("Exception occurred", e);
134     }
135   }
136 
137   /** Manually call gc, if enough time from last call has passed */
138   public void callGc() {
139     long last = lastManualGc.get();
140     if (System.currentTimeMillis() - last > minMsBetweenFullGcs &&
141         lastManualGc.compareAndSet(last, System.currentTimeMillis())) {
142       if (LOG.isInfoEnabled()) {
143         LOG.info("Calling gc manually");
144       }
145       System.gc();
146       if (LOG.isInfoEnabled()) {
147         LOG.info("Manual gc call done");
148       }
149     }
150     setWatcher();
151   }
152 }