View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.zk;
20  
21  import com.google.common.util.concurrent.Uninterruptibles;
22  import org.apache.commons.io.FileUtils;
23  import org.apache.giraph.conf.GiraphConstants;
24  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
25  import org.apache.giraph.time.SystemTime;
26  import org.apache.giraph.time.Time;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.fs.FileStatus;
29  import org.apache.hadoop.fs.FileSystem;
30  import org.apache.hadoop.fs.Path;
31  import org.apache.hadoop.mapreduce.Mapper;
32  import org.apache.log4j.Logger;
33  
34  import java.io.File;
35  import java.io.IOException;
36  import java.net.ConnectException;
37  import java.net.InetSocketAddress;
38  import java.net.Socket;
39  import java.net.SocketTimeoutException;
40  import java.util.Arrays;
41  import java.util.concurrent.TimeUnit;
42  
43  import static com.google.common.base.Preconditions.checkState;
44  import static org.apache.giraph.conf.GiraphConstants.BASE_ZNODE_KEY;
45  import static org.apache.giraph.conf.GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY;
46  
47  
48  /**
49   * Manages the election of ZooKeeper servers, starting/stopping the services,
50   * etc.
51   */
52  public class ZooKeeperManager {
53    /** Class logger */
54    private static final Logger LOG = Logger.getLogger(ZooKeeperManager.class);
55    /** Separates the hostname and task in the candidate stamp */
56    private static final String HOSTNAME_TASK_SEPARATOR = " ";
57    /** The ZooKeeperString filename prefix */
58    private static final String ZOOKEEPER_SERVER_LIST_FILE_PREFIX =
59        "zkServerList_";
60    /** Job context (mainly for progress) */
61    private Mapper<?, ?, ?, ?>.Context context;
62    /** Hadoop configuration */
63    private final ImmutableClassesGiraphConfiguration conf;
64    /** Task partition, to ensure uniqueness */
65    private final int taskPartition;
66    /** HDFS base directory for all file-based coordination */
67    private final Path baseDirectory;
68    /**
69     * HDFS task ZooKeeper candidate/completed
70     * directory for all file-based coordination
71     */
72    private final Path taskDirectory;
73    /**
74     * HDFS ZooKeeper server ready/done directory
75     * for all file-based coordination
76     */
77    private final Path serverDirectory;
78    /** HDFS path to whether the task is done */
79    private final Path myClosedPath;
80    /** Polling msecs timeout */
81    private final int pollMsecs;
82    /** File system */
83    private final FileSystem fs;
84    /** Zookeeper wrapper */
85    private ZooKeeperRunner zkRunner;
86    /** ZooKeeper local file system directory */
87    private final String zkDir;
88    /** ZooKeeper config file path */
89    private final ZookeeperConfig config;
90    /** ZooKeeper server host */
91    private String zkServerHost;
92    /** ZooKeeper server task */
93    private int zkServerTask;
94    /** ZooKeeper base port */
95    private int zkBasePort;
96    /** Final ZooKeeper server port list (for clients) */
97    private String zkServerPortString;
98    /** My hostname */
99    private String myHostname = null;
100   /** Job id, to ensure uniqueness */
101   private final String jobId;
102   /** Time object for tracking timeouts */
103   private final Time time = SystemTime.get();
104 
105   /** State of the application */
106   public enum State {
107     /** Failure occurred */
108     FAILED,
109     /** Application finished */
110     FINISHED
111   }
112 
113   /**
114    * Constructor with context.
115    *
116    * @param context Context to be stored internally
117    * @param configuration Configuration
118    * @throws IOException
119    */
120   public ZooKeeperManager(Mapper<?, ?, ?, ?>.Context context,
121                           ImmutableClassesGiraphConfiguration configuration)
122     throws IOException {
123     this.context = context;
124     this.conf = configuration;
125     taskPartition = conf.getTaskPartition();
126     jobId = conf.getJobId();
127     baseDirectory =
128         new Path(ZOOKEEPER_MANAGER_DIRECTORY.getWithDefault(conf,
129             getFinalZooKeeperPath()));
130     taskDirectory = new Path(baseDirectory,
131         "_task");
132     serverDirectory = new Path(baseDirectory,
133         "_zkServer");
134     myClosedPath = new Path(taskDirectory,
135         (new ComputationDoneName(taskPartition)).getName());
136     pollMsecs = GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS.get(conf);
137     String jobLocalDir = conf.get("job.local.dir");
138     String zkDirDefault;
139     if (jobLocalDir != null) { // for non-local jobs
140       zkDirDefault = jobLocalDir +
141           "/_bspZooKeeper";
142     } else {
143       zkDirDefault = System.getProperty("user.dir") + "/" +
144               ZOOKEEPER_MANAGER_DIRECTORY.getDefaultValue();
145     }
146     zkDir = conf.get(GiraphConstants.ZOOKEEPER_DIR, zkDirDefault);
147     config = new ZookeeperConfig();
148     zkBasePort = GiraphConstants.ZOOKEEPER_SERVER_PORT.get(conf);
149 
150     myHostname = conf.getLocalHostname();
151     fs = FileSystem.get(conf);
152   }
153 
154   /**
155    * Generate the final ZooKeeper coordination directory on HDFS
156    *
157    * @return directory path with job id
158    */
159   private String getFinalZooKeeperPath() {
160     return ZOOKEEPER_MANAGER_DIRECTORY.getDefaultValue() + "/" + jobId;
161   }
162 
163   /**
164    * Return the base ZooKeeper ZNode from which all other ZNodes Giraph creates
165    * should be sited, for instance in a multi-tenant ZooKeeper, the znode
166    * reserved for Giraph
167    *
168    * @param conf  Necessary to access user-provided values
169    * @return  String of path without trailing slash
170    */
171   public static String getBasePath(Configuration conf) {
172     String result = conf.get(BASE_ZNODE_KEY, "");
173     if (!result.equals("") && !result.startsWith("/")) {
174       throw new IllegalArgumentException("Value for " +
175           BASE_ZNODE_KEY + " must start with /: " + result);
176     }
177 
178     return result;
179   }
180 
181   /**
182    * Create the candidate stamps and decide on the servers to start if
183    * you are partition 0.
184    *
185    * @throws IOException
186    * @throws InterruptedException
187    */
188   public void setup() throws IOException, InterruptedException {
189     createCandidateStamp();
190     getZooKeeperServerList();
191   }
192 
193   /**
194    * Create a HDFS stamp for this task.  If another task already
195    * created it, then this one will fail, which is fine.
196    */
197   public void createCandidateStamp() {
198     try {
199       fs.mkdirs(baseDirectory);
200       LOG.info("createCandidateStamp: Made the directory " +
201           baseDirectory);
202     } catch (IOException e) {
203       LOG.error("createCandidateStamp: Failed to mkdirs " +
204           baseDirectory);
205     }
206     try {
207       fs.mkdirs(serverDirectory);
208       LOG.info("createCandidateStamp: Made the directory " +
209           serverDirectory);
210     } catch (IOException e) {
211       LOG.error("createCandidateStamp: Failed to mkdirs " +
212           serverDirectory);
213     }
214     // Check that the base directory exists and is a directory
215     try {
216       if (!fs.getFileStatus(baseDirectory).isDir()) {
217         throw new IllegalArgumentException(
218             "createCandidateStamp: " + baseDirectory +
219             " is not a directory, but should be.");
220       }
221     } catch (IOException e) {
222       throw new IllegalArgumentException(
223           "createCandidateStamp: Couldn't get file status " +
224               "for base directory " + baseDirectory + ".  If there is an " +
225               "issue with this directory, please set an accesible " +
226               "base directory with the Hadoop configuration option " +
227               ZOOKEEPER_MANAGER_DIRECTORY.getKey(), e);
228     }
229 
230     Path myCandidacyPath = new Path(
231         taskDirectory, myHostname +
232         HOSTNAME_TASK_SEPARATOR + taskPartition);
233     try {
234       if (LOG.isInfoEnabled()) {
235         LOG.info("createCandidateStamp: Creating my filestamp " +
236             myCandidacyPath);
237       }
238       fs.createNewFile(myCandidacyPath);
239     } catch (IOException e) {
240       LOG.error("createCandidateStamp: Failed (maybe previous task " +
241           "failed) to create filestamp " + myCandidacyPath, e);
242     }
243   }
244 
245   /**
246    * Create a new file with retries if it fails.
247    *
248    * @param fs File system where the new file is created
249    * @param path Path of the new file
250    * @param maxAttempts Maximum number of attempts
251    * @param retryWaitMsecs Milliseconds to wait before retrying
252    */
253   private static void createNewFileWithRetries(
254       FileSystem fs, Path path, int maxAttempts, int retryWaitMsecs) {
255     int attempt = 0;
256     while (attempt < maxAttempts) {
257       try {
258         fs.createNewFile(path);
259         return;
260       } catch (IOException e) {
261         LOG.warn("createNewFileWithRetries: Failed to create file at path " +
262             path + " on attempt " + attempt + " of " + maxAttempts + ".", e);
263       }
264       ++attempt;
265       Uninterruptibles.sleepUninterruptibly(
266           retryWaitMsecs, TimeUnit.MILLISECONDS);
267     }
268     throw new IllegalStateException(
269         "createNewFileWithRetries: Failed to create file at path " +
270             path + " after " + attempt + " attempts");
271   }
272 
273   /**
274    * Every task must create a stamp to let the ZooKeeper servers know that
275    * they can shutdown.  This also lets the task know that it was already
276    * completed.
277    */
278   private void createZooKeeperClosedStamp() {
279     LOG.info("createZooKeeperClosedStamp: Creating my filestamp " +
280         myClosedPath);
281     createNewFileWithRetries(fs, myClosedPath,
282         conf.getHdfsFileCreationRetries(),
283         conf.getHdfsFileCreationRetryWaitMs());
284   }
285 
286   /**
287    * Check if all the computation is done.
288    * @return true if all computation is done.
289    */
290   public boolean computationDone() {
291     try {
292       return fs.exists(myClosedPath);
293     } catch (IOException e) {
294       throw new RuntimeException(e);
295     }
296   }
297 
298   /**
299    * Task 0 will call this to create the ZooKeeper server list.  The result is
300    * a file that describes the ZooKeeper servers through the filename.
301    *
302    * @throws IOException
303    * @throws InterruptedException
304    */
305   private void createZooKeeperServerList() throws IOException,
306       InterruptedException {
307     String host;
308     String task;
309     while (true) {
310       FileStatus [] fileStatusArray = fs.listStatus(taskDirectory);
311       if (fileStatusArray.length > 0) {
312         FileStatus fileStatus = fileStatusArray[0];
313         String[] hostnameTaskArray =
314             fileStatus.getPath().getName().split(
315                 HOSTNAME_TASK_SEPARATOR);
316         checkState(hostnameTaskArray.length == 2,
317             "createZooKeeperServerList: Task 0 failed " +
318             "to parse " + fileStatus.getPath().getName());
319         host = hostnameTaskArray[0];
320         task = hostnameTaskArray[1];
321         break;
322       }
323       Thread.sleep(pollMsecs);
324     }
325     String serverListFile =
326         ZOOKEEPER_SERVER_LIST_FILE_PREFIX + host +
327         HOSTNAME_TASK_SEPARATOR + task;
328     Path serverListPath =
329         new Path(baseDirectory, serverListFile);
330     if (LOG.isInfoEnabled()) {
331       LOG.info("createZooKeeperServerList: Creating the final " +
332           "ZooKeeper file '" + serverListPath + "'");
333     }
334     fs.createNewFile(serverListPath);
335   }
336 
337   /**
338    * Make an attempt to get the server list file by looking for a file in
339    * the appropriate directory with the prefix
340    * ZOOKEEPER_SERVER_LIST_FILE_PREFIX.
341    * @return null if not found or the filename if found
342    * @throws IOException
343    */
344   private String getServerListFile() throws IOException {
345     String serverListFile = null;
346     FileStatus [] fileStatusArray = fs.listStatus(baseDirectory);
347     for (FileStatus fileStatus : fileStatusArray) {
348       if (fileStatus.getPath().getName().startsWith(
349           ZOOKEEPER_SERVER_LIST_FILE_PREFIX)) {
350         serverListFile = fileStatus.getPath().getName();
351         break;
352       }
353     }
354     return serverListFile;
355   }
356 
357   /**
358    * Task 0 is the designated master and will generate the server list
359    * (unless it has already done so).  Other
360    * tasks will consume the file after it is created (just the filename).
361    * @throws IOException
362    * @throws InterruptedException
363    */
364   private void getZooKeeperServerList() throws IOException,
365       InterruptedException {
366     String serverListFile;
367 
368     if (taskPartition == 0) {
369       serverListFile = getServerListFile();
370       if (serverListFile == null) {
371         createZooKeeperServerList();
372       }
373     }
374 
375     while (true) {
376       serverListFile = getServerListFile();
377       if (LOG.isInfoEnabled()) {
378         LOG.info("getZooKeeperServerList: For task " + taskPartition +
379             ", got file '" + serverListFile +
380             "' (polling period is " +
381             pollMsecs + ")");
382       }
383       if (serverListFile != null) {
384         break;
385       }
386       try {
387         Thread.sleep(pollMsecs);
388       } catch (InterruptedException e) {
389         LOG.warn("getZooKeeperServerList: Strange interrupted " +
390             "exception " + e.getMessage());
391       }
392 
393     }
394 
395     String[] serverHostList = serverListFile.substring(
396         ZOOKEEPER_SERVER_LIST_FILE_PREFIX.length()).split(
397             HOSTNAME_TASK_SEPARATOR);
398     if (LOG.isInfoEnabled()) {
399       LOG.info("getZooKeeperServerList: Found " +
400           Arrays.toString(serverHostList) +
401           " hosts in filename '" + serverListFile + "'");
402     }
403 
404     zkServerHost = serverHostList[0];
405     zkServerTask = Integer.parseInt(serverHostList[1]);
406     updateZkPortString();
407   }
408 
409   /**
410    * Update zookeeper host:port string.
411    */
412   private void updateZkPortString() {
413     zkServerPortString = zkServerHost + ":" + zkBasePort;
414   }
415 
416   /**
417    * Users can get the server port string to connect to ZooKeeper
418    * @return server port string - comma separated
419    */
420   public String getZooKeeperServerPortString() {
421     return zkServerPortString;
422   }
423 
424   /**
425    * Whoever is elected to be a ZooKeeper server must generate a config file
426    * locally.
427    *
428    */
429   private void generateZooKeeperConfig() {
430     if (LOG.isInfoEnabled()) {
431       LOG.info("generateZooKeeperConfig: with base port " +
432           zkBasePort);
433     }
434     File zkDirFile = new File(this.zkDir);
435     boolean mkDirRet = zkDirFile.mkdirs();
436     if (LOG.isInfoEnabled()) {
437       LOG.info("generateZooKeeperConfigFile: Make directory of " +
438           zkDirFile.getName() + " = " + mkDirRet);
439     }
440     /** Set zookeeper system properties */
441     System.setProperty("zookeeper.snapCount",
442         Integer.toString(GiraphConstants.DEFAULT_ZOOKEEPER_SNAP_COUNT));
443     System.setProperty("zookeeper.forceSync",
444         GiraphConstants.ZOOKEEPER_FORCE_SYNC.get(conf) ? "yes" : "no");
445     System.setProperty("zookeeper.skipACL",
446         GiraphConstants.ZOOKEEPER_SKIP_ACL.get(conf) ? "yes" : "no");
447 
448     config.setDataDir(zkDir);
449     config.setDataLogDir(zkDir);
450     config.setClientPortAddress(new InetSocketAddress(zkBasePort));
451     config.setMinSessionTimeout(conf.getZooKeeperMinSessionTimeout());
452     config.setMaxSessionTimeout(conf.getZooKeeperMaxSessionTimeout());
453 
454   }
455 
456   /**
457    * If this task has been selected, online a ZooKeeper server.  Otherwise,
458    * wait until this task knows that the ZooKeeper servers have been onlined.
459    */
460   public void onlineZooKeeperServer() throws IOException {
461     if (zkServerTask == taskPartition) {
462       File zkDirFile = new File(this.zkDir);
463       try {
464         if (LOG.isInfoEnabled()) {
465           LOG.info("onlineZooKeeperServers: Trying to delete old " +
466               "directory " + this.zkDir);
467         }
468         FileUtils.deleteDirectory(zkDirFile);
469       } catch (IOException e) {
470         LOG.warn("onlineZooKeeperServers: Failed to delete " +
471             "directory " + this.zkDir, e);
472       }
473       generateZooKeeperConfig();
474       synchronized (this) {
475         zkRunner = createRunner();
476         int port = zkRunner.start(zkDir, config);
477         if (port > 0) {
478           zkBasePort = port;
479           updateZkPortString();
480         }
481       }
482 
483       // Once the server is up and running, notify that this server is up
484       // and running by dropping a ready stamp.
485       int connectAttempts = 0;
486       final int maxConnectAttempts =
487           conf.getZookeeperConnectionAttempts();
488       while (connectAttempts < maxConnectAttempts) {
489         try {
490           if (LOG.isInfoEnabled()) {
491             LOG.info("onlineZooKeeperServers: Connect attempt " +
492                 connectAttempts + " of " +
493                 maxConnectAttempts +
494                 " max trying to connect to " +
495                 myHostname + ":" + zkBasePort +
496                 " with poll msecs = " + pollMsecs);
497           }
498           InetSocketAddress zkServerAddress =
499               new InetSocketAddress(myHostname, zkBasePort);
500           Socket testServerSock = new Socket();
501           testServerSock.connect(zkServerAddress, 5000);
502           if (LOG.isInfoEnabled()) {
503             LOG.info("onlineZooKeeperServers: Connected to " +
504                 zkServerAddress + "!");
505           }
506           break;
507         } catch (SocketTimeoutException e) {
508           LOG.warn("onlineZooKeeperServers: Got " +
509               "SocketTimeoutException", e);
510         } catch (ConnectException e) {
511           LOG.warn("onlineZooKeeperServers: Got " +
512               "ConnectException", e);
513         } catch (IOException e) {
514           LOG.warn("onlineZooKeeperServers: Got " +
515               "IOException", e);
516         }
517 
518         ++connectAttempts;
519         try {
520           Thread.sleep(pollMsecs);
521         } catch (InterruptedException e) {
522           LOG.warn("onlineZooKeeperServers: Sleep of " + pollMsecs +
523               " interrupted - " + e.getMessage());
524         }
525       }
526       if (connectAttempts == maxConnectAttempts) {
527         throw new IllegalStateException(
528             "onlineZooKeeperServers: Failed to connect in " +
529                 connectAttempts + " tries!");
530       }
531       Path myReadyPath = new Path(
532           serverDirectory, myHostname +
533           HOSTNAME_TASK_SEPARATOR + taskPartition +
534           HOSTNAME_TASK_SEPARATOR + zkBasePort);
535       try {
536         if (LOG.isInfoEnabled()) {
537           LOG.info("onlineZooKeeperServers: Creating my filestamp " +
538               myReadyPath);
539         }
540         fs.createNewFile(myReadyPath);
541       } catch (IOException e) {
542         LOG.error("onlineZooKeeperServers: Failed (maybe previous " +
543             "task failed) to create filestamp " + myReadyPath, e);
544       }
545     } else {
546       int readyRetrievalAttempt = 0;
547       String foundServer = null;
548       while (true) {
549         try {
550           FileStatus [] fileStatusArray =
551               fs.listStatus(serverDirectory);
552           if ((fileStatusArray != null) &&
553               (fileStatusArray.length > 0)) {
554             for (int i = 0; i < fileStatusArray.length; ++i) {
555               String[] hostnameTaskArray =
556                   fileStatusArray[i].getPath().getName().split(
557                       HOSTNAME_TASK_SEPARATOR);
558               if (hostnameTaskArray.length != 3) {
559                 throw new RuntimeException(
560                     "getZooKeeperServerList: Task 0 failed " +
561                         "to parse " +
562                         fileStatusArray[i].getPath().getName());
563               }
564               foundServer = hostnameTaskArray[0];
565               zkBasePort = Integer.parseInt(hostnameTaskArray[2]);
566               updateZkPortString();
567             }
568             if (LOG.isInfoEnabled()) {
569               LOG.info("onlineZooKeeperServers: Got " +
570                   foundServer + " on port " +
571                   zkBasePort +
572                   " (polling period is " +
573                   pollMsecs + ") on attempt " +
574                   readyRetrievalAttempt);
575             }
576             if (zkServerHost.equals(foundServer)) {
577               break;
578             }
579           } else {
580             if (LOG.isInfoEnabled()) {
581               LOG.info("onlineZooKeeperServers: Empty " +
582                   "directory " + serverDirectory +
583                   ", waiting " + pollMsecs + " msecs.");
584             }
585           }
586           Thread.sleep(pollMsecs);
587           ++readyRetrievalAttempt;
588         } catch (IOException e) {
589           throw new RuntimeException(e);
590         } catch (InterruptedException e) {
591           LOG.warn("onlineZooKeeperServers: Strange interrupt from " +
592               e.getMessage(), e);
593         }
594       }
595     }
596   }
597 
598   /**
599    * Wait for all workers to signal completion.  Will wait up to
600    * WAIT_TASK_DONE_TIMEOUT_MS milliseconds for this to complete before
601    * reporting an error.
602    *
603    * @param totalWorkers Number of workers to wait for
604    */
605   private void waitUntilAllTasksDone(int totalWorkers) {
606     int attempt = 0;
607     long maxMs = time.getMilliseconds() +
608         conf.getWaitTaskDoneTimeoutMs();
609     while (true) {
610       boolean[] taskDoneArray = new boolean[totalWorkers];
611       try {
612         FileStatus [] fileStatusArray =
613             fs.listStatus(taskDirectory);
614         int totalDone = 0;
615         if (fileStatusArray.length > 0) {
616           for (FileStatus fileStatus : fileStatusArray) {
617             String name = fileStatus.getPath().getName();
618             if (ComputationDoneName.isName(name)) {
619               ++totalDone;
620               taskDoneArray[ComputationDoneName.fromName(
621                   name).getWorkerId()] = true;
622             }
623           }
624         }
625         if (LOG.isInfoEnabled()) {
626           LOG.info("waitUntilAllTasksDone: Got " + totalDone +
627               " and " + totalWorkers +
628               " desired (polling period is " +
629               pollMsecs + ") on attempt " +
630               attempt);
631         }
632         if (totalDone >= totalWorkers) {
633           break;
634         } else {
635           StringBuilder sb = new StringBuilder();
636           for (int i = 0; i < taskDoneArray.length; ++i) {
637             if (!taskDoneArray[i]) {
638               sb.append(i).append(", ");
639             }
640           }
641           LOG.info("waitUntilAllTasksDone: Still waiting on tasks " +
642               sb.toString());
643         }
644         ++attempt;
645         Thread.sleep(pollMsecs);
646         context.progress();
647       } catch (IOException e) {
648         LOG.warn("waitUntilAllTasksDone: Got IOException.", e);
649       } catch (InterruptedException e) {
650         LOG.warn("waitUntilAllTasksDone: Got InterruptedException", e);
651       }
652 
653       if (time.getMilliseconds() > maxMs) {
654         throw new IllegalStateException("waitUntilAllTasksDone: Tasks " +
655             "did not finish by the maximum time of " +
656             conf.getWaitTaskDoneTimeoutMs() + " milliseconds");
657       }
658     }
659   }
660 
661   /**
662    * Notify the ZooKeeper servers that this partition is done with all
663    * ZooKeeper communication.  If this task is running a ZooKeeper server,
664    * kill it when all partitions are done and wait for
665    * completion.  Clean up the ZooKeeper local directory as well.
666    *
667    * @param state State of the application
668    */
669   public void offlineZooKeeperServers(State state) {
670     if (state == State.FINISHED) {
671       createZooKeeperClosedStamp();
672     }
673     synchronized (this) {
674       if (zkRunner != null) {
675         boolean isYarnJob = GiraphConstants.IS_PURE_YARN_JOB.get(conf);
676         int totalWorkers = conf.getMapTasks();
677         // A Yarn job always spawns MAX_WORKERS + 1 containers
678         if (isYarnJob) {
679           totalWorkers = conf.getInt(GiraphConstants.MAX_WORKERS, 0) + 1;
680         }
681         LOG.info("offlineZooKeeperServers: Will wait for " +
682             totalWorkers + " tasks");
683         waitUntilAllTasksDone(totalWorkers);
684         zkRunner.stop();
685         File zkDirFile;
686         try {
687           zkDirFile = new File(zkDir);
688           FileUtils.deleteDirectory(zkDirFile);
689         } catch (IOException e) {
690           LOG.warn("offlineZooKeeperSevers: " +
691                   "IOException, but continuing",
692               e);
693         }
694         if (LOG.isInfoEnabled()) {
695           LOG.info("offlineZooKeeperServers: deleted directory " + zkDir);
696         }
697         zkRunner = null;
698       }
699     }
700   }
701 
702   /**
703    * Create appropriate zookeeper wrapper depending on configuration.
704    * Zookeeper can run in master process or outside as a separate
705    * java process.
706    *
707    * @return either in process or out of process wrapper.
708    */
709   private ZooKeeperRunner createRunner() {
710     ZooKeeperRunner runner = new InProcessZooKeeperRunner();
711     runner.setConf(conf);
712     return runner;
713   }
714 
715   /**
716    *  Is this task running a ZooKeeper server?  Only could be true if called
717    *  after onlineZooKeeperServers().
718    *
719    *  @return true if running a ZooKeeper server, false otherwise
720    */
721   public boolean runsZooKeeper() {
722     synchronized (this) {
723       return zkRunner != null;
724     }
725   }
726 
727   /**
728    * Do necessary cleanup in zookeeper wrapper.
729    */
730   public void cleanup() {
731     synchronized (this) {
732       if (zkRunner != null) {
733         zkRunner.cleanup();
734       }
735     }
736   }
737 }