View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.yarn;
19  
20  import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR;
21  
22  import com.google.common.collect.ImmutableList;
23  import com.google.common.collect.Maps;
24  
25  import com.google.common.collect.Sets;
26  import java.util.Set;
27  import org.apache.giraph.conf.GiraphConfiguration;
28  import org.apache.giraph.conf.GiraphConstants;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.fs.FileStatus;
31  import org.apache.hadoop.fs.FileSystem;
32  import org.apache.hadoop.fs.Path;
33  import org.apache.hadoop.io.DataOutputBuffer;
34  import org.apache.hadoop.security.Credentials;
35  import org.apache.hadoop.security.UserGroupInformation;
36  import org.apache.hadoop.security.token.Token;
37  import org.apache.hadoop.yarn.api.ApplicationConstants;
38  import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
39  import org.apache.hadoop.yarn.api.records.ApplicationId;
40  import org.apache.hadoop.yarn.api.records.ApplicationReport;
41  import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
42  import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
43  import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
44  import org.apache.hadoop.yarn.api.records.LocalResource;
45  import org.apache.hadoop.yarn.api.records.NodeReport;
46  import org.apache.hadoop.yarn.api.records.NodeState;
47  import org.apache.hadoop.yarn.api.records.Resource;
48  import org.apache.hadoop.yarn.api.records.YarnApplicationState;
49  import org.apache.hadoop.yarn.client.api.YarnClient;
50  import org.apache.hadoop.yarn.client.api.YarnClientApplication;
51  import org.apache.hadoop.yarn.conf.YarnConfiguration;
52  import org.apache.hadoop.yarn.exceptions.YarnException;
53  import org.apache.hadoop.yarn.util.Records;
54  
55  import org.apache.log4j.Logger;
56  
57  import java.io.IOException;
58  import java.util.List;
59  import java.util.Map;
60  import java.nio.ByteBuffer;
61  
62  /**
63   * The initial launcher for a YARN-based Giraph job. This class attempts to
64   * configure and send a request to the ResourceManager for a single
65   * application container to host GiraphApplicationMaster. The RPC connection
66   * between the RM and GiraphYarnClient is the YARN ApplicationManager.
67   */
68  public class GiraphYarnClient {
69    static {
70      Configuration.addDefaultResource("giraph-site.xml");
71    }
72    /** Class logger */
73    private static final Logger LOG = Logger.getLogger(GiraphYarnClient.class);
74    /** Sleep time between silent progress checks */
75    private static final int JOB_STATUS_INTERVAL_MSECS = 800;
76    /** Memory (in MB) to allocate for our ApplicationMaster container */
77    private static final int YARN_APP_MASTER_MEMORY_MB = 512;
78  
79    /** human-readable job name */
80    private final String jobName;
81    /** Helper configuration from the job */
82    private final GiraphConfiguration giraphConf;
83    /** ApplicationId object (needed for RPC to ResourceManager) */
84    private ApplicationId appId;
85    /** # of sleeps between progress reports to client */
86    private int reportCounter;
87    /** Yarn client object */
88    private YarnClient yarnClient;
89  
90    /**
91     * Constructor. Requires caller to hand us a GiraphConfiguration.
92     *
93     * @param giraphConf User-defined configuration
94     * @param jobName User-defined job name
95     */
96    public GiraphYarnClient(GiraphConfiguration giraphConf, String jobName)
97      throws IOException {
98      this.reportCounter = 0;
99      this.jobName = jobName;
100     this.appId = null; // can't set this until after start()
101     this.giraphConf = giraphConf;
102     verifyOutputDirDoesNotExist();
103     yarnClient = YarnClient.createYarnClient();
104     yarnClient.init(giraphConf);
105   }
106 
107   /**
108    * Submit a request to the Hadoop YARN cluster's ResourceManager
109    * to obtain an application container. This will run our ApplicationMaster,
110    * which will in turn request app containers for Giraphs' master and all
111    * worker tasks.
112    * @param verbose Not implemented yet, to provide compatibility w/GiraphJob
113    * @return true if job is successful
114    */
115   public boolean run(final boolean verbose) throws YarnException, IOException {
116     // init our connection to YARN ResourceManager RPC
117     LOG.info("Running Client");
118     yarnClient.start();
119     // request an application id from the RM
120  // Get a new application id
121     YarnClientApplication app = yarnClient.createApplication();
122     GetNewApplicationResponse getNewAppResponse = app.
123       getNewApplicationResponse();
124     checkPerNodeResourcesAvailable(getNewAppResponse);
125     // configure our request for an exec container for GiraphApplicationMaster
126     ApplicationSubmissionContext appContext = app.
127       getApplicationSubmissionContext();
128     appId = appContext.getApplicationId();
129     //createAppSubmissionContext(appContext);
130     appContext.setApplicationId(appId);
131     appContext.setApplicationName(jobName);
132     LOG.info("Obtained new Application ID: " + appId);
133     // sanity check
134     applyConfigsForYarnGiraphJob();
135 
136     ContainerLaunchContext containerContext = buildContainerLaunchContext();
137     appContext.setResource(buildContainerMemory());
138     appContext.setAMContainerSpec(containerContext);
139     LOG.info("ApplicationSumbissionContext for GiraphApplicationMaster " +
140       "launch container is populated.");
141     //TODO: priority and queue
142     // Set the priority for the application master
143     //Priority pri = Records.newRecord(Priority.class);
144     // TODO - what is the range for priority? how to decide?
145     //pri.setPriority(amPriority);
146     //appContext.setPriority(pri);
147 
148     // Set the queue to which this application is to be submitted in the RM
149     //appContext.setQueue(amQueue);
150 
151    // make the request, blow up if fail, loop and report job progress if not
152     try {
153       LOG.info("Submitting application to ASM");
154       // obtain an "updated copy" of the appId for status checks/job kill later
155       appId = yarnClient.submitApplication(appContext);
156       LOG.info("Got new appId after submission :" + appId);
157     } catch (YarnException yre) {
158       // TODO
159       // Try submitting the same request again
160       // app submission failure?
161       throw new RuntimeException("submitApplication(appContext) FAILED.", yre);
162     }
163     LOG.info("GiraphApplicationMaster container request was submitted to " +
164       "ResourceManager for job: " + jobName);
165     return awaitGiraphJobCompletion();
166   }
167 
168   /**
169    * Without Hadoop MR to check for us, make sure the output dir doesn't exist!
170    */
171   private void verifyOutputDirDoesNotExist() {
172     Path outDir = null;
173     try {
174       FileSystem fs = FileSystem.get(giraphConf);
175       String errorMsg = "__ERROR_NO_OUTPUT_DIR_SET__";
176       outDir =
177         new Path(fs.getHomeDirectory(), giraphConf.get(OUTDIR, errorMsg));
178       FileStatus outStatus = fs.getFileStatus(outDir);
179       if (outStatus.isDirectory() || outStatus.isFile() ||
180         outStatus.isSymlink()) {
181         throw new IllegalStateException("Path " + outDir + " already exists.");
182       }
183     } catch (IOException ioe) {
184       LOG.info("Final output path is: " + outDir);
185     }
186   }
187 
188   /**
189    * Configuration settings we need to customize for a Giraph on YARN
190    * job. We need to call this EARLY in the job, before the GiraphConfiguration
191    * is exported to HDFS for localization in each task container.
192    */
193   private void applyConfigsForYarnGiraphJob() {
194     GiraphConstants.IS_PURE_YARN_JOB.set(giraphConf, true);
195     GiraphConstants.SPLIT_MASTER_WORKER.set(giraphConf, true);
196     giraphConf.set("mapred.job.id", "giraph_yarn_" + appId); // ZK app base path
197   }
198 
199   /**
200    * Utility to make sure we have the cluster resources we need to run this
201    * job. If they are not available, we should die here before too much setup.
202    * @param cluster the GetNewApplicationResponse from the YARN RM.
203    */
204   private void checkPerNodeResourcesAvailable(
205     final GetNewApplicationResponse cluster) throws YarnException, IOException {
206     // are there enough containers to go around for our Giraph job?
207     List<NodeReport> nodes = null;
208     long totalAvailable = 0;
209     try {
210       nodes = yarnClient.getNodeReports(NodeState.RUNNING);
211     } catch (YarnException yre) {
212       throw new RuntimeException("GiraphYarnClient could not connect with " +
213         "the YARN ResourceManager to determine the number of available " +
214         "application containers.", yre);
215     }
216     for (NodeReport node : nodes) {
217       LOG.info("Got node report from ASM for" +
218         ", nodeId=" + node.getNodeId() +
219         ", nodeAddress " + node.getHttpAddress() +
220         ", nodeRackName " + node.getRackName() +
221         ", nodeNumContainers " + node.getNumContainers());
222       totalAvailable += node.getCapability().getMemory();
223     }
224     // 1 master + all workers in -w command line arg
225     final int workers = giraphConf.getMaxWorkers() + 1;
226     checkAndAdjustPerTaskHeapSize(cluster);
227     final long totalAsk =
228       giraphConf.getYarnTaskHeapMb() * workers;
229     if (totalAsk > totalAvailable) {
230       throw new IllegalStateException("Giraph's estimated cluster heap " +
231         totalAsk + "MB ask is greater than the current available cluster " +
232         "heap of " + totalAvailable + "MB. Aborting Job.");
233     }
234   }
235 
236   /**
237    * Adjust the user-supplied <code>-yh</code> and <code>-w</code>
238    * settings if they are too small or large for the current cluster,
239    * and re-record the new settings in the GiraphConfiguration for export.
240    * @param gnar the GetNewAppResponse from the YARN ResourceManager.
241    */
242   private void checkAndAdjustPerTaskHeapSize(
243     final GetNewApplicationResponse gnar) {
244     // do we have the right heap size on these cluster nodes to run our job?
245     //TODO:
246     //final int minCapacity = gnar.getMinimumResourceCapability().getMemory();
247     final int maxCapacity = gnar.getMaximumResourceCapability().getMemory();
248     // make sure heap size is OK for this cluster's available containers
249     int giraphMem = giraphConf.getYarnTaskHeapMb();
250     if (giraphMem == GiraphConstants.GIRAPH_YARN_TASK_HEAP_MB_DEFAULT) {
251       LOG.info("Defaulting per-task heap size to " + giraphMem + "MB.");
252     }
253     if (giraphMem > maxCapacity) {
254       LOG.info("Giraph's request of heap MB per-task is more than the " +
255         "minimum; downgrading Giraph to" + maxCapacity + "MB.");
256       giraphMem = maxCapacity;
257     }
258     /*if (giraphMem < minCapacity) { //TODO:
259       LOG.info("Giraph's request of heap MB per-task is less than the " +
260         "minimum; upgrading Giraph to " + minCapacity + "MB.");
261       giraphMem = minCapacity;
262     }*/
263     giraphConf.setYarnTaskHeapMb(giraphMem); // record any changes made
264   }
265 
266   /**
267    * Kill time for the client, report progress occasionally, and otherwise
268    * just sleep and wait for the job to finish. If no AM response, kill the app.
269    * @return true if job run is successful.
270    */
271   private boolean awaitGiraphJobCompletion() throws YarnException, IOException {
272     boolean done;
273     ApplicationReport report = null;
274     try {
275       do {
276         try {
277           Thread.sleep(JOB_STATUS_INTERVAL_MSECS);
278         } catch (InterruptedException ir) {
279           LOG.info("Progress reporter's sleep was interrupted!", ir);
280         }
281         report = yarnClient.getApplicationReport(appId);
282         done = checkProgress(report);
283       } while (!done);
284       if (!giraphConf.metricsEnabled()) {
285         cleanupJarCache();
286       }
287     } catch (IOException ex) {
288       final String diagnostics = (null == report) ? "" :
289         "Diagnostics: " + report.getDiagnostics();
290       LOG.error("Fatal fault encountered, failing " + jobName + ". " +
291         diagnostics, ex);
292       try {
293         LOG.error("FORCIBLY KILLING Application from AppMaster.");
294         yarnClient.killApplication(appId);
295       } catch (YarnException yre) {
296         LOG.error("Exception raised in attempt to kill application.", yre);
297       }
298       return false;
299     }
300     return printFinalJobReport();
301   }
302 
303   /**
304    * Deletes the HDFS cache in YARN, which replaces DistributedCache of Hadoop.
305    * If metrics are enabled this will not get called (so you can examine cache.)
306    * @throws IOException if bad things happen.
307    */
308   private void cleanupJarCache() throws IOException {
309     FileSystem fs = FileSystem.get(giraphConf);
310     Path baseCacheDir = YarnUtils.getFsCachePath(fs, appId);
311     if (fs.exists(baseCacheDir)) {
312       LOG.info("Cleaning up HDFS distributed cache directory for Giraph job.");
313       fs.delete(baseCacheDir, true); // stuff inside
314       fs.delete(baseCacheDir, false); // dir itself
315     }
316   }
317 
318   /**
319    * Print final formatted job report for local client that initiated this run.
320    * @return true for app success, false for failure.
321    */
322   private boolean printFinalJobReport() throws YarnException, IOException {
323     ApplicationReport report;
324     try {
325       report = yarnClient.getApplicationReport(appId);
326       FinalApplicationStatus finalAppStatus =
327         report.getFinalApplicationStatus();
328       final long secs =
329         (report.getFinishTime() - report.getStartTime()) / 1000L;
330       final String time = String.format("%d minutes, %d seconds.",
331         secs / 60L, secs % 60L);
332       LOG.info("Completed " + jobName + ": " +
333         finalAppStatus.name() + ", total running time: " + time);
334     } catch (YarnException yre) {
335       LOG.error("Exception encountered while attempting to request " +
336         "a final job report for " + jobName , yre);
337       return false;
338     }
339     return true;
340   }
341 
342   /**
343    * Compose the ContainerLaunchContext for the Application Master.
344    * @return the CLC object populated and configured.
345    */
346   private ContainerLaunchContext buildContainerLaunchContext()
347     throws IOException {
348     ContainerLaunchContext appMasterContainer =
349       Records.newRecord(ContainerLaunchContext.class);
350     appMasterContainer.setEnvironment(buildEnvironment());
351     appMasterContainer.setLocalResources(buildLocalResourceMap());
352     appMasterContainer.setCommands(buildAppMasterExecCommand());
353     //appMasterContainer.setResource(buildContainerMemory());
354     //appMasterContainer.setUser(ApplicationConstants.Environment.USER.name());
355     setToken(appMasterContainer);
356     return appMasterContainer;
357   }
358 
359   /**
360    * Set delegation tokens for AM container
361    * @param amContainer AM container
362    * @return
363    */
364   private void setToken(ContainerLaunchContext amContainer) throws IOException {
365     // Setup security tokens
366     if (UserGroupInformation.isSecurityEnabled()) {
367       Credentials credentials = new Credentials();
368       String tokenRenewer = giraphConf.get(YarnConfiguration.RM_PRINCIPAL);
369       if (tokenRenewer == null || tokenRenewer.length() == 0) {
370         throw new IOException(
371           "Can't get Master Kerberos principal for the RM to use as renewer");
372       }
373       FileSystem fs = FileSystem.get(giraphConf);
374       // For now, only getting tokens for the default file-system.
375       final Token<?> [] tokens =
376         fs.addDelegationTokens(tokenRenewer, credentials);
377       if (tokens != null) {
378         for (Token<?> token : tokens) {
379           LOG.info("Got dt for " + fs.getUri() + "; " + token);
380         }
381       }
382       DataOutputBuffer dob = new DataOutputBuffer();
383       credentials.writeTokenStorageToStream(dob);
384       ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
385       amContainer.setTokens(fsTokens);
386     }
387   }
388 
389   /**
390    * Assess whether job is already finished/failed and 'done' flag needs to be
391    * set, prints progress display for client if all is going well.
392    * @param report the application report to assess.
393    * @return true if job report indicates the job run is over.
394    */
395   private boolean checkProgress(final ApplicationReport report) {
396     YarnApplicationState jobState = report.getYarnApplicationState();
397     if (jobState == YarnApplicationState.FINISHED ||
398       jobState == YarnApplicationState.KILLED) {
399       return true;
400     } else if (jobState == YarnApplicationState.FAILED) {
401       LOG.error(jobName + " reports FAILED state, diagnostics show: " +
402         report.getDiagnostics());
403       return true;
404     } else {
405       if (reportCounter++ % 5 == 0) {
406         displayJobReport(report);
407       }
408     }
409     return false;
410   }
411 
412   /**
413    * Display a formatted summary of the job progress report from the AM.
414    * @param report the report to display.
415    */
416   private void displayJobReport(final ApplicationReport report) {
417     if (null == report) {
418       throw new IllegalStateException("[*] Latest ApplicationReport for job " +
419         jobName + " was not received by the local client.");
420     }
421     final float elapsed =
422       (System.currentTimeMillis() - report.getStartTime()) / 1000.0f;
423     LOG.info(jobName + ", Elapsed: " + String.format("%.2f secs", elapsed));
424     LOG.info(report.getCurrentApplicationAttemptId() + ", State: " +
425       report.getYarnApplicationState().name() + ", Containers used: " +
426       report.getApplicationResourceUsageReport().getNumUsedContainers());
427   }
428 
429   /**
430    * Utility to produce the command line to activate the AM from the shell.
431    * @return A <code>List<String></code> of shell commands to execute in
432    *         the container allocated to us by the RM to host our App Master.
433    */
434   private List<String> buildAppMasterExecCommand() {
435     // 'gam-' prefix is for GiraphApplicationMaster in log file names
436     return ImmutableList.of("${JAVA_HOME}/bin/java " +
437       "-Xmx" + YARN_APP_MASTER_MEMORY_MB + "M " +
438       "-Xms" + YARN_APP_MASTER_MEMORY_MB + "M " + // TODO: REMOVE examples jar!
439       //TODO: Make constant
440       "-cp .:${CLASSPATH} org.apache.giraph.yarn.GiraphApplicationMaster " +
441       "1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/gam-stdout.log " +
442       "2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/gam-stderr.log "
443     );
444   }
445 
446   /**
447    * Register all local jar files from GiraphConstants.GIRAPH_YARN_LIBJARS
448    * in the LocalResources map, copy to HDFS on that same registered path.
449    * @param map the LocalResources list to populate.
450    */
451   private void addLocalJarsToResourceMap(Map<String, LocalResource> map)
452     throws IOException {
453     Set<String> jars = Sets.newHashSet();
454     LOG.info("LIB JARS :" + giraphConf.getYarnLibJars());
455     String[] libJars = giraphConf.getYarnLibJars().split(",");
456     for (String libJar : libJars) {
457       jars.add(libJar);
458     }
459     FileSystem fs = FileSystem.get(giraphConf);
460     Path baseDir = YarnUtils.getFsCachePath(fs, appId);
461     for (Path jar : YarnUtils.getLocalFiles(jars)) {
462       Path dest = new Path(baseDir, jar.getName());
463       LOG.info("Made local resource for :" + jar + " to " +  dest);
464       fs.copyFromLocalFile(false, true, jar, dest);
465       YarnUtils.addFileToResourceMap(map, fs, dest);
466     }
467   }
468 
469   /**
470    * Construct the memory requirements for the AppMaster's container request.
471    * @return A Resource that wraps the memory request.
472    */
473   private Resource buildContainerMemory() {
474     Resource capability = Records.newRecord(Resource.class);
475     capability.setMemory(YARN_APP_MASTER_MEMORY_MB); //Configurable thru CLI?
476     return capability;
477   }
478 
479   /**
480    * Create the mapping of environment vars that will be visible to the
481    * ApplicationMaster in its remote app container.
482    * @return a map of environment vars to set up for the AppMaster.
483    */
484   private Map<String, String> buildEnvironment() {
485     Map<String, String> environment =
486       Maps.<String, String>newHashMap();
487     LOG.info("Set the environment for the application master");
488     YarnUtils.addLocalClasspathToEnv(environment, giraphConf);
489     //TODO: add the runtime classpath needed for tests to work
490     LOG.info("Environment for AM :" + environment);
491     return environment;
492   }
493 
494   /**
495    * Create the mapping of files and JARs to send to the GiraphApplicationMaster
496    * and from there on to the Giraph tasks.
497    * @return the map of jars to local resource paths for transport
498    *   to the host container that will run our AppMaster.
499    */
500   private Map<String, LocalResource> buildLocalResourceMap() {
501     // set local resources for the application master
502     // local files or archives as needed
503     // In this scenario, the jar file for the application master
504     //is part of the local resources
505     Map<String, LocalResource> localResources =
506         Maps.<String, LocalResource>newHashMap();
507     LOG.info("buildLocalResourceMap ....");
508     try {
509       // export the GiraphConfiguration to HDFS for localization to remote tasks
510       //Ques: Merge the following two method
511       YarnUtils.exportGiraphConfiguration(giraphConf, appId);
512       YarnUtils.addGiraphConfToLocalResourceMap(
513         giraphConf, appId, localResources);
514       // add jars from '-yj' cmd-line arg to resource map for localization
515       addLocalJarsToResourceMap(localResources);
516       //TODO: log4j?
517       return localResources;
518     } catch (IOException ioe) {
519       throw new IllegalStateException("Failed to build LocalResouce map.", ioe);
520     }
521   }
522 
523 }