View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.utils;
20  
21  import org.apache.giraph.bsp.CentralizedServiceWorker;
22  import org.apache.giraph.comm.ServerData;
23  import org.apache.giraph.comm.WorkerClientRequestProcessor;
24  import org.apache.giraph.comm.messages.ByteArrayMessagesPerVertexStore;
25  import org.apache.giraph.conf.GiraphConfiguration;
26  import org.apache.giraph.conf.GiraphConstants;
27  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
28  import org.apache.giraph.edge.ArrayListEdges;
29  import org.apache.giraph.graph.Computation;
30  import org.apache.giraph.graph.GraphState;
31  import org.apache.giraph.graph.Vertex;
32  import org.apache.giraph.partition.BasicPartitionOwner;
33  import org.apache.giraph.partition.PartitionOwner;
34  import org.apache.giraph.partition.SimplePartition;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.io.IntWritable;
37  import org.apache.hadoop.io.Writable;
38  import org.apache.hadoop.io.WritableComparable;
39  import org.apache.hadoop.mapreduce.Mapper;
40  import org.mockito.Mockito;
41  import org.mockito.invocation.InvocationOnMock;
42  import org.mockito.stubbing.Answer;
43  
44  /** simplify mocking for unit testing vertices */
45  public class MockUtils {
46  
47      private MockUtils() {
48      }
49  
50      /**
51       * mocks and holds  "environment objects" that are injected into a vertex
52       *
53       * @param <I> vertex id
54       * @param <V> vertex data
55       * @param <E> edge data
56       * @param <M> message data
57       */
58      public static class MockedEnvironment<I extends WritableComparable,
59              V extends Writable, E extends Writable, M extends Writable> {
60  
61          private final GraphState graphState;
62          private final Mapper.Context context;
63          private final Configuration conf;
64          private final WorkerClientRequestProcessor workerClientRequestProcessor;
65  
66          public MockedEnvironment() {
67              graphState = Mockito.mock(GraphState.class);
68              context = Mockito.mock(Mapper.Context.class);
69              conf = Mockito.mock(Configuration.class);
70              workerClientRequestProcessor =
71                  Mockito.mock(WorkerClientRequestProcessor.class);
72          }
73  
74          /** the injected graph state */
75          public GraphState getGraphState() {
76              return graphState;
77          }
78  
79          /** the injected mapper context  */
80          public Mapper.Context getContext() {
81              return context;
82          }
83  
84          /** the injected hadoop configuration */
85          public Configuration getConfiguration() {
86              return conf;
87          }
88  
89          /** the injected worker communications */
90          public WorkerClientRequestProcessor getWorkerClientRequestProcessor() {
91              return workerClientRequestProcessor;
92          }
93  
94          /** assert that the test vertex message has been sent to a particular vertex */
95          public void verifyMessageSent(I targetVertexId, M message) {
96              Mockito.verify(workerClientRequestProcessor).sendMessageRequest
97                  (targetVertexId, message);
98          }
99  
100         public void verifyMessageSentToAllEdges(Vertex<I, V, E> vertex, M message) {
101           Mockito.verify(workerClientRequestProcessor).sendMessageToAllRequest(vertex, message);
102       }
103 
104         /** assert that the test vertex has sent no message to a particular vertex */
105         public void verifyNoMessageSent() {
106             Mockito.verifyZeroInteractions(workerClientRequestProcessor);
107         }
108     }
109 
110     /**
111      * prepare a vertex and computation for use in a unit test by setting its
112      * internal state and injecting mocked dependencies,
113      *
114      * @param vertex Vertex
115      * @param vertexId initial vertex id
116      * @param vertexValue initial vertex value
117      * @param isHalted initial halted state of the vertex
118      * @param computation Computation
119      * @param superstep Superstep
120      * @param <I> vertex id
121      * @param <V> vertex data
122      * @param <E> edge data
123      * @param <M> message data
124      * @return
125      * @throws Exception
126      */
127   public static <I extends WritableComparable, V extends Writable,
128       E extends Writable, M extends Writable>
129   MockedEnvironment<I, V, E, M> prepareVertexAndComputation(
130       Vertex<I, V, E> vertex, I vertexId, V vertexValue, boolean isHalted,
131       Computation<I, V, E, M, M> computation, long superstep) throws
132       Exception {
133     MockedEnvironment<I, V, E, M> env = new MockedEnvironment<I, V, E, M>();
134     Mockito.when(env.getGraphState().getSuperstep()).thenReturn(superstep);
135     Mockito.when(env.getGraphState().getContext())
136         .thenReturn(env.getContext());
137     Mockito.when(env.getContext().getConfiguration())
138         .thenReturn(env.getConfiguration());
139     computation.initialize(env.getGraphState(),
140         env.getWorkerClientRequestProcessor(), null, null);
141 
142     GiraphConfiguration giraphConf = new GiraphConfiguration();
143     giraphConf.setComputationClass(computation.getClass());
144     giraphConf.setOutEdgesClass(ArrayListEdges.class);
145     ImmutableClassesGiraphConfiguration<I, V, E> conf =
146         new ImmutableClassesGiraphConfiguration<I, V, E>(giraphConf);
147     computation.setConf(conf);
148 
149     vertex.setConf(conf);
150     vertex.initialize(vertexId, vertexValue);
151     if (isHalted) {
152       vertex.voteToHalt();
153     }
154 
155     return env;
156   }
157 
158   /**
159    * Prepare a mocked CentralizedServiceWorker.
160    *
161    * @param numOfPartitions The number of partitions
162    * @return CentralizedServiceWorker
163    */
164   public static CentralizedServiceWorker<IntWritable, IntWritable,
165       IntWritable> mockServiceGetVertexPartitionOwner(final int
166       numOfPartitions) {
167     CentralizedServiceWorker<IntWritable, IntWritable, IntWritable> service =
168         Mockito.mock(CentralizedServiceWorker.class);
169     Answer<PartitionOwner> answer = new Answer<PartitionOwner>() {
170       @Override
171       public PartitionOwner answer(InvocationOnMock invocation) throws
172           Throwable {
173         IntWritable vertexId = (IntWritable) invocation.getArguments()[0];
174         return new BasicPartitionOwner(vertexId.get() % numOfPartitions, null);
175       }
176     };
177     Mockito.when(service.getVertexPartitionOwner(
178         Mockito.any(IntWritable.class))).thenAnswer(answer);
179     return service;
180   }
181 
182   /**
183    * Prepare a ServerData object.
184    *
185    * @param conf Configuration
186    * @param context Context
187    * @return ServerData
188    */
189   public static ServerData<IntWritable, IntWritable, IntWritable>
190     createNewServerData(
191     ImmutableClassesGiraphConfiguration conf, Mapper.Context context) {
192     CentralizedServiceWorker<IntWritable, IntWritable, IntWritable> serviceWorker =
193       MockUtils.mockServiceGetVertexPartitionOwner(1);
194     GiraphConstants.MESSAGE_STORE_FACTORY_CLASS.set(conf,
195         ByteArrayMessagesPerVertexStore.newFactory(serviceWorker, conf)
196             .getClass());
197 
198     ServerData<IntWritable, IntWritable, IntWritable> serverData =
199       new ServerData<IntWritable, IntWritable, IntWritable>(
200           serviceWorker, conf, context);
201     // Here we add a partition to simulate the case that there is one partition.
202     serverData.getPartitionStore().addPartition(new SimplePartition());
203     return serverData;
204   }
205 }