This project has retired. For details please refer to its Attic page.
InProcessZooKeeperRunner xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.zk;
19  
20  import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
21  import org.apache.giraph.conf.GiraphConstants;
22  import org.apache.giraph.utils.ThreadUtils;
23  import org.apache.log4j.Logger;
24  import org.apache.zookeeper.jmx.ManagedUtil;
25  import org.apache.zookeeper.server.DatadirCleanupManager;
26  import org.apache.zookeeper.server.ServerCnxnFactory;
27  import org.apache.zookeeper.server.ZooKeeperServer;
28  import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
29  import org.apache.zookeeper.server.quorum.QuorumPeerMain;
30  
31  import javax.management.JMException;
32  import java.io.File;
33  import java.io.IOException;
34  
35  /**
36   * Zookeeper wrapper that starts zookeeper withing master process.
37   */
38  public class InProcessZooKeeperRunner
39      extends DefaultImmutableClassesGiraphConfigurable
40      implements ZooKeeperRunner {
41  
42    /** Class logger */
43    private static final Logger LOG =
44        Logger.getLogger(InProcessZooKeeperRunner.class);
45    /**
46     * Wrapper for zookeeper quorum.
47     */
48    private QuorumRunner quorumRunner = new QuorumRunner();
49  
50    @Override
51    public int start(String zkDir, ZookeeperConfig config) throws IOException {
52      return quorumRunner.start(config);
53    }
54  
55    @Override
56    public void stop() {
57      try {
58        quorumRunner.stop();
59      } catch (InterruptedException e) {
60        LOG.error("Unable to cleanly shutdown zookeeper", e);
61      }
62    }
63  
64    @Override
65    public void cleanup() {
66    }
67  
68    /**
69     * Wrapper around zookeeper quorum. Does not necessarily
70     * starts quorum, if there is only one server in config file
71     * will only start zookeeper.
72     */
73    private static class QuorumRunner extends QuorumPeerMain {
74  
75      /**
76       * ZooKeeper server wrapper.
77       */
78      private ZooKeeperServerRunner serverRunner;
79  
80      /**
81       * Starts quorum and/or zookeeper service.
82       * @param config quorum and zookeeper configuration
83       * @return zookeeper port
84       * @throws IOException if can't start zookeeper
85       */
86      public int start(ZookeeperConfig config) throws IOException {
87        serverRunner = new ZooKeeperServerRunner();
88        //Make sure zookeeper starts first and purge manager last
89        //This is important because zookeeper creates a folder
90        //strucutre on the local disk. Purge manager also tries
91        //to create it but from a different thread and can run into
92        //race condition. See FileTxnSnapLog source code for details.
93        int port = serverRunner.start(config);
94        // Start and schedule the the purge task
95        DatadirCleanupManager purgeMgr = new DatadirCleanupManager(
96            config
97                .getDataDir(), config.getDataLogDir(),
98            GiraphConstants.ZOOKEEPER_SNAP_RETAIN_COUNT,
99            GiraphConstants.ZOOKEEPER_PURGE_INTERVAL);
100       purgeMgr.start();
101 
102       return port;
103     }
104 
105     /**
106      * Stop quorum and/or zookeeper.
107      * @throws InterruptedException
108      */
109     public void stop() throws InterruptedException {
110       if (quorumPeer != null) {
111         quorumPeer.shutdown();
112         quorumPeer.join();
113       } else if (serverRunner != null) {
114         serverRunner.stop();
115       } else {
116         LOG.warn("Neither quorum nor server is set");
117       }
118     }
119   }
120 
121   /**
122    * Wrapper around zookeeper service.
123    */
124   public static class ZooKeeperServerRunner  {
125     /**
126      * Reference to zookeeper factory.
127      */
128     private ServerCnxnFactory cnxnFactory;
129     /**
130      * Reference to zookeeper server.
131      */
132     private ZooKeeperServer zkServer;
133 
134     /**
135      * Start zookeeper service.
136      * @param config zookeeper configuration
137      * formatted properly
138      * @return the port zookeeper has started on.
139      * @throws IOException
140      */
141     public int start(ZookeeperConfig config) throws IOException {
142       LOG.warn("Either no config or no quorum defined in config, " +
143           "running in process");
144       try {
145         ManagedUtil.registerLog4jMBeans();
146       } catch (JMException e) {
147         LOG.warn("Unable to register log4j JMX control", e);
148       }
149 
150       runFromConfig(config);
151       ThreadUtils.startThread(new Runnable() {
152         @Override
153         public void run() {
154           try {
155             cnxnFactory.join();
156             if (zkServer.isRunning()) {
157               zkServer.shutdown();
158             }
159           } catch (InterruptedException e) {
160             LOG.error(e.getMessage(), e);
161           }
162 
163         }
164       }, "zk-thread");
165       return zkServer.getClientPort();
166     }
167 
168 
169     /**
170      * Run from a ServerConfig.
171      * @param config ServerConfig to use.
172      * @throws IOException
173      */
174     public void runFromConfig(ZookeeperConfig config) throws IOException {
175       LOG.info("Starting server");
176       try {
177         // Note that this thread isn't going to be doing anything else,
178         // so rather than spawning another thread, we will just call
179         // run() in this thread.
180         // create a file logger url from the command line args
181         zkServer = new ZooKeeperServer();
182 
183         FileTxnSnapLog ftxn = new FileTxnSnapLog(new
184             File(config.getDataLogDir()), new File(config.getDataDir()));
185         zkServer.setTxnLogFactory(ftxn);
186         zkServer.setTickTime(GiraphConstants.DEFAULT_ZOOKEEPER_TICK_TIME);
187         zkServer.setMinSessionTimeout(config.getMinSessionTimeout());
188         zkServer.setMaxSessionTimeout(config.getMaxSessionTimeout());
189         cnxnFactory = ServerCnxnFactory.createFactory();
190         cnxnFactory.configure(config.getClientPortAddress(),
191             GiraphConstants.DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS);
192         cnxnFactory.startup(zkServer);
193       } catch (InterruptedException e) {
194         // warn, but generally this is ok
195         LOG.warn("Server interrupted", e);
196       }
197     }
198 
199 
200     /**
201      * Stop zookeeper service.
202      */
203     public void stop() {
204       cnxnFactory.shutdown();
205     }
206   }
207 }