View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.framework;
19  
20  import org.apache.giraph.block_app.framework.api.BlockApiHandle;
21  import org.apache.giraph.block_app.framework.api.BlockMasterApi;
22  import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi;
23  import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
24  import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
25  import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
26  import org.apache.giraph.block_app.framework.api.local.LocalBlockRunner;
27  import org.apache.giraph.block_app.framework.block.Block;
28  import org.apache.giraph.block_app.framework.block.BlockWithApiHandle;
29  import org.apache.giraph.block_app.framework.piece.AbstractPiece;
30  import org.apache.giraph.block_app.framework.piece.DefaultParentPiece;
31  import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
32  import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
33  import org.apache.giraph.conf.GiraphConfiguration;
34  import org.apache.giraph.conf.GiraphConstants;
35  import org.apache.giraph.function.Consumer;
36  import org.apache.giraph.graph.Vertex;
37  import org.apache.giraph.utils.TestGraph;
38  import org.apache.hadoop.io.DoubleWritable;
39  import org.apache.hadoop.io.LongWritable;
40  import org.apache.hadoop.io.NullWritable;
41  import org.apache.hadoop.io.Writable;
42  import org.apache.hadoop.io.WritableComparable;
43  import org.junit.Test;
44  
45  import java.util.Iterator;
46  import java.util.List;
47  
48  import static org.junit.Assert.assertEquals;
49  import static org.junit.Assert.assertFalse;
50  
51  /**
52   * Test the use of {@link BlockApiHandle}.
53   */
54  public class BlockApiHandleTest {
55  
56    private static GiraphConfiguration createConf() {
57      GiraphConfiguration conf = new GiraphConfiguration();
58      GiraphConstants.VERTEX_ID_CLASS.set(conf, LongWritable.class);
59      GiraphConstants.VERTEX_VALUE_CLASS.set(conf, LongWritable.class);
60      GiraphConstants.EDGE_VALUE_CLASS.set(conf, NullWritable.class);
61      return conf;
62    }
63  
64    private static TestGraph<LongWritable, LongWritable, NullWritable>
65    createTestGraph() {
66      TestGraph<LongWritable, LongWritable, NullWritable> graph =
67        new TestGraph<>(createConf());
68      graph.addVertex(new LongWritable(1), new LongWritable());
69      graph.addVertex(new LongWritable(2), new LongWritable());
70      graph.addVertex(new LongWritable(3), new LongWritable());
71      graph.addVertex(new LongWritable(4), new LongWritable());
72      graph.addEdge(new LongWritable(1), new LongWritable(2), NullWritable.get());
73      graph.addEdge(new LongWritable(2), new LongWritable(1), NullWritable.get());
74      graph.addEdge(new LongWritable(2), new LongWritable(3), NullWritable.get());
75      graph.addEdge(new LongWritable(3), new LongWritable(2), NullWritable.get());
76      return graph;
77    }
78  
79    public static class DummyObjectWithApiHandle {
80  
81      private BlockApiHandle handle;
82  
83      public DummyObjectWithApiHandle(BlockApiHandle handle) {
84        this.handle = handle;
85      }
86  
87      public void doSomethingAtWorker() {
88        // checking that the handles have been set
89        assertFalse(handle.isMasterApiSet());
90        assertFalse(handle.isWorkerContextReceiveApiSet());
91        assertFalse(handle.isWorkerContextSendApiSet());
92        assertEquals(1, handle.getWorkerReceiveApi().getWorkerCount());
93        assertEquals(0, handle.getWorkerSendApi().getMyWorkerIndex());
94      }
95  
96      public void doSomethingAtWorkerContext() {
97        // checking that the handles have been set
98        assertFalse(handle.isMasterApiSet());
99        assertFalse(handle.isWorkerReceiveApiSet());
100       assertFalse(handle.isWorkerSendApiSet());
101       assertEquals(1, handle.getWorkerContextReceiveApi().getWorkerCount());
102       assertEquals(0, handle.getWorkerContextSendApi().getMyWorkerIndex());
103     }
104 
105     public void doSomethingAtMaster() {
106       // checking that the handles have been set
107       assertEquals(1, handle.getMasterApi().getWorkerCount());
108       assertFalse(handle.isWorkerReceiveApiSet());
109       assertFalse(handle.isWorkerSendApiSet());
110       assertFalse(handle.isWorkerContextReceiveApiSet());
111       assertFalse(handle.isWorkerContextSendApiSet());
112     }
113   }
114 
115   public static class TestPiece extends DefaultParentPiece<WritableComparable,
116       LongWritable, Writable, NullWritable, Object, DoubleWritable, Object> {
117 
118     private DummyObjectWithApiHandle object;
119 
120     public TestPiece(DummyObjectWithApiHandle object) {
121       this.object = object;
122     }
123 
124     @Override
125     public VertexSender<WritableComparable, LongWritable, Writable>
126     getVertexSender(final BlockWorkerSendApi<WritableComparable, LongWritable,
127       Writable, NullWritable> workerApi, Object executionStage) {
128       return new InnerVertexSender() {
129         @Override
130         public void vertexSend(
131           Vertex<WritableComparable, LongWritable, Writable> vertex) {
132           object.doSomethingAtWorker();
133         }
134       };
135     }
136 
137     @Override
138     public VertexReceiver<WritableComparable, LongWritable, Writable,
139       NullWritable> getVertexReceiver(
140       BlockWorkerReceiveApi<WritableComparable> workerApi,
141       Object executionStage) {
142       return new InnerVertexReceiver() {
143         @Override
144         public void vertexReceive(
145           Vertex<WritableComparable, LongWritable, Writable> vertex,
146           Iterable<NullWritable> messages) {
147           object.doSomethingAtWorker();
148         }
149       };
150     }
151 
152     public void workerContextSend(BlockWorkerContextSendApi<WritableComparable,
153         DoubleWritable> workerContextApi, Object executionStage,
154         Writable workerValue) {
155       object.doSomethingAtWorkerContext();
156     }
157 
158     /**
159      * Override to have worker context receive computation.
160      *
161      * Called once per worker, before all vertices are going to be processed
162      * with getVertexReceiver.
163      */
164     public void workerContextReceive(
165       BlockWorkerContextReceiveApi workerContextApi, Object executionStage,
166       Object workerValue, List<DoubleWritable> workerMessages) {
167       object.doSomethingAtWorkerContext();
168     }
169 
170     @Override
171     public void masterCompute(BlockMasterApi masterApi, Object executionStage) {
172       object.doSomethingAtMaster();
173     }
174 
175     @Override
176     protected Class<NullWritable> getMessageClass() {
177       return NullWritable.class;
178     }
179   }
180 
181   @Test
182   public void testBlockApiHandle() {
183     TestGraph<LongWritable, LongWritable, NullWritable> graph =
184       createTestGraph();
185 
186     final BlockApiHandle handle = new BlockApiHandle();
187     final DefaultParentPiece piece =
188       new TestPiece(new DummyObjectWithApiHandle(handle));
189 
190     Block block = new BlockWithApiHandle() {
191       @Override
192       public Iterator<AbstractPiece> iterator() {
193         return piece.iterator();
194       }
195 
196       @Override
197       public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
198         piece.forAllPossiblePieces(consumer);
199       }
200 
201       @Override
202       public BlockApiHandle getBlockApiHandle() {
203         return handle;
204       }
205     };
206 
207     BlockUtils.BLOCK_WORKER_CONTEXT_VALUE_CLASS.set(
208       graph.getConf(), Object.class);
209     LocalBlockRunner.runBlock(graph, block, new Object());
210   }
211 }