This project has retired. For details please refer to its Attic page.
YarnUtils xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.yarn;
19  
20  import com.google.common.collect.Sets;
21  import java.io.FileOutputStream;
22  import java.util.Set;
23  import org.apache.giraph.conf.GiraphConfiguration;
24  import org.apache.giraph.conf.GiraphConstants;
25  import org.apache.hadoop.fs.FileStatus;
26  import org.apache.hadoop.fs.FileSystem;
27  import org.apache.hadoop.fs.Path;
28  import org.apache.hadoop.mapreduce.MRJobConfig;
29  import org.apache.hadoop.yarn.api.records.ApplicationId;
30  import org.apache.hadoop.yarn.api.records.LocalResource;
31  import org.apache.hadoop.yarn.api.records.LocalResourceType;
32  import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
33  import org.apache.hadoop.yarn.conf.YarnConfiguration;
34  import org.apache.hadoop.yarn.util.ConverterUtils;
35  import org.apache.hadoop.yarn.util.Records;
36  import org.apache.hadoop.util.StringUtils;
37  import java.io.File;
38  import java.io.IOException;
39  import java.util.Map;
40  import org.apache.log4j.Logger;
41  
42  /**
43   * Utilities that can only compile with versions of Hadoop that support YARN,
44   * so they live here instead of o.a.g.utils package.
45   */
46  public class YarnUtils {
47    /** Class Logger */
48    private static final Logger LOG = Logger.getLogger(YarnUtils.class);
49    /** Default dir on HDFS (or equivalent) where LocalResources are stored */
50    private static final String HDFS_RESOURCE_DIR = "giraph_yarn_jar_cache";
51  
52    /** Private constructor, this is a utility class only */
53    private YarnUtils() { /* no-op */ }
54  
55    /**
56     * Populates the LocalResources list with the HDFS paths listed in
57     * the conf under GiraphConstants.GIRAPH_YARN_LIBJARS, and the
58     * GiraphConfiguration for this job. Also adds the Giraph default application
59     * jar as determined by GiraphYarnClient.GIRAPH_CLIENT_JAR constant.
60     * @param map the LocalResources list to populate.
61     * @param giraphConf the configuration to use to select jars to include.
62     * @param appId the ApplicationId, naming the the HDFS base dir for job jars.
63     */
64    public static void addFsResourcesToMap(Map<String, LocalResource> map,
65      GiraphConfiguration giraphConf, ApplicationId appId) throws IOException {
66      FileSystem fs = FileSystem.get(giraphConf);
67      Path baseDir = YarnUtils.getFsCachePath(fs, appId);
68      boolean coreJarFound = false;
69      for (String fileName : giraphConf.getYarnLibJars().split(",")) {
70        if (fileName.length() > 0) {
71          Path filePath = new Path(baseDir, fileName);
72          LOG.info("Adding " + fileName + " to LocalResources for export.to " +
73            filePath);
74          if (fileName.contains("giraph-core")) {
75            coreJarFound = true;
76          }
77          addFileToResourceMap(map, fs, filePath);
78        }
79      }
80      if (!coreJarFound) { // OK if you are running giraph-examples-jar-with-deps
81        LOG.warn("Job jars (-yj option) didn't include giraph-core.");
82      }
83      Path confPath = new Path(baseDir, GiraphConstants.GIRAPH_YARN_CONF_FILE);
84      addFileToResourceMap(map, fs, confPath);
85    }
86  
87    /**
88     * Utility function to locate local JAR files and other resources
89     * recursively in the dirs on the local CLASSPATH. Once all the files
90     * named in <code>fileNames</code> are found, we stop and return the results.
91     * @param fileNames the file name of the jars, without path information.
92     * @return a set of Paths to the jar files requested in fileNames.
93     */
94    public static Set<Path> getLocalFiles(final Set<String> fileNames) {
95      Set<Path> jarPaths = Sets.newHashSet();
96      String classPath = ".:" + System.getenv("HADOOP_HOME");
97      if (classPath.length() > 2) {
98        classPath += ":";
99      }
100     classPath += System.getenv("CLASSPATH");
101     for (String baseDir : classPath.split(":")) {
102       LOG.info("Class path name " + baseDir);
103       if (baseDir.length() > 0) {
104         // lose the globbing chars that will fail in File#listFiles
105         final int lastFileSep = baseDir.lastIndexOf("/");
106         if (lastFileSep > 0) {
107           String test = baseDir.substring(lastFileSep);
108           if (test.contains("*")) {
109             baseDir = baseDir.substring(0, lastFileSep);
110           }
111         }
112         LOG.info("base path checking " + baseDir);
113         populateJarList(new File(baseDir), jarPaths, fileNames);
114       }
115       if (jarPaths.size() >= fileNames.size()) {
116         break; // found a resource for each name in the input set, all done
117       }
118     }
119     return jarPaths;
120   }
121 
122   /**
123    * Start in the working directory and recursively locate all jars.
124    * @param dir current directory to explore.
125    * @param fileSet the list to populate.
126    * @param fileNames file names to locate.
127    */
128   private static void populateJarList(final File dir,
129     final Set<Path> fileSet, final Set<String> fileNames) {
130     File[] filesInThisDir = dir.listFiles();
131     if (null == filesInThisDir) {
132       return;
133     }
134     for (File f : dir.listFiles()) {
135       if (f.isDirectory()) {
136         populateJarList(f, fileSet, fileNames);
137       } else if (f.isFile() && fileNames.contains(f.getName())) {
138         fileSet.add(new Path(f.getAbsolutePath()));
139       }
140     }
141   }
142 
143   /**
144    * Boilerplate to add a file to the local resources..
145    * @param localResources the LocalResources map to populate.
146    * @param fs handle to the HDFS file system.
147    * @param target the file to send to the remote container.
148    */
149   public static void addFileToResourceMap(Map<String, LocalResource>
150     localResources, FileSystem fs, Path target)
151     throws IOException {
152     LocalResource resource = Records.newRecord(LocalResource.class);
153     FileStatus destStatus = fs.getFileStatus(target);
154     resource.setResource(ConverterUtils.getYarnUrlFromURI(target.toUri()));
155     resource.setSize(destStatus.getLen());
156     resource.setTimestamp(destStatus.getModificationTime());
157     resource.setType(LocalResourceType.FILE); // use FILE, even for jars!
158     resource.setVisibility(LocalResourceVisibility.APPLICATION);
159     localResources.put(target.getName(), resource);
160     LOG.info("Registered file in LocalResources :: " + target);
161   }
162 
163   /**
164    * Get the base HDFS dir we will be storing our LocalResources in.
165    * @param fs the file system.
166    * @param appId the ApplicationId under which our resources will be stored.
167    * @return the path
168    */
169   public static Path getFsCachePath(final FileSystem fs,
170     final ApplicationId appId) {
171     return new Path(fs.getHomeDirectory(), HDFS_RESOURCE_DIR + "/" + appId);
172   }
173 
174   /**
175    * Popuate the environment string map to be added to the environment vars
176    * in a remote execution container. Adds the local classpath to pick up
177    * "yarn-site.xml" and "mapred-site.xml" stuff.
178    * @param env the map of env var values.
179    * @param giraphConf the GiraphConfiguration to pull values from.
180    */
181   public static void addLocalClasspathToEnv(final Map<String, String> env,
182     final GiraphConfiguration giraphConf) {
183     StringBuilder classPathEnv = new StringBuilder("${CLASSPATH}:./*");
184     for (String cpEntry : giraphConf.getStrings(
185       YarnConfiguration.YARN_APPLICATION_CLASSPATH,
186       YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
187       classPathEnv.append(':').append(cpEntry.trim()); //TODO: Separator
188     }
189     for (String cpEntry : giraphConf.getStrings(
190       MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH,
191       StringUtils.getStrings(
192         MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH))) {
193       classPathEnv.append(':').append(cpEntry.trim());
194     }
195     // add the runtime classpath needed for tests to work
196     if (giraphConf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
197       classPathEnv.append(':').append(System.getenv("CLASSPATH"));
198     }
199     env.put("CLASSPATH", classPathEnv.toString());
200   }
201 
202   /**
203    * Populate the LocalResources list with the GiraphConf XML file's HDFS path.
204    * @param giraphConf the GiraphConfifuration to export for worker tasks.
205    * @param appId the ApplicationId for this YARN app.
206    * @param localResourceMap the LocalResource map of files to export to tasks.
207    */
208   public static void addGiraphConfToLocalResourceMap(GiraphConfiguration
209     giraphConf, ApplicationId appId, Map<String, LocalResource>
210     localResourceMap) throws IOException {
211     FileSystem fs = FileSystem.get(giraphConf);
212     Path hdfsConfPath = new Path(YarnUtils.getFsCachePath(fs, appId),
213       GiraphConstants.GIRAPH_YARN_CONF_FILE);
214     YarnUtils.addFileToResourceMap(localResourceMap, fs, hdfsConfPath);
215   }
216 
217   /**
218    * Export our populated GiraphConfiguration as an XML file to be used by the
219    * ApplicationMaster's exec container, and register it with LocalResources.
220    * @param giraphConf the current Configuration object to be published.
221    * @param appId the ApplicationId to stamp this app's base HDFS resources dir.
222    */
223   public static void exportGiraphConfiguration(GiraphConfiguration giraphConf,
224     ApplicationId appId) throws IOException {
225     File confFile = new File(System.getProperty("java.io.tmpdir"),
226       GiraphConstants.GIRAPH_YARN_CONF_FILE);
227     if (confFile.exists()) {
228       if (!confFile.delete()) {
229         LOG.warn("Unable to delete file " + confFile);
230       }
231     }
232     String localConfPath = confFile.getAbsolutePath();
233     FileOutputStream fos = null;
234     try {
235       fos = new FileOutputStream(localConfPath);
236       giraphConf.writeXml(fos);
237       FileSystem fs = FileSystem.get(giraphConf);
238       Path hdfsConfPath = new Path(YarnUtils.getFsCachePath(fs, appId),
239         GiraphConstants.GIRAPH_YARN_CONF_FILE);
240       fos.flush();
241       fs.copyFromLocalFile(false, true, new Path(localConfPath), hdfsConfPath);
242     } finally {
243       if (null != fos) {
244         fos.close();
245       }
246     }
247   }
248 }