View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.yarn;
19  
20  import com.google.common.collect.ImmutableList;
21  
22  import com.google.common.collect.Maps;
23  import org.apache.giraph.conf.GiraphConfiguration;
24  import org.apache.giraph.conf.GiraphConstants;
25  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.mapreduce.MRJobConfig;
28  import org.apache.hadoop.io.DataOutputBuffer;
29  import org.apache.hadoop.security.Credentials;
30  import org.apache.hadoop.security.UserGroupInformation;
31  import org.apache.hadoop.security.token.Token;
32  import org.apache.hadoop.yarn.api.ApplicationConstants;
33  import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
34  import org.apache.hadoop.yarn.api.protocolrecords
35    .RegisterApplicationMasterResponse;
36  import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
37  import org.apache.hadoop.yarn.api.records.Container;
38  import org.apache.hadoop.yarn.api.records.ContainerId;
39  import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
40  import org.apache.hadoop.yarn.api.records.ContainerStatus;
41  import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
42  import org.apache.hadoop.yarn.api.records.LocalResource;
43  import org.apache.hadoop.yarn.api.records.NodeReport;
44  import org.apache.hadoop.yarn.api.records.Priority;
45  import org.apache.hadoop.yarn.api.records.Resource;
46  import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
47  import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
48  import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
49  import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
50  import org.apache.hadoop.yarn.conf.YarnConfiguration;
51  import org.apache.hadoop.yarn.exceptions.YarnException;
52  import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
53  import org.apache.hadoop.yarn.util.ConverterUtils;
54  import org.apache.hadoop.yarn.util.Records;
55  
56  import org.apache.log4j.Logger;
57  
58  import java.io.IOException;
59  import java.nio.ByteBuffer;
60  import java.util.Iterator;
61  import java.util.List;
62  import java.util.Map;
63  import java.util.concurrent.ConcurrentHashMap;
64  import java.util.concurrent.ConcurrentMap;
65  import java.util.concurrent.ExecutorService;
66  import java.util.concurrent.Executors;
67  import java.util.concurrent.atomic.AtomicInteger;
68  
69  /**
70   * The YARN Application Master for Giraph is launched when the GiraphYarnClient
71   * successfully requests an execution container from the Resource Manager. The
72   * Application Master is provided by Giraph to manage all requests for resources
73   * (worker nodes, memory, jar files, job configuration metadata, etc.) that
74   * Giraph will need to perform the job. When Giraph runs in a non-YARN context,
75   * the role of the Application Master is played by Hadoop when it launches our
76   * GraphMappers (worker/master task nodes) to run the job.
77   */
78  public class GiraphApplicationMaster {
79    /** Logger */
80    private static final Logger LOG =
81      Logger.getLogger(GiraphApplicationMaster.class);
82    /** Exit code for YARN containers that were manually killed/aborted */
83    private static final int YARN_ABORT_EXIT_STATUS = -100;
84    /** Exit code for successfully run YARN containers */
85    private static final int YARN_SUCCESS_EXIT_STATUS = 0;
86    /** millis to sleep between heartbeats during long loops */
87    private static final int SLEEP_BETWEEN_HEARTBEATS_MSECS = 900;
88    /** A reusable map of resources already in HDFS for each task to copy-to-local
89     * env and use to launch each GiraphYarnTask. */
90    private static Map<String, LocalResource> LOCAL_RESOURCES;
91    /** Initialize the Configuration class with the resource file exported by
92     * the YarnClient. We will need to export this resource to the tasks also.
93     * Construct the HEARTBEAT to use to ping the RM about job progress/health.
94     */
95  //TODO
96    /** For status update for clients - yet to be implemented\\
97    * Hostname of the container
98    */
99    private String appMasterHostname = "";
100   /** Port on which the app master listens for status updates from clients*/
101   private int appMasterRpcPort = 0;
102   /** Tracking url to which app master publishes info for clients to monitor*/
103   private String appMasterTrackingUrl = "";
104 
105   static {
106     // pick up new conf XML file and populate it with stuff exported from client
107     Configuration.addDefaultResource(GiraphConstants.GIRAPH_YARN_CONF_FILE);
108   }
109 
110   /** GiraphApplicationMaster's application attempt id */
111   private final ApplicationAttemptId appAttemptId;
112   /** GiraphApplicationMaster container id. Leave me here, I'm very useful */
113   private final ContainerId containerId;
114   /** number of containers Giraph needs (conf.getMaxWorkers() + 1 master) */
115   private final int containersToLaunch;
116   /** MB of JVM heap per Giraph task container */
117   private final int heapPerContainer;
118   /** Giraph configuration for this job, transported here by YARN framework */
119   private final ImmutableClassesGiraphConfiguration giraphConf;
120   /** Yarn configuration for this job*/
121   private final YarnConfiguration yarnConf;
122   /** Completed Containers Counter */
123   private final AtomicInteger completedCount;
124   /** Failed Containers Counter */
125   private final AtomicInteger failedCount;
126   /** Number of containers requested (hopefully '-w' from our conf) */
127   private final AtomicInteger allocatedCount;
128   /** Number of successfully completed containers in this job run. */
129   private final AtomicInteger successfulCount;
130   /** the ACK #'s for AllocateRequests + heartbeats == last response # */
131   private AtomicInteger lastResponseId;
132   /** buffer tostore all tokens */
133   private ByteBuffer allTokens;
134   /** Executor to attempt asynchronous launches of Giraph containers */
135   private ExecutorService executor;
136   /** YARN progress is a <code>float</code> between 0.0f and 1.0f */
137   //Handle to communicate with the Resource Manager
138   @SuppressWarnings("rawtypes")
139   private AMRMClientAsync amRMClient;
140   /** Handle to communicate with the Node Manager */
141   private NMClientAsync nmClientAsync;
142   /** Listen to process the response from the Node Manager */
143   private NMCallbackHandler containerListener;
144   /** whether all containers finishe */
145   private volatile boolean done;
146 
147   /**
148    * Construct the GiraphAppMaster, populate fields using env vars
149    * set up by YARN framework in this execution container.
150    * @param cId the ContainerId
151    * @param aId the ApplicationAttemptId
152    */
153   protected GiraphApplicationMaster(ContainerId cId, ApplicationAttemptId aId)
154     throws IOException {
155     containerId = cId; // future good stuff will need me to operate.
156     appAttemptId = aId;
157     lastResponseId = new AtomicInteger(0);
158     giraphConf =
159       new ImmutableClassesGiraphConfiguration(new GiraphConfiguration());
160     yarnConf = new YarnConfiguration(giraphConf);
161     completedCount = new AtomicInteger(0);
162     failedCount = new AtomicInteger(0);
163     allocatedCount = new AtomicInteger(0);
164     successfulCount = new AtomicInteger(0);
165     containersToLaunch = giraphConf.getMaxWorkers() + 1;
166     executor = Executors.newFixedThreadPool(containersToLaunch);
167     heapPerContainer = giraphConf.getYarnTaskHeapMb();
168     LOG.info("GiraphAM  for ContainerId " + cId + " ApplicationAttemptId " +
169       aId);
170   }
171 
172   /**
173    * Coordinates all requests for Giraph's worker/master task containers, and
174    * manages application liveness heartbeat, completion status, teardown, etc.
175    * @return success or failure
176    */
177   private boolean run() throws YarnException, IOException {
178     boolean success = false;
179     try {
180       getAllTokens();
181       registerRMCallBackHandler();
182       registerNMCallbackHandler();
183       registerAMToRM();
184       madeAllContainerRequestToRM();
185       LOG.info("Wait to finish ..");
186       while (!done) {
187         try {
188           Thread.sleep(200);
189         } catch (InterruptedException ex) {
190           LOG.error(ex);
191           //TODO:
192         }
193       }
194       LOG.info("Done " + done);
195     } finally {
196       // if we get here w/o problems, the executor is already long finished.
197       if (null != executor && !executor.isTerminated()) {
198         LOG.info("Forcefully terminating executors with done =:" + done);
199         executor.shutdownNow(); // force kill, especially if got here by throw
200       }
201       success = finish();
202     }
203     return success;
204   }
205 
206   /**
207    * Call when the application is done
208    * @return if all containers succeed
209    */
210   private boolean finish() {
211     // When the application completes, it should stop all running containers
212     LOG.info("Application completed. Stopping running containers");
213     nmClientAsync.stop();
214 
215     // When the application completes, it should send a finish application
216     // signal to the RM
217     LOG.info("Application completed. Signalling finish to RM");
218     FinalApplicationStatus appStatus;
219     String appMessage = null;
220     boolean success = true;
221     if (failedCount.get() == 0 &&
222         completedCount.get() == containersToLaunch) {
223       appStatus = FinalApplicationStatus.SUCCEEDED;
224     } else {
225       appStatus = FinalApplicationStatus.FAILED;
226       appMessage = "Diagnostics." + ", total=" + containersToLaunch +
227         ", completed=" + completedCount.get() +  ", failed=" +
228         failedCount.get();
229       success = false;
230     }
231     try {
232       amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
233     } catch (YarnException ex) {
234       LOG.error("Failed to unregister application", ex);
235     } catch (IOException e) {
236       LOG.error("Failed to unregister application", e);
237     }
238 
239     amRMClient.stop();
240     return success;
241   }
242   /**
243    * Add all containers' request
244    * @return
245    */
246   private void madeAllContainerRequestToRM() {
247     // Setup ask for containers from RM
248     // Send request for containers to RM
249     // Until we get our fully allocated quota, we keep on polling RM for
250     // containers
251     // Keep looping until all the containers are launched and shell script
252     // executed on them ( regardless of success/failure).
253     for (int i = 0; i < containersToLaunch; ++i) {
254       ContainerRequest containerAsk = setupContainerAskForRM();
255       amRMClient.addContainerRequest(containerAsk);
256     }
257   }
258 
259    /**
260     * Setup the request that will be sent to the RM for the container ask.
261     *
262     * @return the setup ResourceRequest to be sent to RM
263     */
264   private ContainerRequest setupContainerAskForRM() {
265     // setup requirements for hosts
266     // using * as any host will do for the distributed shell app
267     // set the priority for the request
268     Priority pri = Records.newRecord(Priority.class);
269     // TODO - what is the range for priority? how to decide?
270     pri.setPriority(GiraphConstants.GIRAPH_YARN_PRIORITY);
271 
272     // Set up resource type requirements
273     // For now, only memory is supported so we set memory requirements
274     Resource capability = Records.newRecord(Resource.class);
275     capability.setMemory(heapPerContainer);
276 
277     ContainerRequest request = new ContainerRequest(capability, null, null,
278       pri);
279     LOG.info("Requested container ask: " + request.toString());
280     return request;
281   }
282 
283   /**
284    * Populate allTokens with the tokens received
285    * @return
286    */
287   private void getAllTokens() throws IOException {
288     Credentials credentials = UserGroupInformation.getCurrentUser()
289         .getCredentials();
290     DataOutputBuffer dob = new DataOutputBuffer();
291     credentials.writeTokenStorageToStream(dob);
292     // Now remove the AM->RM token so that containers cannot access it.
293     Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
294     while (iter.hasNext()) {
295       Token<?> token = iter.next();
296       if (LOG.isDebugEnabled()) {
297         LOG.debug("Token type :" + token.getKind());
298       }
299       if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
300         iter.remove();
301       }
302     }
303     allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
304   }
305 
306   /**
307    * Register RM callback and start listening
308    * @return
309    */
310   private void registerRMCallBackHandler() {
311     AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
312     amRMClient = AMRMClientAsync.createAMRMClientAsync(1000,
313       allocListener);
314     amRMClient.init(yarnConf);
315     amRMClient.start();
316   }
317 
318   /**
319    * Register NM callback and start listening
320    * @return
321    */
322   private void registerNMCallbackHandler() {
323     containerListener = new NMCallbackHandler();
324     nmClientAsync = new NMClientAsyncImpl(containerListener);
325     nmClientAsync.init(yarnConf);
326     nmClientAsync.start();
327   }
328   /**
329    * Register AM to RM
330    * @return AM register response
331    */
332   private RegisterApplicationMasterResponse registerAMToRM()
333     throws YarnException {
334     // register Application Master with the YARN Resource Manager so we can
335     // begin requesting resources.
336     try {
337       if (UserGroupInformation.isSecurityEnabled()) {
338         LOG.info("SECURITY ENABLED ");
339       }
340       // TODO: provide actual call back details
341       RegisterApplicationMasterResponse response = amRMClient
342         .registerApplicationMaster(appMasterHostname
343         , appMasterRpcPort, appMasterTrackingUrl);
344       return response;
345     } catch (IOException ioe) {
346       throw new IllegalStateException(
347         "GiraphApplicationMaster failed to register with RM.", ioe);
348     }
349   }
350 
351   /**
352    * For each container successfully allocated, attempt to set up and launch
353    * a Giraph worker/master task.
354    * @param allocatedContainers the containers we have currently allocated.
355    */
356   private void startContainerLaunchingThreads(final List<Container>
357     allocatedContainers) {
358     for (Container allocatedContainer : allocatedContainers) {
359       LOG.info("Launching command on a new container." +
360         ", containerId=" + allocatedContainer.getId() +
361         ", containerNode=" + allocatedContainer.getNodeId().getHost() +
362         ":" + allocatedContainer.getNodeId().getPort() +
363         ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() +
364         ", containerResourceMemory=" +
365         allocatedContainer.getResource().getMemory());
366       // Launch and start the container on a separate thread to keep the main
367       // thread unblocked as all containers may not be allocated at one go.
368       LaunchContainerRunnable runnableLaunchContainer =
369         new LaunchContainerRunnable(allocatedContainer, containerListener);
370       executor.execute(runnableLaunchContainer);
371     }
372   }
373 
374   /**
375    * Lazily compose the map of jar and file names to LocalResource records for
376    * inclusion in GiraphYarnTask container requests. Can re-use the same map
377    * as Giraph tasks need identical HDFS-based resources (jars etc.) to run.
378    * @return the resource map for a ContainerLaunchContext
379    */
380   private synchronized Map<String, LocalResource> getTaskResourceMap() {
381     // Set the local resources: just send the copies already in HDFS
382     if (null == LOCAL_RESOURCES) {
383       LOCAL_RESOURCES = Maps.newHashMap();
384       try {
385         // if you have to update the giraphConf for export to tasks, do it now
386         updateGiraphConfForExport();
387         YarnUtils.addFsResourcesToMap(LOCAL_RESOURCES, giraphConf,
388           appAttemptId.getApplicationId());
389       } catch (IOException ioe) {
390         // fail fast, this container will never launch.
391         throw new IllegalStateException("Could not configure the container" +
392           "launch context for GiraphYarnTasks.", ioe);
393       }
394     }
395     // else, return the prepopulated copy to reuse for each GiraphYarkTask
396     return LOCAL_RESOURCES;
397   }
398 
399   /**
400    * If you're going to make ANY CHANGES to your local GiraphConfiguration
401    * while running the GiraphApplicationMaster, put them here.
402    * This method replaces the current XML file GiraphConfiguration
403    * stored in HDFS with the copy you have modified locally in-memory.
404    */
405   private void updateGiraphConfForExport()
406     throws IOException {
407     // Giraph expects this MapReduce stuff
408     giraphConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
409       appAttemptId.getAttemptId());
410     // now republish the giraph-conf.xml in HDFS
411     YarnUtils.exportGiraphConfiguration(giraphConf,
412       appAttemptId.getApplicationId());
413   }
414 
415   /**
416    * Application entry point
417    * @param args command-line args (set by GiraphYarnClient, if any)
418    */
419   public static void main(final String[] args) {
420     boolean result = false;
421     LOG.info("Starting GitaphAM ");
422     String containerIdString =  System.getenv().get(
423       Environment.CONTAINER_ID.name());
424     if (containerIdString == null) {
425       // container id should always be set in the env by the framework
426       throw new IllegalArgumentException("ContainerId not found in env vars.");
427     }
428     ContainerId containerId = ConverterUtils.toContainerId(containerIdString);
429     ApplicationAttemptId appAttemptId = containerId.getApplicationAttemptId();
430     try {
431       GiraphApplicationMaster giraphAppMaster =
432         new GiraphApplicationMaster(containerId, appAttemptId);
433       result = giraphAppMaster.run();
434       // CHECKSTYLE: stop IllegalCatch
435     } catch (Throwable t) {
436       // CHECKSTYLE: resume IllegalCatch
437       LOG.error("GiraphApplicationMaster caught a " +
438                   "top-level exception in main.", t);
439       System.exit(1);
440     }
441     if (result) {
442       LOG.info("Giraph Application Master completed successfully. exiting");
443       System.exit(0);
444     } else {
445       LOG.info("Giraph Application Master failed. exiting");
446       System.exit(2);
447     }
448   }
449 
450   /**
451    * Thread to connect to the {@link ContainerManager} and launch the container
452    * that will house one of our Giraph worker (or master) tasks.
453    */
454   private class LaunchContainerRunnable implements Runnable {
455     /** Allocated container */
456     private Container container;
457     /** NM listener */
458     private NMCallbackHandler containerListener;
459 
460     /**
461      * Constructor.
462      * @param newGiraphTaskContainer Allocated container
463      * @param containerListener container listener.
464      */
465     public LaunchContainerRunnable(final Container newGiraphTaskContainer,
466       NMCallbackHandler containerListener) {
467       this.container = newGiraphTaskContainer;
468       this.containerListener = containerListener;
469     }
470 
471     /**
472      * Connects to CM, sets up container launch context
473      * for shell command and eventually dispatches the container
474      * start request to the CM.
475      */
476     public void run() {
477       // Connect to ContainerManager
478       // configure the launcher for the Giraph task it will host
479       ContainerLaunchContext ctx = buildContainerLaunchContext();
480       // request CM to start this container as spec'd in ContainerLaunchContext
481       containerListener.addContainer(container.getId(), container);
482       nmClientAsync.startContainerAsync(container, ctx);
483     }
484 
485     /**
486      * Boilerplate to set up the ContainerLaunchContext to tell the Container
487      * Manager how to launch our Giraph task in the execution container we have
488      * already allocated.
489      * @return a populated ContainerLaunchContext object.
490      */
491     private ContainerLaunchContext buildContainerLaunchContext() {
492       LOG.info("Setting up container launch container for containerid=" +
493         container.getId());
494       ContainerLaunchContext launchContext = Records
495         .newRecord(ContainerLaunchContext.class);
496       // args inject the CLASSPATH, heap MB, and TaskAttemptID for launched task
497       final List<String> commands = generateShellExecCommand();
498       LOG.info("Conatain launch Commands :" + commands.get(0));
499       launchContext.setCommands(commands);
500       // Set up tokens for the container too. We are
501       // populating them mainly for NodeManagers to be able to download any
502       // files in the distributed file-system. The tokens are otherwise also
503       // useful in cases, for e.g., when one is running a
504       // "hadoop dfs" like command
505       launchContext.setTokens(allTokens.slice());
506 
507       // add user information to the job
508       String jobUserName = "ERROR_UNKNOWN_USER";
509       UserGroupInformation ugi = null;
510       try {
511         ugi = UserGroupInformation.getCurrentUser();
512         jobUserName = ugi.getUserName();
513       } catch (IOException ioe) {
514         jobUserName =
515           System.getenv(ApplicationConstants.Environment.USER.name());
516       }
517       //launchContext.setUser(jobUserName);
518       LOG.info("Setting username in ContainerLaunchContext to: " + jobUserName);
519       // Set the environment variables to inject into remote task's container
520       buildEnvironment(launchContext);
521       // Set the local resources: just send the copies already in HDFS
522       launchContext.setLocalResources(getTaskResourceMap());
523       return launchContext;
524     }
525 
526     /**
527      * Generates our command line string used to launch our Giraph tasks.
528      * @return the BASH shell commands to launch the job.
529      */
530     private List<String> generateShellExecCommand() {
531       return ImmutableList.of("java " +
532         "-Xmx" + heapPerContainer + "M " +
533         "-Xms" + heapPerContainer + "M " +
534         "-cp .:${CLASSPATH} " +
535         "org.apache.giraph.yarn.GiraphYarnTask " +
536         appAttemptId.getApplicationId().getClusterTimestamp() + " " +
537         appAttemptId.getApplicationId().getId() + " " +
538         container.getId().getId() + " " +
539         appAttemptId.getAttemptId() + " " +
540         "1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
541         "/task-" + container.getId().getId() + "-stdout.log " +
542         "2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
543         "/task-" + container.getId().getId() + "-stderr.log "
544       );
545     }
546 
547     /**
548      * Utility to populate the environment vars we wish to inject into the new
549      * containter's env when the Giraph BSP task is executed.
550      * @param launchContext the launch context which will set our environment
551      *                      vars in the app master's execution container.
552      */
553     private void buildEnvironment(final ContainerLaunchContext launchContext) {
554       Map<String, String> classPathForEnv = Maps.<String, String>newHashMap();
555       // pick up the local classpath so when we instantiate a Configuration
556       // remotely, we also get the "mapred-site.xml" and "yarn-site.xml"
557       YarnUtils.addLocalClasspathToEnv(classPathForEnv, giraphConf);
558       // set this map of env vars into the launch context.
559       launchContext.setEnvironment(classPathForEnv);
560     }
561   }
562 
563   /**
564    * CallbackHandler to process RM async calls
565    */
566   private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
567     @SuppressWarnings("unchecked")
568     @Override
569     public void onContainersCompleted(List<ContainerStatus>
570       completedContainers) {
571       LOG.info("Got response from RM for container ask, completedCnt=" +
572         completedContainers.size());
573       for (ContainerStatus containerStatus : completedContainers) {
574         LOG.info("Got container status for containerID=" +
575           containerStatus.getContainerId() + ", state=" +
576           containerStatus.getState() + ", exitStatus=" +
577           containerStatus.getExitStatus() + ", diagnostics=" +
578           containerStatus.getDiagnostics());
579         switch (containerStatus.getExitStatus()) {
580         case YARN_SUCCESS_EXIT_STATUS:
581           successfulCount.incrementAndGet();
582           break;
583         case YARN_ABORT_EXIT_STATUS:
584           break; // not success or fail
585         default:
586           failedCount.incrementAndGet();
587           break;
588         }
589         completedCount.incrementAndGet();
590       }
591 
592       if (completedCount.get() == containersToLaunch) {
593         done = true;
594         LOG.info("All container compeleted. done = " + done);
595       } else {
596         LOG.info("After completion of one conatiner. current status is:" +
597           " completedCount :" + completedCount.get() +
598           " containersToLaunch :" + containersToLaunch +
599           " successfulCount :" + successfulCount.get() +
600           " failedCount :" + failedCount.get());
601       }
602     }
603     @Override
604     public void onContainersAllocated(List<Container> allocatedContainers) {
605       LOG.info("Got response from RM for container ask, allocatedCnt=" +
606           allocatedContainers.size());
607       allocatedCount.addAndGet(allocatedContainers.size());
608       LOG.info("Total allocated # of container so far : " +
609         allocatedCount.get() +
610         " allocated out of " + containersToLaunch + " required.");
611       startContainerLaunchingThreads(allocatedContainers);
612     }
613 
614     @Override
615     public void onShutdownRequest() {
616       done = true;
617     }
618 
619     @Override
620     public void onNodesUpdated(List<NodeReport> updatedNodes) {
621     }
622 
623     @Override
624     public float getProgress() {
625       // set progress to deliver to RM on next heartbeat
626       float progress = (float) completedCount.get() /
627           containersToLaunch;
628       return progress;
629     }
630 
631     @Override
632     public void onError(Throwable e) {
633       done = true;
634       amRMClient.stop();
635     }
636   }
637 
638   /**
639    * CallbackHandler to process NM async calls
640    */
641   private class NMCallbackHandler implements NMClientAsync.CallbackHandler {
642     /** List of containers */
643     private ConcurrentMap<ContainerId, Container> containers =
644           new ConcurrentHashMap<ContainerId, Container>();
645 
646     /**
647      * Add a container
648      * @param containerId id of container
649      * @param container container object
650      * @return
651      */
652     public void addContainer(ContainerId containerId, Container container) {
653       containers.putIfAbsent(containerId, container);
654     }
655 
656     @Override
657     public void onContainerStopped(ContainerId containerId) {
658       if (LOG.isDebugEnabled()) {
659         LOG.debug("Succeeded to stop Container " + containerId);
660       }
661       containers.remove(containerId);
662     }
663 
664     @Override
665     public void onContainerStatusReceived(ContainerId containerId,
666         ContainerStatus containerStatus) {
667       if (LOG.isDebugEnabled()) {
668         LOG.debug("Container Status: id=" + containerId + ", status=" +
669             containerStatus);
670       }
671     }
672 
673     @Override
674     public void onContainerStarted(ContainerId containerId,
675         Map<String, ByteBuffer> allServiceResponse) {
676       if (LOG.isDebugEnabled()) {
677         LOG.debug("Succeeded to start Container " + containerId);
678       }
679       Container container = containers.get(containerId);
680       if (container != null) {
681         nmClientAsync.getContainerStatusAsync(containerId,
682           container.getNodeId());
683       }
684     }
685 
686     @Override
687     public void onStartContainerError(ContainerId containerId, Throwable t) {
688       LOG.error("Failed to start Container " + containerId, t);
689       containers.remove(containerId);
690     }
691 
692     @Override
693     public void onGetContainerStatusError(
694       ContainerId containerId, Throwable t) {
695       LOG.error("Failed to query the status of Container " + containerId, t);
696     }
697 
698     @Override
699     public void onStopContainerError(ContainerId containerId, Throwable t) {
700       LOG.error("Failed to stop Container " + containerId);
701       containers.remove(containerId);
702     }
703   }
704 }