View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.zk;
19  
20  
21  import org.apache.giraph.bsp.BspService;
22  import org.apache.giraph.conf.GiraphConfiguration;
23  import org.apache.giraph.conf.GiraphConstants;
24  import org.apache.hadoop.conf.Configuration;
25  import org.apache.hadoop.util.Tool;
26  import org.apache.hadoop.util.ToolRunner;
27  import org.apache.zookeeper.KeeperException;
28  import org.apache.zookeeper.WatchedEvent;
29  import org.apache.zookeeper.Watcher;
30  
31  import java.io.IOException;
32  import java.net.UnknownHostException;
33  import java.util.Arrays;
34  import java.util.List;
35  
36  import static java.lang.System.out;
37  import static org.apache.giraph.conf.GiraphConstants.ZOOKEEPER_SERVER_PORT;
38  
39  /**
40   * A Utility class to be used by Giraph admins to occasionally clean up the
41   * ZK remnants of jobs that have failed or were killed before finishing.
42   * Usage (note that defaults are used if giraph.XYZ args are missing):
43   * <code>
44   * bin/giraph-admin -Dgiraph.zkBaseNode=... -Dgiraph.zkList=...
45   * -Dgiraph.zkServerPort=... -cleanZk
46   * </code>
47   *
48   * alterantely, the <code>Configuration</code> file will populate these fields
49   * as it would in a <code>bin/giraph</code> run.
50   *
51   * <strong>WARNING:</strong> Obviously, running this while actual Giraph jobs
52   * using your cluster are in progress is <strong>not recommended.</strong>
53   */
54  public class GiraphZooKeeperAdmin implements Watcher, Tool {
55    static {
56      Configuration.addDefaultResource("giraph-site.xml");
57    }
58  
59    /** The configuration for this admin run */
60    private Configuration conf;
61  
62    @Override
63    public Configuration getConf() {
64      return conf;
65    }
66  
67    @Override
68    public void setConf(Configuration conf) {
69      this.conf = conf;
70    }
71  
72    /**
73     * Clean the ZooKeeper of all failed and cancelled in-memory
74     * job remnants that pile up on the ZK quorum over time.
75     * @param args the input command line arguments, if any.
76     * @return the System.exit value to return to the console.
77     */
78    @Override
79    public int run(String[] args) {
80      final GiraphConfiguration giraphConf = new GiraphConfiguration(getConf());
81      final int zkPort = ZOOKEEPER_SERVER_PORT.get(giraphConf);
82      final String zkBasePath = giraphConf.get(
83        GiraphConstants.BASE_ZNODE_KEY, "") + BspService.BASE_DIR;
84      final String[] zkServerList;
85      String zkServerListStr = giraphConf.getZookeeperList();
86      if (zkServerListStr.isEmpty()) {
87        throw new IllegalStateException("GiraphZooKeeperAdmin requires a list " +
88          "of ZooKeeper servers to clean.");
89      }
90      zkServerList = zkServerListStr.split(",");
91  
92      out.println("[GIRAPH-ZKADMIN] Attempting to clean Zookeeper " +
93        "hosts at: " + Arrays.deepToString(zkServerList));
94      out.println("[GIRAPH-ZKADMIN] Connecting on port: " + zkPort);
95      out.println("[GIRAPH-ZKADMIN] to ZNode root path: " + zkBasePath);
96      try {
97        ZooKeeperExt zooKeeper = new ZooKeeperExt(
98          formatZkServerList(zkServerList, zkPort),
99          GiraphConstants.ZOOKEEPER_SESSION_TIMEOUT.getDefaultValue(),
100         GiraphConstants.ZOOKEEPER_OPS_MAX_ATTEMPTS.getDefaultValue(),
101         GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS.getDefaultValue(),
102         this);
103       doZooKeeperCleanup(zooKeeper, zkBasePath);
104       return 0;
105     } catch (KeeperException e) {
106       System.err.println("[ERROR] Failed to do cleanup of " +
107         zkBasePath + " due to KeeperException: " + e.getMessage());
108     } catch (InterruptedException e) {
109       System.err.println("[ERROR] Failed to do cleanup of " +
110         zkBasePath + " due to InterruptedException: " + e.getMessage());
111     } catch (UnknownHostException e) {
112       System.err.println("[ERROR] Failed to do cleanup of " +
113         zkBasePath + " due to UnknownHostException: " + e.getMessage());
114     } catch (IOException e) {
115       System.err.println("[ERROR] Failed to do cleanup of " +
116         zkBasePath + " due to IOException: " + e.getMessage());
117     }
118     return -1;
119   }
120 
121   /** Implement watcher to receive event at the end of the cleaner run
122    * @param event the WatchedEvent returned by ZK after the cleaning job.
123    */
124   @Override
125   public final void process(WatchedEvent event) {
126     out.println("[GIRAPH-ZKADMIN] ZK event received: " + event);
127   }
128 
129   /**
130    * Cleans the ZooKeeper quorum of in-memory failed/killed job fragments.
131    * @param zooKeeper the connected ZK instance (session) to delete from.
132    * @param zkBasePath the base node to begin erasing from.
133    */
134   public void doZooKeeperCleanup(ZooKeeperExt zooKeeper, String zkBasePath)
135     throws KeeperException, InterruptedException {
136     try {
137       zooKeeper.deleteExt(zkBasePath, -1, false);
138       out.println("[GIRAPH-ZKADMIN] Deleted: " + zkBasePath);
139     } catch (KeeperException.NotEmptyException e) {
140       List<String> childList =
141         zooKeeper.getChildrenExt(zkBasePath, false, false, false);
142       for (String child : childList) {
143         String childPath = zkBasePath + "/" + child;
144         doZooKeeperCleanup(zooKeeper, childPath);
145       }
146       zooKeeper.deleteExt(zkBasePath, -1, false);
147       out.println("[GIRAPH-ZKADMIN] Deleted: " + zkBasePath);
148     }
149   }
150 
151   /** Forms ZK server list in a format the ZooKeeperExt object
152    * requires to connect to the quorum.
153    * @param zkServerList the CSV-style list of hostnames of Zk quorum members.
154    * @param zkPort the port the quorum is listening on.
155    * @return the formatted zkConnectList for use in the ZkExt constructor.
156    */
157   private String formatZkServerList(String[] zkServerList, int zkPort)
158     throws UnknownHostException {
159     StringBuffer zkConnectList = new StringBuffer();
160     for (String zkServer : zkServerList) {
161       if (!zkServer.equals("")) {
162         zkConnectList.append(zkServer + ":" + zkPort + ",");
163       }
164     }
165     return zkConnectList.substring(0, zkConnectList.length() - 1);
166   }
167 
168   /** Entry point from shell script
169    * @param args the command line arguments
170    */
171   public static void main(String[] args) throws Exception {
172     System.exit(ToolRunner.run(new GiraphZooKeeperAdmin(), args));
173   }
174 }