1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.yarn;
2021importstatic org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR;
2223import java.io.File;
24import java.io.IOException;
25import java.util.Properties;
26import java.util.concurrent.ExecutorService;
27import java.util.concurrent.Executors;
28import junit.framework.Assert;
29import org.apache.commons.io.FileUtils;
30import org.apache.giraph.conf.GiraphConfiguration;
31import org.apache.giraph.conf.GiraphConstants;
32import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
33import org.apache.giraph.graph.BasicComputation;
34import org.apache.giraph.graph.Vertex;
35import org.apache.giraph.io.formats.GiraphFileInputFormat;
36import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
37import org.apache.giraph.io.formats.IntIntNullTextInputFormat;
38import org.apache.hadoop.fs.Path;
39import org.apache.hadoop.io.IntWritable;
40import org.apache.hadoop.io.NullWritable;
41import org.apache.hadoop.yarn.conf.YarnConfiguration;
42import org.apache.hadoop.yarn.server.MiniYARNCluster;
43import org.apache.log4j.Logger;
44import org.apache.zookeeper.WatchedEvent;
45import org.apache.zookeeper.Watcher;
46import org.apache.zookeeper.server.ServerConfig;
47import org.apache.zookeeper.server.ZooKeeperServerMain;
48import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
4950import org.junit.Test;
515253/**54 * Tests the Giraph on YARN workflow. Basically, the plan is to use a55 * <code>MiniYARNCluster</code> to run a small test job through our56 * GiraphYarnClient -> GiraphApplicationMaster -gt; GiraphYarnTask (2 no-ops)57 * No "real" BSP code need be tested here, as it is not aware it is running on58 * YARN once the job is in progress, so the existing MRv1 BSP tests are fine.59 */60publicclassTestYarnJobimplements Watcher {
61privatestaticfinal Logger LOG = Logger.getLogger(TestYarnJob.class);
62/**63 * Simple No-Op vertex to test if we can run a quick Giraph job on YARN.64 */65privatestaticclassDummyYarnComputationextends BasicComputation<
66 IntWritable, IntWritable, NullWritable, IntWritable> {
67 @Override
68publicvoid compute(Vertex<IntWritable, IntWritable, NullWritable> vertex,
69 Iterable<IntWritable> messages) throws IOException {
70 vertex.voteToHalt();
71 }
72 }
7374/** job name for this integration test */75privatestaticfinal String JOB_NAME = "giraph-TestPureYarnJob";
76/** ZooKeeper port to use for tests, avoiding InternalVertexRunner's port */77privatestaticfinalint LOCAL_ZOOKEEPER_PORT = 22183;
78/** ZooKeeper list system property */79privatestaticfinal String zkList = "localhost:" + LOCAL_ZOOKEEPER_PORT;
80/** Local ZK working dir, avoid InternalVertexRunner naming */81privatestaticfinal String zkDirName = "_bspZooKeeperYarn";
82/** Local ZK Manager working dir, avoid InternalVertexRunner naming */83privatestaticfinal String zkMgrDirName = "_defaultZooKeeperManagerYarn";
8485/** Temp ZK base working dir for integration test */86private File testBaseDir = null;
87/** Fake input dir for integration test */88private File inputDir = null;
89/** Fake output dir for integration test */90private File outputDir = null;
91/** Temp ZK working dir for integration test */92private File zkDir = null;
93/** Temp ZK Manager working dir for integration test */94private File zkMgrDir = null;
95/** Internal ZooKeeper instance for integration test run */96privateInternalZooKeeper zookeeper;
97/** For running the ZK instance locally */98private ExecutorService exec = Executors.newSingleThreadExecutor();
99/** GiraphConfiguration for a "fake YARN job" */100private GiraphConfiguration conf = null;
101/** Counter for # of znode events during integration test */102privateint zkEventCount = 0;
103/** Our YARN test cluster for local integration test */104private MiniYARNCluster cluster = null;
105106 @Test
107publicvoid testPureYarnJob() {
108try {
109 setupYarnConfiguration();
110 initLocalZookeeper();
111 initYarnCluster();
112 GiraphYarnClient testGyc = new GiraphYarnClient(conf, JOB_NAME);
113 Assert.assertTrue(testGyc.run(true));
114 } catch (Exception e) {
115 e.printStackTrace();
116 Assert.fail("Caught exception in TestYarnJob: " + e);
117 } finally {
118 zookeeper.end();
119 exec.shutdown();
120 cluster.stop();
121 deleteTempDirectories();
122 }
123 }
124125/**126 * Logging this stuff will help you debug integration test issues.127 * @param zkEvent incoming event for our current test ZK's znode tree.128 */129 @Override
130publicvoid process(WatchedEvent zkEvent) {
131 String event = zkEvent == null ? "NULL" : zkEvent.toString();
132 LOG.info("TestYarnJob observed ZK event: " + event +
133" for a total of " + (++zkEventCount) + " so far.");
134 }
135136/**137 * Delete our temp dir so checkstyle and rat plugins are happy.138 */139privatevoid deleteTempDirectories() {
140try {
141if (testBaseDir != null && testBaseDir.exists()) {
142 FileUtils.deleteDirectory(testBaseDir);
143 }
144 } catch (IOException ioe) {
145 LOG.error("TestYarnJob#deleteTempDirectories() FAIL at: " + testBaseDir);
146 }
147 }
148149/**150 * Initialize a local ZK instance for our test run.151 */152privatevoid initLocalZookeeper() throws IOException {
153 zookeeper = newInternalZooKeeper();
154 exec.execute(new Runnable() {
155 @Override
156publicvoid run() {
157try {
158// Configure a local zookeeper instance159 Properties zkProperties = generateLocalZkProperties();
160 QuorumPeerConfig qpConfig = new QuorumPeerConfig();
161 qpConfig.parseProperties(zkProperties);
162// run the zookeeper instance163final ServerConfig zkConfig = new ServerConfig();
164 zkConfig.readFrom(qpConfig);
165 zookeeper.runFromConfig(zkConfig);
166 } catch (QuorumPeerConfig.ConfigException qpcce) {
167thrownew RuntimeException("parse of generated ZK config file " +
168"has failed.", qpcce);
169 } catch (IOException e) {
170 e.printStackTrace();
171thrownew RuntimeException("initLocalZookeeper in TestYarnJob: ", e);
172 }
173 }
174175/**176 * Returns pre-created ZK conf properties for Giraph integration test.177 * @return the populated properties sheet.178 */179 Properties generateLocalZkProperties() {
180 Properties zkProperties = new Properties();
181 zkProperties.setProperty("tickTime", "2000");
182 zkProperties.setProperty("dataDir", zkDir.getAbsolutePath());
183 zkProperties.setProperty("clientPort",
184 String.valueOf(LOCAL_ZOOKEEPER_PORT));
185 zkProperties.setProperty("maxClientCnxns", "10000");
186 zkProperties.setProperty("minSessionTimeout", "10000");
187 zkProperties.setProperty("maxSessionTimeout", "100000");
188 zkProperties.setProperty("initLimit", "10");
189 zkProperties.setProperty("syncLimit", "5");
190 zkProperties.setProperty("snapCount", "50000");
191return zkProperties;
192 }
193 });
194 }
195196/**197 * Set up the GiraphConfiguration settings we need to run a no-op Giraph198 * job on a MiniYARNCluster as an integration test. Some YARN-specific199 * flags are set inside GiraphYarnClient and won't need to be set here.200 */201privatevoid setupYarnConfiguration() throws IOException {
202 conf = new GiraphConfiguration();
203 conf.setWorkerConfiguration(1, 1, 100.0f);
204 conf.setMaxMasterSuperstepWaitMsecs(30 * 1000);
205 conf.setEventWaitMsecs(3 * 1000);
206 conf.setYarnLibJars(""); // no need207 conf.setYarnTaskHeapMb(256); // small since no work to be done208 conf.setComputationClass(DummyYarnComputation.class);
209 conf.setVertexInputFormatClass(IntIntNullTextInputFormat.class);
210 conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
211 conf.setNumComputeThreads(1);
212 conf.setMaxTaskAttempts(1);
213 conf.setNumInputSplitsThreads(1);
214// Giraph on YARN only ever things its running in "non-local" mode215 conf.setLocalTestMode(false);
216// this has to happen here before we populate the conf with the temp dirs217 setupTempDirectories();
218 conf.set(OUTDIR, new Path(outputDir.getAbsolutePath()).toString());
219 GiraphFileInputFormat.addVertexInputPath(conf, new Path(inputDir.getAbsolutePath()));
220// hand off the ZK info we just created to our no-op job221 GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS.set(conf, 500);
222 conf.setZooKeeperConfiguration(zkList);
223 conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.getAbsolutePath());
224 GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf, zkMgrDir.getAbsolutePath());
225// without this, our "real" client won't connect w/"fake" YARN cluster226 conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
227 }
228229/**230 * Initialize the temp dir tree for ZK and I/O for no-op integration test.231 */232privatevoid setupTempDirectories() throws IOException {
233try {
234 testBaseDir =
235new File(System.getProperty("user.dir"), JOB_NAME);
236if (testBaseDir.exists()) {
237 testBaseDir.delete();
238 }
239 testBaseDir.mkdir();
240 inputDir = new File(testBaseDir, "yarninput");
241if (inputDir.exists()) {
242 inputDir.delete();
243 }
244 inputDir.mkdir();
245 File inFile = new File(inputDir, "graph_data.txt");
246 inFile.createNewFile();
247 outputDir = new File(testBaseDir, "yarnoutput");
248if (outputDir.exists()) {
249 outputDir.delete();
250 } // don't actually produce the output dir, let Giraph On YARN do it251 zkDir = new File(testBaseDir, zkDirName);
252if (zkDir.exists()) {
253 zkDir.delete();
254 }
255 zkDir.mkdir();
256 zkMgrDir = new File(testBaseDir, zkMgrDirName);
257if (zkMgrDir.exists()) {
258 zkMgrDir.delete();
259 }
260 zkMgrDir.mkdir();
261 } catch (IOException ioe) {
262 ioe.printStackTrace();
263thrownew IOException("from setupTempDirectories: ", ioe);
264 }
265 }
266267/**268 * Initialize the MiniYARNCluster for the integration test.269 */270privatevoid initYarnCluster() {
271 cluster = new MiniYARNCluster(TestYarnJob.class.getName(), 1, 1, 1);
272 cluster.init(new ImmutableClassesGiraphConfiguration(conf));
273 cluster.start();
274 }
275276/**277 * Extension of {@link ZooKeeperServerMain} that allows programmatic shutdown278 */279classInternalZooKeeperextends ZooKeeperServerMain {
280/**281 * Shutdown the ZooKeeper instance.282 */283void end() {
284 shutdown();
285 }
286 }
287 }