This project has retired. For details please refer to its
Attic page.
ScriptLoader xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.scripting;
19
20 import org.apache.giraph.conf.JsonStringConfOption;
21 import org.apache.giraph.graph.Language;
22 import org.apache.giraph.jython.JythonUtils;
23 import org.apache.hadoop.conf.Configuration;
24 import org.apache.hadoop.fs.Path;
25 import org.apache.log4j.Logger;
26 import org.codehaus.jackson.type.TypeReference;
27
28 import com.google.common.base.Optional;
29 import com.google.common.collect.Lists;
30 import com.google.common.io.Closeables;
31
32 import java.io.BufferedInputStream;
33 import java.io.FileInputStream;
34 import java.io.IOException;
35 import java.io.InputStream;
36 import java.util.List;
37
38 import static org.apache.giraph.utils.DistributedCacheUtils.getLocalCacheFile;
39
40
41
42
43 public class ScriptLoader {
44
45 public static final JsonStringConfOption SCRIPTS_TO_LOAD =
46 new JsonStringConfOption("giraph.scripts.to.load",
47 "Scripts to load on workers");
48
49
50 private static final List<DeployedScript> LOADED_SCRIPTS =
51 Lists.newArrayList();
52
53
54 private static final Logger LOG = Logger.getLogger(ScriptLoader.class);
55
56
57 private ScriptLoader() { }
58
59
60
61
62
63
64
65
66
67 public static void setScriptsToLoad(Configuration conf,
68 String scriptPath, DeployType deployType, Language language) {
69 DeployedScript deployedScript = new DeployedScript(scriptPath,
70 deployType, language);
71 setScriptsToLoad(conf, deployedScript);
72 }
73
74
75
76
77
78
79
80
81
82
83
84
85 public static void setScriptsToLoad(Configuration conf,
86 String script1, DeployType deployType1, Language language1,
87 String script2, DeployType deployType2, Language language2) {
88 DeployedScript deployedScript1 = new DeployedScript(script1,
89 deployType1, language1);
90 DeployedScript deployedScript2 = new DeployedScript(script2,
91 deployType2, language2);
92 setScriptsToLoad(conf, deployedScript1, deployedScript2);
93 }
94
95
96
97
98
99
100
101 public static void setScriptsToLoad(Configuration conf,
102 DeployedScript... scripts) {
103 List<DeployedScript> scriptsToLoad = Lists.newArrayList(scripts);
104 SCRIPTS_TO_LOAD.set(conf, scriptsToLoad);
105 }
106
107
108
109
110
111
112
113
114
115 public static void addScriptToLoad(Configuration conf,
116 String script, DeployType deployType, Language language) {
117 addScriptToLoad(conf, new DeployedScript(script, deployType, language));
118 }
119
120
121
122
123
124
125
126 public static void addScriptToLoad(Configuration conf,
127 DeployedScript script) {
128 List<DeployedScript> scriptsToLoad = getScriptsToLoad(conf);
129 if (scriptsToLoad == null) {
130 scriptsToLoad = Lists.<DeployedScript>newArrayList();
131 }
132 scriptsToLoad.add(script);
133 SCRIPTS_TO_LOAD.set(conf, scriptsToLoad);
134 }
135
136
137
138
139
140
141
142 public static List<DeployedScript> getScriptsToLoad(Configuration conf) {
143 TypeReference<List<DeployedScript>> jsonType =
144 new TypeReference<List<DeployedScript>>() { };
145 return SCRIPTS_TO_LOAD.get(conf, jsonType);
146 }
147
148
149
150
151
152
153
154 public static void loadScripts(Configuration conf) throws IOException {
155 List<DeployedScript> deployedScripts = getScriptsToLoad(conf);
156 if (deployedScripts == null) {
157 return;
158 }
159 for (DeployedScript deployedScript : deployedScripts) {
160 loadScript(conf, deployedScript);
161 }
162 }
163
164
165
166
167
168
169
170
171 public static void loadScript(Configuration conf,
172 DeployedScript deployedScript) throws IOException {
173 InputStream stream = openScriptInputStream(conf, deployedScript);
174 switch (deployedScript.getLanguage()) {
175 case JYTHON:
176 loadJythonScript(stream);
177 break;
178 default:
179 LOG.fatal("Don't know how to load script " + deployedScript);
180 throw new IllegalStateException("Don't know how to load script " +
181 deployedScript);
182 }
183
184 LOADED_SCRIPTS.add(deployedScript);
185 Closeables.close(stream, true);
186 }
187
188
189
190
191
192
193 private static void loadJythonScript(InputStream stream) {
194 JythonUtils.getInterpreter().execfile(stream);
195 }
196
197
198
199
200
201
202 public static List<DeployedScript> getLoadedScripts() {
203 return LOADED_SCRIPTS;
204 }
205
206
207
208
209
210
211
212
213 private static InputStream openScriptInputStream(Configuration conf,
214 DeployedScript deployedScript) {
215 DeployType deployType = deployedScript.getDeployType();
216 String path = deployedScript.getPath();
217
218 InputStream stream;
219 switch (deployType) {
220 case RESOURCE:
221 if (LOG.isInfoEnabled()) {
222 LOG.info("getScriptStream: Reading script from resource at " +
223 deployedScript.getPath());
224 }
225 stream = ScriptLoader.class.getClassLoader().getResourceAsStream(path);
226 if (stream == null) {
227 throw new IllegalStateException("getScriptStream: Failed to " +
228 "open script from resource at " + path);
229 }
230 break;
231 case DISTRIBUTED_CACHE:
232 if (LOG.isInfoEnabled()) {
233 LOG.info("getScriptStream: Reading script from DistributedCache at " +
234 path);
235 }
236 Optional<Path> localPath = getLocalCacheFile(conf, path);
237 if (!localPath.isPresent()) {
238 throw new IllegalStateException("getScriptStream: Failed to " +
239 "find script in local DistributedCache matching " + path);
240 }
241 String pathStr = localPath.get().toString();
242 try {
243 stream = new BufferedInputStream(new FileInputStream(pathStr));
244 } catch (IOException e) {
245 throw new IllegalStateException("getScriptStream: Failed open " +
246 "script from DistributedCache at " + localPath);
247 }
248 break;
249 default:
250 throw new IllegalArgumentException("getScriptStream: Unknown " +
251 "script deployment type: " + deployType);
252 }
253 return stream;
254 }
255 }