This project has retired. For details please refer to its Attic page.
TestBspBasic xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertSame;
23  import static org.junit.Assert.assertTrue;
24  import static org.junit.Assert.fail;
25  import static org.mockito.Mockito.mock;
26  import static org.mockito.Mockito.when;
27  
28  import java.io.BufferedReader;
29  import java.io.ByteArrayOutputStream;
30  import java.io.DataOutputStream;
31  import java.io.IOException;
32  import java.io.InputStreamReader;
33  import java.lang.reflect.InvocationTargetException;
34  import java.util.ArrayList;
35  import java.util.List;
36  import java.util.Map;
37  
38  import org.apache.giraph.aggregators.TextAggregatorWriter;
39  import org.apache.giraph.combiner.SimpleSumMessageCombiner;
40  import org.apache.giraph.conf.GiraphConfiguration;
41  import org.apache.giraph.conf.GiraphConstants;
42  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
43  import org.apache.giraph.examples.GeneratedVertexReader;
44  import org.apache.giraph.examples.SimpleCombinerComputation;
45  import org.apache.giraph.examples.SimpleFailComputation;
46  import org.apache.giraph.examples.SimpleMasterComputeComputation;
47  import org.apache.giraph.examples.SimpleMsgComputation;
48  import org.apache.giraph.examples.SimplePageRankComputation;
49  import org.apache.giraph.examples.SimplePageRankComputation.SimplePageRankVertexInputFormat;
50  import org.apache.giraph.examples.SimpleShortestPathsComputation;
51  import org.apache.giraph.examples.SimpleSuperstepComputation;
52  import org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexInputFormat;
53  import org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexOutputFormat;
54  import org.apache.giraph.graph.Vertex;
55  import org.apache.giraph.io.VertexInputFormat;
56  import org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexOutputFormat;
57  import org.apache.giraph.job.GiraphJob;
58  import org.apache.giraph.job.HadoopUtils;
59  import org.apache.giraph.master.input.LocalityAwareInputSplitsMasterOrganizer;
60  import org.apache.giraph.utils.NoOpComputation;
61  import org.apache.giraph.worker.WorkerInfo;
62  import org.apache.hadoop.conf.Configuration;
63  import org.apache.hadoop.fs.FSDataInputStream;
64  import org.apache.hadoop.fs.FileStatus;
65  import org.apache.hadoop.fs.FileSystem;
66  import org.apache.hadoop.fs.Path;
67  import org.apache.hadoop.io.FloatWritable;
68  import org.apache.hadoop.io.IntWritable;
69  import org.apache.hadoop.io.LongWritable;
70  import org.apache.hadoop.io.NullWritable;
71  import org.apache.hadoop.io.Writable;
72  import org.apache.hadoop.mapreduce.InputSplit;
73  import org.apache.zookeeper.KeeperException;
74  import org.junit.Test;
75  
76  import com.google.common.base.Charsets;
77  import com.google.common.collect.Lists;
78  import com.google.common.collect.Maps;
79  import com.google.common.io.Closeables;
80  
81  /**
82   * Unit test for many simple BSP applications.
83   */
84  public class
85      TestBspBasic extends BspCase {
86  
87    public TestBspBasic() {
88      super(TestBspBasic.class.getName());
89    }
90  
91    /**
92     * Just instantiate the vertex (all functions are implemented) and the
93     * VertexInputFormat using reflection.
94     *
95     * @throws IllegalAccessException
96     * @throws InstantiationException
97     * @throws InterruptedException
98     * @throws IOException
99     * @throws InvocationTargetException
100    * @throws IllegalArgumentException
101    * @throws NoSuchMethodException
102    * @throws SecurityException
103    */
104   @Test
105   public void testInstantiateVertex()
106       throws InstantiationException, IllegalAccessException,
107       IOException, InterruptedException, IllegalArgumentException,
108       InvocationTargetException, SecurityException, NoSuchMethodException {
109     System.out.println("testInstantiateVertex: java.class.path=" +
110         System.getProperty("java.class.path"));
111     GiraphConfiguration conf = new GiraphConfiguration();
112     conf.setComputationClass(SimpleSuperstepComputation.class);
113     conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
114     GiraphJob job = prepareJob(getCallingMethodName(), conf);
115     ImmutableClassesGiraphConfiguration configuration =
116         new ImmutableClassesGiraphConfiguration(job.getConfiguration());
117     Vertex<LongWritable, IntWritable, FloatWritable> vertex =
118         configuration.createVertex();
119     vertex.initialize(new LongWritable(1), new IntWritable(1));
120     System.out.println("testInstantiateVertex: Got vertex " + vertex);
121     VertexInputFormat<LongWritable, IntWritable, FloatWritable>
122     inputFormat = configuration.createWrappedVertexInputFormat();
123     List<InputSplit> splitArray = inputFormat.getSplits(
124         HadoopUtils.makeJobContext(), 1);
125     ByteArrayOutputStream byteArrayOutputStream =
126         new ByteArrayOutputStream();
127     DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
128     ((Writable) splitArray.get(0)).write(outputStream);
129     System.out.println("testInstantiateVertex: Example output split = " +
130         byteArrayOutputStream.toString());
131   }
132 
133   private static class NullComputation extends NoOpComputation<NullWritable,
134       NullWritable, NullWritable, NullWritable> { }
135 
136   /**
137    * Test whether vertices with NullWritable for vertex value type, edge value
138    * type and message value type can be instantiated.
139    */
140   @Test
141   public void testInstantiateNullVertex() throws IOException {
142     GiraphConfiguration nullConf = new GiraphConfiguration();
143     nullConf.setComputationClass(NullComputation.class);
144     ImmutableClassesGiraphConfiguration<NullWritable, NullWritable,
145         NullWritable> immutableClassesGiraphConfiguration =
146         new ImmutableClassesGiraphConfiguration<
147             NullWritable, NullWritable, NullWritable>(nullConf);
148     NullWritable vertexValue =
149         immutableClassesGiraphConfiguration.createVertexValue();
150     NullWritable edgeValue =
151         immutableClassesGiraphConfiguration.createEdgeValue();
152     Writable messageValue =
153         immutableClassesGiraphConfiguration.createOutgoingMessageValueFactory()
154             .newInstance();
155     assertSame(vertexValue.getClass(), NullWritable.class);
156     assertSame(vertexValue, edgeValue);
157     assertSame(edgeValue, messageValue);
158   }
159 
160   /**
161    * Do some checks for local job runner.
162    *
163    * @throws IOException
164    * @throws ClassNotFoundException
165    * @throws InterruptedException
166    */
167   @Test
168   public void testLocalJobRunnerConfig()
169       throws IOException, InterruptedException, ClassNotFoundException {
170     if (runningInDistributedMode()) {
171       System.out.println("testLocalJobRunnerConfig: Skipping for " +
172           "non-local");
173       return;
174     }
175     GiraphConfiguration conf = new GiraphConfiguration();
176     conf.setComputationClass(SimpleSuperstepComputation.class);
177     conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
178     GiraphJob job = prepareJob(getCallingMethodName(), conf);
179     conf = job.getConfiguration();
180     conf.setWorkerConfiguration(5, 5, 100.0f);
181     GiraphConstants.SPLIT_MASTER_WORKER.set(conf, true);
182 
183     try {
184       job.run(true);
185       fail();
186     } catch (IllegalArgumentException e) {
187     }
188 
189     GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false);
190     try {
191       job.run(true);
192       fail();
193     } catch (IllegalArgumentException e) {
194     }
195 
196     conf.setWorkerConfiguration(1, 1, 100.0f);
197     job.run(true);
198   }
199 
200   /**
201    * Run a sample BSP job in JobTracker, kill a task, and make sure
202    * the job fails (not enough attempts to restart)
203    *
204    * @throws IOException
205    * @throws ClassNotFoundException
206    * @throws InterruptedException
207    */
208   @Test
209   public void testBspFail()
210       throws IOException, InterruptedException, ClassNotFoundException {
211     // Allow this test only to be run on a real Hadoop setup
212     if (!runningInDistributedMode()) {
213       System.out.println("testBspFail: not executed for local setup.");
214       return;
215     }
216 
217     GiraphConfiguration conf = new GiraphConfiguration();
218     conf.setComputationClass(SimpleFailComputation.class);
219     conf.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
220     GiraphJob job = prepareJob(getCallingMethodName(), conf,
221         getTempPath(getCallingMethodName()));
222     job.getConfiguration().setInt("mapred.map.max.attempts", 1);
223     assertTrue(!job.run(true));
224   }
225 
226   /**
227    * Run a sample BSP job locally and test supersteps.
228    *
229    * @throws IOException
230    * @throws ClassNotFoundException
231    * @throws InterruptedException
232    */
233   @Test
234   public void testBspSuperStep()
235       throws IOException, InterruptedException, ClassNotFoundException {
236     String callingMethod = getCallingMethodName();
237     Path outputPath = getTempPath(callingMethod);
238     GiraphConfiguration conf = new GiraphConfiguration();
239     conf.setComputationClass(SimpleSuperstepComputation.class);
240     conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
241     conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
242     GiraphJob job = prepareJob(callingMethod, conf, outputPath);
243     Configuration configuration = job.getConfiguration();
244     // GeneratedInputSplit will generate 10 vertices
245     GeneratedVertexReader.READER_VERTICES.set(configuration, 10);
246     assertTrue(job.run(true));
247     if (!runningInDistributedMode()) {
248       FileStatus fileStatus = getSinglePartFileStatus(configuration, outputPath);
249       assertEquals(49l, fileStatus.getLen());
250     }
251   }
252 
253   /**
254    * Run a sample BSP job locally and test messages.
255    *
256    * @throws IOException
257    * @throws ClassNotFoundException
258    * @throws InterruptedException
259    */
260   @Test
261   public void testBspMsg()
262       throws IOException, InterruptedException, ClassNotFoundException {
263     GiraphConfiguration conf = new GiraphConfiguration();
264     conf.setComputationClass(SimpleMsgComputation.class);
265     conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
266     GiraphJob job = prepareJob(getCallingMethodName(), conf);
267     assertTrue(job.run(true));
268   }
269 
270 
271   /**
272    * Run a sample BSP job locally with no vertices and make sure
273    * it completes.
274    *
275    * @throws IOException
276    * @throws ClassNotFoundException
277    * @throws InterruptedException
278    */
279   @Test
280   public void testEmptyVertexInputFormat()
281       throws IOException, InterruptedException, ClassNotFoundException {
282     GiraphConfiguration conf = new GiraphConfiguration();
283     conf.setComputationClass(SimpleMsgComputation.class);
284     conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
285     GiraphJob job = prepareJob(getCallingMethodName(), conf);
286     GeneratedVertexReader.READER_VERTICES.set(job.getConfiguration(), 0);
287     assertTrue(job.run(true));
288   }
289 
290   /**
291    * Run a sample BSP job locally with message combiner and
292    * checkout output value.
293    *
294    * @throws IOException
295    * @throws ClassNotFoundException
296    * @throws InterruptedException
297    */
298   @Test
299   public void testBspCombiner()
300       throws IOException, InterruptedException, ClassNotFoundException {
301     GiraphConfiguration conf = new GiraphConfiguration();
302     conf.setComputationClass(SimpleCombinerComputation.class);
303     conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
304     conf.setMessageCombinerClass(SimpleSumMessageCombiner.class);
305     GiraphJob job = prepareJob(getCallingMethodName(), conf);
306     assertTrue(job.run(true));
307   }
308 
309   /**
310    * Run a test to see if the InputSplitPathOrganizer can correctly sort
311    * locality information from a mocked znode of data.
312    * @throws IOException
313    * @throws KeeperException
314    * @throws InterruptedException
315    */
316   @Test
317   public void testInputSplitLocality()
318     throws IOException, KeeperException, InterruptedException {
319     List<byte[]> serializedSplits = new ArrayList<>();
320     serializedSplits.add(new byte[]{1});
321     serializedSplits.add(new byte[]{2});
322     serializedSplits.add(new byte[]{3});
323 
324     WorkerInfo workerInfo = mock(WorkerInfo.class);
325     when(workerInfo.getTaskId()).thenReturn(5);
326     when(workerInfo.getHostname()).thenReturn("node.LOCAL.com");
327 
328     List<InputSplit> splits = new ArrayList<>();
329     InputSplit split1 = mock(InputSplit.class);
330     when(split1.getLocations()).thenReturn(new String[]{
331         "node.test1.com", "node.test2.com", "node.test3.com"});
332     splits.add(split1);
333     InputSplit split2 = mock(InputSplit.class);
334     when(split2.getLocations()).thenReturn(new String[]{
335         "node.testx.com", "node.LOCAL.com", "node.testy.com"});
336     splits.add(split2);
337     InputSplit split3 = mock(InputSplit.class);
338     when(split3.getLocations()).thenReturn(new String[]{
339         "node.test4.com", "node.test5.com", "node.test6.com"});
340     splits.add(split3);
341 
342     LocalityAwareInputSplitsMasterOrganizer inputSplitOrganizer =
343         new LocalityAwareInputSplitsMasterOrganizer(
344             serializedSplits,
345             splits,
346             Lists.newArrayList(workerInfo));
347 
348     assertEquals(2,
349         inputSplitOrganizer.getSerializedSplitFor(workerInfo.getTaskId())[0]);
350   }
351 
352   /**
353    * Run a sample BSP job locally and test shortest paths.
354    *
355    * @throws IOException
356    * @throws ClassNotFoundException
357    * @throws InterruptedException
358    */
359   @Test
360   public void testBspShortestPaths()
361       throws IOException, InterruptedException, ClassNotFoundException {
362     Path outputPath = getTempPath(getCallingMethodName());
363     GiraphConfiguration conf = new GiraphConfiguration();
364     conf.setComputationClass(SimpleShortestPathsComputation.class);
365     conf.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
366     conf.setVertexOutputFormatClass(
367         JsonLongDoubleFloatDoubleVertexOutputFormat.class);
368     SimpleShortestPathsComputation.SOURCE_ID.set(conf, 0);
369     GiraphJob job = prepareJob(getCallingMethodName(), conf, outputPath);
370 
371     assertTrue(job.run(true));
372 
373     int numResults = getNumResults(job.getConfiguration(), outputPath);
374 
375     int expectedNumResults = runningInDistributedMode() ? 15 : 5;
376     assertEquals(expectedNumResults, numResults);
377   }
378 
379   /**
380    * Run a sample BSP job locally and test PageRank with AggregatorWriter.
381    *
382    * @throws IOException
383    * @throws ClassNotFoundException
384    * @throws InterruptedException
385    */
386   @Test
387   public void testBspPageRankWithAggregatorWriter()
388       throws IOException, InterruptedException, ClassNotFoundException {
389     Path outputPath = getTempPath(getCallingMethodName());
390 
391     GiraphConfiguration conf = new GiraphConfiguration();
392     conf.setComputationClass(SimplePageRankComputation.class);
393     conf.setAggregatorWriterClass(TextAggregatorWriter.class);
394     conf.setMasterComputeClass(
395         SimplePageRankComputation.SimplePageRankMasterCompute.class);
396     conf.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
397     conf.setVertexOutputFormatClass(
398         SimplePageRankComputation.SimplePageRankVertexOutputFormat.class);
399     conf.setWorkerContextClass(
400         SimplePageRankComputation.SimplePageRankWorkerContext.class);
401     GiraphJob job = prepareJob(getCallingMethodName(), conf, outputPath);
402     GiraphConfiguration configuration = job.getConfiguration();
403     Path aggregatorValues = getTempPath("aggregatorValues");
404     configuration.setInt(TextAggregatorWriter.FREQUENCY,
405         TextAggregatorWriter.ALWAYS);
406     configuration.set(TextAggregatorWriter.FILENAME,
407         aggregatorValues.toString());
408 
409     assertTrue(job.run(true));
410 
411     FileSystem fs = FileSystem.get(configuration);
412     Path valuesFile = new Path(aggregatorValues.toString() + "_0");
413 
414     try {
415       if (!runningInDistributedMode()) {
416         double maxPageRank =
417             SimplePageRankComputation.SimplePageRankWorkerContext.getFinalMax();
418         double minPageRank =
419             SimplePageRankComputation.SimplePageRankWorkerContext.getFinalMin();
420         long numVertices =
421             SimplePageRankComputation.SimplePageRankWorkerContext.getFinalSum();
422         System.out.println("testBspPageRank: maxPageRank=" + maxPageRank +
423             " minPageRank=" + minPageRank + " numVertices=" + numVertices);
424 
425         FSDataInputStream in = null;
426         BufferedReader reader = null;
427         try {
428           Map<Integer, Double> minValues = Maps.newHashMap();
429           Map<Integer, Double> maxValues = Maps.newHashMap();
430           Map<Integer, Long> vertexCounts = Maps.newHashMap();
431 
432           in = fs.open(valuesFile);
433           reader = new BufferedReader(new InputStreamReader(in,
434               Charsets.UTF_8));
435           String line;
436           while ((line = reader.readLine()) != null) {
437             String[] tokens = line.split("\t");
438             int superstep = Integer.parseInt(tokens[0].split("=")[1]);
439             String value = (tokens[1].split("=")[1]);
440             String aggregatorName = (tokens[1].split("=")[0]);
441 
442             if ("min".equals(aggregatorName)) {
443               minValues.put(superstep, Double.parseDouble(value));
444             }
445             if ("max".equals(aggregatorName)) {
446               maxValues.put(superstep, Double.parseDouble(value));
447             }
448             if ("sum".equals(aggregatorName)) {
449               vertexCounts.put(superstep, Long.parseLong(value));
450             }
451           }
452 
453           int maxSuperstep = SimplePageRankComputation.MAX_SUPERSTEPS;
454           assertEquals(maxSuperstep + 2, minValues.size());
455           assertEquals(maxSuperstep + 2, maxValues.size());
456           assertEquals(maxSuperstep + 2, vertexCounts.size());
457 
458           assertEquals(maxPageRank, maxValues.get(maxSuperstep), 0d);
459           assertEquals(minPageRank, minValues.get(maxSuperstep), 0d);
460           assertEquals(numVertices, (long) vertexCounts.get(maxSuperstep));
461 
462         } finally {
463           Closeables.close(in, true);
464           Closeables.close(reader, true);
465         }
466       }
467     } finally {
468       fs.delete(valuesFile, false);
469     }
470   }
471 
472   /**
473    * Run a sample BSP job locally and test MasterCompute.
474    *
475    * @throws IOException
476    * @throws ClassNotFoundException
477    * @throws InterruptedException
478    */
479   @Test
480   public void testBspMasterCompute()
481       throws IOException, InterruptedException, ClassNotFoundException {
482     GiraphConfiguration conf = new GiraphConfiguration();
483     conf.setComputationClass(SimpleMasterComputeComputation.class);
484     conf.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
485     conf.setMasterComputeClass(
486         SimpleMasterComputeComputation.SimpleMasterCompute.class);
487     conf.setWorkerContextClass(
488         SimpleMasterComputeComputation.SimpleMasterComputeWorkerContext.class);
489     GiraphJob job = prepareJob(getCallingMethodName(), conf);
490     assertTrue(job.run(true));
491     if (!runningInDistributedMode()) {
492       double finalSum =
493           SimpleMasterComputeComputation.SimpleMasterComputeWorkerContext.getFinalSum();
494       System.out.println("testBspMasterCompute: finalSum=" + finalSum);
495       assertEquals(32.5, finalSum, 0d);
496     }
497   }
498 
499   /**
500    * Test halting at superstep 0
501    */
502   @Test
503   public void testHaltSuperstep0()
504       throws IOException, InterruptedException, ClassNotFoundException {
505     GiraphConfiguration conf = new GiraphConfiguration();
506     GiraphConstants.MAX_NUMBER_OF_SUPERSTEPS.set(conf, 0);
507     conf.setComputationClass(SimpleMsgComputation.class);
508     conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
509     GiraphJob job = prepareJob(getCallingMethodName(), conf);
510     assertTrue(job.run(true));
511   }
512 }