This project has retired. For details please refer to its Attic page.
BspCase xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph;
20  
21  import org.apache.giraph.conf.GiraphConfiguration;
22  import org.apache.giraph.conf.GiraphConstants;
23  import org.apache.giraph.io.formats.FileOutputFormatUtil;
24  import org.apache.giraph.job.GiraphJob;
25  import org.apache.giraph.utils.FileUtils;
26  import org.apache.giraph.zk.ZooKeeperExt;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.fs.FSDataInputStream;
29  import org.apache.hadoop.fs.FileStatus;
30  import org.apache.hadoop.fs.FileSystem;
31  import org.apache.hadoop.fs.Path;
32  import org.apache.hadoop.fs.PathFilter;
33  import org.apache.zookeeper.WatchedEvent;
34  import org.apache.zookeeper.Watcher;
35  import org.junit.After;
36  import org.junit.Before;
37  
38  import com.google.common.base.Charsets;
39  import com.google.common.base.Preconditions;
40  import com.google.common.io.Closeables;
41  
42  import java.io.BufferedReader;
43  import java.io.IOException;
44  import java.io.InputStreamReader;
45  import java.util.List;
46  
47  /**
48   * Extended TestCase for making setting up Bsp testing.
49   */
50  @SuppressWarnings("unchecked")
51  public class BspCase implements Watcher {
52    /** JobTracker system property */
53    private final String jobTracker =
54        System.getProperty("prop.mapred.job.tracker");
55    /** Jar location system property */
56    private final String jarLocation =
57        System.getProperty("prop.jarLocation", "");
58    /** Number of actual processes for the BSP application */
59    private int numWorkers = 1;
60    /** ZooKeeper list system property */
61    private final String zkList = System.getProperty("prop.zookeeper.list");
62    private String testName;
63  
64    /** Default path for temporary files */
65    static final Path DEFAULT_TEMP_DIR =
66        new Path(System.getProperty("java.io.tmpdir"), "_giraphTests");
67  
68    public static final String READER_VERTICES_OPT =
69  		  		    "GeneratedVertexReader.reader_vertices";
70  
71    /** A filter for listing parts files */
72    static final PathFilter PARTS_FILTER = new PathFilter() {
73      @Override
74      public boolean accept(Path path) {
75        return path.getName().startsWith("part-");
76      }
77    };
78  
79    /**
80     * Adjust the configuration to the basic test case
81     */
82    public final Configuration setupConfiguration(GiraphJob job)
83        throws IOException {
84      GiraphConfiguration conf = job.getConfiguration();
85      conf.set("mapred.jar", getJarLocation());
86  
87      // Allow this test to be run on a real Hadoop setup
88      if (runningInDistributedMode()) {
89        System.out.println("setupConfiguration: Sending job to job tracker " +
90            jobTracker + " with jar path " + getJarLocation()
91            + " for " + getName());
92        conf.set("mapred.job.tracker", jobTracker);
93        conf.setWorkerConfiguration(getNumWorkers(), getNumWorkers(), 100.0f);
94      }
95      else {
96        System.out.println("setupConfiguration: Using local job runner with " +
97            "location " + getJarLocation() + " for " + getName());
98        conf.setWorkerConfiguration(1, 1, 100.0f);
99        // Single node testing
100       GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false);
101       GiraphConstants.LOCAL_TEST_MODE.set(conf, true);
102     }
103     conf.setMaxMasterSuperstepWaitMsecs(30 * 1000);
104     conf.setEventWaitMsecs(3 * 1000);
105     GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS.set(conf, 500);
106     if (getZooKeeperList() != null) {
107       conf.setZooKeeperConfiguration(getZooKeeperList());
108     }
109     // GeneratedInputSplit will generate 5 vertices
110     conf.setLong(READER_VERTICES_OPT, 5);
111 
112     // Setup pathes for temporary files
113     Path zookeeperDir = getTempPath("_bspZooKeeper");
114     Path zkManagerDir = getTempPath("_defaultZkManagerDir");
115     Path checkPointDir = getTempPath("_checkpoints");
116 
117     // We might start several jobs per test, so we need to clean up here
118     FileUtils.deletePath(conf, zookeeperDir);
119     FileUtils.deletePath(conf, zkManagerDir);
120     FileUtils.deletePath(conf, checkPointDir);
121 
122     conf.set(GiraphConstants.ZOOKEEPER_DIR, zookeeperDir.toString());
123     GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf,
124         zkManagerDir.toString());
125     GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkPointDir.toString());
126 
127     return conf;
128   }
129 
130   /**
131    * Create a temporary path
132    *
133    * @param name  name of the file to create in the temporary folder
134    * @return  newly created temporary path
135    */
136   protected Path getTempPath(String name) {
137     return new Path(DEFAULT_TEMP_DIR, name);
138   }
139 
140   /**
141    * Prepare a GiraphJob for test purposes
142    *
143    * @param name identifying name for job
144    * @param conf GiraphConfiguration describing which classes to use
145    * @return GiraphJob configured for testing
146    * @throws IOException if anything goes wrong
147    */
148   protected GiraphJob prepareJob(String name, GiraphConfiguration conf)
149       throws IOException {
150     return prepareJob(name, conf, null);
151   }
152 
153   /**
154    * Prepare a GiraphJob for test purposes
155    *
156    * @param name identifying name for job
157    * @param conf GiraphConfiguration describing which classes to use
158    * @param outputPath Where to right output to
159    * @return GiraphJob configured for testing
160    * @throws IOException if anything goes wrong
161    */
162   protected GiraphJob prepareJob(String name, GiraphConfiguration conf,
163                                  Path outputPath)
164       throws IOException {
165     GiraphJob job = new GiraphJob(conf, name);
166     setupConfiguration(job);
167     if (outputPath != null) {
168       removeAndSetOutput(job, outputPath);
169     }
170     return job;
171   }
172 
173   private String getName() {
174     return testName;
175   }
176 
177   /**
178    * Create the test case
179    *
180    * @param testName name of the test case
181    */
182   public BspCase(String testName) {
183     this.testName = testName;
184   }
185 
186   /**
187    * Get the number of workers used in the BSP application
188    *
189    * @return number of workers
190    */
191   public int getNumWorkers() {
192     return numWorkers;
193   }
194 
195   /**
196    * Get the ZooKeeper list
197    */
198   public String getZooKeeperList() {
199     return zkList;
200   }
201 
202   /**
203    * Get the jar location
204    *
205    * @return location of the jar file
206    */
207   String getJarLocation() {
208     return jarLocation;
209   }
210 
211   /**
212    *  Are the tests executed on a real hadoop instance?
213    *
214    *  @return whether we use a real hadoop instance or not
215    */
216   public boolean runningInDistributedMode() {
217     return jobTracker != null;
218   }
219 
220   /**
221    * Get the single part file status and make sure there is only one part
222    *
223    * @param conf Configuration to get the file system from
224    * @param partDirPath Directory where the single part file should exist
225    * @return Single part file status
226    * @throws IOException
227    */
228   public static FileStatus getSinglePartFileStatus(Configuration conf,
229       Path partDirPath) throws IOException {
230     FileSystem fs = FileSystem.get(conf);
231     FileStatus singlePartFileStatus = null;
232     int partFiles = 0;
233     for (FileStatus fileStatus : fs.listStatus(partDirPath)) {
234       if (fileStatus.getPath().getName().equals("part-m-00000")) {
235         singlePartFileStatus = fileStatus;
236       }
237       if (fileStatus.getPath().getName().startsWith("part-m-")) {
238         ++partFiles;
239       }
240     }
241 
242     Preconditions.checkState(partFiles == 1, "getSinglePartFile: Part file " +
243         "count should be 1, but is " + partFiles);
244 
245     return singlePartFileStatus;
246   }
247 
248   /**
249    * Read all parts- files in the output and count their lines.
250    * This works only for textual output!
251    *
252    * @param conf Configuration
253    * @param outputPath Output path
254    * @return Number of output lines
255    * @throws IOException
256    */
257   public int getNumResults(Configuration conf, Path outputPath)
258       throws IOException {
259     FileSystem fs = FileSystem.get(conf);
260     int numResults = 0;
261     for (FileStatus status : fs.listStatus(outputPath, PARTS_FILTER)) {
262       FSDataInputStream in = null;
263       BufferedReader reader = null;
264       try {
265         in = fs.open(status.getPath());
266         reader = new BufferedReader(new InputStreamReader(in, Charsets.UTF_8));
267         while (reader.readLine() != null) {
268           numResults++;
269         }
270       } finally {
271         Closeables.close(in, true);
272         Closeables.close(reader, true);
273       }
274     }
275     return numResults;
276   }
277 
278   @Before
279   public void setUp() {
280     if (runningInDistributedMode()) {
281       System.out.println("setUp: Setting tasks to 3 for " + getName() +
282           " since JobTracker exists...");
283       numWorkers = 3;
284     }
285     try {
286       cleanupTemporaryFiles();
287 
288       if (zkList == null) {
289         return;
290       }
291       ZooKeeperExt zooKeeperExt =
292           new ZooKeeperExt(zkList, 30 * 1000, 0, 0, this);
293       List<String> rootChildren =
294           zooKeeperExt.getChildrenExt("/", false, false, true);
295       for (String rootChild : rootChildren) {
296         if (rootChild.startsWith("/_hadoopBsp")) {
297           List<String> children =
298               zooKeeperExt.getChildrenExt(rootChild, false, false, true);
299           for (String child: children) {
300             if (child.contains("job_local_")) {
301               System.out.println("Cleaning up " + child);
302               zooKeeperExt.deleteExt(child, -1, true);
303             }
304           }
305         }
306       }
307       zooKeeperExt.close();
308     } catch (Exception e) {
309       throw new RuntimeException(e);
310     }
311   }
312 
313   @After
314   public void tearDown() throws IOException {
315     cleanupTemporaryFiles();
316   }
317 
318   /**
319    * Remove temporary files
320    */
321   private void cleanupTemporaryFiles() throws IOException {
322     FileUtils.deletePath(new Configuration(), DEFAULT_TEMP_DIR);
323   }
324 
325   @Override
326   public void process(WatchedEvent event) {
327     // Do nothing
328   }
329 
330   /**
331    * Helper method to remove an old output directory if it exists,
332    * and set the output path for any VertexOutputFormat that uses
333    * FileOutputFormat.
334    *
335    * @param job Job to set the output path for
336    * @param outputPath Path to output
337    * @throws IOException
338    */
339   public static void removeAndSetOutput(GiraphJob job,
340       Path outputPath) throws IOException {
341     FileUtils.deletePath(job.getConfiguration(), outputPath);
342     FileOutputFormatUtil.setOutputPath(job.getInternalJob(), outputPath);
343   }
344 
345   public static String getCallingMethodName() {
346     return Thread.currentThread().getStackTrace()[2].getMethodName();
347   }
348 }