This project has retired. For details please refer to its Attic page.
BlockApiHandleTest xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.framework;
19  
20  import org.apache.giraph.block_app.framework.api.BlockApiHandle;
21  import org.apache.giraph.block_app.framework.api.BlockMasterApi;
22  import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi;
23  import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
24  import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
25  import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
26  import org.apache.giraph.block_app.framework.api.local.LocalBlockRunner;
27  import org.apache.giraph.block_app.framework.block.Block;
28  import org.apache.giraph.block_app.framework.block.BlockWithApiHandle;
29  import org.apache.giraph.block_app.framework.block.PieceCount;
30  import org.apache.giraph.block_app.framework.piece.AbstractPiece;
31  import org.apache.giraph.block_app.framework.piece.DefaultParentPiece;
32  import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
33  import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
34  import org.apache.giraph.conf.GiraphConfiguration;
35  import org.apache.giraph.conf.GiraphConstants;
36  import org.apache.giraph.function.Consumer;
37  import org.apache.giraph.graph.Vertex;
38  import org.apache.giraph.utils.TestGraph;
39  import org.apache.hadoop.io.DoubleWritable;
40  import org.apache.hadoop.io.LongWritable;
41  import org.apache.hadoop.io.NullWritable;
42  import org.apache.hadoop.io.Writable;
43  import org.apache.hadoop.io.WritableComparable;
44  import org.junit.Test;
45  
46  import java.util.Iterator;
47  import java.util.List;
48  
49  import static org.junit.Assert.assertEquals;
50  import static org.junit.Assert.assertFalse;
51  
52  /**
53   * Test the use of {@link BlockApiHandle}.
54   */
55  public class BlockApiHandleTest {
56  
57    private static GiraphConfiguration createConf() {
58      GiraphConfiguration conf = new GiraphConfiguration();
59      GiraphConstants.VERTEX_ID_CLASS.set(conf, LongWritable.class);
60      GiraphConstants.VERTEX_VALUE_CLASS.set(conf, LongWritable.class);
61      GiraphConstants.EDGE_VALUE_CLASS.set(conf, NullWritable.class);
62      return conf;
63    }
64  
65    private static TestGraph<LongWritable, LongWritable, NullWritable>
66    createTestGraph() {
67      TestGraph<LongWritable, LongWritable, NullWritable> graph =
68        new TestGraph<>(createConf());
69      graph.addVertex(new LongWritable(1), new LongWritable());
70      graph.addVertex(new LongWritable(2), new LongWritable());
71      graph.addVertex(new LongWritable(3), new LongWritable());
72      graph.addVertex(new LongWritable(4), new LongWritable());
73      graph.addEdge(new LongWritable(1), new LongWritable(2), NullWritable.get());
74      graph.addEdge(new LongWritable(2), new LongWritable(1), NullWritable.get());
75      graph.addEdge(new LongWritable(2), new LongWritable(3), NullWritable.get());
76      graph.addEdge(new LongWritable(3), new LongWritable(2), NullWritable.get());
77      return graph;
78    }
79  
80    public static class DummyObjectWithApiHandle {
81  
82      private BlockApiHandle handle;
83  
84      public DummyObjectWithApiHandle(BlockApiHandle handle) {
85        this.handle = handle;
86      }
87  
88      public void doSomethingAtWorker() {
89        // checking that the handles have been set
90        assertFalse(handle.isMasterApiSet());
91        assertFalse(handle.isWorkerContextReceiveApiSet());
92        assertFalse(handle.isWorkerContextSendApiSet());
93        assertEquals(1, handle.getWorkerReceiveApi().getWorkerCount());
94        assertEquals(0, handle.getWorkerSendApi().getMyWorkerIndex());
95      }
96  
97      public void doSomethingAtWorkerContext() {
98        // checking that the handles have been set
99        assertFalse(handle.isMasterApiSet());
100       assertFalse(handle.isWorkerReceiveApiSet());
101       assertFalse(handle.isWorkerSendApiSet());
102       assertEquals(1, handle.getWorkerContextReceiveApi().getWorkerCount());
103       assertEquals(0, handle.getWorkerContextSendApi().getMyWorkerIndex());
104     }
105 
106     public void doSomethingAtMaster() {
107       // checking that the handles have been set
108       assertEquals(1, handle.getMasterApi().getWorkerCount());
109       assertFalse(handle.isWorkerReceiveApiSet());
110       assertFalse(handle.isWorkerSendApiSet());
111       assertFalse(handle.isWorkerContextReceiveApiSet());
112       assertFalse(handle.isWorkerContextSendApiSet());
113     }
114   }
115 
116   public static class TestPiece extends DefaultParentPiece<WritableComparable,
117       LongWritable, Writable, NullWritable, Object, DoubleWritable, Object> {
118 
119     private DummyObjectWithApiHandle object;
120 
121     public TestPiece(DummyObjectWithApiHandle object) {
122       this.object = object;
123     }
124 
125     @Override
126     public VertexSender<WritableComparable, LongWritable, Writable>
127     getVertexSender(final BlockWorkerSendApi<WritableComparable, LongWritable,
128       Writable, NullWritable> workerApi, Object executionStage) {
129       return new InnerVertexSender() {
130         @Override
131         public void vertexSend(
132           Vertex<WritableComparable, LongWritable, Writable> vertex) {
133           object.doSomethingAtWorker();
134         }
135       };
136     }
137 
138     @Override
139     public VertexReceiver<WritableComparable, LongWritable, Writable,
140       NullWritable> getVertexReceiver(
141       BlockWorkerReceiveApi<WritableComparable> workerApi,
142       Object executionStage) {
143       return new InnerVertexReceiver() {
144         @Override
145         public void vertexReceive(
146           Vertex<WritableComparable, LongWritable, Writable> vertex,
147           Iterable<NullWritable> messages) {
148           object.doSomethingAtWorker();
149         }
150       };
151     }
152 
153     public void workerContextSend(BlockWorkerContextSendApi<WritableComparable,
154         DoubleWritable> workerContextApi, Object executionStage,
155         Writable workerValue) {
156       object.doSomethingAtWorkerContext();
157     }
158 
159     /**
160      * Override to have worker context receive computation.
161      *
162      * Called once per worker, before all vertices are going to be processed
163      * with getVertexReceiver.
164      */
165     public void workerContextReceive(
166       BlockWorkerContextReceiveApi workerContextApi, Object executionStage,
167       Object workerValue, List<DoubleWritable> workerMessages) {
168       object.doSomethingAtWorkerContext();
169     }
170 
171     @Override
172     public void masterCompute(BlockMasterApi masterApi, Object executionStage) {
173       object.doSomethingAtMaster();
174     }
175 
176     @Override
177     protected Class<NullWritable> getMessageClass() {
178       return NullWritable.class;
179     }
180   }
181 
182   @Test
183   public void testBlockApiHandle() {
184     TestGraph<LongWritable, LongWritable, NullWritable> graph =
185       createTestGraph();
186 
187     final BlockApiHandle handle = new BlockApiHandle();
188     final DefaultParentPiece piece =
189       new TestPiece(new DummyObjectWithApiHandle(handle));
190 
191     Block block = new BlockWithApiHandle() {
192       @Override
193       public Iterator<AbstractPiece> iterator() {
194         return piece.iterator();
195       }
196 
197       @Override
198       public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
199         piece.forAllPossiblePieces(consumer);
200       }
201 
202       @Override
203       public PieceCount getPieceCount() {
204         return piece.getPieceCount();
205       }
206 
207       @Override
208       public BlockApiHandle getBlockApiHandle() {
209         return handle;
210       }
211     };
212 
213     BlockUtils.BLOCK_WORKER_CONTEXT_VALUE_CLASS.set(
214       graph.getConf(), Object.class);
215     LocalBlockRunner.runBlock(graph, block, new Object());
216   }
217 }