View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.zk;
19  
20  import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
21  import org.apache.giraph.conf.GiraphConstants;
22  import org.apache.log4j.Logger;
23  import org.apache.zookeeper.jmx.ManagedUtil;
24  import org.apache.zookeeper.server.DatadirCleanupManager;
25  import org.apache.zookeeper.server.ServerCnxnFactory;
26  import org.apache.zookeeper.server.ZooKeeperServer;
27  import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
28  import org.apache.zookeeper.server.quorum.QuorumPeerMain;
29  
30  import javax.management.JMException;
31  import java.io.File;
32  import java.io.IOException;
33  
34  /**
35   * Zookeeper wrapper that starts zookeeper withing master process.
36   */
37  public class InProcessZooKeeperRunner
38      extends DefaultImmutableClassesGiraphConfigurable
39      implements ZooKeeperRunner {
40  
41    /** Class logger */
42    private static final Logger LOG =
43        Logger.getLogger(InProcessZooKeeperRunner.class);
44    /**
45     * Wrapper for zookeeper quorum.
46     */
47    private QuorumRunner quorumRunner = new QuorumRunner();
48  
49    @Override
50    public int start(String zkDir, ZookeeperConfig config) throws IOException {
51      return quorumRunner.start(config);
52    }
53  
54    @Override
55    public void stop() {
56      try {
57        quorumRunner.stop();
58      } catch (InterruptedException e) {
59        LOG.error("Unable to cleanly shutdown zookeeper", e);
60      }
61    }
62  
63    @Override
64    public void cleanup() {
65    }
66  
67    /**
68     * Wrapper around zookeeper quorum. Does not necessarily
69     * starts quorum, if there is only one server in config file
70     * will only start zookeeper.
71     */
72    private static class QuorumRunner extends QuorumPeerMain {
73  
74      /**
75       * ZooKeeper server wrapper.
76       */
77      private ZooKeeperServerRunner serverRunner;
78  
79      /**
80       * Starts quorum and/or zookeeper service.
81       * @param config quorum and zookeeper configuration
82       * @return zookeeper port
83       * @throws IOException if can't start zookeeper
84       */
85      public int start(ZookeeperConfig config) throws IOException {
86        serverRunner = new ZooKeeperServerRunner();
87        //Make sure zookeeper starts first and purge manager last
88        //This is important because zookeeper creates a folder
89        //strucutre on the local disk. Purge manager also tries
90        //to create it but from a different thread and can run into
91        //race condition. See FileTxnSnapLog source code for details.
92        int port = serverRunner.start(config);
93        // Start and schedule the the purge task
94        DatadirCleanupManager purgeMgr = new DatadirCleanupManager(
95            config
96                .getDataDir(), config.getDataLogDir(),
97            GiraphConstants.ZOOKEEPER_SNAP_RETAIN_COUNT,
98            GiraphConstants.ZOOKEEPER_PURGE_INTERVAL);
99        purgeMgr.start();
100 
101       return port;
102     }
103 
104     /**
105      * Stop quorum and/or zookeeper.
106      * @throws InterruptedException
107      */
108     public void stop() throws InterruptedException {
109       if (quorumPeer != null) {
110         quorumPeer.shutdown();
111         quorumPeer.join();
112       } else if (serverRunner != null) {
113         serverRunner.stop();
114       } else {
115         LOG.warn("Neither quorum nor server is set");
116       }
117     }
118   }
119 
120   /**
121    * Wrapper around zookeeper service.
122    */
123   public static class ZooKeeperServerRunner  {
124     /**
125      * Reference to zookeeper factory.
126      */
127     private ServerCnxnFactory cnxnFactory;
128     /**
129      * Reference to zookeeper server.
130      */
131     private ZooKeeperServer zkServer;
132 
133     /**
134      * Start zookeeper service.
135      * @param config zookeeper configuration
136      * formatted properly
137      * @return the port zookeeper has started on.
138      * @throws IOException
139      */
140     public int start(ZookeeperConfig config) throws IOException {
141       LOG.warn("Either no config or no quorum defined in config, " +
142           "running in process");
143       try {
144         ManagedUtil.registerLog4jMBeans();
145       } catch (JMException e) {
146         LOG.warn("Unable to register log4j JMX control", e);
147       }
148 
149       runFromConfig(config);
150       Thread zkThread = new Thread(new Runnable() {
151         @Override
152         public void run() {
153           try {
154             cnxnFactory.join();
155             if (zkServer.isRunning()) {
156               zkServer.shutdown();
157             }
158           } catch (InterruptedException e) {
159             LOG.error(e.getMessage(), e);
160           }
161 
162         }
163       });
164       zkThread.setDaemon(true);
165       zkThread.start();
166       return zkServer.getClientPort();
167     }
168 
169 
170     /**
171      * Run from a ServerConfig.
172      * @param config ServerConfig to use.
173      * @throws IOException
174      */
175     public void runFromConfig(ZookeeperConfig config) throws IOException {
176       LOG.info("Starting server");
177       try {
178         // Note that this thread isn't going to be doing anything else,
179         // so rather than spawning another thread, we will just call
180         // run() in this thread.
181         // create a file logger url from the command line args
182         zkServer = new ZooKeeperServer();
183 
184         FileTxnSnapLog ftxn = new FileTxnSnapLog(new
185             File(config.getDataLogDir()), new File(config.getDataDir()));
186         zkServer.setTxnLogFactory(ftxn);
187         zkServer.setTickTime(GiraphConstants.DEFAULT_ZOOKEEPER_TICK_TIME);
188         zkServer.setMinSessionTimeout(config.getMinSessionTimeout());
189         zkServer.setMaxSessionTimeout(config.getMaxSessionTimeout());
190         cnxnFactory = ServerCnxnFactory.createFactory();
191         cnxnFactory.configure(config.getClientPortAddress(),
192             GiraphConstants.DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS);
193         cnxnFactory.startup(zkServer);
194       } catch (InterruptedException e) {
195         // warn, but generally this is ok
196         LOG.warn("Server interrupted", e);
197       }
198     }
199 
200 
201     /**
202      * Stop zookeeper service.
203      */
204     public void stop() {
205       cnxnFactory.shutdown();
206     }
207   }
208 }