This project has retired. For details please refer to its Attic page.
MockUtils xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.utils;
20  
21  import org.apache.giraph.bsp.CentralizedServiceWorker;
22  import org.apache.giraph.comm.ServerData;
23  import org.apache.giraph.comm.WorkerClientRequestProcessor;
24  import org.apache.giraph.comm.WorkerServer;
25  import org.apache.giraph.comm.messages.ByteArrayMessagesPerVertexStore;
26  import org.apache.giraph.conf.GiraphConfiguration;
27  import org.apache.giraph.conf.GiraphConstants;
28  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
29  import org.apache.giraph.edge.ArrayListEdges;
30  import org.apache.giraph.graph.Computation;
31  import org.apache.giraph.graph.GraphState;
32  import org.apache.giraph.graph.Vertex;
33  import org.apache.giraph.partition.BasicPartitionOwner;
34  import org.apache.giraph.partition.PartitionOwner;
35  import org.apache.giraph.partition.SimplePartition;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.io.IntWritable;
38  import org.apache.hadoop.io.Writable;
39  import org.apache.hadoop.io.WritableComparable;
40  import org.apache.hadoop.mapreduce.Mapper;
41  import org.mockito.Mockito;
42  import org.mockito.invocation.InvocationOnMock;
43  import org.mockito.stubbing.Answer;
44  
45  /** simplify mocking for unit testing vertices */
46  public class MockUtils {
47  
48      private MockUtils() {
49      }
50  
51      /**
52       * mocks and holds  "environment objects" that are injected into a vertex
53       *
54       * @param <I> vertex id
55       * @param <V> vertex data
56       * @param <E> edge data
57       * @param <M> message data
58       */
59      public static class MockedEnvironment<I extends WritableComparable,
60              V extends Writable, E extends Writable, M extends Writable> {
61  
62          private final GraphState graphState;
63          private final Mapper.Context context;
64          private final Configuration conf;
65          private final WorkerClientRequestProcessor workerClientRequestProcessor;
66  
67          public MockedEnvironment() {
68              graphState = Mockito.mock(GraphState.class);
69              context = Mockito.mock(Mapper.Context.class);
70              conf = Mockito.mock(Configuration.class);
71              workerClientRequestProcessor =
72                  Mockito.mock(WorkerClientRequestProcessor.class);
73          }
74  
75          /** the injected graph state */
76          public GraphState getGraphState() {
77              return graphState;
78          }
79  
80          /** the injected mapper context  */
81          public Mapper.Context getContext() {
82              return context;
83          }
84  
85          /** the injected hadoop configuration */
86          public Configuration getConfiguration() {
87              return conf;
88          }
89  
90          /** the injected worker communications */
91          public WorkerClientRequestProcessor getWorkerClientRequestProcessor() {
92              return workerClientRequestProcessor;
93          }
94  
95          /** assert that the test vertex message has been sent to a particular vertex */
96          public void verifyMessageSent(I targetVertexId, M message) {
97              Mockito.verify(workerClientRequestProcessor).sendMessageRequest
98                  (targetVertexId, message);
99          }
100 
101         public void verifyMessageSentToAllEdges(Vertex<I, V, E> vertex, M message) {
102           Mockito.verify(workerClientRequestProcessor).sendMessageToAllRequest(vertex, message);
103       }
104 
105         /** assert that the test vertex has sent no message to a particular vertex */
106         public void verifyNoMessageSent() {
107             Mockito.verifyZeroInteractions(workerClientRequestProcessor);
108         }
109     }
110 
111     /**
112      * prepare a vertex and computation for use in a unit test by setting its
113      * internal state and injecting mocked dependencies,
114      *
115      * @param vertex Vertex
116      * @param vertexId initial vertex id
117      * @param vertexValue initial vertex value
118      * @param isHalted initial halted state of the vertex
119      * @param computation Computation
120      * @param superstep Superstep
121      * @param <I> vertex id
122      * @param <V> vertex data
123      * @param <E> edge data
124      * @param <M> message data
125      * @return
126      * @throws Exception
127      */
128   public static <I extends WritableComparable, V extends Writable,
129       E extends Writable, M extends Writable>
130   MockedEnvironment<I, V, E, M> prepareVertexAndComputation(
131       Vertex<I, V, E> vertex, I vertexId, V vertexValue, boolean isHalted,
132       Computation<I, V, E, M, M> computation, long superstep) throws
133       Exception {
134     MockedEnvironment<I, V, E, M> env = new MockedEnvironment<I, V, E, M>();
135     Mockito.when(env.getGraphState().getSuperstep()).thenReturn(superstep);
136     Mockito.when(env.getGraphState().getContext())
137         .thenReturn(env.getContext());
138     Mockito.when(env.getContext().getConfiguration())
139         .thenReturn(env.getConfiguration());
140     computation.initialize(env.getGraphState(),
141         env.getWorkerClientRequestProcessor(), null, null);
142 
143     GiraphConfiguration giraphConf = new GiraphConfiguration();
144     giraphConf.setComputationClass(computation.getClass());
145     giraphConf.setOutEdgesClass(ArrayListEdges.class);
146     ImmutableClassesGiraphConfiguration<I, V, E> conf =
147         new ImmutableClassesGiraphConfiguration<I, V, E>(giraphConf);
148     computation.setConf(conf);
149 
150     vertex.setConf(conf);
151     vertex.initialize(vertexId, vertexValue);
152     if (isHalted) {
153       vertex.voteToHalt();
154     }
155 
156     return env;
157   }
158 
159   /**
160    * Prepare a mocked CentralizedServiceWorker.
161    *
162    * @param numOfPartitions The number of partitions
163    * @return CentralizedServiceWorker
164    */
165   public static CentralizedServiceWorker<IntWritable, IntWritable,
166       IntWritable> mockServiceGetVertexPartitionOwner(final int
167       numOfPartitions) {
168     CentralizedServiceWorker<IntWritable, IntWritable, IntWritable> service =
169         Mockito.mock(CentralizedServiceWorker.class);
170     Answer<PartitionOwner> answerOwner = new Answer<PartitionOwner>() {
171       @Override
172       public PartitionOwner answer(InvocationOnMock invocation) throws
173           Throwable {
174         IntWritable vertexId = (IntWritable) invocation.getArguments()[0];
175         return new BasicPartitionOwner(vertexId.get() % numOfPartitions, null);
176       }
177     };
178     Mockito.when(service.getVertexPartitionOwner(
179       Mockito.any(IntWritable.class))).thenAnswer(answerOwner);
180 
181     Answer<Integer> answerId = new Answer<Integer>() {
182       @Override
183       public Integer answer(InvocationOnMock invocation) throws
184           Throwable {
185         IntWritable vertexId = (IntWritable) invocation.getArguments()[0];
186         return vertexId.get() % numOfPartitions;
187       }
188     };
189     Mockito.when(service.getPartitionId(
190       Mockito.any(IntWritable.class))).thenAnswer(answerId);
191     return service;
192   }
193 
194   /**
195    * Prepare a ServerData object.
196    *
197    * @param conf Configuration
198    * @param context Context
199    * @return ServerData
200    */
201   public static ServerData<IntWritable, IntWritable, IntWritable>
202     createNewServerData(
203     ImmutableClassesGiraphConfiguration conf, Mapper.Context context) {
204     CentralizedServiceWorker<IntWritable, IntWritable, IntWritable> serviceWorker =
205       MockUtils.mockServiceGetVertexPartitionOwner(1);
206     GiraphConstants.MESSAGE_STORE_FACTORY_CLASS.set(conf,
207         ByteArrayMessagesPerVertexStore.newFactory(serviceWorker, conf)
208             .getClass());
209 
210     WorkerServer workerServer = Mockito.mock(WorkerServer.class);
211     ServerData<IntWritable, IntWritable, IntWritable> serverData =
212       new ServerData<IntWritable, IntWritable, IntWritable>(
213           serviceWorker, workerServer, conf, context);
214     // Here we add a partition to simulate the case that there is one partition.
215     serverData.getPartitionStore().addPartition(new SimplePartition());
216     return serverData;
217   }
218 }