This project has retired. For details please refer to its Attic page.
TestWorkerMessages xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.framework;
19  
20  import java.util.HashSet;
21  import java.util.List;
22  
23  import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi;
24  import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
25  import org.apache.giraph.block_app.framework.api.local.LocalBlockRunner;
26  import org.apache.giraph.block_app.framework.block.Block;
27  import org.apache.giraph.block_app.framework.block.SequenceBlock;
28  import org.apache.giraph.block_app.framework.piece.PieceWithWorkerContext;
29  import org.apache.giraph.block_app.test_setup.NumericTestGraph;
30  import org.apache.giraph.block_app.test_setup.TestGraphModifier;
31  import org.apache.giraph.block_app.test_setup.TestGraphUtils;
32  import org.apache.giraph.conf.BulkConfigurator;
33  import org.apache.giraph.conf.GiraphConfiguration;
34  import org.apache.giraph.types.NoMessage;
35  import org.apache.giraph.utils.TestGraph;
36  import org.apache.hadoop.io.LongWritable;
37  import org.apache.hadoop.io.NullWritable;
38  import org.apache.hadoop.io.Writable;
39  import org.apache.hadoop.io.WritableComparable;
40  import org.junit.Assert;
41  import org.junit.Test;
42  
43  /**
44   * Test sending worker to worker messages
45   */
46  public class TestWorkerMessages {
47    @Test
48    public void testWorkerMessages() throws Exception {
49      GiraphConfiguration conf = new GiraphConfiguration();
50      BlockUtils.setAndInitBlockFactoryClass(conf, TestWorkerMessagesBlockFactory.class);
51      TestGraph testGraph = new TestGraph(conf);
52      testGraph.addEdge(new LongWritable(1), new LongWritable(2), NullWritable.get());
53      LocalBlockRunner.runApp(testGraph);
54    }
55  
56    @Test
57    public void testWithTestSetup() throws Exception {
58      TestGraphUtils.runTest(
59          new TestGraphModifier<WritableComparable, Writable, Writable>() {
60            @Override
61            public void modifyGraph(NumericTestGraph<WritableComparable, Writable, Writable> graph) {
62              graph.addEdge(1, 2);
63            }
64          },
65          null,
66          new BulkConfigurator() {
67            @Override
68            public void configure(GiraphConfiguration conf) {
69              BlockUtils.setBlockFactoryClass(conf, TestWorkerMessagesBlockFactory.class);
70            }
71          });
72    }
73  
74    public static class TestWorkerMessagesBlockFactory extends TestLongNullNullBlockFactory {
75      @Override
76      public Block createBlock(GiraphConfiguration conf) {
77        return new SequenceBlock(
78            new TestWorkerMessagesPiece(2, 4, 11),
79            new TestWorkerMessagesPiece(3, 5, 2, 100));
80      }
81    }
82  
83    public static class TestWorkerMessagesPiece extends PieceWithWorkerContext<LongWritable,
84        Writable, Writable, NoMessage, Object, LongWritable, Object> {
85      private final HashSet<Long> values;
86  
87      public TestWorkerMessagesPiece(long... values) {
88        this.values = new HashSet<>();
89        for (long value : values) {
90          this.values.add(value);
91        }
92      }
93  
94      @Override
95      public void workerContextSend(
96          BlockWorkerContextSendApi<LongWritable, LongWritable> workerContextApi,
97          Object executionStage, Object workerValue) {
98        for (long value : values) {
99          workerContextApi.sendMessageToWorker(new LongWritable(value),
100             workerContextApi.getMyWorkerIndex());
101       }
102     }
103 
104     @Override
105     public void workerContextReceive(BlockWorkerContextReceiveApi workerContextApi,
106         Object executionStage, Object workerValue, List<LongWritable> workerMessages) {
107       Assert.assertEquals(values.size(), workerMessages.size());
108       for (LongWritable workerMessage : workerMessages) {
109         Assert.assertTrue(values.remove(workerMessage.get()));
110       }
111     }
112   }
113 }