This project has retired. For details please refer to its
Attic page.
MultipleSimultanousMutationsTest xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.block_app.framework;
20
21 import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
22 import org.apache.giraph.block_app.framework.block.Block;
23 import org.apache.giraph.block_app.framework.piece.Piece;
24 import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
25 import org.apache.giraph.block_app.test_setup.NumericTestGraph;
26 import org.apache.giraph.block_app.test_setup.TestGraphChecker;
27 import org.apache.giraph.block_app.test_setup.TestGraphModifier;
28 import org.apache.giraph.block_app.test_setup.TestGraphUtils;
29 import org.apache.giraph.conf.BulkConfigurator;
30 import org.apache.giraph.conf.GiraphConfiguration;
31 import org.apache.giraph.edge.Edge;
32 import org.apache.giraph.edge.ReusableEdge;
33 import org.apache.giraph.graph.Vertex;
34 import org.apache.hadoop.io.LongWritable;
35 import org.apache.hadoop.io.NullWritable;
36 import org.apache.hadoop.io.Writable;
37 import org.junit.Assert;
38 import org.junit.Test;
39
40
41
42
43
44
45 public class MultipleSimultanousMutationsTest {
46 @Test
47 public void createVertexOnMsgsTest() throws Exception {
48 TestGraphUtils.runTest(
49 new TestGraphModifier<LongWritable, Writable, LongWritable>() {
50 @Override
51 public void modifyGraph(NumericTestGraph<LongWritable, Writable, LongWritable> graph) {
52 graph.addEdge(1, 2, 2);
53 }
54 },
55 new TestGraphChecker<LongWritable, Writable, LongWritable>() {
56 @Override
57 public void checkOutput(NumericTestGraph<LongWritable, Writable, LongWritable> graph) {
58 Assert.assertEquals(1, graph.getVertex(1).getNumEdges());
59 Assert.assertNull(graph.getVertex(1).getEdgeValue(new LongWritable(-1)));
60 Assert.assertEquals(2, graph.getVertex(1).getEdgeValue(new LongWritable(2)).get());
61
62 Assert.assertEquals(1, graph.getVertex(2).getNumEdges());
63 Assert.assertEquals(-1, graph.getVertex(2).getEdgeValue(new LongWritable(-1)).get());
64 }
65 },
66 new BulkConfigurator() {
67 @Override
68 public void configure(GiraphConfiguration conf) {
69 BlockUtils.setBlockFactoryClass(conf, SendingAndAddEdgeBlockFactory.class);
70 }
71 });
72 }
73
74 public static class SendingAndAddEdgeBlockFactory extends TestLongNullNullBlockFactory {
75 @Override
76 protected Class<? extends Writable> getEdgeValueClass(GiraphConfiguration conf) {
77 return LongWritable.class;
78 }
79
80 @Override
81 public Block createBlock(GiraphConfiguration conf) {
82 return new Piece<LongWritable, Writable, LongWritable, NullWritable, Object>() {
83 @Override
84 protected Class<NullWritable> getMessageClass() {
85 return NullWritable.class;
86 }
87
88 @Override
89 public VertexSender<LongWritable, Writable, LongWritable> getVertexSender(
90 final BlockWorkerSendApi<LongWritable, Writable, LongWritable, NullWritable> workerApi,
91 Object executionStage) {
92 final ReusableEdge<LongWritable, LongWritable> reusableEdge = workerApi.getConf().createReusableEdge();
93 reusableEdge.setTargetVertexId(new LongWritable(-1));
94 reusableEdge.setValue(new LongWritable(-1));
95 return new VertexSender<LongWritable, Writable, LongWritable>() {
96 @Override
97 public void vertexSend(Vertex<LongWritable, Writable, LongWritable> vertex) {
98 for (Edge<LongWritable, LongWritable> edge : vertex.getEdges()) {
99 workerApi.addEdgeRequest(edge.getTargetVertexId(), reusableEdge);
100 workerApi.sendMessage(edge.getTargetVertexId(), NullWritable.get());
101 }
102 }
103 };
104 }
105 };
106 }
107 }
108 }