1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.zk;
192021import org.apache.giraph.bsp.BspService;
22import org.apache.giraph.conf.GiraphConfiguration;
23import org.apache.giraph.conf.GiraphConstants;
24import org.apache.hadoop.conf.Configuration;
25import org.apache.hadoop.util.Tool;
26import org.apache.hadoop.util.ToolRunner;
27import org.apache.zookeeper.KeeperException;
28import org.apache.zookeeper.WatchedEvent;
29import org.apache.zookeeper.Watcher;
3031import java.io.IOException;
32import java.net.UnknownHostException;
33import java.util.Arrays;
34import java.util.List;
3536importstatic java.lang.System.out;
37importstatic org.apache.giraph.conf.GiraphConstants.ZOOKEEPER_SERVER_PORT;
3839/**40 * A Utility class to be used by Giraph admins to occasionally clean up the41 * ZK remnants of jobs that have failed or were killed before finishing.42 * Usage (note that defaults are used if giraph.XYZ args are missing):43 * <code>44 * bin/giraph-admin -Dgiraph.zkBaseNode=... -Dgiraph.zkList=...45 * -Dgiraph.zkServerPort=... -cleanZk46 * </code>47 *48 * alterantely, the <code>Configuration</code> file will populate these fields49 * as it would in a <code>bin/giraph</code> run.50 *51 * <strong>WARNING:</strong> Obviously, running this while actual Giraph jobs52 * using your cluster are in progress is <strong>not recommended.</strong>53 */54publicclassGiraphZooKeeperAdminimplements Watcher, Tool {
55static {
56 Configuration.addDefaultResource("giraph-site.xml");
57 }
5859/** The configuration for this admin run */60private Configuration conf;
6162 @Override
63public Configuration getConf() {
64return conf;
65 }
6667 @Override
68publicvoid setConf(Configuration conf) {
69this.conf = conf;
70 }
7172/**73 * Clean the ZooKeeper of all failed and cancelled in-memory74 * job remnants that pile up on the ZK quorum over time.75 * @param args the input command line arguments, if any.76 * @return the System.exit value to return to the console.77 */78 @Override
79publicint run(String[] args) {
80finalGiraphConfiguration giraphConf = newGiraphConfiguration(getConf());
81finalint zkPort = ZOOKEEPER_SERVER_PORT.get(giraphConf);
82final String zkBasePath = giraphConf.get(
83 GiraphConstants.BASE_ZNODE_KEY, "") + BspService.BASE_DIR;
84final String[] zkServerList;
85 String zkServerListStr = giraphConf.getZookeeperList();
86if (zkServerListStr.isEmpty()) {
87thrownew IllegalStateException("GiraphZooKeeperAdmin requires a list " +
88"of ZooKeeper servers to clean.");
89 }
90 zkServerList = zkServerListStr.split(",");
9192 out.println("[GIRAPH-ZKADMIN] Attempting to clean Zookeeper " +
93"hosts at: " + Arrays.deepToString(zkServerList));
94 out.println("[GIRAPH-ZKADMIN] Connecting on port: " + zkPort);
95 out.println("[GIRAPH-ZKADMIN] to ZNode root path: " + zkBasePath);
96try {
97ZooKeeperExt zooKeeper = newZooKeeperExt(
98 formatZkServerList(zkServerList, zkPort),
99 GiraphConstants.ZOOKEEPER_SESSION_TIMEOUT.getDefaultValue(),
100 GiraphConstants.ZOOKEEPER_OPS_MAX_ATTEMPTS.getDefaultValue(),
101 GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS.getDefaultValue(),
102this);
103 doZooKeeperCleanup(zooKeeper, zkBasePath);
104return 0;
105 } catch (KeeperException e) {
106 System.err.println("[ERROR] Failed to do cleanup of " +
107 zkBasePath + " due to KeeperException: " + e.getMessage());
108 } catch (InterruptedException e) {
109 System.err.println("[ERROR] Failed to do cleanup of " +
110 zkBasePath + " due to InterruptedException: " + e.getMessage());
111 } catch (UnknownHostException e) {
112 System.err.println("[ERROR] Failed to do cleanup of " +
113 zkBasePath + " due to UnknownHostException: " + e.getMessage());
114 } catch (IOException e) {
115 System.err.println("[ERROR] Failed to do cleanup of " +
116 zkBasePath + " due to IOException: " + e.getMessage());
117 }
118return -1;
119 }
120121/** Implement watcher to receive event at the end of the cleaner run122 * @param event the WatchedEvent returned by ZK after the cleaning job.123 */124 @Override
125publicfinalvoid process(WatchedEvent event) {
126 out.println("[GIRAPH-ZKADMIN] ZK event received: " + event);
127 }
128129/**130 * Cleans the ZooKeeper quorum of in-memory failed/killed job fragments.131 * @param zooKeeper the connected ZK instance (session) to delete from.132 * @param zkBasePath the base node to begin erasing from.133 * @throws KeeperException134 * @throws InterruptedException135 */136publicvoid doZooKeeperCleanup(ZooKeeperExt zooKeeper, String zkBasePath)
137throws KeeperException, InterruptedException {
138try {
139 zooKeeper.deleteExt(zkBasePath, -1, false);
140 out.println("[GIRAPH-ZKADMIN] Deleted: " + zkBasePath);
141 } catch (KeeperException.NotEmptyException e) {
142 List<String> childList =
143 zooKeeper.getChildrenExt(zkBasePath, false, false, false);
144for (String child : childList) {
145 String childPath = zkBasePath + "/" + child;
146 doZooKeeperCleanup(zooKeeper, childPath);
147 }
148 zooKeeper.deleteExt(zkBasePath, -1, false);
149 out.println("[GIRAPH-ZKADMIN] Deleted: " + zkBasePath);
150 }
151 }
152153/** Forms ZK server list in a format the ZooKeeperExt object154 * requires to connect to the quorum.155 * @param zkServerList the CSV-style list of hostnames of Zk quorum members.156 * @param zkPort the port the quorum is listening on.157 * @return the formatted zkConnectList for use in the ZkExt constructor.158 * @throws UnknownHostException159 */160private String formatZkServerList(String[] zkServerList, int zkPort)
161throws UnknownHostException {
162 StringBuffer zkConnectList = new StringBuffer();
163for (String zkServer : zkServerList) {
164if (!zkServer.equals("")) {
165 zkConnectList.append(zkServer + ":" + zkPort + ",");
166 }
167 }
168return zkConnectList.substring(0, zkConnectList.length() - 1);
169 }
170171/** Entry point from shell script172 * @param args the command line arguments173 * @throws Exception174 */175publicstaticvoid main(String[] args) throws Exception {
176 System.exit(ToolRunner.run(newGiraphZooKeeperAdmin(), args));
177 }
178 }