1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.giraph.zk;
20
21 import java.io.IOException;
22
23 import org.apache.hadoop.util.Progressable;
24 import org.apache.log4j.Logger;
25 import org.apache.zookeeper.KeeperException;
26 import org.apache.zookeeper.CreateMode;
27 import org.apache.zookeeper.data.ACL;
28 import org.apache.zookeeper.data.Stat;
29
30 import java.util.ArrayList;
31 import java.util.Collections;
32 import java.util.Comparator;
33 import java.util.List;
34
35 import org.apache.zookeeper.Watcher;
36 import org.apache.zookeeper.ZooKeeper;
37
38 /**
39 * ZooKeeper provides only atomic operations. ZooKeeperExt provides additional
40 * non-atomic operations that are useful. It also provides wrappers to
41 * deal with ConnectionLossException. All methods of this class
42 * should be thread-safe.
43 */
44 public class ZooKeeperExt {
45 /** Length of the ZK sequence number */
46 public static final int SEQUENCE_NUMBER_LENGTH = 10;
47 /** Internal logger */
48 private static final Logger LOG = Logger.getLogger(ZooKeeperExt.class);
49 /** Internal ZooKeeper */
50 private final ZooKeeper zooKeeper;
51 /** Ensure we have progress */
52 private final Progressable progressable;
53 /** Number of max attempts to retry when failing due to connection loss */
54 private final int maxRetryAttempts;
55 /** Milliseconds to wait before trying again due to connection loss */
56 private final int retryWaitMsecs;
57
58 /**
59 * Constructor to connect to ZooKeeper, does not make progress
60 *
61 * @param connectString Comma separated host:port pairs, each corresponding
62 * to a zk server. e.g.
63 * "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" If the optional
64 * chroot suffix is used the example would look
65 * like: "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a"
66 * where the client would be rooted at "/app/a" and all paths
67 * would be relative to this root - ie getting/setting/etc...
68 * "/foo/bar" would result in operations being run on
69 * "/app/a/foo/bar" (from the server perspective).
70 * @param sessionTimeout Session timeout in milliseconds
71 * @param maxRetryAttempts Max retry attempts during connection loss
72 * @param retryWaitMsecs Msecs to wait when retrying due to connection
73 * loss
74 * @param watcher A watcher object which will be notified of state changes,
75 * may also be notified for node events
76 * @throws IOException
77 */
78 public ZooKeeperExt(String connectString,
79 int sessionTimeout,
80 int maxRetryAttempts,
81 int retryWaitMsecs,
82 Watcher watcher) throws IOException {
83 this(connectString, sessionTimeout, maxRetryAttempts,
84 retryWaitMsecs, watcher, null);
85 }
86
87 /**
88 * Constructor to connect to ZooKeeper, make progress
89 *
90 * @param connectString Comma separated host:port pairs, each corresponding
91 * to a zk server. e.g.
92 * "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" If the optional
93 * chroot suffix is used the example would look
94 * like: "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a"
95 * where the client would be rooted at "/app/a" and all paths
96 * would be relative to this root - ie getting/setting/etc...
97 * "/foo/bar" would result in operations being run on
98 * "/app/a/foo/bar" (from the server perspective).
99 * @param sessionTimeout Session timeout in milliseconds
100 * @param maxRetryAttempts Max retry attempts during connection loss
101 * @param retryWaitMsecs Msecs to wait when retrying due to connection
102 * loss
103 * @param watcher A watcher object which will be notified of state changes,
104 * may also be notified for node events
105 * @param progressable Makes progress for longer operations
106 * @throws IOException
107 */
108 public ZooKeeperExt(String connectString,
109 int sessionTimeout,
110 int maxRetryAttempts,
111 int retryWaitMsecs,
112 Watcher watcher,
113 Progressable progressable) throws IOException {
114 this.zooKeeper = new ZooKeeper(connectString, sessionTimeout, watcher);
115 this.progressable = progressable;
116 this.maxRetryAttempts = maxRetryAttempts;
117 this.retryWaitMsecs = retryWaitMsecs;
118 }
119
120 /**
121 * Provides a possibility of a creating a path consisting of more than one
122 * znode (not atomic). If recursive is false, operates exactly the
123 * same as create().
124 *
125 * @param path path to create
126 * @param data data to set on the final znode
127 * @param acl acls on each znode created
128 * @param createMode only affects the final znode
129 * @param recursive if true, creates all ancestors
130 * @return Actual created path
131 * @throws KeeperException
132 * @throws InterruptedException
133 */
134 public String createExt(
135 final String path,
136 byte[] data,
137 List<ACL> acl,
138 CreateMode createMode,
139 boolean recursive) throws KeeperException, InterruptedException {
140 if (LOG.isDebugEnabled()) {
141 LOG.debug("createExt: Creating path " + path);
142 }
143
144 int attempt = 0;
145 while (attempt < maxRetryAttempts) {
146 try {
147 if (!recursive) {
148 return zooKeeper.create(path, data, acl, createMode);
149 }
150
151 try {
152 return zooKeeper.create(path, data, acl, createMode);
153 } catch (KeeperException.NoNodeException e) {
154 if (LOG.isDebugEnabled()) {
155 LOG.debug("createExt: Cannot directly create node " + path);
156 }
157 }
158
159 int pos = path.indexOf("/", 1);
160 for (; pos != -1; pos = path.indexOf("/", pos + 1)) {
161 try {
162 if (progressable != null) {
163 progressable.progress();
164 }
165 String filePath = path.substring(0, pos);
166 if (zooKeeper.exists(filePath, false) == null) {
167 zooKeeper.create(
168 filePath, null, acl, CreateMode.PERSISTENT);
169 }
170 } catch (KeeperException.NodeExistsException e) {
171 if (LOG.isDebugEnabled()) {
172 LOG.debug("createExt: Znode " + path.substring(0, pos) +
173 " already exists");
174 }
175 }
176 }
177 return zooKeeper.create(path, data, acl, createMode);
178 } catch (KeeperException.ConnectionLossException e) {
179 LOG.warn("createExt: Connection loss on attempt " + attempt + ", " +
180 "waiting " + retryWaitMsecs + " msecs before retrying.", e);
181 }
182 ++attempt;
183 Thread.sleep(retryWaitMsecs);
184 }
185 throw new IllegalStateException("createExt: Failed to create " + path +
186 " after " + attempt + " tries!");
187 }
188
189 /**
190 * Data structure for handling the output of createOrSet()
191 */
192 public static class PathStat {
193 /** Path to created znode (if any) */
194 private String path;
195 /** Stat from set znode (if any) */
196 private Stat stat;
197
198 /**
199 * Put in results from createOrSet()
200 *
201 * @param path Path to created znode (or null)
202 * @param stat Stat from set znode (if set)
203 */
204 public PathStat(String path, Stat stat) {
205 this.path = path;
206 this.stat = stat;
207 }
208
209 /**
210 * Get the path of the created znode if it was created.
211 *
212 * @return Path of created znode or null if not created
213 */
214 public String getPath() {
215 return path;
216 }
217
218 /**
219 * Get the stat of the set znode if set
220 *
221 * @return Stat of set znode or null if not set
222 */
223 public Stat getStat() {
224 return stat;
225 }
226 }
227
228 /**
229 * Create a znode. Set the znode if the created znode already exists.
230 *
231 * @param path path to create
232 * @param data data to set on the final znode
233 * @param acl acls on each znode created
234 * @param createMode only affects the final znode
235 * @param recursive if true, creates all ancestors
236 * @param version Version to set if setting
237 * @return Path of created znode or Stat of set znode
238 * @throws InterruptedException
239 * @throws KeeperException
240 */
241 public PathStat createOrSetExt(final String path,
242 byte[] data,
243 List<ACL> acl,
244 CreateMode createMode,
245 boolean recursive,
246 int version) throws KeeperException, InterruptedException {
247 String createdPath = null;
248 Stat setStat = null;
249 try {
250 createdPath = createExt(path, data, acl, createMode, recursive);
251 } catch (KeeperException.NodeExistsException e) {
252 if (LOG.isDebugEnabled()) {
253 LOG.debug("createOrSet: Node exists on path " + path);
254 }
255 setStat = zooKeeper.setData(path, data, version);
256 }
257 return new PathStat(createdPath, setStat);
258 }
259
260 /**
261 * Create a znode if there is no other znode there
262 *
263 * @param path path to create
264 * @param data data to set on the final znode
265 * @param acl acls on each znode created
266 * @param createMode only affects the final znode
267 * @param recursive if true, creates all ancestors
268 * @return Path of created znode or Stat of set znode
269 * @throws InterruptedException
270 * @throws KeeperException
271 */
272 public PathStat createOnceExt(final String path,
273 byte[] data,
274 List<ACL> acl,
275 CreateMode createMode,
276 boolean recursive) throws KeeperException, InterruptedException {
277 String createdPath = null;
278 Stat setStat = null;
279 try {
280 createdPath = createExt(path, data, acl, createMode, recursive);
281 } catch (KeeperException.NodeExistsException e) {
282 if (LOG.isDebugEnabled()) {
283 LOG.debug("createOnceExt: Node already exists on path " + path);
284 }
285 }
286 return new PathStat(createdPath, setStat);
287 }
288
289 /**
290 * Delete a path recursively. When the deletion is recursive, it is a
291 * non-atomic operation, hence, not part of ZooKeeper.
292 * @param path path to remove (i.e. /tmp will remove /tmp/1 and /tmp/2)
293 * @param version expected version (-1 for all)
294 * @param recursive if true, remove all children, otherwise behave like
295 * remove()
296 * @throws InterruptedException
297 * @throws KeeperException
298 */
299 public void deleteExt(final String path, int version, boolean recursive)
300 throws InterruptedException, KeeperException {
301 int attempt = 0;
302 while (attempt < maxRetryAttempts) {
303 try {
304 if (!recursive) {
305 zooKeeper.delete(path, version);
306 return;
307 }
308
309 try {
310 zooKeeper.delete(path, version);
311 return;
312 } catch (KeeperException.NotEmptyException e) {
313 if (LOG.isDebugEnabled()) {
314 LOG.debug("deleteExt: Cannot directly remove node " + path);
315 }
316 }
317
318 List<String> childList = zooKeeper.getChildren(path, false);
319 for (String child : childList) {
320 if (progressable != null) {
321 progressable.progress();
322 }
323 deleteExt(path + "/" + child, -1, true);
324 }
325
326 zooKeeper.delete(path, version);
327 return;
328 } catch (KeeperException.ConnectionLossException e) {
329 LOG.warn("deleteExt: Connection loss on attempt " +
330 attempt + ", waiting " + retryWaitMsecs +
331 " msecs before retrying.", e);
332 }
333 ++attempt;
334 Thread.sleep(retryWaitMsecs);
335 }
336 throw new IllegalStateException("deleteExt: Failed to delete " + path +
337 " after " + attempt + " tries!");
338 }
339
340 /**
341 * Return the stat of the node of the given path. Return null if no such a
342 * node exists.
343 * <p>
344 * If the watch is true and the call is successful (no exception is thrown),
345 * a watch will be left on the node with the given path. The watch will be
346 * triggered by a successful operation that creates/delete the node or sets
347 * the data on the node.
348 *
349 * @param path
350 * the node path
351 * @param watch
352 * whether need to watch this node
353 * @return the stat of the node of the given path; return null if no such a
354 * node exists.
355 * @throws KeeperException If the server signals an error
356 * @throws InterruptedException If the server transaction is interrupted.
357 */
358 public Stat exists(String path, boolean watch) throws KeeperException,
359 InterruptedException {
360 int attempt = 0;
361 while (attempt < maxRetryAttempts) {
362 try {
363 return zooKeeper.exists(path, watch);
364 } catch (KeeperException.ConnectionLossException e) {
365 LOG.warn("exists: Connection loss on attempt " +
366 attempt + ", waiting " + retryWaitMsecs +
367 " msecs before retrying.", e);
368 }
369 ++attempt;
370 Thread.sleep(retryWaitMsecs);
371 }
372 throw new IllegalStateException("exists: Failed to check " + path +
373 " after " + attempt + " tries!");
374 }
375
376 /**
377 * Return the stat of the node of the given path. Return null if no such a
378 * node exists.
379 * <p>
380 * If the watch is non-null and the call is successful (no exception is
381 * thrown), a watch will be left on the node with the given path. The
382 * watch will be triggered by a successful operation that
383 * creates/delete the node or sets the data on the node.
384 *
385 * @param path the node path
386 * @param watcher explicit watcher
387 * @return the stat of the node of the given path; return null if no such a
388 * node exists.
389 * @throws KeeperException If the server signals an error
390 * @throws InterruptedException If the server transaction is interrupted.
391 * @throws IllegalArgumentException if an invalid path is specified
392 */
393 public Stat exists(final String path, Watcher watcher)
394 throws KeeperException, InterruptedException {
395 int attempt = 0;
396 while (attempt < maxRetryAttempts) {
397 try {
398 return zooKeeper.exists(path, watcher);
399 } catch (KeeperException.ConnectionLossException e) {
400 LOG.warn("exists: Connection loss on attempt " +
401 attempt + ", waiting " + retryWaitMsecs +
402 " msecs before retrying.", e);
403 }
404 ++attempt;
405 Thread.sleep(retryWaitMsecs);
406 }
407 throw new IllegalStateException("exists: Failed to check " + path +
408 " after " + attempt + " tries!");
409 }
410
411 /**
412 * Return the data and the stat of the node of the given path.
413 * <p>
414 * If the watch is non-null and the call is successful (no exception is
415 * thrown), a watch will be left on the node with the given path. The watch
416 * will be triggered by a successful operation that sets data on the node, or
417 * deletes the node.
418 * <p>
419 * A KeeperException with error code KeeperException.NoNode will be thrown
420 * if no node with the given path exists.
421 *
422 * @param path the given path
423 * @param watcher explicit watcher
424 * @param stat the stat of the node
425 * @return the data of the node
426 * @throws KeeperException If the server signals an error with a non-zero
427 * error code
428 * @throws InterruptedException If the server transaction is interrupted.
429 * @throws IllegalArgumentException if an invalid path is specified
430 */
431 public byte[] getData(final String path, Watcher watcher, Stat stat)
432 throws KeeperException, InterruptedException {
433 int attempt = 0;
434 while (attempt < maxRetryAttempts) {
435 try {
436 return zooKeeper.getData(path, watcher, stat);
437 } catch (KeeperException.ConnectionLossException e) {
438 LOG.warn("getData: Connection loss on attempt " +
439 attempt + ", waiting " + retryWaitMsecs +
440 " msecs before retrying.", e);
441 }
442 ++attempt;
443 Thread.sleep(retryWaitMsecs);
444 }
445 throw new IllegalStateException("getData: Failed to get " + path +
446 " after " + attempt + " tries!");
447 }
448
449 /**
450 * Return the data and the stat of the node of the given path.
451 * <p>
452 * If the watch is true and the call is successful (no exception is
453 * thrown), a watch will be left on the node with the given path. The watch
454 * will be triggered by a successful operation that sets data on the node, or
455 * deletes the node.
456 * <p>
457 * A KeeperException with error code KeeperException.NoNode will be thrown
458 * if no node with the given path exists.
459 *
460 * @param path the given path
461 * @param watch whether need to watch this node
462 * @param stat the stat of the node
463 * @return the data of the node
464 * @throws KeeperException If the server signals an error with a non-zero
465 * error code
466 * @throws InterruptedException If the server transaction is interrupted.
467 */
468 public byte[] getData(String path, boolean watch, Stat stat)
469 throws KeeperException, InterruptedException {
470 int attempt = 0;
471 while (attempt < maxRetryAttempts) {
472 try {
473 return zooKeeper.getData(path, watch, stat);
474 } catch (KeeperException.ConnectionLossException e) {
475 LOG.warn("getData: Connection loss on attempt " +
476 attempt + ", waiting " + retryWaitMsecs +
477 " msecs before retrying.", e);
478 }
479 ++attempt;
480 Thread.sleep(retryWaitMsecs);
481 }
482 throw new IllegalStateException("getData: Failed to get " + path +
483 " after " + attempt + " tries!");
484 }
485
486 /**
487 * Get the children of the path with extensions.
488 * Extension 1: Sort the children based on sequence number
489 * Extension 2: Get the full path instead of relative path
490 *
491 * @param path path to znode
492 * @param watch set the watch?
493 * @param sequenceSorted sort by the sequence number
494 * @param fullPath if true, get the fully znode path back
495 * @return list of children
496 * @throws InterruptedException
497 * @throws KeeperException
498 */
499 public List<String> getChildrenExt(
500 final String path,
501 boolean watch,
502 boolean sequenceSorted,
503 boolean fullPath) throws KeeperException, InterruptedException {
504 int attempt = 0;
505 while (attempt < maxRetryAttempts) {
506 try {
507 List<String> childList = zooKeeper.getChildren(path, watch);
508 /* Sort children according to the sequence number, if desired */
509 if (sequenceSorted) {
510 Collections.sort(childList, new Comparator<String>() {
511 public int compare(String s1, String s2) {
512 if ((s1.length() <= SEQUENCE_NUMBER_LENGTH) ||
513 (s2.length() <= SEQUENCE_NUMBER_LENGTH)) {
514 throw new RuntimeException(
515 "getChildrenExt: Invalid length for sequence " +
516 " sorting > " +
517 SEQUENCE_NUMBER_LENGTH +
518 " for s1 (" +
519 s1.length() + ") or s2 (" + s2.length() + ")");
520 }
521 int s1sequenceNumber = Integer.parseInt(
522 s1.substring(s1.length() -
523 SEQUENCE_NUMBER_LENGTH));
524 int s2sequenceNumber = Integer.parseInt(
525 s2.substring(s2.length() -
526 SEQUENCE_NUMBER_LENGTH));
527 return s1sequenceNumber - s2sequenceNumber;
528 }
529 });
530 }
531 if (fullPath) {
532 List<String> fullChildList = new ArrayList<String>();
533 for (String child : childList) {
534 fullChildList.add(path + "/" + child);
535 }
536 return fullChildList;
537 }
538 return childList;
539 } catch (KeeperException.ConnectionLossException e) {
540 LOG.warn("getChildrenExt: Connection loss on attempt " +
541 attempt + ", waiting " + retryWaitMsecs +
542 " msecs before retrying.", e);
543 }
544 ++attempt;
545 Thread.sleep(retryWaitMsecs);
546 }
547 throw new IllegalStateException("createExt: Failed to create " + path +
548 " after " + attempt + " tries!");
549 }
550
551 /**
552 * Close this client object. Once the client is closed, its session becomes
553 * invalid. All the ephemeral nodes in the ZooKeeper server associated with
554 * the session will be removed. The watches left on those nodes (and on
555 * their parents) will be triggered.
556 *
557 * @throws InterruptedException
558 */
559 public void close() throws InterruptedException {
560 zooKeeper.close();
561 }
562 }