This project has retired. For details please refer to its
Attic page.
TestBspBasic xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertSame;
23 import static org.junit.Assert.assertTrue;
24 import static org.junit.Assert.fail;
25 import static org.mockito.Mockito.mock;
26 import static org.mockito.Mockito.when;
27
28 import java.io.BufferedReader;
29 import java.io.ByteArrayOutputStream;
30 import java.io.DataOutputStream;
31 import java.io.IOException;
32 import java.io.InputStreamReader;
33 import java.lang.reflect.InvocationTargetException;
34 import java.util.ArrayList;
35 import java.util.List;
36 import java.util.Map;
37
38 import org.apache.giraph.aggregators.TextAggregatorWriter;
39 import org.apache.giraph.combiner.SimpleSumMessageCombiner;
40 import org.apache.giraph.conf.GiraphConfiguration;
41 import org.apache.giraph.conf.GiraphConstants;
42 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
43 import org.apache.giraph.examples.GeneratedVertexReader;
44 import org.apache.giraph.examples.SimpleCombinerComputation;
45 import org.apache.giraph.examples.SimpleFailComputation;
46 import org.apache.giraph.examples.SimpleMasterComputeComputation;
47 import org.apache.giraph.examples.SimpleMsgComputation;
48 import org.apache.giraph.examples.SimplePageRankComputation;
49 import org.apache.giraph.examples.SimplePageRankComputation.SimplePageRankVertexInputFormat;
50 import org.apache.giraph.examples.SimpleShortestPathsComputation;
51 import org.apache.giraph.examples.SimpleSuperstepComputation;
52 import org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexInputFormat;
53 import org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexOutputFormat;
54 import org.apache.giraph.graph.Vertex;
55 import org.apache.giraph.io.VertexInputFormat;
56 import org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexOutputFormat;
57 import org.apache.giraph.job.GiraphJob;
58 import org.apache.giraph.job.HadoopUtils;
59 import org.apache.giraph.master.input.LocalityAwareInputSplitsMasterOrganizer;
60 import org.apache.giraph.utils.NoOpComputation;
61 import org.apache.giraph.worker.WorkerInfo;
62 import org.apache.hadoop.conf.Configuration;
63 import org.apache.hadoop.fs.FSDataInputStream;
64 import org.apache.hadoop.fs.FileStatus;
65 import org.apache.hadoop.fs.FileSystem;
66 import org.apache.hadoop.fs.Path;
67 import org.apache.hadoop.io.FloatWritable;
68 import org.apache.hadoop.io.IntWritable;
69 import org.apache.hadoop.io.LongWritable;
70 import org.apache.hadoop.io.NullWritable;
71 import org.apache.hadoop.io.Writable;
72 import org.apache.hadoop.mapreduce.InputSplit;
73 import org.apache.zookeeper.KeeperException;
74 import org.junit.Test;
75
76 import com.google.common.base.Charsets;
77 import com.google.common.collect.Lists;
78 import com.google.common.collect.Maps;
79 import com.google.common.io.Closeables;
80
81
82
83
84 public class
85 TestBspBasic extends BspCase {
86
87 public TestBspBasic() {
88 super(TestBspBasic.class.getName());
89 }
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104 @Test
105 public void testInstantiateVertex()
106 throws InstantiationException, IllegalAccessException,
107 IOException, InterruptedException, IllegalArgumentException,
108 InvocationTargetException, SecurityException, NoSuchMethodException {
109 System.out.println("testInstantiateVertex: java.class.path=" +
110 System.getProperty("java.class.path"));
111 GiraphConfiguration conf = new GiraphConfiguration();
112 conf.setComputationClass(SimpleSuperstepComputation.class);
113 conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
114 GiraphJob job = prepareJob(getCallingMethodName(), conf);
115 ImmutableClassesGiraphConfiguration configuration =
116 new ImmutableClassesGiraphConfiguration(job.getConfiguration());
117 Vertex<LongWritable, IntWritable, FloatWritable> vertex =
118 configuration.createVertex();
119 vertex.initialize(new LongWritable(1), new IntWritable(1));
120 System.out.println("testInstantiateVertex: Got vertex " + vertex);
121 VertexInputFormat<LongWritable, IntWritable, FloatWritable>
122 inputFormat = configuration.createWrappedVertexInputFormat();
123 List<InputSplit> splitArray = inputFormat.getSplits(
124 HadoopUtils.makeJobContext(), 1);
125 ByteArrayOutputStream byteArrayOutputStream =
126 new ByteArrayOutputStream();
127 DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
128 ((Writable) splitArray.get(0)).write(outputStream);
129 System.out.println("testInstantiateVertex: Example output split = " +
130 byteArrayOutputStream.toString());
131 }
132
133 private static class NullComputation extends NoOpComputation<NullWritable,
134 NullWritable, NullWritable, NullWritable> { }
135
136
137
138
139
140 @Test
141 public void testInstantiateNullVertex() throws IOException {
142 GiraphConfiguration nullConf = new GiraphConfiguration();
143 nullConf.setComputationClass(NullComputation.class);
144 ImmutableClassesGiraphConfiguration<NullWritable, NullWritable,
145 NullWritable> immutableClassesGiraphConfiguration =
146 new ImmutableClassesGiraphConfiguration<
147 NullWritable, NullWritable, NullWritable>(nullConf);
148 NullWritable vertexValue =
149 immutableClassesGiraphConfiguration.createVertexValue();
150 NullWritable edgeValue =
151 immutableClassesGiraphConfiguration.createEdgeValue();
152 Writable messageValue =
153 immutableClassesGiraphConfiguration.createOutgoingMessageValueFactory()
154 .newInstance();
155 assertSame(vertexValue.getClass(), NullWritable.class);
156 assertSame(vertexValue, edgeValue);
157 assertSame(edgeValue, messageValue);
158 }
159
160
161
162
163
164
165
166
167 @Test
168 public void testLocalJobRunnerConfig()
169 throws IOException, InterruptedException, ClassNotFoundException {
170 if (runningInDistributedMode()) {
171 System.out.println("testLocalJobRunnerConfig: Skipping for " +
172 "non-local");
173 return;
174 }
175 GiraphConfiguration conf = new GiraphConfiguration();
176 conf.setComputationClass(SimpleSuperstepComputation.class);
177 conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
178 GiraphJob job = prepareJob(getCallingMethodName(), conf);
179 conf = job.getConfiguration();
180 conf.setWorkerConfiguration(5, 5, 100.0f);
181 GiraphConstants.SPLIT_MASTER_WORKER.set(conf, true);
182
183 try {
184 job.run(true);
185 fail();
186 } catch (IllegalArgumentException e) {
187 }
188
189 GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false);
190 try {
191 job.run(true);
192 fail();
193 } catch (IllegalArgumentException e) {
194 }
195
196 conf.setWorkerConfiguration(1, 1, 100.0f);
197 job.run(true);
198 }
199
200
201
202
203
204
205
206
207
208 @Test
209 public void testBspFail()
210 throws IOException, InterruptedException, ClassNotFoundException {
211
212 if (!runningInDistributedMode()) {
213 System.out.println("testBspFail: not executed for local setup.");
214 return;
215 }
216
217 GiraphConfiguration conf = new GiraphConfiguration();
218 conf.setComputationClass(SimpleFailComputation.class);
219 conf.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
220 GiraphJob job = prepareJob(getCallingMethodName(), conf,
221 getTempPath(getCallingMethodName()));
222 job.getConfiguration().setInt("mapred.map.max.attempts", 1);
223 assertTrue(!job.run(true));
224 }
225
226
227
228
229
230
231
232
233 @Test
234 public void testBspSuperStep()
235 throws IOException, InterruptedException, ClassNotFoundException {
236 String callingMethod = getCallingMethodName();
237 Path outputPath = getTempPath(callingMethod);
238 GiraphConfiguration conf = new GiraphConfiguration();
239 conf.setComputationClass(SimpleSuperstepComputation.class);
240 conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
241 conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
242 GiraphJob job = prepareJob(callingMethod, conf, outputPath);
243 Configuration configuration = job.getConfiguration();
244
245 GeneratedVertexReader.READER_VERTICES.set(configuration, 10);
246 assertTrue(job.run(true));
247 if (!runningInDistributedMode()) {
248 FileStatus fileStatus = getSinglePartFileStatus(configuration, outputPath);
249 assertEquals(49l, fileStatus.getLen());
250 }
251 }
252
253
254
255
256
257
258
259
260 @Test
261 public void testBspMsg()
262 throws IOException, InterruptedException, ClassNotFoundException {
263 GiraphConfiguration conf = new GiraphConfiguration();
264 conf.setComputationClass(SimpleMsgComputation.class);
265 conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
266 GiraphJob job = prepareJob(getCallingMethodName(), conf);
267 assertTrue(job.run(true));
268 }
269
270
271
272
273
274
275
276
277
278
279 @Test
280 public void testEmptyVertexInputFormat()
281 throws IOException, InterruptedException, ClassNotFoundException {
282 GiraphConfiguration conf = new GiraphConfiguration();
283 conf.setComputationClass(SimpleMsgComputation.class);
284 conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
285 GiraphJob job = prepareJob(getCallingMethodName(), conf);
286 GeneratedVertexReader.READER_VERTICES.set(job.getConfiguration(), 0);
287 assertTrue(job.run(true));
288 }
289
290
291
292
293
294
295
296
297
298 @Test
299 public void testBspCombiner()
300 throws IOException, InterruptedException, ClassNotFoundException {
301 GiraphConfiguration conf = new GiraphConfiguration();
302 conf.setComputationClass(SimpleCombinerComputation.class);
303 conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
304 conf.setMessageCombinerClass(SimpleSumMessageCombiner.class);
305 GiraphJob job = prepareJob(getCallingMethodName(), conf);
306 assertTrue(job.run(true));
307 }
308
309
310
311
312
313
314
315
316 @Test
317 public void testInputSplitLocality()
318 throws IOException, KeeperException, InterruptedException {
319 List<byte[]> serializedSplits = new ArrayList<>();
320 serializedSplits.add(new byte[]{1});
321 serializedSplits.add(new byte[]{2});
322 serializedSplits.add(new byte[]{3});
323
324 WorkerInfo workerInfo = mock(WorkerInfo.class);
325 when(workerInfo.getTaskId()).thenReturn(5);
326 when(workerInfo.getHostname()).thenReturn("node.LOCAL.com");
327
328 List<InputSplit> splits = new ArrayList<>();
329 InputSplit split1 = mock(InputSplit.class);
330 when(split1.getLocations()).thenReturn(new String[]{
331 "node.test1.com", "node.test2.com", "node.test3.com"});
332 splits.add(split1);
333 InputSplit split2 = mock(InputSplit.class);
334 when(split2.getLocations()).thenReturn(new String[]{
335 "node.testx.com", "node.LOCAL.com", "node.testy.com"});
336 splits.add(split2);
337 InputSplit split3 = mock(InputSplit.class);
338 when(split3.getLocations()).thenReturn(new String[]{
339 "node.test4.com", "node.test5.com", "node.test6.com"});
340 splits.add(split3);
341
342 LocalityAwareInputSplitsMasterOrganizer inputSplitOrganizer =
343 new LocalityAwareInputSplitsMasterOrganizer(
344 serializedSplits,
345 splits,
346 Lists.newArrayList(workerInfo));
347
348 assertEquals(2,
349 inputSplitOrganizer.getSerializedSplitFor(workerInfo.getTaskId())[0]);
350 }
351
352
353
354
355
356
357
358
359 @Test
360 public void testBspShortestPaths()
361 throws IOException, InterruptedException, ClassNotFoundException {
362 Path outputPath = getTempPath(getCallingMethodName());
363 GiraphConfiguration conf = new GiraphConfiguration();
364 conf.setComputationClass(SimpleShortestPathsComputation.class);
365 conf.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
366 conf.setVertexOutputFormatClass(
367 JsonLongDoubleFloatDoubleVertexOutputFormat.class);
368 SimpleShortestPathsComputation.SOURCE_ID.set(conf, 0);
369 GiraphJob job = prepareJob(getCallingMethodName(), conf, outputPath);
370
371 assertTrue(job.run(true));
372
373 int numResults = getNumResults(job.getConfiguration(), outputPath);
374
375 int expectedNumResults = runningInDistributedMode() ? 15 : 5;
376 assertEquals(expectedNumResults, numResults);
377 }
378
379
380
381
382
383
384
385
386 @Test
387 public void testBspPageRankWithAggregatorWriter()
388 throws IOException, InterruptedException, ClassNotFoundException {
389 Path outputPath = getTempPath(getCallingMethodName());
390
391 GiraphConfiguration conf = new GiraphConfiguration();
392 conf.setComputationClass(SimplePageRankComputation.class);
393 conf.setAggregatorWriterClass(TextAggregatorWriter.class);
394 conf.setMasterComputeClass(
395 SimplePageRankComputation.SimplePageRankMasterCompute.class);
396 conf.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
397 conf.setVertexOutputFormatClass(
398 SimplePageRankComputation.SimplePageRankVertexOutputFormat.class);
399 conf.setWorkerContextClass(
400 SimplePageRankComputation.SimplePageRankWorkerContext.class);
401 GiraphJob job = prepareJob(getCallingMethodName(), conf, outputPath);
402 GiraphConfiguration configuration = job.getConfiguration();
403 Path aggregatorValues = getTempPath("aggregatorValues");
404 configuration.setInt(TextAggregatorWriter.FREQUENCY,
405 TextAggregatorWriter.ALWAYS);
406 configuration.set(TextAggregatorWriter.FILENAME,
407 aggregatorValues.toString());
408
409 assertTrue(job.run(true));
410
411 FileSystem fs = FileSystem.get(configuration);
412 Path valuesFile = new Path(aggregatorValues.toString() + "_0");
413
414 try {
415 if (!runningInDistributedMode()) {
416 double maxPageRank =
417 SimplePageRankComputation.SimplePageRankWorkerContext.getFinalMax();
418 double minPageRank =
419 SimplePageRankComputation.SimplePageRankWorkerContext.getFinalMin();
420 long numVertices =
421 SimplePageRankComputation.SimplePageRankWorkerContext.getFinalSum();
422 System.out.println("testBspPageRank: maxPageRank=" + maxPageRank +
423 " minPageRank=" + minPageRank + " numVertices=" + numVertices);
424
425 FSDataInputStream in = null;
426 BufferedReader reader = null;
427 try {
428 Map<Integer, Double> minValues = Maps.newHashMap();
429 Map<Integer, Double> maxValues = Maps.newHashMap();
430 Map<Integer, Long> vertexCounts = Maps.newHashMap();
431
432 in = fs.open(valuesFile);
433 reader = new BufferedReader(new InputStreamReader(in,
434 Charsets.UTF_8));
435 String line;
436 while ((line = reader.readLine()) != null) {
437 String[] tokens = line.split("\t");
438 int superstep = Integer.parseInt(tokens[0].split("=")[1]);
439 String value = (tokens[1].split("=")[1]);
440 String aggregatorName = (tokens[1].split("=")[0]);
441
442 if ("min".equals(aggregatorName)) {
443 minValues.put(superstep, Double.parseDouble(value));
444 }
445 if ("max".equals(aggregatorName)) {
446 maxValues.put(superstep, Double.parseDouble(value));
447 }
448 if ("sum".equals(aggregatorName)) {
449 vertexCounts.put(superstep, Long.parseLong(value));
450 }
451 }
452
453 int maxSuperstep = SimplePageRankComputation.MAX_SUPERSTEPS;
454 assertEquals(maxSuperstep + 2, minValues.size());
455 assertEquals(maxSuperstep + 2, maxValues.size());
456 assertEquals(maxSuperstep + 2, vertexCounts.size());
457
458 assertEquals(maxPageRank, maxValues.get(maxSuperstep), 0d);
459 assertEquals(minPageRank, minValues.get(maxSuperstep), 0d);
460 assertEquals(numVertices, (long) vertexCounts.get(maxSuperstep));
461
462 } finally {
463 Closeables.close(in, true);
464 Closeables.close(reader, true);
465 }
466 }
467 } finally {
468 fs.delete(valuesFile, false);
469 }
470 }
471
472
473
474
475
476
477
478
479 @Test
480 public void testBspMasterCompute()
481 throws IOException, InterruptedException, ClassNotFoundException {
482 GiraphConfiguration conf = new GiraphConfiguration();
483 conf.setComputationClass(SimpleMasterComputeComputation.class);
484 conf.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
485 conf.setMasterComputeClass(
486 SimpleMasterComputeComputation.SimpleMasterCompute.class);
487 conf.setWorkerContextClass(
488 SimpleMasterComputeComputation.SimpleMasterComputeWorkerContext.class);
489 GiraphJob job = prepareJob(getCallingMethodName(), conf);
490 assertTrue(job.run(true));
491 if (!runningInDistributedMode()) {
492 double finalSum =
493 SimpleMasterComputeComputation.SimpleMasterComputeWorkerContext.getFinalSum();
494 System.out.println("testBspMasterCompute: finalSum=" + finalSum);
495 assertEquals(32.5, finalSum, 0d);
496 }
497 }
498
499
500
501
502 @Test
503 public void testHaltSuperstep0()
504 throws IOException, InterruptedException, ClassNotFoundException {
505 GiraphConfiguration conf = new GiraphConfiguration();
506 GiraphConstants.MAX_NUMBER_OF_SUPERSTEPS.set(conf, 0);
507 conf.setComputationClass(SimpleMsgComputation.class);
508 conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
509 GiraphJob job = prepareJob(getCallingMethodName(), conf);
510 assertTrue(job.run(true));
511 }
512 }