This project has retired. For details please refer to its Attic page.
TestYarnJob xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.yarn;
20  
21  import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR;
22  
23  import java.io.File;
24  import java.io.IOException;
25  import java.util.Properties;
26  import java.util.concurrent.ExecutorService;
27  import java.util.concurrent.Executors;
28  import junit.framework.Assert;
29  import org.apache.commons.io.FileUtils;
30  import org.apache.giraph.conf.GiraphConfiguration;
31  import org.apache.giraph.conf.GiraphConstants;
32  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
33  import org.apache.giraph.graph.BasicComputation;
34  import org.apache.giraph.graph.Vertex;
35  import org.apache.giraph.io.formats.GiraphFileInputFormat;
36  import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
37  import org.apache.giraph.io.formats.IntIntNullTextInputFormat;
38  import org.apache.hadoop.fs.Path;
39  import org.apache.hadoop.io.IntWritable;
40  import org.apache.hadoop.io.NullWritable;
41  import org.apache.hadoop.yarn.conf.YarnConfiguration;
42  import org.apache.hadoop.yarn.server.MiniYARNCluster;
43  import org.apache.log4j.Logger;
44  import org.apache.zookeeper.WatchedEvent;
45  import org.apache.zookeeper.Watcher;
46  import org.apache.zookeeper.server.ServerConfig;
47  import org.apache.zookeeper.server.ZooKeeperServerMain;
48  import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
49  
50  import org.junit.Test;
51  
52  
53  /**
54   * Tests the Giraph on YARN workflow. Basically, the plan is to use a
55   * <code>MiniYARNCluster</code> to run a small test job through our
56   * GiraphYarnClient -&gt; GiraphApplicationMaster -gt; GiraphYarnTask (2 no-ops)
57   * No "real" BSP code need be tested here, as it is not aware it is running on
58   * YARN once the job is in progress, so the existing MRv1 BSP tests are fine.
59   */
60  public class TestYarnJob implements Watcher {
61    private static final Logger LOG = Logger.getLogger(TestYarnJob.class);
62    /**
63     * Simple No-Op vertex to test if we can run a quick Giraph job on YARN.
64     */
65    private static class DummyYarnComputation extends BasicComputation<
66        IntWritable, IntWritable, NullWritable, IntWritable> {
67      @Override
68      public void compute(Vertex<IntWritable, IntWritable, NullWritable> vertex,
69          Iterable<IntWritable> messages) throws IOException {
70        vertex.voteToHalt();
71      }
72    }
73  
74    /** job name for this integration test */
75    private static final String JOB_NAME = "giraph-TestPureYarnJob";
76    /** ZooKeeper port to use for tests, avoiding InternalVertexRunner's port */
77    private static final int LOCAL_ZOOKEEPER_PORT = 22183;
78    /** ZooKeeper list system property */
79    private static final String zkList = "localhost:" + LOCAL_ZOOKEEPER_PORT;
80    /** Local ZK working dir, avoid InternalVertexRunner naming */
81    private static final String zkDirName = "_bspZooKeeperYarn";
82    /** Local ZK Manager working dir, avoid InternalVertexRunner naming */
83    private static final String zkMgrDirName = "_defaultZooKeeperManagerYarn";
84  
85    /** Temp ZK base working dir for integration test */
86    private File testBaseDir = null;
87    /** Fake input dir for integration test */
88    private File inputDir = null;
89    /** Fake output dir for integration test */
90    private File outputDir = null;
91    /** Temp ZK working dir for integration test */
92    private File zkDir = null;
93    /** Temp ZK Manager working dir for integration test */
94    private File zkMgrDir = null;
95    /** Internal ZooKeeper instance for integration test run */
96    private InternalZooKeeper zookeeper;
97    /** For running the ZK instance locally */
98    private ExecutorService exec = Executors.newSingleThreadExecutor();
99    /** GiraphConfiguration for a "fake YARN job" */
100   private GiraphConfiguration conf = null;
101   /** Counter for # of znode events during integration test */
102   private int zkEventCount = 0;
103   /** Our YARN test cluster for local integration test */
104   private MiniYARNCluster cluster = null;
105 
106   @Test
107   public void testPureYarnJob() {
108     try {
109       setupYarnConfiguration();
110       initLocalZookeeper();
111       initYarnCluster();
112       GiraphYarnClient testGyc = new GiraphYarnClient(conf, JOB_NAME);
113       Assert.assertTrue(testGyc.run(true));
114     } catch (Exception e) {
115       e.printStackTrace();
116       Assert.fail("Caught exception in TestYarnJob: " + e);
117     } finally {
118       zookeeper.end();
119       exec.shutdown();
120       cluster.stop();
121       deleteTempDirectories();
122     }
123   }
124 
125   /**
126    * Logging this stuff will help you debug integration test issues.
127    * @param zkEvent incoming event for our current test ZK's znode tree.
128    */
129   @Override
130   public void process(WatchedEvent zkEvent) {
131     String event = zkEvent == null ? "NULL" : zkEvent.toString();
132     LOG.info("TestYarnJob observed ZK event: " + event +
133       " for a total of " + (++zkEventCount) + " so far.");
134   }
135 
136   /**
137    * Delete our temp dir so checkstyle and rat plugins are happy.
138    */
139   private void deleteTempDirectories() {
140     try {
141       if (testBaseDir != null && testBaseDir.exists()) {
142         FileUtils.deleteDirectory(testBaseDir);
143       }
144     } catch (IOException ioe) {
145       LOG.error("TestYarnJob#deleteTempDirectories() FAIL at: " + testBaseDir);
146     }
147   }
148 
149   /**
150    * Initialize a local ZK instance for our test run.
151    */
152   private void initLocalZookeeper() throws IOException {
153     zookeeper = new InternalZooKeeper();
154     exec.execute(new Runnable() {
155       @Override
156       public void run() {
157         try {
158           // Configure a local zookeeper instance
159           Properties zkProperties = generateLocalZkProperties();
160           QuorumPeerConfig qpConfig = new QuorumPeerConfig();
161           qpConfig.parseProperties(zkProperties);
162           // run the zookeeper instance
163           final ServerConfig zkConfig = new ServerConfig();
164           zkConfig.readFrom(qpConfig);
165           zookeeper.runFromConfig(zkConfig);
166         } catch (QuorumPeerConfig.ConfigException qpcce) {
167           throw new RuntimeException("parse of generated ZK config file " +
168                                        "has failed.", qpcce);
169         } catch (IOException e) {
170           e.printStackTrace();
171           throw new RuntimeException("initLocalZookeeper in TestYarnJob: ", e);
172         }
173       }
174 
175       /**
176        * Returns pre-created ZK conf properties for Giraph integration test.
177        * @return the populated properties sheet.
178        */
179       Properties generateLocalZkProperties() {
180         Properties zkProperties = new Properties();
181         zkProperties.setProperty("tickTime", "2000");
182         zkProperties.setProperty("dataDir", zkDir.getAbsolutePath());
183         zkProperties.setProperty("clientPort",
184                                   String.valueOf(LOCAL_ZOOKEEPER_PORT));
185         zkProperties.setProperty("maxClientCnxns", "10000");
186         zkProperties.setProperty("minSessionTimeout", "10000");
187         zkProperties.setProperty("maxSessionTimeout", "100000");
188         zkProperties.setProperty("initLimit", "10");
189         zkProperties.setProperty("syncLimit", "5");
190         zkProperties.setProperty("snapCount", "50000");
191         return zkProperties;
192       }
193     });
194   }
195 
196   /**
197    * Set up the GiraphConfiguration settings we need to run a no-op Giraph
198    * job on a MiniYARNCluster as an integration test. Some YARN-specific
199    * flags are set inside GiraphYarnClient and won't need to be set here.
200    */
201   private void setupYarnConfiguration() throws IOException {
202     conf = new GiraphConfiguration();
203     conf.setWorkerConfiguration(1, 1, 100.0f);
204     conf.setMaxMasterSuperstepWaitMsecs(30 * 1000);
205     conf.setEventWaitMsecs(3 * 1000);
206     conf.setYarnLibJars(""); // no need
207     conf.setYarnTaskHeapMb(256); // small since no work to be done
208     conf.setComputationClass(DummyYarnComputation.class);
209     conf.setVertexInputFormatClass(IntIntNullTextInputFormat.class);
210     conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
211     conf.setNumComputeThreads(1);
212     conf.setMaxTaskAttempts(1);
213     conf.setNumInputSplitsThreads(1);
214     // Giraph on YARN only ever things its running in "non-local" mode
215     conf.setLocalTestMode(false);
216     // this has to happen here before we populate the conf with the temp dirs
217     setupTempDirectories();
218     conf.set(OUTDIR, new Path(outputDir.getAbsolutePath()).toString());
219     GiraphFileInputFormat.addVertexInputPath(conf, new Path(inputDir.getAbsolutePath()));
220     // hand off the ZK info we just created to our no-op job
221     GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS.set(conf, 500);
222     conf.setZooKeeperConfiguration(zkList);
223     conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.getAbsolutePath());
224     GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf, zkMgrDir.getAbsolutePath());
225     // without this, our "real" client won't connect w/"fake" YARN cluster
226     conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
227   }
228 
229   /**
230    * Initialize the temp dir tree for ZK and I/O for no-op integration test.
231    */
232   private void setupTempDirectories() throws IOException {
233     try {
234     testBaseDir =
235       new File(System.getProperty("user.dir"), JOB_NAME);
236     if (testBaseDir.exists()) {
237       testBaseDir.delete();
238     }
239     testBaseDir.mkdir();
240     inputDir = new File(testBaseDir, "yarninput");
241     if (inputDir.exists()) {
242       inputDir.delete();
243     }
244     inputDir.mkdir();
245     File inFile = new File(inputDir, "graph_data.txt");
246     inFile.createNewFile();
247     outputDir = new File(testBaseDir, "yarnoutput");
248     if (outputDir.exists()) {
249       outputDir.delete();
250     } // don't actually produce the output dir, let Giraph On YARN do it
251     zkDir = new File(testBaseDir, zkDirName);
252     if (zkDir.exists()) {
253       zkDir.delete();
254     }
255     zkDir.mkdir();
256     zkMgrDir = new File(testBaseDir, zkMgrDirName);
257     if (zkMgrDir.exists()) {
258       zkMgrDir.delete();
259     }
260     zkMgrDir.mkdir();
261     } catch (IOException ioe) {
262       ioe.printStackTrace();
263       throw new IOException("from setupTempDirectories: ", ioe);
264     }
265   }
266 
267   /**
268    * Initialize the MiniYARNCluster for the integration test.
269    */
270   private void initYarnCluster() {
271     cluster = new MiniYARNCluster(TestYarnJob.class.getName(), 1, 1, 1);
272     cluster.init(new ImmutableClassesGiraphConfiguration(conf));
273     cluster.start();
274   }
275 
276   /**
277    * Extension of {@link ZooKeeperServerMain} that allows programmatic shutdown
278    */
279   class InternalZooKeeper extends ZooKeeperServerMain {
280     /**
281      * Shutdown the ZooKeeper instance.
282      */
283     void end() {
284       shutdown();
285     }
286   }
287 }