View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.zk;
20  
21  import java.io.IOException;
22  
23  import org.apache.hadoop.util.Progressable;
24  import org.apache.log4j.Logger;
25  import org.apache.zookeeper.KeeperException;
26  import org.apache.zookeeper.CreateMode;
27  import org.apache.zookeeper.data.ACL;
28  import org.apache.zookeeper.data.Stat;
29  
30  import java.util.ArrayList;
31  import java.util.Collections;
32  import java.util.Comparator;
33  import java.util.List;
34  
35  import org.apache.zookeeper.Watcher;
36  import org.apache.zookeeper.ZooKeeper;
37  
38  /**
39   * ZooKeeper provides only atomic operations.  ZooKeeperExt provides additional
40   * non-atomic operations that are useful.  It also provides wrappers to
41   * deal with ConnectionLossException.  All methods of this class
42   * should be thread-safe.
43   */
44  public class ZooKeeperExt {
45    /** Internal logger */
46    private static final Logger LOG = Logger.getLogger(ZooKeeperExt.class);
47    /** Length of the ZK sequence number */
48    private static final int SEQUENCE_NUMBER_LENGTH = 10;
49    /** Internal ZooKeeper */
50    private final ZooKeeper zooKeeper;
51    /** Ensure we have progress */
52    private final Progressable progressable;
53    /** Number of max attempts to retry when failing due to connection loss */
54    private final int maxRetryAttempts;
55    /** Milliseconds to wait before trying again due to connection loss */
56    private final int retryWaitMsecs;
57  
58    /**
59     * Constructor to connect to ZooKeeper, does not make progress
60     *
61     * @param connectString Comma separated host:port pairs, each corresponding
62     *        to a zk server. e.g.
63     *        "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" If the optional
64     *        chroot suffix is used the example would look
65     *        like: "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a"
66     *        where the client would be rooted at "/app/a" and all paths
67     *        would be relative to this root - ie getting/setting/etc...
68     *        "/foo/bar" would result in operations being run on
69     *        "/app/a/foo/bar" (from the server perspective).
70     * @param sessionTimeout Session timeout in milliseconds
71     * @param maxRetryAttempts Max retry attempts during connection loss
72     * @param retryWaitMsecs Msecs to wait when retrying due to connection
73     *        loss
74     * @param watcher A watcher object which will be notified of state changes,
75     *        may also be notified for node events
76     * @throws IOException
77     */
78    public ZooKeeperExt(String connectString,
79                        int sessionTimeout,
80                        int maxRetryAttempts,
81                        int retryWaitMsecs,
82                        Watcher watcher) throws IOException {
83      this(connectString, sessionTimeout, maxRetryAttempts,
84          retryWaitMsecs, watcher, null);
85    }
86  
87    /**
88     * Constructor to connect to ZooKeeper, make progress
89     *
90     * @param connectString Comma separated host:port pairs, each corresponding
91     *        to a zk server. e.g.
92     *        "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" If the optional
93     *        chroot suffix is used the example would look
94     *        like: "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a"
95     *        where the client would be rooted at "/app/a" and all paths
96     *        would be relative to this root - ie getting/setting/etc...
97     *        "/foo/bar" would result in operations being run on
98     *        "/app/a/foo/bar" (from the server perspective).
99     * @param sessionTimeout Session timeout in milliseconds
100    * @param maxRetryAttempts Max retry attempts during connection loss
101    * @param retryWaitMsecs Msecs to wait when retrying due to connection
102    *        loss
103    * @param watcher A watcher object which will be notified of state changes,
104    *        may also be notified for node events
105    * @param progressable Makes progress for longer operations
106    * @throws IOException
107    */
108   public ZooKeeperExt(String connectString,
109       int sessionTimeout,
110       int maxRetryAttempts,
111       int retryWaitMsecs,
112       Watcher watcher,
113       Progressable progressable) throws IOException {
114     this.zooKeeper = new ZooKeeper(connectString, sessionTimeout, watcher);
115     this.progressable = progressable;
116     this.maxRetryAttempts = maxRetryAttempts;
117     this.retryWaitMsecs = retryWaitMsecs;
118   }
119 
120   /**
121    * Provides a possibility of a creating a path consisting of more than one
122    * znode (not atomic).  If recursive is false, operates exactly the
123    * same as create().
124    *
125    * @param path path to create
126    * @param data data to set on the final znode
127    * @param acl acls on each znode created
128    * @param createMode only affects the final znode
129    * @param recursive if true, creates all ancestors
130    * @return Actual created path
131    * @throws KeeperException
132    * @throws InterruptedException
133    */
134   public String createExt(
135       final String path,
136       byte[] data,
137       List<ACL> acl,
138       CreateMode createMode,
139       boolean recursive) throws KeeperException, InterruptedException {
140     if (LOG.isDebugEnabled()) {
141       LOG.debug("createExt: Creating path " + path);
142     }
143 
144     int attempt = 0;
145     while (attempt < maxRetryAttempts) {
146       try {
147         if (!recursive) {
148           return zooKeeper.create(path, data, acl, createMode);
149         }
150 
151         try {
152           return zooKeeper.create(path, data, acl, createMode);
153         } catch (KeeperException.NoNodeException e) {
154           if (LOG.isDebugEnabled()) {
155             LOG.debug("createExt: Cannot directly create node " + path);
156           }
157         }
158 
159         int pos = path.indexOf("/", 1);
160         for (; pos != -1; pos = path.indexOf("/", pos + 1)) {
161           try {
162             if (progressable != null) {
163               progressable.progress();
164             }
165             String filePath = path.substring(0, pos);
166             if (zooKeeper.exists(filePath, false) == null) {
167               zooKeeper.create(
168                   filePath, null, acl, CreateMode.PERSISTENT);
169             }
170           } catch (KeeperException.NodeExistsException e) {
171             if (LOG.isDebugEnabled()) {
172               LOG.debug("createExt: Znode " + path.substring(0, pos) +
173                   " already exists");
174             }
175           }
176         }
177         return zooKeeper.create(path, data, acl, createMode);
178       } catch (KeeperException.ConnectionLossException e) {
179         LOG.warn("createExt: Connection loss on attempt " + attempt + ", " +
180             "waiting " + retryWaitMsecs + " msecs before retrying.", e);
181       }
182       ++attempt;
183       Thread.sleep(retryWaitMsecs);
184     }
185     throw new IllegalStateException("createExt: Failed to create " + path  +
186         " after " + attempt + " tries!");
187   }
188 
189   /**
190    * Data structure for handling the output of createOrSet()
191    */
192   public static class PathStat {
193     /** Path to created znode (if any) */
194     private String path;
195     /** Stat from set znode (if any) */
196     private Stat stat;
197 
198     /**
199      * Put in results from createOrSet()
200      *
201      * @param path Path to created znode (or null)
202      * @param stat Stat from set znode (if set)
203      */
204     public PathStat(String path, Stat stat) {
205       this.path = path;
206       this.stat = stat;
207     }
208 
209     /**
210      * Get the path of the created znode if it was created.
211      *
212      * @return Path of created znode or null if not created
213      */
214     public String getPath() {
215       return path;
216     }
217 
218     /**
219      * Get the stat of the set znode if set
220      *
221      * @return Stat of set znode or null if not set
222      */
223     public Stat getStat() {
224       return stat;
225     }
226   }
227 
228   /**
229    * Create a znode.  Set the znode if the created znode already exists.
230    *
231    * @param path path to create
232    * @param data data to set on the final znode
233    * @param acl acls on each znode created
234    * @param createMode only affects the final znode
235    * @param recursive if true, creates all ancestors
236    * @param version Version to set if setting
237    * @return Path of created znode or Stat of set znode
238    * @throws InterruptedException
239    * @throws KeeperException
240    */
241   public PathStat createOrSetExt(final String path,
242       byte[] data,
243       List<ACL> acl,
244       CreateMode createMode,
245       boolean recursive,
246       int version) throws KeeperException, InterruptedException {
247     String createdPath = null;
248     Stat setStat = null;
249     try {
250       createdPath = createExt(path, data, acl, createMode, recursive);
251     } catch (KeeperException.NodeExistsException e) {
252       if (LOG.isDebugEnabled()) {
253         LOG.debug("createOrSet: Node exists on path " + path);
254       }
255       setStat = zooKeeper.setData(path, data, version);
256     }
257     return new PathStat(createdPath, setStat);
258   }
259 
260   /**
261    * Create a znode if there is no other znode there
262    *
263    * @param path path to create
264    * @param data data to set on the final znode
265    * @param acl acls on each znode created
266    * @param createMode only affects the final znode
267    * @param recursive if true, creates all ancestors
268    * @return Path of created znode or Stat of set znode
269    * @throws InterruptedException
270    * @throws KeeperException
271    */
272   public PathStat createOnceExt(final String path,
273       byte[] data,
274       List<ACL> acl,
275       CreateMode createMode,
276       boolean recursive) throws KeeperException, InterruptedException {
277     String createdPath = null;
278     Stat setStat = null;
279     try {
280       createdPath = createExt(path, data, acl, createMode, recursive);
281     } catch (KeeperException.NodeExistsException e) {
282       if (LOG.isDebugEnabled()) {
283         LOG.debug("createOnceExt: Node already exists on path " + path);
284       }
285     }
286     return new PathStat(createdPath, setStat);
287   }
288 
289   /**
290    * Delete a path recursively.  When the deletion is recursive, it is a
291    * non-atomic operation, hence, not part of ZooKeeper.
292    * @param path path to remove (i.e. /tmp will remove /tmp/1 and /tmp/2)
293    * @param version expected version (-1 for all)
294    * @param recursive if true, remove all children, otherwise behave like
295    *        remove()
296    * @throws InterruptedException
297    * @throws KeeperException
298    */
299   public void deleteExt(final String path, int version, boolean recursive)
300     throws InterruptedException, KeeperException {
301     int attempt = 0;
302     while (attempt < maxRetryAttempts) {
303       try {
304         if (!recursive) {
305           zooKeeper.delete(path, version);
306           return;
307         }
308 
309         try {
310           zooKeeper.delete(path, version);
311           return;
312         } catch (KeeperException.NotEmptyException e) {
313           if (LOG.isDebugEnabled()) {
314             LOG.debug("deleteExt: Cannot directly remove node " + path);
315           }
316         }
317 
318         List<String> childList = zooKeeper.getChildren(path, false);
319         for (String child : childList) {
320           if (progressable != null) {
321             progressable.progress();
322           }
323           deleteExt(path + "/" + child, -1, true);
324         }
325 
326         zooKeeper.delete(path, version);
327         return;
328       } catch (KeeperException.ConnectionLossException e) {
329         LOG.warn("deleteExt: Connection loss on attempt " +
330             attempt + ", waiting " + retryWaitMsecs +
331             " msecs before retrying.", e);
332       }
333       ++attempt;
334       Thread.sleep(retryWaitMsecs);
335     }
336     throw new IllegalStateException("deleteExt: Failed to delete " + path  +
337         " after " + attempt + " tries!");
338   }
339 
340   /**
341    * Return the stat of the node of the given path. Return null if no such a
342    * node exists.
343    * <p>
344    * If the watch is true and the call is successful (no exception is thrown),
345    * a watch will be left on the node with the given path. The watch will be
346    * triggered by a successful operation that creates/delete the node or sets
347    * the data on the node.
348    *
349    * @param path
350    *                the node path
351    * @param watch
352    *                whether need to watch this node
353    * @return the stat of the node of the given path; return null if no such a
354    *         node exists.
355    * @throws KeeperException If the server signals an error
356    * @throws InterruptedException If the server transaction is interrupted.
357    */
358   public Stat exists(String path, boolean watch) throws KeeperException,
359       InterruptedException {
360     int attempt = 0;
361     while (attempt < maxRetryAttempts) {
362       try {
363         return zooKeeper.exists(path, watch);
364       } catch (KeeperException.ConnectionLossException e) {
365         LOG.warn("exists: Connection loss on attempt " +
366             attempt + ", waiting " + retryWaitMsecs +
367             " msecs before retrying.", e);
368       }
369       ++attempt;
370       Thread.sleep(retryWaitMsecs);
371     }
372     throw new IllegalStateException("exists: Failed to check " + path  +
373         " after " + attempt + " tries!");
374   }
375 
376   /**
377    * Return the stat of the node of the given path. Return null if no such a
378    * node exists.
379    * <p>
380    * If the watch is non-null and the call is successful (no exception is
381    * thrown), a watch will be left on the node with the given path. The
382    * watch will be triggered by a successful operation that
383    * creates/delete the node or sets the data on the node.
384    *
385    * @param path the node path
386    * @param watcher explicit watcher
387    * @return the stat of the node of the given path; return null if no such a
388    *         node exists.
389    * @throws KeeperException If the server signals an error
390    * @throws InterruptedException If the server transaction is interrupted.
391    * @throws IllegalArgumentException if an invalid path is specified
392    */
393   public Stat exists(final String path, Watcher watcher)
394     throws KeeperException, InterruptedException {
395     int attempt = 0;
396     while (attempt < maxRetryAttempts) {
397       try {
398         return zooKeeper.exists(path, watcher);
399       } catch (KeeperException.ConnectionLossException e) {
400         LOG.warn("exists: Connection loss on attempt " +
401             attempt + ", waiting " + retryWaitMsecs +
402             " msecs before retrying.", e);
403       }
404       ++attempt;
405       Thread.sleep(retryWaitMsecs);
406     }
407     throw new IllegalStateException("exists: Failed to check " + path  +
408         " after " + attempt + " tries!");
409   }
410 
411   /**
412    * Return the data and the stat of the node of the given path.
413    * <p>
414    * If the watch is non-null and the call is successful (no exception is
415    * thrown), a watch will be left on the node with the given path. The watch
416    * will be triggered by a successful operation that sets data on the node, or
417    * deletes the node.
418    * <p>
419    * A KeeperException with error code KeeperException.NoNode will be thrown
420    * if no node with the given path exists.
421    *
422    * @param path the given path
423    * @param watcher explicit watcher
424    * @param stat the stat of the node
425    * @return the data of the node
426    * @throws KeeperException If the server signals an error with a non-zero
427    *         error code
428    * @throws InterruptedException If the server transaction is interrupted.
429    * @throws IllegalArgumentException if an invalid path is specified
430    */
431   public byte[] getData(final String path, Watcher watcher, Stat stat)
432     throws KeeperException, InterruptedException {
433     int attempt = 0;
434     while (attempt < maxRetryAttempts) {
435       try {
436         return zooKeeper.getData(path, watcher, stat);
437       } catch (KeeperException.ConnectionLossException e) {
438         LOG.warn("getData: Connection loss on attempt " +
439             attempt + ", waiting " + retryWaitMsecs +
440             " msecs before retrying.", e);
441       }
442       ++attempt;
443       Thread.sleep(retryWaitMsecs);
444     }
445     throw new IllegalStateException("getData: Failed to get " + path  +
446         " after " + attempt + " tries!");
447   }
448 
449   /**
450    * Return the data and the stat of the node of the given path.
451    * <p>
452    * If the watch is true and the call is successful (no exception is
453    * thrown), a watch will be left on the node with the given path. The watch
454    * will be triggered by a successful operation that sets data on the node, or
455    * deletes the node.
456    * <p>
457    * A KeeperException with error code KeeperException.NoNode will be thrown
458    * if no node with the given path exists.
459    *
460    * @param path the given path
461    * @param watch whether need to watch this node
462    * @param stat the stat of the node
463    * @return the data of the node
464    * @throws KeeperException If the server signals an error with a non-zero
465    *         error code
466    * @throws InterruptedException If the server transaction is interrupted.
467    */
468   public byte[] getData(String path, boolean watch, Stat stat)
469     throws KeeperException, InterruptedException {
470     int attempt = 0;
471     while (attempt < maxRetryAttempts) {
472       try {
473         return zooKeeper.getData(path, watch, stat);
474       } catch (KeeperException.ConnectionLossException e) {
475         LOG.warn("getData: Connection loss on attempt " +
476             attempt + ", waiting " + retryWaitMsecs +
477             " msecs before retrying.", e);
478       }
479       ++attempt;
480       Thread.sleep(retryWaitMsecs);
481     }
482     throw new IllegalStateException("getData: Failed to get " + path  +
483         " after " + attempt + " tries!");
484   }
485 
486   /**
487    * Get the children of the path with extensions.
488    * Extension 1: Sort the children based on sequence number
489    * Extension 2: Get the full path instead of relative path
490    *
491    * @param path path to znode
492    * @param watch set the watch?
493    * @param sequenceSorted sort by the sequence number
494    * @param fullPath if true, get the fully znode path back
495    * @return list of children
496    * @throws InterruptedException
497    * @throws KeeperException
498    */
499   public List<String> getChildrenExt(
500       final String path,
501       boolean watch,
502       boolean sequenceSorted,
503       boolean fullPath) throws KeeperException, InterruptedException {
504     int attempt = 0;
505     while (attempt < maxRetryAttempts) {
506       try {
507         List<String> childList = zooKeeper.getChildren(path, watch);
508         /* Sort children according to the sequence number, if desired */
509         if (sequenceSorted) {
510           Collections.sort(childList, new Comparator<String>() {
511             public int compare(String s1, String s2) {
512               if ((s1.length() <= SEQUENCE_NUMBER_LENGTH) ||
513                   (s2.length() <= SEQUENCE_NUMBER_LENGTH)) {
514                 throw new RuntimeException(
515                     "getChildrenExt: Invalid length for sequence " +
516                         " sorting > " +
517                         SEQUENCE_NUMBER_LENGTH +
518                         " for s1 (" +
519                         s1.length() + ") or s2 (" + s2.length() + ")");
520               }
521               int s1sequenceNumber = Integer.parseInt(
522                   s1.substring(s1.length() -
523                       SEQUENCE_NUMBER_LENGTH));
524               int s2sequenceNumber = Integer.parseInt(
525                   s2.substring(s2.length() -
526                       SEQUENCE_NUMBER_LENGTH));
527               return s1sequenceNumber - s2sequenceNumber;
528             }
529           });
530         }
531         if (fullPath) {
532           List<String> fullChildList = new ArrayList<String>();
533           for (String child : childList) {
534             fullChildList.add(path + "/" + child);
535           }
536           return fullChildList;
537         }
538         return childList;
539       } catch (KeeperException.ConnectionLossException e) {
540         LOG.warn("getChildrenExt: Connection loss on attempt " +
541             attempt + ", waiting " + retryWaitMsecs +
542             " msecs before retrying.", e);
543       }
544       ++attempt;
545       Thread.sleep(retryWaitMsecs);
546     }
547     throw new IllegalStateException("createExt: Failed to create " + path  +
548         " after " + attempt + " tries!");
549   }
550 
551   /**
552    * Close this client object. Once the client is closed, its session becomes
553    * invalid. All the ephemeral nodes in the ZooKeeper server associated with
554    * the session will be removed. The watches left on those nodes (and on
555    * their parents) will be triggered.
556    *
557    * @throws InterruptedException
558    */
559   public void close() throws InterruptedException {
560     zooKeeper.close();
561   }
562 }