This project has retired. For details please refer to its
Attic page.
ZooKeeperManager xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.zk;
20
21 import com.google.common.util.concurrent.Uninterruptibles;
22 import org.apache.commons.io.FileUtils;
23 import org.apache.giraph.conf.GiraphConstants;
24 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
25 import org.apache.giraph.time.SystemTime;
26 import org.apache.giraph.time.Time;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.fs.FileStatus;
29 import org.apache.hadoop.fs.FileSystem;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.mapreduce.Mapper;
32 import org.apache.log4j.Logger;
33
34 import java.io.File;
35 import java.io.IOException;
36 import java.net.ConnectException;
37 import java.net.InetSocketAddress;
38 import java.net.Socket;
39 import java.net.SocketTimeoutException;
40 import java.util.Arrays;
41 import java.util.concurrent.TimeUnit;
42
43 import static com.google.common.base.Preconditions.checkState;
44 import static org.apache.giraph.conf.GiraphConstants.BASE_ZNODE_KEY;
45 import static org.apache.giraph.conf.GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY;
46
47
48
49
50
51
52 public class ZooKeeperManager {
53
54 private static final Logger LOG = Logger.getLogger(ZooKeeperManager.class);
55
56 private static final String HOSTNAME_TASK_SEPARATOR = " ";
57
58 private static final String ZOOKEEPER_SERVER_LIST_FILE_PREFIX =
59 "zkServerList_";
60
61 private Mapper<?, ?, ?, ?>.Context context;
62
63 private final ImmutableClassesGiraphConfiguration conf;
64
65 private final int taskPartition;
66
67 private final Path baseDirectory;
68
69
70
71
72 private final Path taskDirectory;
73
74
75
76
77 private final Path serverDirectory;
78
79 private final Path myClosedPath;
80
81 private final int pollMsecs;
82
83 private final FileSystem fs;
84
85 private ZooKeeperRunner zkRunner;
86
87 private final String zkDir;
88
89 private final ZookeeperConfig config;
90
91 private String zkServerHost;
92
93 private int zkServerTask;
94
95 private int zkBasePort;
96
97 private String zkServerPortString;
98
99 private String myHostname = null;
100
101 private final String jobId;
102
103 private final Time time = SystemTime.get();
104
105
106 public enum State {
107
108 FAILED,
109
110 FINISHED
111 }
112
113
114
115
116
117
118
119
120 public ZooKeeperManager(Mapper<?, ?, ?, ?>.Context context,
121 ImmutableClassesGiraphConfiguration configuration)
122 throws IOException {
123 this.context = context;
124 this.conf = configuration;
125 taskPartition = conf.getTaskPartition();
126 jobId = conf.getJobId();
127 baseDirectory =
128 new Path(ZOOKEEPER_MANAGER_DIRECTORY.getWithDefault(conf,
129 getFinalZooKeeperPath()));
130 taskDirectory = new Path(baseDirectory,
131 "_task");
132 serverDirectory = new Path(baseDirectory,
133 "_zkServer");
134 myClosedPath = new Path(taskDirectory,
135 (new ComputationDoneName(taskPartition)).getName());
136 pollMsecs = GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS.get(conf);
137 String jobLocalDir = conf.get("job.local.dir");
138 String zkDirDefault;
139 if (jobLocalDir != null) {
140 zkDirDefault = jobLocalDir +
141 "/_bspZooKeeper";
142 } else {
143 zkDirDefault = System.getProperty("user.dir") + "/" +
144 ZOOKEEPER_MANAGER_DIRECTORY.getDefaultValue();
145 }
146 zkDir = conf.get(GiraphConstants.ZOOKEEPER_DIR, zkDirDefault);
147 config = new ZookeeperConfig();
148 zkBasePort = GiraphConstants.ZOOKEEPER_SERVER_PORT.get(conf);
149
150 myHostname = conf.getLocalHostname();
151 fs = FileSystem.get(conf);
152 }
153
154
155
156
157
158
159 private String getFinalZooKeeperPath() {
160 return ZOOKEEPER_MANAGER_DIRECTORY.getDefaultValue() + "/" + jobId;
161 }
162
163
164
165
166
167
168
169
170
171 public static String getBasePath(Configuration conf) {
172 String result = conf.get(BASE_ZNODE_KEY, "");
173 if (!result.equals("") && !result.startsWith("/")) {
174 throw new IllegalArgumentException("Value for " +
175 BASE_ZNODE_KEY + " must start with /: " + result);
176 }
177
178 return result;
179 }
180
181
182
183
184
185
186
187
188 public void setup() throws IOException, InterruptedException {
189 createCandidateStamp();
190 getZooKeeperServerList();
191 }
192
193
194
195
196
197 public void createCandidateStamp() {
198 try {
199 fs.mkdirs(baseDirectory);
200 LOG.info("createCandidateStamp: Made the directory " +
201 baseDirectory);
202 } catch (IOException e) {
203 LOG.error("createCandidateStamp: Failed to mkdirs " +
204 baseDirectory);
205 }
206 try {
207 fs.mkdirs(serverDirectory);
208 LOG.info("createCandidateStamp: Made the directory " +
209 serverDirectory);
210 } catch (IOException e) {
211 LOG.error("createCandidateStamp: Failed to mkdirs " +
212 serverDirectory);
213 }
214
215 try {
216 if (!fs.getFileStatus(baseDirectory).isDir()) {
217 throw new IllegalArgumentException(
218 "createCandidateStamp: " + baseDirectory +
219 " is not a directory, but should be.");
220 }
221 } catch (IOException e) {
222 throw new IllegalArgumentException(
223 "createCandidateStamp: Couldn't get file status " +
224 "for base directory " + baseDirectory + ". If there is an " +
225 "issue with this directory, please set an accesible " +
226 "base directory with the Hadoop configuration option " +
227 ZOOKEEPER_MANAGER_DIRECTORY.getKey(), e);
228 }
229
230 Path myCandidacyPath = new Path(
231 taskDirectory, myHostname +
232 HOSTNAME_TASK_SEPARATOR + taskPartition);
233 try {
234 if (LOG.isInfoEnabled()) {
235 LOG.info("createCandidateStamp: Creating my filestamp " +
236 myCandidacyPath);
237 }
238 fs.createNewFile(myCandidacyPath);
239 } catch (IOException e) {
240 LOG.error("createCandidateStamp: Failed (maybe previous task " +
241 "failed) to create filestamp " + myCandidacyPath, e);
242 }
243 }
244
245
246
247
248
249
250
251
252
253 private static void createNewFileWithRetries(
254 FileSystem fs, Path path, int maxAttempts, int retryWaitMsecs) {
255 int attempt = 0;
256 while (attempt < maxAttempts) {
257 try {
258 fs.createNewFile(path);
259 return;
260 } catch (IOException e) {
261 LOG.warn("createNewFileWithRetries: Failed to create file at path " +
262 path + " on attempt " + attempt + " of " + maxAttempts + ".", e);
263 }
264 ++attempt;
265 Uninterruptibles.sleepUninterruptibly(
266 retryWaitMsecs, TimeUnit.MILLISECONDS);
267 }
268 throw new IllegalStateException(
269 "createNewFileWithRetries: Failed to create file at path " +
270 path + " after " + attempt + " attempts");
271 }
272
273
274
275
276
277
278 private void createZooKeeperClosedStamp() {
279 LOG.info("createZooKeeperClosedStamp: Creating my filestamp " +
280 myClosedPath);
281 createNewFileWithRetries(fs, myClosedPath,
282 conf.getHdfsFileCreationRetries(),
283 conf.getHdfsFileCreationRetryWaitMs());
284 }
285
286
287
288
289
290 public boolean computationDone() {
291 try {
292 return fs.exists(myClosedPath);
293 } catch (IOException e) {
294 throw new RuntimeException(e);
295 }
296 }
297
298
299
300
301
302
303
304
305 private void createZooKeeperServerList() throws IOException,
306 InterruptedException {
307 String host;
308 String task;
309 while (true) {
310 FileStatus [] fileStatusArray = fs.listStatus(taskDirectory);
311 if (fileStatusArray.length > 0) {
312 FileStatus fileStatus = fileStatusArray[0];
313 String[] hostnameTaskArray =
314 fileStatus.getPath().getName().split(
315 HOSTNAME_TASK_SEPARATOR);
316 checkState(hostnameTaskArray.length == 2,
317 "createZooKeeperServerList: Task 0 failed " +
318 "to parse " + fileStatus.getPath().getName());
319 host = hostnameTaskArray[0];
320 task = hostnameTaskArray[1];
321 break;
322 }
323 Thread.sleep(pollMsecs);
324 }
325 String serverListFile =
326 ZOOKEEPER_SERVER_LIST_FILE_PREFIX + host +
327 HOSTNAME_TASK_SEPARATOR + task;
328 Path serverListPath =
329 new Path(baseDirectory, serverListFile);
330 if (LOG.isInfoEnabled()) {
331 LOG.info("createZooKeeperServerList: Creating the final " +
332 "ZooKeeper file '" + serverListPath + "'");
333 }
334 fs.createNewFile(serverListPath);
335 }
336
337
338
339
340
341
342
343
344 private String getServerListFile() throws IOException {
345 String serverListFile = null;
346 FileStatus [] fileStatusArray = fs.listStatus(baseDirectory);
347 for (FileStatus fileStatus : fileStatusArray) {
348 if (fileStatus.getPath().getName().startsWith(
349 ZOOKEEPER_SERVER_LIST_FILE_PREFIX)) {
350 serverListFile = fileStatus.getPath().getName();
351 break;
352 }
353 }
354 return serverListFile;
355 }
356
357
358
359
360
361
362
363
364 private void getZooKeeperServerList() throws IOException,
365 InterruptedException {
366 String serverListFile;
367
368 if (taskPartition == 0) {
369 serverListFile = getServerListFile();
370 if (serverListFile == null) {
371 createZooKeeperServerList();
372 }
373 }
374
375 while (true) {
376 serverListFile = getServerListFile();
377 if (LOG.isInfoEnabled()) {
378 LOG.info("getZooKeeperServerList: For task " + taskPartition +
379 ", got file '" + serverListFile +
380 "' (polling period is " +
381 pollMsecs + ")");
382 }
383 if (serverListFile != null) {
384 break;
385 }
386 try {
387 Thread.sleep(pollMsecs);
388 } catch (InterruptedException e) {
389 LOG.warn("getZooKeeperServerList: Strange interrupted " +
390 "exception " + e.getMessage());
391 }
392
393 }
394
395 String[] serverHostList = serverListFile.substring(
396 ZOOKEEPER_SERVER_LIST_FILE_PREFIX.length()).split(
397 HOSTNAME_TASK_SEPARATOR);
398 if (LOG.isInfoEnabled()) {
399 LOG.info("getZooKeeperServerList: Found " +
400 Arrays.toString(serverHostList) +
401 " hosts in filename '" + serverListFile + "'");
402 }
403
404 zkServerHost = serverHostList[0];
405 zkServerTask = Integer.parseInt(serverHostList[1]);
406 updateZkPortString();
407 }
408
409
410
411
412 private void updateZkPortString() {
413 zkServerPortString = zkServerHost + ":" + zkBasePort;
414 }
415
416
417
418
419
420 public String getZooKeeperServerPortString() {
421 return zkServerPortString;
422 }
423
424
425
426
427
428
429 private void generateZooKeeperConfig() {
430 if (LOG.isInfoEnabled()) {
431 LOG.info("generateZooKeeperConfig: with base port " +
432 zkBasePort);
433 }
434 File zkDirFile = new File(this.zkDir);
435 boolean mkDirRet = zkDirFile.mkdirs();
436 if (LOG.isInfoEnabled()) {
437 LOG.info("generateZooKeeperConfigFile: Make directory of " +
438 zkDirFile.getName() + " = " + mkDirRet);
439 }
440
441 System.setProperty("zookeeper.snapCount",
442 Integer.toString(GiraphConstants.DEFAULT_ZOOKEEPER_SNAP_COUNT));
443 System.setProperty("zookeeper.forceSync",
444 GiraphConstants.ZOOKEEPER_FORCE_SYNC.get(conf) ? "yes" : "no");
445 System.setProperty("zookeeper.skipACL",
446 GiraphConstants.ZOOKEEPER_SKIP_ACL.get(conf) ? "yes" : "no");
447
448 config.setDataDir(zkDir);
449 config.setDataLogDir(zkDir);
450 config.setClientPortAddress(new InetSocketAddress(zkBasePort));
451 config.setMinSessionTimeout(conf.getZooKeeperMinSessionTimeout());
452 config.setMaxSessionTimeout(conf.getZooKeeperMaxSessionTimeout());
453
454 }
455
456
457
458
459
460
461
462 public void onlineZooKeeperServer() throws IOException {
463 if (zkServerTask == taskPartition) {
464 File zkDirFile = new File(this.zkDir);
465 try {
466 if (LOG.isInfoEnabled()) {
467 LOG.info("onlineZooKeeperServers: Trying to delete old " +
468 "directory " + this.zkDir);
469 }
470 FileUtils.deleteDirectory(zkDirFile);
471 } catch (IOException e) {
472 LOG.warn("onlineZooKeeperServers: Failed to delete " +
473 "directory " + this.zkDir, e);
474 }
475 generateZooKeeperConfig();
476 synchronized (this) {
477 zkRunner = createRunner();
478 int port = zkRunner.start(zkDir, config);
479 if (port > 0) {
480 zkBasePort = port;
481 updateZkPortString();
482 }
483 }
484
485
486
487 int connectAttempts = 0;
488 final int maxConnectAttempts =
489 conf.getZookeeperConnectionAttempts();
490 while (connectAttempts < maxConnectAttempts) {
491 try {
492 if (LOG.isInfoEnabled()) {
493 LOG.info("onlineZooKeeperServers: Connect attempt " +
494 connectAttempts + " of " +
495 maxConnectAttempts +
496 " max trying to connect to " +
497 myHostname + ":" + zkBasePort +
498 " with poll msecs = " + pollMsecs);
499 }
500 InetSocketAddress zkServerAddress =
501 new InetSocketAddress(myHostname, zkBasePort);
502 Socket testServerSock = new Socket();
503 testServerSock.connect(zkServerAddress, 5000);
504 if (LOG.isInfoEnabled()) {
505 LOG.info("onlineZooKeeperServers: Connected to " +
506 zkServerAddress + "!");
507 }
508 break;
509 } catch (SocketTimeoutException e) {
510 LOG.warn("onlineZooKeeperServers: Got " +
511 "SocketTimeoutException", e);
512 } catch (ConnectException e) {
513 LOG.warn("onlineZooKeeperServers: Got " +
514 "ConnectException", e);
515 } catch (IOException e) {
516 LOG.warn("onlineZooKeeperServers: Got " +
517 "IOException", e);
518 }
519
520 ++connectAttempts;
521 try {
522 Thread.sleep(pollMsecs);
523 } catch (InterruptedException e) {
524 LOG.warn("onlineZooKeeperServers: Sleep of " + pollMsecs +
525 " interrupted - " + e.getMessage());
526 }
527 }
528 if (connectAttempts == maxConnectAttempts) {
529 throw new IllegalStateException(
530 "onlineZooKeeperServers: Failed to connect in " +
531 connectAttempts + " tries!");
532 }
533 Path myReadyPath = new Path(
534 serverDirectory, myHostname +
535 HOSTNAME_TASK_SEPARATOR + taskPartition +
536 HOSTNAME_TASK_SEPARATOR + zkBasePort);
537 try {
538 if (LOG.isInfoEnabled()) {
539 LOG.info("onlineZooKeeperServers: Creating my filestamp " +
540 myReadyPath);
541 }
542 fs.createNewFile(myReadyPath);
543 } catch (IOException e) {
544 LOG.error("onlineZooKeeperServers: Failed (maybe previous " +
545 "task failed) to create filestamp " + myReadyPath, e);
546 }
547 } else {
548 int readyRetrievalAttempt = 0;
549 String foundServer = null;
550 while (true) {
551 try {
552 FileStatus [] fileStatusArray =
553 fs.listStatus(serverDirectory);
554 if ((fileStatusArray != null) &&
555 (fileStatusArray.length > 0)) {
556 for (int i = 0; i < fileStatusArray.length; ++i) {
557 String[] hostnameTaskArray =
558 fileStatusArray[i].getPath().getName().split(
559 HOSTNAME_TASK_SEPARATOR);
560 if (hostnameTaskArray.length != 3) {
561 throw new RuntimeException(
562 "getZooKeeperServerList: Task 0 failed " +
563 "to parse " +
564 fileStatusArray[i].getPath().getName());
565 }
566 foundServer = hostnameTaskArray[0];
567 zkBasePort = Integer.parseInt(hostnameTaskArray[2]);
568 updateZkPortString();
569 }
570 if (LOG.isInfoEnabled()) {
571 LOG.info("onlineZooKeeperServers: Got " +
572 foundServer + " on port " +
573 zkBasePort +
574 " (polling period is " +
575 pollMsecs + ") on attempt " +
576 readyRetrievalAttempt);
577 }
578 if (zkServerHost.equals(foundServer)) {
579 break;
580 }
581 } else {
582 if (LOG.isInfoEnabled()) {
583 LOG.info("onlineZooKeeperServers: Empty " +
584 "directory " + serverDirectory +
585 ", waiting " + pollMsecs + " msecs.");
586 }
587 }
588 Thread.sleep(pollMsecs);
589 ++readyRetrievalAttempt;
590 } catch (IOException e) {
591 throw new RuntimeException(e);
592 } catch (InterruptedException e) {
593 LOG.warn("onlineZooKeeperServers: Strange interrupt from " +
594 e.getMessage(), e);
595 }
596 }
597 }
598 }
599
600
601
602
603
604
605
606
607 private void waitUntilAllTasksDone(int totalWorkers) {
608 int attempt = 0;
609 long maxMs = time.getMilliseconds() +
610 conf.getWaitTaskDoneTimeoutMs();
611 while (true) {
612 boolean[] taskDoneArray = new boolean[totalWorkers];
613 try {
614 FileStatus [] fileStatusArray =
615 fs.listStatus(taskDirectory);
616 int totalDone = 0;
617 if (fileStatusArray.length > 0) {
618 for (FileStatus fileStatus : fileStatusArray) {
619 String name = fileStatus.getPath().getName();
620 if (ComputationDoneName.isName(name)) {
621 ++totalDone;
622 taskDoneArray[ComputationDoneName.fromName(
623 name).getWorkerId()] = true;
624 }
625 }
626 }
627 if (LOG.isInfoEnabled()) {
628 LOG.info("waitUntilAllTasksDone: Got " + totalDone +
629 " and " + totalWorkers +
630 " desired (polling period is " +
631 pollMsecs + ") on attempt " +
632 attempt);
633 }
634 if (totalDone >= totalWorkers) {
635 break;
636 } else {
637 StringBuilder sb = new StringBuilder();
638 for (int i = 0; i < taskDoneArray.length; ++i) {
639 if (!taskDoneArray[i]) {
640 sb.append(i).append(", ");
641 }
642 }
643 LOG.info("waitUntilAllTasksDone: Still waiting on tasks " +
644 sb.toString());
645 }
646 ++attempt;
647 Thread.sleep(pollMsecs);
648 context.progress();
649 } catch (IOException e) {
650 LOG.warn("waitUntilAllTasksDone: Got IOException.", e);
651 } catch (InterruptedException e) {
652 LOG.warn("waitUntilAllTasksDone: Got InterruptedException", e);
653 }
654
655 if (time.getMilliseconds() > maxMs) {
656 throw new IllegalStateException("waitUntilAllTasksDone: Tasks " +
657 "did not finish by the maximum time of " +
658 conf.getWaitTaskDoneTimeoutMs() + " milliseconds");
659 }
660 }
661 }
662
663
664
665
666
667
668
669
670
671 public void offlineZooKeeperServers(State state) {
672 if (state == State.FINISHED) {
673 createZooKeeperClosedStamp();
674 }
675 synchronized (this) {
676 if (zkRunner != null) {
677 boolean isYarnJob = GiraphConstants.IS_PURE_YARN_JOB.get(conf);
678 int totalWorkers = conf.getMapTasks();
679
680 if (isYarnJob) {
681 totalWorkers = conf.getInt(GiraphConstants.MAX_WORKERS, 0) + 1;
682 }
683 LOG.info("offlineZooKeeperServers: Will wait for " +
684 totalWorkers + " tasks");
685 waitUntilAllTasksDone(totalWorkers);
686 zkRunner.stop();
687 File zkDirFile;
688 try {
689 zkDirFile = new File(zkDir);
690 FileUtils.deleteDirectory(zkDirFile);
691 } catch (IOException e) {
692 LOG.warn("offlineZooKeeperSevers: " +
693 "IOException, but continuing",
694 e);
695 }
696 if (LOG.isInfoEnabled()) {
697 LOG.info("offlineZooKeeperServers: deleted directory " + zkDir);
698 }
699 zkRunner = null;
700 }
701 }
702 }
703
704
705
706
707
708
709
710
711 private ZooKeeperRunner createRunner() {
712 ZooKeeperRunner runner = new InProcessZooKeeperRunner();
713 runner.setConf(conf);
714 return runner;
715 }
716
717
718
719
720
721
722
723 public boolean runsZooKeeper() {
724 synchronized (this) {
725 return zkRunner != null;
726 }
727 }
728
729
730
731
732
733 public void cleanupOnExit() {
734 try {
735 fs.deleteOnExit(baseDirectory);
736 } catch (IOException e) {
737 LOG.error("cleanupOnExit: Failed to delete on exit " + baseDirectory);
738 }
739 }
740
741
742
743
744 public void cleanup() {
745 synchronized (this) {
746 if (zkRunner != null) {
747 zkRunner.cleanup();
748 }
749 }
750 }
751 }