This project has retired. For details please refer to its
Attic page.
InternalVertexRunner xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.utils;
20
21 import com.google.common.base.Charsets;
22 import com.google.common.collect.ImmutableList;
23 import com.google.common.io.Files;
24 import org.apache.giraph.conf.GiraphConfiguration;
25 import org.apache.giraph.conf.GiraphConstants;
26 import org.apache.giraph.io.formats.FileOutputFormatUtil;
27 import org.apache.giraph.io.formats.GiraphFileInputFormat;
28 import org.apache.giraph.io.formats.InMemoryVertexOutputFormat;
29 import org.apache.giraph.job.GiraphJob;
30 import org.apache.giraph.zk.InProcessZooKeeperRunner;
31 import org.apache.giraph.zk.ZookeeperConfig;
32 import org.apache.hadoop.fs.Path;
33 import org.apache.hadoop.io.Writable;
34 import org.apache.hadoop.io.WritableComparable;
35 import org.apache.hadoop.mapreduce.Job;
36 import org.apache.log4j.Logger;
37
38 import java.io.File;
39 import java.io.IOException;
40 import java.net.InetSocketAddress;
41
42
43
44
45
46
47
48
49
50
51
52 @SuppressWarnings("unchecked")
53 public class InternalVertexRunner {
54
55
56 private static final Logger LOG =
57 Logger.getLogger(InternalVertexRunner.class);
58
59
60 private InternalVertexRunner() { }
61
62
63
64
65
66
67
68
69
70
71
72 public static Iterable<String> run(
73 GiraphConfiguration conf,
74 String[] vertexInputData) throws Exception {
75 return run(conf, vertexInputData, null);
76 }
77
78
79
80
81
82
83
84
85 private static boolean runZooKeeperAndJob(
86 final ZookeeperConfig zookeeperConfig,
87 GiraphJob giraphJob) throws IOException {
88 final InProcessZooKeeperRunner.ZooKeeperServerRunner zookeeper =
89 new InProcessZooKeeperRunner.ZooKeeperServerRunner();
90
91 int port = zookeeper.start(zookeeperConfig);
92
93 LOG.info("Started test zookeeper on port " + port);
94 GiraphConstants.ZOOKEEPER_LIST.set(giraphJob.getConfiguration(),
95 "localhost:" + port);
96 try {
97 return giraphJob.run(true);
98 } catch (InterruptedException |
99 ClassNotFoundException | IOException e) {
100 LOG.error("runZooKeeperAndJob: Got exception on running", e);
101 } finally {
102 zookeeper.stop();
103 }
104
105 return false;
106 }
107
108
109
110
111
112
113
114
115
116
117
118
119
120 public static Iterable<String> run(
121 GiraphConfiguration conf,
122 String[] vertexInputData,
123 String[] edgeInputData) throws Exception {
124
125 File tmpDir = FileUtils.createTestDir(conf.getComputationName());
126 try {
127 return run(conf, vertexInputData, edgeInputData, null, tmpDir);
128 } finally {
129 FileUtils.delete(tmpDir);
130 }
131 }
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148 public static Iterable<String> run(
149 GiraphConfiguration conf,
150 String[] vertexInputData,
151 String[] edgeInputData,
152 String checkpointsDir,
153 File tmpDir) throws Exception {
154 File vertexInputFile = null;
155 File edgeInputFile = null;
156 if (conf.hasVertexInputFormat()) {
157 vertexInputFile = FileUtils.createTempFile(tmpDir, "vertices.txt");
158 }
159 if (conf.hasEdgeInputFormat()) {
160 edgeInputFile = FileUtils.createTempFile(tmpDir, "edges.txt");
161 }
162
163 File outputDir = FileUtils.createTempDir(tmpDir, "output");
164 File zkDir = FileUtils.createTempDir(tmpDir, "_bspZooKeeper");
165 File zkMgrDir = FileUtils.createTempDir(tmpDir, "_defaultZkManagerDir");
166 File mrLocalDir = FileUtils.createTempDir(tmpDir, "_mapred");
167
168 if (conf.hasVertexInputFormat()) {
169 FileUtils.writeLines(vertexInputFile, vertexInputData);
170 }
171 if (conf.hasEdgeInputFormat()) {
172 FileUtils.writeLines(edgeInputFile, edgeInputData);
173 }
174
175 conf.setWorkerConfiguration(1, 1, 100.0f);
176 GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false);
177 GiraphConstants.LOCAL_TEST_MODE.set(conf, true);
178 conf.setIfUnset("mapred.job.tracker", "local");
179 conf.setIfUnset("mapred.local.dir", mrLocalDir.toString());
180
181 conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.toString());
182 GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf,
183 zkMgrDir.toString());
184
185 if (checkpointsDir == null) {
186 checkpointsDir = FileUtils.createTempDir(
187 tmpDir, "_checkpoints").toString();
188 }
189 GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir);
190
191
192 GiraphJob job = new GiraphJob(conf, conf.getComputationName());
193
194 Job internalJob = job.getInternalJob();
195 if (conf.hasVertexInputFormat()) {
196 GiraphFileInputFormat.setVertexInputPath(internalJob.getConfiguration(),
197 new Path(vertexInputFile.toString()));
198 }
199 if (conf.hasEdgeInputFormat()) {
200 GiraphFileInputFormat.setEdgeInputPath(internalJob.getConfiguration(),
201 new Path(edgeInputFile.toString()));
202 }
203 FileOutputFormatUtil.setOutputPath(job.getInternalJob(),
204 new Path(outputDir.toString()));
205
206
207 ZookeeperConfig qpConfig = configLocalZooKeeper(zkDir);
208
209 boolean success = runZooKeeperAndJob(qpConfig, job);
210 if (!success) {
211 return null;
212 }
213
214 File outFile = new File(outputDir, "part-m-00000");
215 if (conf.hasVertexOutputFormat() && outFile.canRead()) {
216 return Files.readLines(outFile, Charsets.UTF_8);
217 } else {
218 return ImmutableList.of();
219 }
220
221 }
222
223
224
225
226
227
228
229
230
231
232
233
234
235 public static <I extends WritableComparable,
236 V extends Writable,
237 E extends Writable> void run(
238 GiraphConfiguration conf,
239 TestGraph<I, V, E> graph) throws Exception {
240
241 File tmpDir = FileUtils.createTestDir(conf.getComputationName());
242 try {
243 run(conf, graph, tmpDir, null);
244 } finally {
245 FileUtils.delete(tmpDir);
246 }
247 }
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264 public static <I extends WritableComparable,
265 V extends Writable,
266 E extends Writable> void run(
267 GiraphConfiguration conf,
268 TestGraph<I, V, E> graph,
269 File tmpDir,
270 String checkpointsDir) throws Exception {
271 File zkDir = FileUtils.createTempDir(tmpDir, "_bspZooKeeper");
272 File zkMgrDir = FileUtils.createTempDir(tmpDir, "_defaultZkManagerDir");
273 File mrLocalDir = FileUtils.createTempDir(tmpDir, "_mapred");
274
275 if (checkpointsDir == null) {
276 checkpointsDir = FileUtils.
277 createTempDir(tmpDir, "_checkpoints").toString();
278 }
279
280 conf.setVertexInputFormatClass(InMemoryVertexInputFormat.class);
281
282
283 GiraphJob job = new GiraphJob(conf, conf.getComputationName());
284
285 InMemoryVertexInputFormat.setGraph(graph);
286
287 conf.setWorkerConfiguration(1, 1, 100.0f);
288 GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false);
289 GiraphConstants.LOCAL_TEST_MODE.set(conf, true);
290 GiraphConstants.ZOOKEEPER_SERVER_PORT.set(conf, 0);
291 conf.setIfUnset("mapred.job.tracker", "local");
292 conf.setIfUnset("mapred.local.dir", mrLocalDir.toString());
293
294 conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.toString());
295 GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf,
296 zkMgrDir.toString());
297 GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir);
298
299 runZooKeeperAndJob(configLocalZooKeeper(zkDir), job);
300 }
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315 public static <I extends WritableComparable,
316 V extends Writable,
317 E extends Writable> TestGraph<I, V, E> runWithInMemoryOutput(
318 GiraphConfiguration conf,
319 TestGraph<I, V, E> graph) throws Exception {
320
321 File tmpDir = FileUtils.createTestDir(conf.getComputationName());
322 try {
323 return runWithInMemoryOutput(conf, graph, tmpDir, null);
324 } finally {
325 FileUtils.delete(tmpDir);
326 }
327 }
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345 public static <I extends WritableComparable,
346 V extends Writable,
347 E extends Writable> TestGraph<I, V, E> runWithInMemoryOutput(
348 GiraphConfiguration conf,
349 TestGraph<I, V, E> graph,
350 File tmpDir,
351 String checkpointsDir) throws Exception {
352 conf.setVertexOutputFormatClass(InMemoryVertexOutputFormat.class);
353 InMemoryVertexOutputFormat.initializeOutputGraph(conf);
354 InternalVertexRunner.run(conf, graph, tmpDir, checkpointsDir);
355 return InMemoryVertexOutputFormat.getOutputGraph();
356 }
357
358
359
360
361
362
363
364 private static ZookeeperConfig configLocalZooKeeper(File zkDir) {
365 ZookeeperConfig config = new ZookeeperConfig();
366 config.setMaxSessionTimeout(100000);
367 config.setMinSessionTimeout(10000);
368 config.setClientPortAddress(new InetSocketAddress("localhost", 0));
369 config.setDataDir(zkDir.getAbsolutePath());
370 return config;
371 }
372
373 }