This project has retired. For details please refer to its Attic page.
InternalVertexRunner xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.utils;
20  
21  import com.google.common.base.Charsets;
22  import com.google.common.collect.ImmutableList;
23  import com.google.common.io.Files;
24  import org.apache.giraph.conf.GiraphConfiguration;
25  import org.apache.giraph.conf.GiraphConstants;
26  import org.apache.giraph.io.formats.FileOutputFormatUtil;
27  import org.apache.giraph.io.formats.GiraphFileInputFormat;
28  import org.apache.giraph.io.formats.InMemoryVertexOutputFormat;
29  import org.apache.giraph.job.GiraphJob;
30  import org.apache.giraph.zk.InProcessZooKeeperRunner;
31  import org.apache.giraph.zk.ZookeeperConfig;
32  import org.apache.hadoop.fs.Path;
33  import org.apache.hadoop.io.Writable;
34  import org.apache.hadoop.io.WritableComparable;
35  import org.apache.hadoop.mapreduce.Job;
36  import org.apache.log4j.Logger;
37  
38  import java.io.File;
39  import java.io.IOException;
40  import java.net.InetSocketAddress;
41  
42  /**
43   * A base class for running internal tests on a vertex
44   *
45   * Extending classes only have to invoke the run() method to test their vertex.
46   * All data is written to a local tmp directory that is removed afterwards.
47   * A local zookeeper instance is started in an extra thread and
48   * shutdown at the end.
49   *
50   * Heavily inspired from Apache Mahout's MahoutTestCase
51   */
52  @SuppressWarnings("unchecked")
53  public class InternalVertexRunner {
54  
55    /** Logger */
56    private static final Logger LOG =
57        Logger.getLogger(InternalVertexRunner.class);
58  
59    /** Don't construct */
60    private InternalVertexRunner() { }
61  
62    /**
63     * Attempts to run the vertex internally in the current JVM, reading from and
64     * writing to a temporary folder on local disk. Will start its own zookeeper
65     * instance.
66     *
67     * @param conf GiraphClasses specifying which types to use
68     * @param vertexInputData linewise vertex input data
69     * @return linewise output data, or null if job fails
70     * @throws Exception if anything goes wrong
71     */
72    public static Iterable<String> run(
73        GiraphConfiguration conf,
74        String[] vertexInputData) throws Exception {
75      return run(conf, vertexInputData, null);
76    }
77  
78    /**
79     * Run the ZooKeeper in-process and the job.
80     *
81     * @param zookeeperConfig Quorum peer configuration
82     * @param giraphJob Giraph job to run
83     * @return True if successful, false otherwise
84     */
85    private static boolean runZooKeeperAndJob(
86        final ZookeeperConfig zookeeperConfig,
87        GiraphJob giraphJob) throws IOException {
88      final InProcessZooKeeperRunner.ZooKeeperServerRunner zookeeper =
89          new InProcessZooKeeperRunner.ZooKeeperServerRunner();
90  
91      int port = zookeeper.start(zookeeperConfig);
92  
93      LOG.info("Started test zookeeper on port " + port);
94      GiraphConstants.ZOOKEEPER_LIST.set(giraphJob.getConfiguration(),
95          "localhost:" + port);
96      try {
97        return giraphJob.run(true);
98      } catch (InterruptedException |
99          ClassNotFoundException | IOException e) {
100       LOG.error("runZooKeeperAndJob: Got exception on running", e);
101     } finally {
102       zookeeper.stop();
103     }
104 
105     return false;
106   }
107 
108   /**
109    * Attempts to run the vertex internally in the current JVM, reading from and
110    * writing to a temporary folder on local disk. Will start its own zookeeper
111    * instance.
112    *
113    *
114    * @param conf GiraphClasses specifying which types to use
115    * @param vertexInputData linewise vertex input data
116    * @param edgeInputData linewise edge input data
117    * @return linewise output data, or null if job fails
118    * @throws Exception if anything goes wrong
119    */
120   public static Iterable<String> run(
121       GiraphConfiguration conf,
122       String[] vertexInputData,
123       String[] edgeInputData) throws Exception {
124     // Prepare input file, output folder and temporary folders
125     File tmpDir = FileUtils.createTestDir(conf.getComputationName());
126     try {
127       return run(conf, vertexInputData, edgeInputData, null, tmpDir);
128     } finally {
129       FileUtils.delete(tmpDir);
130     }
131   }
132 
133   /**
134    * Attempts to run the vertex internally in the current JVM, reading from and
135    * writing to a temporary folder on local disk. Will start its own zookeeper
136    * instance.
137    *
138    *
139    * @param conf GiraphClasses specifying which types to use
140    * @param vertexInputData linewise vertex input data
141    * @param edgeInputData linewise edge input data
142    * @param checkpointsDir if set, will use this folder
143    *                          for storing checkpoints.
144    * @param tmpDir file path for storing temporary files.
145    * @return linewise output data, or null if job fails
146    * @throws Exception if anything goes wrong
147    */
148   public static Iterable<String> run(
149       GiraphConfiguration conf,
150       String[] vertexInputData,
151       String[] edgeInputData,
152       String checkpointsDir,
153       File tmpDir) throws Exception {
154     File vertexInputFile = null;
155     File edgeInputFile = null;
156     if (conf.hasVertexInputFormat()) {
157       vertexInputFile = FileUtils.createTempFile(tmpDir, "vertices.txt");
158     }
159     if (conf.hasEdgeInputFormat()) {
160       edgeInputFile = FileUtils.createTempFile(tmpDir, "edges.txt");
161     }
162 
163     File outputDir = FileUtils.createTempDir(tmpDir, "output");
164     File zkDir = FileUtils.createTempDir(tmpDir, "_bspZooKeeper");
165     File zkMgrDir = FileUtils.createTempDir(tmpDir, "_defaultZkManagerDir");
166     File mrLocalDir = FileUtils.createTempDir(tmpDir, "_mapred");
167     // Write input data to disk
168     if (conf.hasVertexInputFormat()) {
169       FileUtils.writeLines(vertexInputFile, vertexInputData);
170     }
171     if (conf.hasEdgeInputFormat()) {
172       FileUtils.writeLines(edgeInputFile, edgeInputData);
173     }
174 
175     conf.setWorkerConfiguration(1, 1, 100.0f);
176     GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false);
177     GiraphConstants.LOCAL_TEST_MODE.set(conf, true);
178     conf.setIfUnset("mapred.job.tracker", "local");
179     conf.setIfUnset("mapred.local.dir", mrLocalDir.toString());
180 
181     conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.toString());
182     GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf,
183         zkMgrDir.toString());
184 
185     if (checkpointsDir == null) {
186       checkpointsDir = FileUtils.createTempDir(
187           tmpDir, "_checkpoints").toString();
188     }
189     GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir);
190 
191     // Create and configure the job to run the vertex
192     GiraphJob job = new GiraphJob(conf, conf.getComputationName());
193 
194     Job internalJob = job.getInternalJob();
195     if (conf.hasVertexInputFormat()) {
196       GiraphFileInputFormat.setVertexInputPath(internalJob.getConfiguration(),
197           new Path(vertexInputFile.toString()));
198     }
199     if (conf.hasEdgeInputFormat()) {
200       GiraphFileInputFormat.setEdgeInputPath(internalJob.getConfiguration(),
201           new Path(edgeInputFile.toString()));
202     }
203     FileOutputFormatUtil.setOutputPath(job.getInternalJob(),
204         new Path(outputDir.toString()));
205 
206     // Configure a local zookeeper instance
207     ZookeeperConfig qpConfig = configLocalZooKeeper(zkDir);
208 
209     boolean success = runZooKeeperAndJob(qpConfig, job);
210     if (!success) {
211       return null;
212     }
213 
214     File outFile = new File(outputDir, "part-m-00000");
215     if (conf.hasVertexOutputFormat() && outFile.canRead()) {
216       return Files.readLines(outFile, Charsets.UTF_8);
217     } else {
218       return ImmutableList.of();
219     }
220 
221   }
222 
223   /**
224    * Attempts to run the vertex internally in the current JVM,
225    * reading from an in-memory graph. Will start its own zookeeper
226    * instance.
227    *
228    * @param <I> Vertex ID
229    * @param <V> Vertex Value
230    * @param <E> Edge Value
231    * @param conf GiraphClasses specifying which types to use
232    * @param graph input graph
233    * @throws Exception if anything goes wrong
234    */
235   public static <I extends WritableComparable,
236       V extends Writable,
237       E extends Writable> void run(
238       GiraphConfiguration conf,
239       TestGraph<I, V, E> graph) throws Exception {
240     // Prepare temporary folders
241     File tmpDir = FileUtils.createTestDir(conf.getComputationName());
242     try {
243       run(conf, graph, tmpDir, null);
244     } finally {
245       FileUtils.delete(tmpDir);
246     }
247   }
248 
249   /**
250    * Attempts to run the vertex internally in the current JVM,
251    * reading from an in-memory graph. Will start its own zookeeper
252    * instance.
253    *
254    * @param <I> Vertex ID
255    * @param <V> Vertex Value
256    * @param <E> Edge Value
257    * @param conf GiraphClasses specifying which types to use
258    * @param graph input graph
259    * @param tmpDir file path for storing temporary files.
260    * @param checkpointsDir if set, will use this folder
261    *                          for storing checkpoints.
262    * @throws Exception if anything goes wrong
263    */
264   public static <I extends WritableComparable,
265       V extends Writable,
266       E extends Writable> void run(
267       GiraphConfiguration conf,
268       TestGraph<I, V, E> graph,
269       File tmpDir,
270       String checkpointsDir) throws Exception {
271     File zkDir = FileUtils.createTempDir(tmpDir, "_bspZooKeeper");
272     File zkMgrDir = FileUtils.createTempDir(tmpDir, "_defaultZkManagerDir");
273     File mrLocalDir = FileUtils.createTempDir(tmpDir, "_mapred");
274 
275     if (checkpointsDir == null) {
276       checkpointsDir = FileUtils.
277           createTempDir(tmpDir, "_checkpoints").toString();
278     }
279 
280     conf.setVertexInputFormatClass(InMemoryVertexInputFormat.class);
281 
282     // Create and configure the job to run the vertex
283     GiraphJob job = new GiraphJob(conf, conf.getComputationName());
284 
285     InMemoryVertexInputFormat.setGraph(graph);
286 
287     conf.setWorkerConfiguration(1, 1, 100.0f);
288     GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false);
289     GiraphConstants.LOCAL_TEST_MODE.set(conf, true);
290     GiraphConstants.ZOOKEEPER_SERVER_PORT.set(conf, 0);
291     conf.setIfUnset("mapred.job.tracker", "local");
292     conf.setIfUnset("mapred.local.dir", mrLocalDir.toString());
293 
294     conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.toString());
295     GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf,
296         zkMgrDir.toString());
297     GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir);
298 
299     runZooKeeperAndJob(configLocalZooKeeper(zkDir), job);
300   }
301 
302   /**
303    * Attempts to run the vertex internally in the current JVM, reading and
304    * writing to an in-memory graph. Will start its own zookeeper
305    * instance.
306    *
307    * @param <I> Vertex ID
308    * @param <V> Vertex Value
309    * @param <E> Edge Value
310    * @param conf GiraphClasses specifying which types to use
311    * @param graph input graph
312    * @return Output graph
313    * @throws Exception if anything goes wrong
314    */
315   public static <I extends WritableComparable,
316       V extends Writable,
317       E extends Writable> TestGraph<I, V, E> runWithInMemoryOutput(
318       GiraphConfiguration conf,
319       TestGraph<I, V, E> graph) throws Exception {
320     // Prepare temporary folders
321     File tmpDir = FileUtils.createTestDir(conf.getComputationName());
322     try {
323       return runWithInMemoryOutput(conf, graph, tmpDir, null);
324     } finally {
325       FileUtils.delete(tmpDir);
326     }
327   }
328 
329   /**
330    * Attempts to run the vertex internally in the current JVM, reading and
331    * writing to an in-memory graph. Will start its own zookeeper
332    * instance.
333    *
334    * @param <I> Vertex ID
335    * @param <V> Vertex Value
336    * @param <E> Edge Value
337    * @param conf GiraphClasses specifying which types to use
338    * @param graph input graph
339    * @param tmpDir file path for storing temporary files.
340    * @param checkpointsDir if set, will use this folder
341    *                       for storing checkpoints.
342    * @return Output graph
343    * @throws Exception if anything goes wrong
344    */
345   public static <I extends WritableComparable,
346       V extends Writable,
347       E extends Writable> TestGraph<I, V, E> runWithInMemoryOutput(
348       GiraphConfiguration conf,
349       TestGraph<I, V, E> graph,
350       File tmpDir,
351       String checkpointsDir) throws Exception {
352     conf.setVertexOutputFormatClass(InMemoryVertexOutputFormat.class);
353     InMemoryVertexOutputFormat.initializeOutputGraph(conf);
354     InternalVertexRunner.run(conf, graph, tmpDir, checkpointsDir);
355     return InMemoryVertexOutputFormat.getOutputGraph();
356   }
357 
358   /**
359    * Configuration options for running local ZK.
360    *
361    * @param zkDir directory for ZK to hold files in.
362    * @return zookeeper configuration object
363    */
364   private static ZookeeperConfig configLocalZooKeeper(File zkDir) {
365     ZookeeperConfig config = new ZookeeperConfig();
366     config.setMaxSessionTimeout(100000);
367     config.setMinSessionTimeout(10000);
368     config.setClientPortAddress(new InetSocketAddress("localhost", 0));
369     config.setDataDir(zkDir.getAbsolutePath());
370     return config;
371   }
372 
373 }