This project has retired. For details please refer to its Attic page.
ScriptLoader xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.scripting;
19  
20  import org.apache.giraph.conf.JsonStringConfOption;
21  import org.apache.giraph.graph.Language;
22  import org.apache.giraph.jython.JythonUtils;
23  import org.apache.hadoop.conf.Configuration;
24  import org.apache.hadoop.fs.Path;
25  import org.apache.log4j.Logger;
26  import org.codehaus.jackson.type.TypeReference;
27  
28  import com.google.common.base.Optional;
29  import com.google.common.collect.Lists;
30  import com.google.common.io.Closeables;
31  
32  import java.io.BufferedInputStream;
33  import java.io.FileInputStream;
34  import java.io.IOException;
35  import java.io.InputStream;
36  import java.util.List;
37  
38  import static org.apache.giraph.utils.DistributedCacheUtils.getLocalCacheFile;
39  
40  /**
41   * Loads scripts written by user in other languages, for example Jython.
42   */
43  public class ScriptLoader {
44    /** Option for scripts to load on workers */
45    public static final JsonStringConfOption SCRIPTS_TO_LOAD =
46        new JsonStringConfOption("giraph.scripts.to.load",
47            "Scripts to load on workers");
48  
49    /** Scripts that were loaded */
50    private static final List<DeployedScript> LOADED_SCRIPTS =
51        Lists.newArrayList();
52  
53    /** Logger */
54    private static final Logger LOG = Logger.getLogger(ScriptLoader.class);
55  
56    /** Don't construct */
57    private ScriptLoader() { }
58  
59    /**
60     * Deploy a script
61     *
62     * @param conf {@link Configuration}
63     * @param scriptPath Path to script
64     * @param deployType type of deployment
65     * @param language programming language
66     */
67    public static void setScriptsToLoad(Configuration conf,
68        String scriptPath, DeployType deployType, Language language) {
69      DeployedScript deployedScript = new DeployedScript(scriptPath,
70          deployType, language);
71      setScriptsToLoad(conf, deployedScript);
72    }
73  
74    /**
75     * Deploy pair of scripts
76     *
77     * @param conf {@link Configuration}
78     * @param script1 Path to script
79     * @param deployType1 type of deployment
80     * @param language1 programming language
81     * @param script2 Path to script
82     * @param deployType2 type of deployment
83     * @param language2 programming language
84     */
85    public static void setScriptsToLoad(Configuration conf,
86        String script1, DeployType deployType1, Language language1,
87        String script2, DeployType deployType2, Language language2) {
88      DeployedScript deployedScript1 = new DeployedScript(script1,
89          deployType1, language1);
90      DeployedScript deployedScript2 = new DeployedScript(script2,
91          deployType2, language2);
92      setScriptsToLoad(conf, deployedScript1, deployedScript2);
93    }
94  
95    /**
96     * Deploy scripts
97     *
98     * @param conf Configuration
99     * @param scripts the scripts to deploy
100    */
101   public static void setScriptsToLoad(Configuration conf,
102       DeployedScript... scripts) {
103     List<DeployedScript> scriptsToLoad = Lists.newArrayList(scripts);
104     SCRIPTS_TO_LOAD.set(conf, scriptsToLoad);
105   }
106 
107   /**
108    * Add a script to load on workers
109    *
110    * @param conf {@link Configuration}
111    * @param script  Path to script
112    * @param deployType type of deployment
113    * @param language programming language
114    */
115   public static void addScriptToLoad(Configuration conf,
116       String script, DeployType deployType, Language language) {
117     addScriptToLoad(conf, new DeployedScript(script, deployType, language));
118   }
119 
120   /**
121    * Add a script to load on workers
122    *
123    * @param conf {@link Configuration}
124    * @param script the script to load
125    */
126   public static void addScriptToLoad(Configuration conf,
127       DeployedScript script) {
128     List<DeployedScript> scriptsToLoad = getScriptsToLoad(conf);
129     if (scriptsToLoad == null) {
130       scriptsToLoad = Lists.<DeployedScript>newArrayList();
131     }
132     scriptsToLoad.add(script);
133     SCRIPTS_TO_LOAD.set(conf, scriptsToLoad);
134   }
135 
136   /**
137    * Get the list of scripts to load on workers
138    *
139    * @param conf {@link Configuration}
140    * @return list of {@link DeployedScript}s
141    */
142   public static List<DeployedScript> getScriptsToLoad(Configuration conf) {
143     TypeReference<List<DeployedScript>> jsonType =
144         new TypeReference<List<DeployedScript>>() { };
145     return SCRIPTS_TO_LOAD.get(conf, jsonType);
146   }
147 
148   /**
149    * Load all the scripts deployed in Configuration
150    *
151    * @param conf Configuration
152    * @throws IOException
153    */
154   public static void loadScripts(Configuration conf) throws IOException {
155     List<DeployedScript> deployedScripts = getScriptsToLoad(conf);
156     if (deployedScripts == null) {
157       return;
158     }
159     for (DeployedScript deployedScript : deployedScripts) {
160       loadScript(conf, deployedScript);
161     }
162   }
163 
164   /**
165    * Load a single deployed script
166    *
167    * @param conf Configuration
168    * @param deployedScript the deployed script
169    * @throws IOException
170    */
171   public static void loadScript(Configuration conf,
172       DeployedScript deployedScript) throws IOException {
173     InputStream stream = openScriptInputStream(conf, deployedScript);
174     switch (deployedScript.getLanguage()) {
175     case JYTHON:
176       loadJythonScript(stream);
177       break;
178     default:
179       LOG.fatal("Don't know how to load script " + deployedScript);
180       throw new IllegalStateException("Don't know how to load script " +
181           deployedScript);
182     }
183 
184     LOADED_SCRIPTS.add(deployedScript);
185     Closeables.close(stream, true);
186   }
187 
188   /**
189    * Load a Jython deployed script
190    *
191    * @param stream InputStream with Jython code to load
192    */
193   private static void loadJythonScript(InputStream stream) {
194     JythonUtils.getInterpreter().execfile(stream);
195   }
196 
197   /**
198    * Get list of scripts already loaded.
199    *
200    * @return list of loaded scripts
201    */
202   public static List<DeployedScript> getLoadedScripts() {
203     return LOADED_SCRIPTS;
204   }
205 
206   /**
207    * Get an {@link java.io.InputStream} for the deployed script.
208    *
209    * @param conf Configuration
210    * @param deployedScript the deployed script
211    * @return {@link java.io.InputStream} for reading script
212    */
213   private static InputStream openScriptInputStream(Configuration conf,
214       DeployedScript deployedScript) {
215     DeployType deployType = deployedScript.getDeployType();
216     String path = deployedScript.getPath();
217 
218     InputStream stream;
219     switch (deployType) {
220     case RESOURCE:
221       if (LOG.isInfoEnabled()) {
222         LOG.info("getScriptStream: Reading script from resource at " +
223             deployedScript.getPath());
224       }
225       stream = ScriptLoader.class.getClassLoader().getResourceAsStream(path);
226       if (stream == null) {
227         throw new IllegalStateException("getScriptStream: Failed to " +
228             "open script from resource at " + path);
229       }
230       break;
231     case DISTRIBUTED_CACHE:
232       if (LOG.isInfoEnabled()) {
233         LOG.info("getScriptStream: Reading script from DistributedCache at " +
234             path);
235       }
236       Optional<Path> localPath = getLocalCacheFile(conf, path);
237       if (!localPath.isPresent()) {
238         throw new IllegalStateException("getScriptStream: Failed to " +
239             "find script in local DistributedCache matching " + path);
240       }
241       String pathStr = localPath.get().toString();
242       try {
243         stream = new BufferedInputStream(new FileInputStream(pathStr));
244       } catch (IOException e) {
245         throw new IllegalStateException("getScriptStream: Failed open " +
246             "script from DistributedCache at " + localPath);
247       }
248       break;
249     default:
250       throw new IllegalArgumentException("getScriptStream: Unknown " +
251           "script deployment type: " + deployType);
252     }
253     return stream;
254   }
255 }