This project has retired. For details please refer to its Attic page.
ConnectionTest xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm;
20  
21  import com.google.common.collect.Lists;
22  import org.apache.giraph.comm.netty.NettyClient;
23  import org.apache.giraph.comm.netty.NettyServer;
24  import org.apache.giraph.comm.netty.handler.RequestServerHandler;
25  import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
26  import org.apache.giraph.conf.GiraphConfiguration;
27  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
28  import org.apache.giraph.utils.IntNoOpComputation;
29  import org.apache.giraph.utils.MockUtils;
30  import org.apache.giraph.worker.WorkerInfo;
31  import org.apache.hadoop.io.IntWritable;
32  import org.apache.hadoop.mapreduce.Mapper.Context;
33  import org.junit.Before;
34  import org.junit.Test;
35  
36  import java.io.IOException;
37  import java.util.List;
38  
39  import static org.mockito.Mockito.mock;
40  import static org.mockito.Mockito.when;
41  
42  /**
43   * Test the netty connections
44   */
45  public class ConnectionTest {
46    /** Class configuration */
47    private ImmutableClassesGiraphConfiguration conf;
48  
49    @Before
50    public void setUp() {
51      GiraphConfiguration tmpConfig = new GiraphConfiguration();
52      tmpConfig.setComputationClass(IntNoOpComputation.class);
53      conf = new ImmutableClassesGiraphConfiguration(tmpConfig);
54    }
55  
56    /**
57     * Test connecting a single client to a single server.
58     *
59     * @throws IOException
60     */
61    @Test
62    public void connectSingleClientServer() throws IOException {
63      @SuppressWarnings("rawtypes")
64      Context context = mock(Context.class);
65      when(context.getConfiguration()).thenReturn(conf);
66  
67      ServerData<IntWritable, IntWritable, IntWritable> serverData =
68          MockUtils.createNewServerData(conf, context);
69      WorkerInfo workerInfo = new WorkerInfo();
70      NettyServer server =
71          new NettyServer(conf,
72              new WorkerRequestServerHandler.Factory(serverData), workerInfo,
73              context, new MockExceptionHandler());
74      server.start();
75      workerInfo.setInetSocketAddress(server.getMyAddress(), server.getLocalHostOrIp());
76  
77      NettyClient client = new NettyClient(context, conf, new WorkerInfo(),
78          new MockExceptionHandler());
79      server.setFlowControl(client.getFlowControl());
80      client.connectAllAddresses(
81          Lists.<WorkerInfo>newArrayList(workerInfo));
82  
83      client.stop();
84      server.stop();
85    }
86  
87    /**
88     * Test connecting one client to three servers.
89     *
90     * @throws IOException
91     */
92    @Test
93    public void connectOneClientToThreeServers() throws IOException {
94      @SuppressWarnings("rawtypes")
95      Context context = mock(Context.class);
96      when(context.getConfiguration()).thenReturn(conf);
97  
98      ServerData<IntWritable, IntWritable, IntWritable> serverData =
99          MockUtils.createNewServerData(conf, context);
100    RequestServerHandler.Factory requestServerHandlerFactory =
101        new WorkerRequestServerHandler.Factory(serverData);
102 
103     WorkerInfo workerInfo1 = new WorkerInfo();
104     workerInfo1.setTaskId(1);
105     NettyServer server1 =
106         new NettyServer(conf, requestServerHandlerFactory, workerInfo1,
107             context, new MockExceptionHandler());
108     server1.start();
109     workerInfo1.setInetSocketAddress(server1.getMyAddress(), server1.getLocalHostOrIp());
110 
111     WorkerInfo workerInfo2 = new WorkerInfo();
112     workerInfo1.setTaskId(2);
113     NettyServer server2 =
114         new NettyServer(conf, requestServerHandlerFactory, workerInfo2,
115             context, new MockExceptionHandler());
116     server2.start();
117     workerInfo2.setInetSocketAddress(server2.getMyAddress(), server1.getLocalHostOrIp());
118 
119     WorkerInfo workerInfo3 = new WorkerInfo();
120     workerInfo1.setTaskId(3);
121     NettyServer server3 =
122         new NettyServer(conf, requestServerHandlerFactory, workerInfo3,
123             context, new MockExceptionHandler());
124     server3.start();
125     workerInfo3.setInetSocketAddress(server3.getMyAddress(), server1.getLocalHostOrIp());
126 
127     NettyClient client = new NettyClient(context, conf, new WorkerInfo(),
128         new MockExceptionHandler());
129     server1.setFlowControl(client.getFlowControl());
130     server2.setFlowControl(client.getFlowControl());
131     server3.setFlowControl(client.getFlowControl());
132     List<WorkerInfo> addresses = Lists.<WorkerInfo>newArrayList(workerInfo1,
133         workerInfo2, workerInfo3);
134     client.connectAllAddresses(addresses);
135 
136     client.stop();
137     server1.stop();
138     server2.stop();
139     server3.stop();
140   }
141 
142   /**
143    * Test connecting three clients to one server.
144    *
145    * @throws IOException
146    */
147   @Test
148   public void connectThreeClientsToOneServer() throws IOException {
149     @SuppressWarnings("rawtypes")
150     Context context = mock(Context.class);
151     when(context.getConfiguration()).thenReturn(conf);
152 
153     ServerData<IntWritable, IntWritable, IntWritable> serverData =
154         MockUtils.createNewServerData(conf, context);
155     WorkerInfo workerInfo = new WorkerInfo();
156     NettyServer server = new NettyServer(conf,
157         new WorkerRequestServerHandler.Factory(serverData), workerInfo,
158             context, new MockExceptionHandler());
159     server.start();
160     workerInfo.setInetSocketAddress(server.getMyAddress(), server.getLocalHostOrIp());
161 
162     List<WorkerInfo> addresses = Lists.<WorkerInfo>newArrayList(workerInfo);
163     NettyClient client1 = new NettyClient(context, conf, new WorkerInfo(),
164         new MockExceptionHandler());
165     client1.connectAllAddresses(addresses);
166     NettyClient client2 = new NettyClient(context, conf, new WorkerInfo(),
167         new MockExceptionHandler());
168     client2.connectAllAddresses(addresses);
169     NettyClient client3 = new NettyClient(context, conf, new WorkerInfo(),
170         new MockExceptionHandler());
171     client3.connectAllAddresses(addresses);
172     server.setFlowControl(client1.getFlowControl());
173 
174     client1.stop();
175     client2.stop();
176     client3.stop();
177     server.stop();
178   }
179 }