View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.utils;
20  
21  import com.google.common.base.Charsets;
22  import com.google.common.collect.ImmutableList;
23  import com.google.common.io.Files;
24  import org.apache.giraph.conf.GiraphConfiguration;
25  import org.apache.giraph.conf.GiraphConstants;
26  import org.apache.giraph.io.formats.GiraphFileInputFormat;
27  import org.apache.giraph.io.formats.InMemoryVertexOutputFormat;
28  import org.apache.giraph.job.GiraphJob;
29  import org.apache.giraph.zk.InProcessZooKeeperRunner;
30  import org.apache.giraph.zk.ZookeeperConfig;
31  import org.apache.hadoop.fs.Path;
32  import org.apache.hadoop.io.Writable;
33  import org.apache.hadoop.io.WritableComparable;
34  import org.apache.hadoop.mapreduce.Job;
35  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
36  import org.apache.log4j.Logger;
37  
38  import java.io.File;
39  import java.io.IOException;
40  import java.net.InetSocketAddress;
41  
42  /**
43   * A base class for running internal tests on a vertex
44   *
45   * Extending classes only have to invoke the run() method to test their vertex.
46   * All data is written to a local tmp directory that is removed afterwards.
47   * A local zookeeper instance is started in an extra thread and
48   * shutdown at the end.
49   *
50   * Heavily inspired from Apache Mahout's MahoutTestCase
51   */
52  @SuppressWarnings("unchecked")
53  public class InternalVertexRunner {
54  
55    /** Logger */
56    private static final Logger LOG =
57        Logger.getLogger(InternalVertexRunner.class);
58  
59    /** Don't construct */
60    private InternalVertexRunner() { }
61  
62    /**
63     * Attempts to run the vertex internally in the current JVM, reading from and
64     * writing to a temporary folder on local disk. Will start its own zookeeper
65     * instance.
66     *
67     * @param conf GiraphClasses specifying which types to use
68     * @param vertexInputData linewise vertex input data
69     * @return linewise output data, or null if job fails
70     * @throws Exception if anything goes wrong
71     */
72    public static Iterable<String> run(
73        GiraphConfiguration conf,
74        String[] vertexInputData) throws Exception {
75      return run(conf, vertexInputData, null);
76    }
77  
78    /**
79     * Run the ZooKeeper in-process and the job.
80     *
81     * @param zookeeperConfig Quorum peer configuration
82     * @param giraphJob Giraph job to run
83     * @return True if successful, false otherwise
84     */
85    private static boolean runZooKeeperAndJob(
86        final ZookeeperConfig zookeeperConfig,
87        GiraphJob giraphJob) throws IOException {
88      final InProcessZooKeeperRunner.ZooKeeperServerRunner zookeeper =
89          new InProcessZooKeeperRunner.ZooKeeperServerRunner();
90  
91      int port = zookeeper.start(zookeeperConfig);
92  
93      LOG.info("Started test zookeeper on port " + port);
94      GiraphConstants.ZOOKEEPER_LIST.set(giraphJob.getConfiguration(),
95          "localhost:" + port);
96      try {
97        return giraphJob.run(true);
98      } catch (InterruptedException |
99          ClassNotFoundException | IOException e) {
100       LOG.error("runZooKeeperAndJob: Got exception on running", e);
101     } finally {
102       zookeeper.stop();
103     }
104 
105     return false;
106   }
107 
108   /**
109    * Attempts to run the vertex internally in the current JVM, reading from and
110    * writing to a temporary folder on local disk. Will start its own zookeeper
111    * instance.
112    *
113    *
114    * @param conf GiraphClasses specifying which types to use
115    * @param vertexInputData linewise vertex input data
116    * @param edgeInputData linewise edge input data
117    * @return linewise output data, or null if job fails
118    * @throws Exception if anything goes wrong
119    */
120   public static Iterable<String> run(
121       GiraphConfiguration conf,
122       String[] vertexInputData,
123       String[] edgeInputData) throws Exception {
124     // Prepare input file, output folder and temporary folders
125     File tmpDir = FileUtils.createTestDir(conf.getComputationName());
126     try {
127       return run(conf, vertexInputData, edgeInputData, null, tmpDir);
128     } finally {
129       FileUtils.delete(tmpDir);
130     }
131   }
132 
133   /**
134    * Attempts to run the vertex internally in the current JVM, reading from and
135    * writing to a temporary folder on local disk. Will start its own zookeeper
136    * instance.
137    *
138    *
139    * @param conf GiraphClasses specifying which types to use
140    * @param vertexInputData linewise vertex input data
141    * @param edgeInputData linewise edge input data
142    * @param checkpointsDir if set, will use this folder
143    *                          for storing checkpoints.
144    * @param tmpDir file path for storing temporary files.
145    * @return linewise output data, or null if job fails
146    * @throws Exception if anything goes wrong
147    */
148   public static Iterable<String> run(
149       GiraphConfiguration conf,
150       String[] vertexInputData,
151       String[] edgeInputData,
152       String checkpointsDir,
153       File tmpDir) throws Exception {
154     File vertexInputFile = null;
155     File edgeInputFile = null;
156     if (conf.hasVertexInputFormat()) {
157       vertexInputFile = FileUtils.createTempFile(tmpDir, "vertices.txt");
158     }
159     if (conf.hasEdgeInputFormat()) {
160       edgeInputFile = FileUtils.createTempFile(tmpDir, "edges.txt");
161     }
162 
163     File outputDir = FileUtils.createTempDir(tmpDir, "output");
164     File zkDir = FileUtils.createTempDir(tmpDir, "_bspZooKeeper");
165     File zkMgrDir = FileUtils.createTempDir(tmpDir, "_defaultZkManagerDir");
166     // Write input data to disk
167     if (conf.hasVertexInputFormat()) {
168       FileUtils.writeLines(vertexInputFile, vertexInputData);
169     }
170     if (conf.hasEdgeInputFormat()) {
171       FileUtils.writeLines(edgeInputFile, edgeInputData);
172     }
173 
174     conf.setWorkerConfiguration(1, 1, 100.0f);
175     GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false);
176     GiraphConstants.LOCAL_TEST_MODE.set(conf, true);
177 
178     conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.toString());
179     GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf,
180         zkMgrDir.toString());
181 
182     if (checkpointsDir == null) {
183       checkpointsDir = FileUtils.createTempDir(
184           tmpDir, "_checkpoints").toString();
185     }
186     GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir);
187 
188     // Create and configure the job to run the vertex
189     GiraphJob job = new GiraphJob(conf, conf.getComputationName());
190 
191     Job internalJob = job.getInternalJob();
192     if (conf.hasVertexInputFormat()) {
193       GiraphFileInputFormat.setVertexInputPath(internalJob.getConfiguration(),
194           new Path(vertexInputFile.toString()));
195     }
196     if (conf.hasEdgeInputFormat()) {
197       GiraphFileInputFormat.setEdgeInputPath(internalJob.getConfiguration(),
198           new Path(edgeInputFile.toString()));
199     }
200     FileOutputFormat.setOutputPath(job.getInternalJob(),
201         new Path(outputDir.toString()));
202 
203     // Configure a local zookeeper instance
204     ZookeeperConfig qpConfig = configLocalZooKeeper(zkDir);
205 
206     boolean success = runZooKeeperAndJob(qpConfig, job);
207     if (!success) {
208       return null;
209     }
210 
211     File outFile = new File(outputDir, "part-m-00000");
212     if (conf.hasVertexOutputFormat() && outFile.canRead()) {
213       return Files.readLines(outFile, Charsets.UTF_8);
214     } else {
215       return ImmutableList.of();
216     }
217 
218   }
219 
220   /**
221    * Attempts to run the vertex internally in the current JVM,
222    * reading from an in-memory graph. Will start its own zookeeper
223    * instance.
224    *
225    * @param <I> Vertex ID
226    * @param <V> Vertex Value
227    * @param <E> Edge Value
228    * @param conf GiraphClasses specifying which types to use
229    * @param graph input graph
230    * @throws Exception if anything goes wrong
231    */
232   public static <I extends WritableComparable,
233       V extends Writable,
234       E extends Writable> void run(
235       GiraphConfiguration conf,
236       TestGraph<I, V, E> graph) throws Exception {
237     // Prepare temporary folders
238     File tmpDir = FileUtils.createTestDir(conf.getComputationName());
239     try {
240       run(conf, graph, tmpDir, null);
241     } finally {
242       FileUtils.delete(tmpDir);
243     }
244   }
245 
246   /**
247    * Attempts to run the vertex internally in the current JVM,
248    * reading from an in-memory graph. Will start its own zookeeper
249    * instance.
250    *
251    * @param <I> Vertex ID
252    * @param <V> Vertex Value
253    * @param <E> Edge Value
254    * @param conf GiraphClasses specifying which types to use
255    * @param graph input graph
256    * @param tmpDir file path for storing temporary files.
257    * @param checkpointsDir if set, will use this folder
258    *                          for storing checkpoints.
259    * @throws Exception if anything goes wrong
260    */
261   public static <I extends WritableComparable,
262       V extends Writable,
263       E extends Writable> void run(
264       GiraphConfiguration conf,
265       TestGraph<I, V, E> graph,
266       File tmpDir,
267       String checkpointsDir) throws Exception {
268     File zkDir = FileUtils.createTempDir(tmpDir, "_bspZooKeeper");
269     File zkMgrDir = FileUtils.createTempDir(tmpDir, "_defaultZkManagerDir");
270 
271     if (checkpointsDir == null) {
272       checkpointsDir = FileUtils.
273           createTempDir(tmpDir, "_checkpoints").toString();
274     }
275 
276     conf.setVertexInputFormatClass(InMemoryVertexInputFormat.class);
277 
278     // Create and configure the job to run the vertex
279     GiraphJob job = new GiraphJob(conf, conf.getComputationName());
280 
281     InMemoryVertexInputFormat.setGraph(graph);
282 
283     conf.setWorkerConfiguration(1, 1, 100.0f);
284     GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false);
285     GiraphConstants.LOCAL_TEST_MODE.set(conf, true);
286     GiraphConstants.ZOOKEEPER_SERVER_PORT.set(conf, 0);
287 
288     conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.toString());
289     GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf,
290         zkMgrDir.toString());
291     GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir);
292 
293     runZooKeeperAndJob(configLocalZooKeeper(zkDir), job);
294   }
295 
296   /**
297    * Attempts to run the vertex internally in the current JVM, reading and
298    * writing to an in-memory graph. Will start its own zookeeper
299    * instance.
300    *
301    * @param <I> Vertex ID
302    * @param <V> Vertex Value
303    * @param <E> Edge Value
304    * @param conf GiraphClasses specifying which types to use
305    * @param graph input graph
306    * @return Output graph
307    * @throws Exception if anything goes wrong
308    */
309   public static <I extends WritableComparable,
310       V extends Writable,
311       E extends Writable> TestGraph<I, V, E> runWithInMemoryOutput(
312       GiraphConfiguration conf,
313       TestGraph<I, V, E> graph) throws Exception {
314     // Prepare temporary folders
315     File tmpDir = FileUtils.createTestDir(conf.getComputationName());
316     try {
317       return runWithInMemoryOutput(conf, graph, tmpDir, null);
318     } finally {
319       FileUtils.delete(tmpDir);
320     }
321   }
322 
323   /**
324    * Attempts to run the vertex internally in the current JVM, reading and
325    * writing to an in-memory graph. Will start its own zookeeper
326    * instance.
327    *
328    * @param <I> Vertex ID
329    * @param <V> Vertex Value
330    * @param <E> Edge Value
331    * @param conf GiraphClasses specifying which types to use
332    * @param graph input graph
333    * @param tmpDir file path for storing temporary files.
334    * @param checkpointsDir if set, will use this folder
335    *                       for storing checkpoints.
336    * @return Output graph
337    * @throws Exception if anything goes wrong
338    */
339   public static <I extends WritableComparable,
340       V extends Writable,
341       E extends Writable> TestGraph<I, V, E> runWithInMemoryOutput(
342       GiraphConfiguration conf,
343       TestGraph<I, V, E> graph,
344       File tmpDir,
345       String checkpointsDir) throws Exception {
346     conf.setVertexOutputFormatClass(InMemoryVertexOutputFormat.class);
347     InMemoryVertexOutputFormat.initializeOutputGraph(conf);
348     InternalVertexRunner.run(conf, graph, tmpDir, checkpointsDir);
349     return InMemoryVertexOutputFormat.getOutputGraph();
350   }
351 
352   /**
353    * Configuration options for running local ZK.
354    *
355    * @param zkDir directory for ZK to hold files in.
356    * @return zookeeper configuration object
357    */
358   private static ZookeeperConfig configLocalZooKeeper(File zkDir) {
359     ZookeeperConfig config = new ZookeeperConfig();
360     config.setMaxSessionTimeout(100000);
361     config.setMinSessionTimeout(10000);
362     config.setClientPortAddress(new InetSocketAddress("localhost", 0));
363     config.setDataDir(zkDir.getAbsolutePath());
364     return config;
365   }
366 
367 }