View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.scripting;
19  
20  import org.apache.giraph.conf.JsonStringConfOption;
21  import org.apache.giraph.graph.Language;
22  import org.apache.giraph.jython.JythonUtils;
23  import org.apache.hadoop.conf.Configuration;
24  import org.apache.hadoop.fs.Path;
25  import org.apache.log4j.Logger;
26  import org.codehaus.jackson.type.TypeReference;
27  
28  import com.google.common.base.Optional;
29  import com.google.common.collect.Lists;
30  import com.google.common.io.Closeables;
31  
32  import java.io.BufferedInputStream;
33  import java.io.FileInputStream;
34  import java.io.IOException;
35  import java.io.InputStream;
36  import java.util.List;
37  
38  import static org.apache.giraph.utils.DistributedCacheUtils.getLocalCacheFile;
39  
40  /**
41   * Loads scripts written by user in other languages, for example Jython.
42   */
43  public class ScriptLoader {
44    /** Option for scripts to load on workers */
45    public static final JsonStringConfOption SCRIPTS_TO_LOAD =
46        new JsonStringConfOption("giraph.scripts.to.load",
47            "Scripts to load on workers");
48  
49    /** Scripts that were loaded */
50    private static final List<DeployedScript> LOADED_SCRIPTS =
51        Lists.newArrayList();
52  
53    /** Logger */
54    private static final Logger LOG = Logger.getLogger(ScriptLoader.class);
55  
56    /** Don't construct */
57    private ScriptLoader() { }
58  
59    /**
60     * Deploy a script
61     *
62     * @param conf {@link Configuration}
63     * @param scriptPath Path to script
64     * @param deployType type of deployment
65     * @param language programming language
66     */
67    public static void setScriptsToLoad(Configuration conf,
68        String scriptPath, DeployType deployType, Language language) {
69      DeployedScript deployedScript = new DeployedScript(scriptPath,
70          deployType, language);
71      setScriptsToLoad(conf, deployedScript);
72    }
73  
74    /**
75     * Deploy pair of scripts
76     *
77     * @param conf {@link Configuration}
78     * @param script1 Path to script
79     * @param deployType1 type of deployment
80     * @param language1 programming language
81     * @param script2 Path to script
82     * @param deployType2 type of deployment
83     * @param language2 programming language
84     */
85    public static void setScriptsToLoad(Configuration conf,
86        String script1, DeployType deployType1, Language language1,
87        String script2, DeployType deployType2, Language language2) {
88      DeployedScript deployedScript1 = new DeployedScript(script1,
89          deployType1, language1);
90      DeployedScript deployedScript2 = new DeployedScript(script2,
91          deployType2, language2);
92      setScriptsToLoad(conf, deployedScript1, deployedScript2);
93    }
94  
95    /**
96     * Deploy scripts
97     *
98     * @param conf Configuration
99     * @param scripts the scripts to deploy
100    */
101   public static void setScriptsToLoad(Configuration conf,
102       DeployedScript... scripts) {
103     List<DeployedScript> scriptsToLoad = Lists.newArrayList(scripts);
104     SCRIPTS_TO_LOAD.set(conf, scriptsToLoad);
105   }
106 
107   /**
108    * Add a script to load on workers
109    *
110    * @param conf {@link Configuration}
111    * @param script  Path to script
112    * @param deployType type of deployment
113    * @param language programming language
114    */
115   public static void addScriptToLoad(Configuration conf,
116       String script, DeployType deployType, Language language) {
117     addScriptToLoad(conf, new DeployedScript(script, deployType, language));
118   }
119 
120   /**
121    * Add a script to load on workers
122    *
123    * @param conf {@link Configuration}
124    * @param script the script to load
125    */
126   public static void addScriptToLoad(Configuration conf,
127       DeployedScript script) {
128     List<DeployedScript> scriptsToLoad = getScriptsToLoad(conf);
129     if (scriptsToLoad == null) {
130       scriptsToLoad = Lists.<DeployedScript>newArrayList();
131     }
132     scriptsToLoad.add(script);
133     SCRIPTS_TO_LOAD.set(conf, scriptsToLoad);
134   }
135 
136   /**
137    * Get the list of scripts to load on workers
138    *
139    * @param conf {@link Configuration}
140    * @return list of {@link DeployedScript}s
141    */
142   public static List<DeployedScript> getScriptsToLoad(Configuration conf) {
143     TypeReference<List<DeployedScript>> jsonType =
144         new TypeReference<List<DeployedScript>>() { };
145     return SCRIPTS_TO_LOAD.get(conf, jsonType);
146   }
147 
148   /**
149    * Load all the scripts deployed in Configuration
150    *
151    * @param conf Configuration
152    */
153   public static void loadScripts(Configuration conf) throws IOException {
154     List<DeployedScript> deployedScripts = getScriptsToLoad(conf);
155     if (deployedScripts == null) {
156       return;
157     }
158     for (DeployedScript deployedScript : deployedScripts) {
159       loadScript(conf, deployedScript);
160     }
161   }
162 
163   /**
164    * Load a single deployed script
165    *
166    * @param conf Configuration
167    * @param deployedScript the deployed script
168    */
169   public static void loadScript(Configuration conf,
170       DeployedScript deployedScript) throws IOException {
171     InputStream stream = openScriptInputStream(conf, deployedScript);
172     switch (deployedScript.getLanguage()) {
173     case JYTHON:
174       loadJythonScript(stream);
175       break;
176     default:
177       LOG.fatal("Don't know how to load script " + deployedScript);
178       throw new IllegalStateException("Don't know how to load script " +
179           deployedScript);
180     }
181 
182     LOADED_SCRIPTS.add(deployedScript);
183     Closeables.close(stream, true);
184   }
185 
186   /**
187    * Load a Jython deployed script
188    *
189    * @param stream InputStream with Jython code to load
190    */
191   private static void loadJythonScript(InputStream stream) {
192     JythonUtils.getInterpreter().execfile(stream);
193   }
194 
195   /**
196    * Get list of scripts already loaded.
197    *
198    * @return list of loaded scripts
199    */
200   public static List<DeployedScript> getLoadedScripts() {
201     return LOADED_SCRIPTS;
202   }
203 
204   /**
205    * Get an {@link java.io.InputStream} for the deployed script.
206    *
207    * @param conf Configuration
208    * @param deployedScript the deployed script
209    * @return {@link java.io.InputStream} for reading script
210    */
211   private static InputStream openScriptInputStream(Configuration conf,
212       DeployedScript deployedScript) {
213     DeployType deployType = deployedScript.getDeployType();
214     String path = deployedScript.getPath();
215 
216     InputStream stream;
217     switch (deployType) {
218     case RESOURCE:
219       if (LOG.isInfoEnabled()) {
220         LOG.info("getScriptStream: Reading script from resource at " +
221             deployedScript.getPath());
222       }
223       stream = ScriptLoader.class.getClassLoader().getResourceAsStream(path);
224       if (stream == null) {
225         throw new IllegalStateException("getScriptStream: Failed to " +
226             "open script from resource at " + path);
227       }
228       break;
229     case DISTRIBUTED_CACHE:
230       if (LOG.isInfoEnabled()) {
231         LOG.info("getScriptStream: Reading script from DistributedCache at " +
232             path);
233       }
234       Optional<Path> localPath = getLocalCacheFile(conf, path);
235       if (!localPath.isPresent()) {
236         throw new IllegalStateException("getScriptStream: Failed to " +
237             "find script in local DistributedCache matching " + path);
238       }
239       String pathStr = localPath.get().toString();
240       try {
241         stream = new BufferedInputStream(new FileInputStream(pathStr));
242       } catch (IOException e) {
243         throw new IllegalStateException("getScriptStream: Failed open " +
244             "script from DistributedCache at " + localPath);
245       }
246       break;
247     default:
248       throw new IllegalArgumentException("getScriptStream: Unknown " +
249           "script deployment type: " + deployType);
250     }
251     return stream;
252   }
253 }