This project has retired. For details please refer to its
Attic page.
ConnectionTest xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm;
20
21 import com.google.common.collect.Lists;
22 import org.apache.giraph.comm.netty.NettyClient;
23 import org.apache.giraph.comm.netty.NettyServer;
24 import org.apache.giraph.comm.netty.handler.RequestServerHandler;
25 import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
26 import org.apache.giraph.conf.GiraphConfiguration;
27 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
28 import org.apache.giraph.utils.IntNoOpComputation;
29 import org.apache.giraph.utils.MockUtils;
30 import org.apache.giraph.worker.WorkerInfo;
31 import org.apache.hadoop.io.IntWritable;
32 import org.apache.hadoop.mapreduce.Mapper.Context;
33 import org.junit.Before;
34 import org.junit.Test;
35
36 import java.io.IOException;
37 import java.util.List;
38
39 import static org.mockito.Mockito.mock;
40 import static org.mockito.Mockito.when;
41
42
43
44
45 public class ConnectionTest {
46
47 private ImmutableClassesGiraphConfiguration conf;
48
49 @Before
50 public void setUp() {
51 GiraphConfiguration tmpConfig = new GiraphConfiguration();
52 tmpConfig.setComputationClass(IntNoOpComputation.class);
53 conf = new ImmutableClassesGiraphConfiguration(tmpConfig);
54 }
55
56
57
58
59
60
61 @Test
62 public void connectSingleClientServer() throws IOException {
63 @SuppressWarnings("rawtypes")
64 Context context = mock(Context.class);
65 when(context.getConfiguration()).thenReturn(conf);
66
67 ServerData<IntWritable, IntWritable, IntWritable> serverData =
68 MockUtils.createNewServerData(conf, context);
69 WorkerInfo workerInfo = new WorkerInfo();
70 NettyServer server =
71 new NettyServer(conf,
72 new WorkerRequestServerHandler.Factory(serverData), workerInfo,
73 context, new MockExceptionHandler());
74 server.start();
75 workerInfo.setInetSocketAddress(server.getMyAddress(), server.getLocalHostOrIp());
76
77 NettyClient client = new NettyClient(context, conf, new WorkerInfo(),
78 new MockExceptionHandler());
79 server.setFlowControl(client.getFlowControl());
80 client.connectAllAddresses(
81 Lists.<WorkerInfo>newArrayList(workerInfo));
82
83 client.stop();
84 server.stop();
85 }
86
87
88
89
90
91
92 @Test
93 public void connectOneClientToThreeServers() throws IOException {
94 @SuppressWarnings("rawtypes")
95 Context context = mock(Context.class);
96 when(context.getConfiguration()).thenReturn(conf);
97
98 ServerData<IntWritable, IntWritable, IntWritable> serverData =
99 MockUtils.createNewServerData(conf, context);
100 RequestServerHandler.Factory requestServerHandlerFactory =
101 new WorkerRequestServerHandler.Factory(serverData);
102
103 WorkerInfo workerInfo1 = new WorkerInfo();
104 workerInfo1.setTaskId(1);
105 NettyServer server1 =
106 new NettyServer(conf, requestServerHandlerFactory, workerInfo1,
107 context, new MockExceptionHandler());
108 server1.start();
109 workerInfo1.setInetSocketAddress(server1.getMyAddress(), server1.getLocalHostOrIp());
110
111 WorkerInfo workerInfo2 = new WorkerInfo();
112 workerInfo1.setTaskId(2);
113 NettyServer server2 =
114 new NettyServer(conf, requestServerHandlerFactory, workerInfo2,
115 context, new MockExceptionHandler());
116 server2.start();
117 workerInfo2.setInetSocketAddress(server2.getMyAddress(), server1.getLocalHostOrIp());
118
119 WorkerInfo workerInfo3 = new WorkerInfo();
120 workerInfo1.setTaskId(3);
121 NettyServer server3 =
122 new NettyServer(conf, requestServerHandlerFactory, workerInfo3,
123 context, new MockExceptionHandler());
124 server3.start();
125 workerInfo3.setInetSocketAddress(server3.getMyAddress(), server1.getLocalHostOrIp());
126
127 NettyClient client = new NettyClient(context, conf, new WorkerInfo(),
128 new MockExceptionHandler());
129 server1.setFlowControl(client.getFlowControl());
130 server2.setFlowControl(client.getFlowControl());
131 server3.setFlowControl(client.getFlowControl());
132 List<WorkerInfo> addresses = Lists.<WorkerInfo>newArrayList(workerInfo1,
133 workerInfo2, workerInfo3);
134 client.connectAllAddresses(addresses);
135
136 client.stop();
137 server1.stop();
138 server2.stop();
139 server3.stop();
140 }
141
142
143
144
145
146
147 @Test
148 public void connectThreeClientsToOneServer() throws IOException {
149 @SuppressWarnings("rawtypes")
150 Context context = mock(Context.class);
151 when(context.getConfiguration()).thenReturn(conf);
152
153 ServerData<IntWritable, IntWritable, IntWritable> serverData =
154 MockUtils.createNewServerData(conf, context);
155 WorkerInfo workerInfo = new WorkerInfo();
156 NettyServer server = new NettyServer(conf,
157 new WorkerRequestServerHandler.Factory(serverData), workerInfo,
158 context, new MockExceptionHandler());
159 server.start();
160 workerInfo.setInetSocketAddress(server.getMyAddress(), server.getLocalHostOrIp());
161
162 List<WorkerInfo> addresses = Lists.<WorkerInfo>newArrayList(workerInfo);
163 NettyClient client1 = new NettyClient(context, conf, new WorkerInfo(),
164 new MockExceptionHandler());
165 client1.connectAllAddresses(addresses);
166 NettyClient client2 = new NettyClient(context, conf, new WorkerInfo(),
167 new MockExceptionHandler());
168 client2.connectAllAddresses(addresses);
169 NettyClient client3 = new NettyClient(context, conf, new WorkerInfo(),
170 new MockExceptionHandler());
171 client3.connectAllAddresses(addresses);
172 server.setFlowControl(client1.getFlowControl());
173
174 client1.stop();
175 client2.stop();
176 client3.stop();
177 server.stop();
178 }
179 }