This project has retired. For details please refer to its Attic page.
MultipleSimultanousMutationsTest xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.block_app.framework;
20  
21  import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
22  import org.apache.giraph.block_app.framework.block.Block;
23  import org.apache.giraph.block_app.framework.piece.Piece;
24  import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
25  import org.apache.giraph.block_app.test_setup.NumericTestGraph;
26  import org.apache.giraph.block_app.test_setup.TestGraphChecker;
27  import org.apache.giraph.block_app.test_setup.TestGraphModifier;
28  import org.apache.giraph.block_app.test_setup.TestGraphUtils;
29  import org.apache.giraph.conf.BulkConfigurator;
30  import org.apache.giraph.conf.GiraphConfiguration;
31  import org.apache.giraph.edge.Edge;
32  import org.apache.giraph.edge.ReusableEdge;
33  import org.apache.giraph.graph.Vertex;
34  import org.apache.hadoop.io.LongWritable;
35  import org.apache.hadoop.io.NullWritable;
36  import org.apache.hadoop.io.Writable;
37  import org.junit.Assert;
38  import org.junit.Test;
39  
40  /**
41   * Test when vertex gets multiple simultaneous mutations
42   * (i.e. to non-existent vertex, send a message and do add edge request)
43   * and confirm all mutations are correctly processed
44   */
45  public class MultipleSimultanousMutationsTest {
46    @Test
47    public void createVertexOnMsgsTest() throws Exception {
48      TestGraphUtils.runTest(
49          new TestGraphModifier<LongWritable, Writable, LongWritable>() {
50            @Override
51            public void modifyGraph(NumericTestGraph<LongWritable, Writable, LongWritable> graph) {
52              graph.addEdge(1, 2, 2);
53            }
54          },
55          new TestGraphChecker<LongWritable, Writable, LongWritable>() {
56            @Override
57            public void checkOutput(NumericTestGraph<LongWritable, Writable, LongWritable> graph) {
58              Assert.assertEquals(1, graph.getVertex(1).getNumEdges());
59              Assert.assertNull(graph.getVertex(1).getEdgeValue(new LongWritable(-1)));
60              Assert.assertEquals(2, graph.getVertex(1).getEdgeValue(new LongWritable(2)).get());
61  
62              Assert.assertEquals(1, graph.getVertex(2).getNumEdges());
63              Assert.assertEquals(-1, graph.getVertex(2).getEdgeValue(new LongWritable(-1)).get());
64            }
65          },
66          new BulkConfigurator() {
67            @Override
68            public void configure(GiraphConfiguration conf) {
69              BlockUtils.setBlockFactoryClass(conf, SendingAndAddEdgeBlockFactory.class);
70            }
71          });
72    }
73  
74    public static class SendingAndAddEdgeBlockFactory extends TestLongNullNullBlockFactory {
75      @Override
76      protected Class<? extends Writable> getEdgeValueClass(GiraphConfiguration conf) {
77        return LongWritable.class;
78      }
79  
80      @Override
81      public Block createBlock(GiraphConfiguration conf) {
82        return new Piece<LongWritable, Writable, LongWritable, NullWritable, Object>() {
83          @Override
84          protected Class<NullWritable> getMessageClass() {
85            return NullWritable.class;
86          }
87  
88          @Override
89          public VertexSender<LongWritable, Writable, LongWritable> getVertexSender(
90              final BlockWorkerSendApi<LongWritable, Writable, LongWritable, NullWritable> workerApi,
91              Object executionStage) {
92            final ReusableEdge<LongWritable, LongWritable> reusableEdge = workerApi.getConf().createReusableEdge();
93            reusableEdge.setTargetVertexId(new LongWritable(-1));
94            reusableEdge.setValue(new LongWritable(-1));
95            return new VertexSender<LongWritable, Writable, LongWritable>() {
96              @Override
97              public void vertexSend(Vertex<LongWritable, Writable, LongWritable> vertex) {
98                for (Edge<LongWritable, LongWritable> edge : vertex.getEdges()) {
99                  workerApi.addEdgeRequest(edge.getTargetVertexId(), reusableEdge);
100                 workerApi.sendMessage(edge.getTargetVertexId(), NullWritable.get());
101               }
102             }
103           };
104         }
105       };
106     }
107   }
108 }