View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.zk;
20  
21  import com.google.common.util.concurrent.Uninterruptibles;
22  import org.apache.commons.io.FileUtils;
23  import org.apache.giraph.conf.GiraphConstants;
24  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
25  import org.apache.giraph.time.SystemTime;
26  import org.apache.giraph.time.Time;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.fs.FileStatus;
29  import org.apache.hadoop.fs.FileSystem;
30  import org.apache.hadoop.fs.Path;
31  import org.apache.hadoop.mapreduce.Mapper;
32  import org.apache.log4j.Logger;
33  
34  import java.io.File;
35  import java.io.IOException;
36  import java.net.ConnectException;
37  import java.net.InetSocketAddress;
38  import java.net.Socket;
39  import java.net.SocketTimeoutException;
40  import java.util.Arrays;
41  import java.util.concurrent.TimeUnit;
42  
43  import static com.google.common.base.Preconditions.checkState;
44  import static org.apache.giraph.conf.GiraphConstants.BASE_ZNODE_KEY;
45  import static org.apache.giraph.conf.GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY;
46  
47  
48  /**
49   * Manages the election of ZooKeeper servers, starting/stopping the services,
50   * etc.
51   */
52  public class ZooKeeperManager {
53    /** Class logger */
54    private static final Logger LOG = Logger.getLogger(ZooKeeperManager.class);
55    /** Separates the hostname and task in the candidate stamp */
56    private static final String HOSTNAME_TASK_SEPARATOR = " ";
57    /** The ZooKeeperString filename prefix */
58    private static final String ZOOKEEPER_SERVER_LIST_FILE_PREFIX =
59        "zkServerList_";
60    /** Job context (mainly for progress) */
61    private Mapper<?, ?, ?, ?>.Context context;
62    /** Hadoop configuration */
63    private final ImmutableClassesGiraphConfiguration conf;
64    /** Task partition, to ensure uniqueness */
65    private final int taskPartition;
66    /** HDFS base directory for all file-based coordination */
67    private final Path baseDirectory;
68    /**
69     * HDFS task ZooKeeper candidate/completed
70     * directory for all file-based coordination
71     */
72    private final Path taskDirectory;
73    /**
74     * HDFS ZooKeeper server ready/done directory
75     * for all file-based coordination
76     */
77    private final Path serverDirectory;
78    /** HDFS path to whether the task is done */
79    private final Path myClosedPath;
80    /** Polling msecs timeout */
81    private final int pollMsecs;
82    /** File system */
83    private final FileSystem fs;
84    /** Zookeeper wrapper */
85    private ZooKeeperRunner zkRunner;
86    /** ZooKeeper local file system directory */
87    private final String zkDir;
88    /** ZooKeeper config file path */
89    private final ZookeeperConfig config;
90    /** ZooKeeper server host */
91    private String zkServerHost;
92    /** ZooKeeper server task */
93    private int zkServerTask;
94    /** ZooKeeper base port */
95    private int zkBasePort;
96    /** Final ZooKeeper server port list (for clients) */
97    private String zkServerPortString;
98    /** My hostname */
99    private String myHostname = null;
100   /** Job id, to ensure uniqueness */
101   private final String jobId;
102   /** Time object for tracking timeouts */
103   private final Time time = SystemTime.get();
104 
105   /** State of the application */
106   public enum State {
107     /** Failure occurred */
108     FAILED,
109     /** Application finished */
110     FINISHED
111   }
112 
113   /**
114    * Constructor with context.
115    *
116    * @param context Context to be stored internally
117    * @param configuration Configuration
118    * @throws IOException
119    */
120   public ZooKeeperManager(Mapper<?, ?, ?, ?>.Context context,
121                           ImmutableClassesGiraphConfiguration configuration)
122     throws IOException {
123     this.context = context;
124     this.conf = configuration;
125     taskPartition = conf.getTaskPartition();
126     jobId = conf.getJobId();
127     baseDirectory =
128         new Path(ZOOKEEPER_MANAGER_DIRECTORY.getWithDefault(conf,
129             getFinalZooKeeperPath()));
130     taskDirectory = new Path(baseDirectory,
131         "_task");
132     serverDirectory = new Path(baseDirectory,
133         "_zkServer");
134     myClosedPath = new Path(taskDirectory,
135         (new ComputationDoneName(taskPartition)).getName());
136     pollMsecs = GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS.get(conf);
137     String jobLocalDir = conf.get("job.local.dir");
138     String zkDirDefault;
139     if (jobLocalDir != null) { // for non-local jobs
140       zkDirDefault = jobLocalDir +
141           "/_bspZooKeeper";
142     } else {
143       zkDirDefault = System.getProperty("user.dir") + "/" +
144               ZOOKEEPER_MANAGER_DIRECTORY.getDefaultValue();
145     }
146     zkDir = conf.get(GiraphConstants.ZOOKEEPER_DIR, zkDirDefault);
147     config = new ZookeeperConfig();
148     zkBasePort = GiraphConstants.ZOOKEEPER_SERVER_PORT.get(conf);
149 
150     myHostname = conf.getLocalHostname();
151     fs = FileSystem.get(conf);
152   }
153 
154   /**
155    * Generate the final ZooKeeper coordination directory on HDFS
156    *
157    * @return directory path with job id
158    */
159   private String getFinalZooKeeperPath() {
160     return ZOOKEEPER_MANAGER_DIRECTORY.getDefaultValue() + "/" + jobId;
161   }
162 
163   /**
164    * Return the base ZooKeeper ZNode from which all other ZNodes Giraph creates
165    * should be sited, for instance in a multi-tenant ZooKeeper, the znode
166    * reserved for Giraph
167    *
168    * @param conf  Necessary to access user-provided values
169    * @return  String of path without trailing slash
170    */
171   public static String getBasePath(Configuration conf) {
172     String result = conf.get(BASE_ZNODE_KEY, "");
173     if (!result.equals("") && !result.startsWith("/")) {
174       throw new IllegalArgumentException("Value for " +
175           BASE_ZNODE_KEY + " must start with /: " + result);
176     }
177 
178     return result;
179   }
180 
181   /**
182    * Create the candidate stamps and decide on the servers to start if
183    * you are partition 0.
184    *
185    * @throws IOException
186    * @throws InterruptedException
187    */
188   public void setup() throws IOException, InterruptedException {
189     createCandidateStamp();
190     getZooKeeperServerList();
191   }
192 
193   /**
194    * Create a HDFS stamp for this task.  If another task already
195    * created it, then this one will fail, which is fine.
196    */
197   public void createCandidateStamp() {
198     try {
199       fs.mkdirs(baseDirectory);
200       LOG.info("createCandidateStamp: Made the directory " +
201           baseDirectory);
202     } catch (IOException e) {
203       LOG.error("createCandidateStamp: Failed to mkdirs " +
204           baseDirectory);
205     }
206     try {
207       fs.mkdirs(serverDirectory);
208       LOG.info("createCandidateStamp: Made the directory " +
209           serverDirectory);
210     } catch (IOException e) {
211       LOG.error("createCandidateStamp: Failed to mkdirs " +
212           serverDirectory);
213     }
214     // Check that the base directory exists and is a directory
215     try {
216       if (!fs.getFileStatus(baseDirectory).isDir()) {
217         throw new IllegalArgumentException(
218             "createCandidateStamp: " + baseDirectory +
219             " is not a directory, but should be.");
220       }
221     } catch (IOException e) {
222       throw new IllegalArgumentException(
223           "createCandidateStamp: Couldn't get file status " +
224               "for base directory " + baseDirectory + ".  If there is an " +
225               "issue with this directory, please set an accesible " +
226               "base directory with the Hadoop configuration option " +
227               ZOOKEEPER_MANAGER_DIRECTORY.getKey(), e);
228     }
229 
230     Path myCandidacyPath = new Path(
231         taskDirectory, myHostname +
232         HOSTNAME_TASK_SEPARATOR + taskPartition);
233     try {
234       if (LOG.isInfoEnabled()) {
235         LOG.info("createCandidateStamp: Creating my filestamp " +
236             myCandidacyPath);
237       }
238       fs.createNewFile(myCandidacyPath);
239     } catch (IOException e) {
240       LOG.error("createCandidateStamp: Failed (maybe previous task " +
241           "failed) to create filestamp " + myCandidacyPath, e);
242     }
243   }
244 
245   /**
246    * Create a new file with retries if it fails.
247    *
248    * @param fs File system where the new file is created
249    * @param path Path of the new file
250    * @param maxAttempts Maximum number of attempts
251    * @param retryWaitMsecs Milliseconds to wait before retrying
252    */
253   private static void createNewFileWithRetries(
254       FileSystem fs, Path path, int maxAttempts, int retryWaitMsecs) {
255     int attempt = 0;
256     while (attempt < maxAttempts) {
257       try {
258         fs.createNewFile(path);
259         return;
260       } catch (IOException e) {
261         LOG.warn("createNewFileWithRetries: Failed to create file at path " +
262             path + " on attempt " + attempt + " of " + maxAttempts + ".", e);
263       }
264       ++attempt;
265       Uninterruptibles.sleepUninterruptibly(
266           retryWaitMsecs, TimeUnit.MILLISECONDS);
267     }
268     throw new IllegalStateException(
269         "createNewFileWithRetries: Failed to create file at path " +
270             path + " after " + attempt + " attempts");
271   }
272 
273   /**
274    * Every task must create a stamp to let the ZooKeeper servers know that
275    * they can shutdown.  This also lets the task know that it was already
276    * completed.
277    */
278   private void createZooKeeperClosedStamp() {
279     LOG.info("createZooKeeperClosedStamp: Creating my filestamp " +
280         myClosedPath);
281     createNewFileWithRetries(fs, myClosedPath,
282         conf.getHdfsFileCreationRetries(),
283         conf.getHdfsFileCreationRetryWaitMs());
284   }
285 
286   /**
287    * Check if all the computation is done.
288    * @return true if all computation is done.
289    */
290   public boolean computationDone() {
291     try {
292       return fs.exists(myClosedPath);
293     } catch (IOException e) {
294       throw new RuntimeException(e);
295     }
296   }
297 
298   /**
299    * Task 0 will call this to create the ZooKeeper server list.  The result is
300    * a file that describes the ZooKeeper servers through the filename.
301    *
302    * @throws IOException
303    * @throws InterruptedException
304    */
305   private void createZooKeeperServerList() throws IOException,
306       InterruptedException {
307     String host;
308     String task;
309     while (true) {
310       FileStatus [] fileStatusArray = fs.listStatus(taskDirectory);
311       if (fileStatusArray.length > 0) {
312         FileStatus fileStatus = fileStatusArray[0];
313         String[] hostnameTaskArray =
314             fileStatus.getPath().getName().split(
315                 HOSTNAME_TASK_SEPARATOR);
316         checkState(hostnameTaskArray.length == 2,
317             "createZooKeeperServerList: Task 0 failed " +
318             "to parse " + fileStatus.getPath().getName());
319         host = hostnameTaskArray[0];
320         task = hostnameTaskArray[1];
321         break;
322       }
323       Thread.sleep(pollMsecs);
324     }
325     String serverListFile =
326         ZOOKEEPER_SERVER_LIST_FILE_PREFIX + host +
327         HOSTNAME_TASK_SEPARATOR + task;
328     Path serverListPath =
329         new Path(baseDirectory, serverListFile);
330     if (LOG.isInfoEnabled()) {
331       LOG.info("createZooKeeperServerList: Creating the final " +
332           "ZooKeeper file '" + serverListPath + "'");
333     }
334     fs.createNewFile(serverListPath);
335   }
336 
337   /**
338    * Make an attempt to get the server list file by looking for a file in
339    * the appropriate directory with the prefix
340    * ZOOKEEPER_SERVER_LIST_FILE_PREFIX.
341    * @return null if not found or the filename if found
342    * @throws IOException
343    */
344   private String getServerListFile() throws IOException {
345     String serverListFile = null;
346     FileStatus [] fileStatusArray = fs.listStatus(baseDirectory);
347     for (FileStatus fileStatus : fileStatusArray) {
348       if (fileStatus.getPath().getName().startsWith(
349           ZOOKEEPER_SERVER_LIST_FILE_PREFIX)) {
350         serverListFile = fileStatus.getPath().getName();
351         break;
352       }
353     }
354     return serverListFile;
355   }
356 
357   /**
358    * Task 0 is the designated master and will generate the server list
359    * (unless it has already done so).  Other
360    * tasks will consume the file after it is created (just the filename).
361    * @throws IOException
362    * @throws InterruptedException
363    */
364   private void getZooKeeperServerList() throws IOException,
365       InterruptedException {
366     String serverListFile;
367 
368     if (taskPartition == 0) {
369       serverListFile = getServerListFile();
370       if (serverListFile == null) {
371         createZooKeeperServerList();
372       }
373     }
374 
375     while (true) {
376       serverListFile = getServerListFile();
377       if (LOG.isInfoEnabled()) {
378         LOG.info("getZooKeeperServerList: For task " + taskPartition +
379             ", got file '" + serverListFile +
380             "' (polling period is " +
381             pollMsecs + ")");
382       }
383       if (serverListFile != null) {
384         break;
385       }
386       try {
387         Thread.sleep(pollMsecs);
388       } catch (InterruptedException e) {
389         LOG.warn("getZooKeeperServerList: Strange interrupted " +
390             "exception " + e.getMessage());
391       }
392 
393     }
394 
395     String[] serverHostList = serverListFile.substring(
396         ZOOKEEPER_SERVER_LIST_FILE_PREFIX.length()).split(
397             HOSTNAME_TASK_SEPARATOR);
398     if (LOG.isInfoEnabled()) {
399       LOG.info("getZooKeeperServerList: Found " +
400           Arrays.toString(serverHostList) +
401           " hosts in filename '" + serverListFile + "'");
402     }
403 
404     zkServerHost = serverHostList[0];
405     zkServerTask = Integer.parseInt(serverHostList[1]);
406     updateZkPortString();
407   }
408 
409   /**
410    * Update zookeeper host:port string.
411    */
412   private void updateZkPortString() {
413     zkServerPortString = zkServerHost + ":" + zkBasePort;
414   }
415 
416   /**
417    * Users can get the server port string to connect to ZooKeeper
418    * @return server port string - comma separated
419    */
420   public String getZooKeeperServerPortString() {
421     return zkServerPortString;
422   }
423 
424   /**
425    * Whoever is elected to be a ZooKeeper server must generate a config file
426    * locally.
427    *
428    */
429   private void generateZooKeeperConfig() {
430     if (LOG.isInfoEnabled()) {
431       LOG.info("generateZooKeeperConfig: with base port " +
432           zkBasePort);
433     }
434     File zkDirFile = new File(this.zkDir);
435     boolean mkDirRet = zkDirFile.mkdirs();
436     if (LOG.isInfoEnabled()) {
437       LOG.info("generateZooKeeperConfigFile: Make directory of " +
438           zkDirFile.getName() + " = " + mkDirRet);
439     }
440     /** Set zookeeper system properties */
441     System.setProperty("zookeeper.snapCount",
442         Integer.toString(GiraphConstants.DEFAULT_ZOOKEEPER_SNAP_COUNT));
443     System.setProperty("zookeeper.forceSync",
444         GiraphConstants.ZOOKEEPER_FORCE_SYNC.get(conf) ? "yes" : "no");
445     System.setProperty("zookeeper.skipACL",
446         GiraphConstants.ZOOKEEPER_SKIP_ACL.get(conf) ? "yes" : "no");
447 
448     config.setDataDir(zkDir);
449     config.setDataLogDir(zkDir);
450     config.setClientPortAddress(new InetSocketAddress(zkBasePort));
451     config.setMinSessionTimeout(conf.getZooKeeperMinSessionTimeout());
452     config.setMaxSessionTimeout(conf.getZooKeeperMaxSessionTimeout());
453 
454   }
455 
456   /**
457    * If this task has been selected, online a ZooKeeper server.  Otherwise,
458    * wait until this task knows that the ZooKeeper servers have been onlined.
459    *
460    * @throws IOException
461    */
462   public void onlineZooKeeperServer() throws IOException {
463     if (zkServerTask == taskPartition) {
464       File zkDirFile = new File(this.zkDir);
465       try {
466         if (LOG.isInfoEnabled()) {
467           LOG.info("onlineZooKeeperServers: Trying to delete old " +
468               "directory " + this.zkDir);
469         }
470         FileUtils.deleteDirectory(zkDirFile);
471       } catch (IOException e) {
472         LOG.warn("onlineZooKeeperServers: Failed to delete " +
473             "directory " + this.zkDir, e);
474       }
475       generateZooKeeperConfig();
476       synchronized (this) {
477         zkRunner = createRunner();
478         int port = zkRunner.start(zkDir, config);
479         if (port > 0) {
480           zkBasePort = port;
481           updateZkPortString();
482         }
483       }
484 
485       // Once the server is up and running, notify that this server is up
486       // and running by dropping a ready stamp.
487       int connectAttempts = 0;
488       final int maxConnectAttempts =
489           conf.getZookeeperConnectionAttempts();
490       while (connectAttempts < maxConnectAttempts) {
491         try {
492           if (LOG.isInfoEnabled()) {
493             LOG.info("onlineZooKeeperServers: Connect attempt " +
494                 connectAttempts + " of " +
495                 maxConnectAttempts +
496                 " max trying to connect to " +
497                 myHostname + ":" + zkBasePort +
498                 " with poll msecs = " + pollMsecs);
499           }
500           InetSocketAddress zkServerAddress =
501               new InetSocketAddress(myHostname, zkBasePort);
502           Socket testServerSock = new Socket();
503           testServerSock.connect(zkServerAddress, 5000);
504           if (LOG.isInfoEnabled()) {
505             LOG.info("onlineZooKeeperServers: Connected to " +
506                 zkServerAddress + "!");
507           }
508           break;
509         } catch (SocketTimeoutException e) {
510           LOG.warn("onlineZooKeeperServers: Got " +
511               "SocketTimeoutException", e);
512         } catch (ConnectException e) {
513           LOG.warn("onlineZooKeeperServers: Got " +
514               "ConnectException", e);
515         } catch (IOException e) {
516           LOG.warn("onlineZooKeeperServers: Got " +
517               "IOException", e);
518         }
519 
520         ++connectAttempts;
521         try {
522           Thread.sleep(pollMsecs);
523         } catch (InterruptedException e) {
524           LOG.warn("onlineZooKeeperServers: Sleep of " + pollMsecs +
525               " interrupted - " + e.getMessage());
526         }
527       }
528       if (connectAttempts == maxConnectAttempts) {
529         throw new IllegalStateException(
530             "onlineZooKeeperServers: Failed to connect in " +
531                 connectAttempts + " tries!");
532       }
533       Path myReadyPath = new Path(
534           serverDirectory, myHostname +
535           HOSTNAME_TASK_SEPARATOR + taskPartition +
536           HOSTNAME_TASK_SEPARATOR + zkBasePort);
537       try {
538         if (LOG.isInfoEnabled()) {
539           LOG.info("onlineZooKeeperServers: Creating my filestamp " +
540               myReadyPath);
541         }
542         fs.createNewFile(myReadyPath);
543       } catch (IOException e) {
544         LOG.error("onlineZooKeeperServers: Failed (maybe previous " +
545             "task failed) to create filestamp " + myReadyPath, e);
546       }
547     } else {
548       int readyRetrievalAttempt = 0;
549       String foundServer = null;
550       while (true) {
551         try {
552           FileStatus [] fileStatusArray =
553               fs.listStatus(serverDirectory);
554           if ((fileStatusArray != null) &&
555               (fileStatusArray.length > 0)) {
556             for (int i = 0; i < fileStatusArray.length; ++i) {
557               String[] hostnameTaskArray =
558                   fileStatusArray[i].getPath().getName().split(
559                       HOSTNAME_TASK_SEPARATOR);
560               if (hostnameTaskArray.length != 3) {
561                 throw new RuntimeException(
562                     "getZooKeeperServerList: Task 0 failed " +
563                         "to parse " +
564                         fileStatusArray[i].getPath().getName());
565               }
566               foundServer = hostnameTaskArray[0];
567               zkBasePort = Integer.parseInt(hostnameTaskArray[2]);
568               updateZkPortString();
569             }
570             if (LOG.isInfoEnabled()) {
571               LOG.info("onlineZooKeeperServers: Got " +
572                   foundServer + " on port " +
573                   zkBasePort +
574                   " (polling period is " +
575                   pollMsecs + ") on attempt " +
576                   readyRetrievalAttempt);
577             }
578             if (zkServerHost.equals(foundServer)) {
579               break;
580             }
581           } else {
582             if (LOG.isInfoEnabled()) {
583               LOG.info("onlineZooKeeperServers: Empty " +
584                   "directory " + serverDirectory +
585                   ", waiting " + pollMsecs + " msecs.");
586             }
587           }
588           Thread.sleep(pollMsecs);
589           ++readyRetrievalAttempt;
590         } catch (IOException e) {
591           throw new RuntimeException(e);
592         } catch (InterruptedException e) {
593           LOG.warn("onlineZooKeeperServers: Strange interrupt from " +
594               e.getMessage(), e);
595         }
596       }
597     }
598   }
599 
600   /**
601    * Wait for all workers to signal completion.  Will wait up to
602    * WAIT_TASK_DONE_TIMEOUT_MS milliseconds for this to complete before
603    * reporting an error.
604    *
605    * @param totalWorkers Number of workers to wait for
606    */
607   private void waitUntilAllTasksDone(int totalWorkers) {
608     int attempt = 0;
609     long maxMs = time.getMilliseconds() +
610         conf.getWaitTaskDoneTimeoutMs();
611     while (true) {
612       boolean[] taskDoneArray = new boolean[totalWorkers];
613       try {
614         FileStatus [] fileStatusArray =
615             fs.listStatus(taskDirectory);
616         int totalDone = 0;
617         if (fileStatusArray.length > 0) {
618           for (FileStatus fileStatus : fileStatusArray) {
619             String name = fileStatus.getPath().getName();
620             if (ComputationDoneName.isName(name)) {
621               ++totalDone;
622               taskDoneArray[ComputationDoneName.fromName(
623                   name).getWorkerId()] = true;
624             }
625           }
626         }
627         if (LOG.isInfoEnabled()) {
628           LOG.info("waitUntilAllTasksDone: Got " + totalDone +
629               " and " + totalWorkers +
630               " desired (polling period is " +
631               pollMsecs + ") on attempt " +
632               attempt);
633         }
634         if (totalDone >= totalWorkers) {
635           break;
636         } else {
637           StringBuilder sb = new StringBuilder();
638           for (int i = 0; i < taskDoneArray.length; ++i) {
639             if (!taskDoneArray[i]) {
640               sb.append(i).append(", ");
641             }
642           }
643           LOG.info("waitUntilAllTasksDone: Still waiting on tasks " +
644               sb.toString());
645         }
646         ++attempt;
647         Thread.sleep(pollMsecs);
648         context.progress();
649       } catch (IOException e) {
650         LOG.warn("waitUntilAllTasksDone: Got IOException.", e);
651       } catch (InterruptedException e) {
652         LOG.warn("waitUntilAllTasksDone: Got InterruptedException", e);
653       }
654 
655       if (time.getMilliseconds() > maxMs) {
656         throw new IllegalStateException("waitUntilAllTasksDone: Tasks " +
657             "did not finish by the maximum time of " +
658             conf.getWaitTaskDoneTimeoutMs() + " milliseconds");
659       }
660     }
661   }
662 
663   /**
664    * Notify the ZooKeeper servers that this partition is done with all
665    * ZooKeeper communication.  If this task is running a ZooKeeper server,
666    * kill it when all partitions are done and wait for
667    * completion.  Clean up the ZooKeeper local directory as well.
668    *
669    * @param state State of the application
670    */
671   public void offlineZooKeeperServers(State state) {
672     if (state == State.FINISHED) {
673       createZooKeeperClosedStamp();
674     }
675     synchronized (this) {
676       if (zkRunner != null) {
677         boolean isYarnJob = GiraphConstants.IS_PURE_YARN_JOB.get(conf);
678         int totalWorkers = conf.getMapTasks();
679         // A Yarn job always spawns MAX_WORKERS + 1 containers
680         if (isYarnJob) {
681           totalWorkers = conf.getInt(GiraphConstants.MAX_WORKERS, 0) + 1;
682         }
683         LOG.info("offlineZooKeeperServers: Will wait for " +
684             totalWorkers + " tasks");
685         waitUntilAllTasksDone(totalWorkers);
686         zkRunner.stop();
687         File zkDirFile;
688         try {
689           zkDirFile = new File(zkDir);
690           FileUtils.deleteDirectory(zkDirFile);
691         } catch (IOException e) {
692           LOG.warn("offlineZooKeeperSevers: " +
693                   "IOException, but continuing",
694               e);
695         }
696         if (LOG.isInfoEnabled()) {
697           LOG.info("offlineZooKeeperServers: deleted directory " + zkDir);
698         }
699         zkRunner = null;
700       }
701     }
702   }
703 
704   /**
705    * Create appropriate zookeeper wrapper depending on configuration.
706    * Zookeeper can run in master process or outside as a separate
707    * java process.
708    *
709    * @return either in process or out of process wrapper.
710    */
711   private ZooKeeperRunner createRunner() {
712     ZooKeeperRunner runner = new InProcessZooKeeperRunner();
713     runner.setConf(conf);
714     return runner;
715   }
716 
717   /**
718    *  Is this task running a ZooKeeper server?  Only could be true if called
719    *  after onlineZooKeeperServers().
720    *
721    *  @return true if running a ZooKeeper server, false otherwise
722    */
723   public boolean runsZooKeeper() {
724     synchronized (this) {
725       return zkRunner != null;
726     }
727   }
728 
729   /**
730    * Mark files zookeeper creates in hdfs to be deleted on exit.
731    * To be called on master, since it's the last one who finishes.
732    */
733   public void cleanupOnExit() {
734     try {
735       fs.deleteOnExit(baseDirectory);
736     } catch (IOException e) {
737       LOG.error("cleanupOnExit: Failed to delete on exit " + baseDirectory);
738     }
739   }
740 
741   /**
742    * Do necessary cleanup in zookeeper wrapper.
743    */
744   public void cleanup() {
745     synchronized (this) {
746       if (zkRunner != null) {
747         zkRunner.cleanup();
748       }
749     }
750   }
751 }