This project has retired. For details please refer to its
Attic page.
RequestFailureTest xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm;
20
21 import org.apache.giraph.comm.netty.NettyClient;
22 import org.apache.giraph.comm.netty.NettyServer;
23 import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
24 import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
25 import org.apache.giraph.comm.requests.WritableRequest;
26 import org.apache.giraph.conf.GiraphConfiguration;
27 import org.apache.giraph.conf.GiraphConstants;
28 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
29 import org.apache.giraph.factories.TestMessageValueFactory;
30 import org.apache.giraph.utils.VertexIdMessages;
31 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
32 import org.apache.giraph.utils.IntNoOpComputation;
33 import org.apache.giraph.utils.MockUtils;
34 import org.apache.giraph.utils.PairList;
35 import org.apache.giraph.worker.WorkerInfo;
36 import org.apache.hadoop.io.IntWritable;
37 import org.apache.hadoop.mapreduce.Counter;
38 import org.apache.hadoop.mapreduce.Mapper.Context;
39 import org.junit.Before;
40 import org.junit.Test;
41
42 import com.google.common.collect.Lists;
43
44 import java.io.IOException;
45
46 import static org.junit.Assert.assertEquals;
47 import static org.mockito.Mockito.mock;
48 import static org.mockito.Mockito.when;
49 import static org.mockito.Mockito.any;
50
51
52
53
54 @SuppressWarnings("unchecked")
55 public class RequestFailureTest {
56
57 private ImmutableClassesGiraphConfiguration conf;
58
59 private ServerData<IntWritable, IntWritable, IntWritable>
60 serverData;
61
62 private NettyServer server;
63
64 private NettyClient client;
65
66 private Context context;
67
68 @Before
69 public void setUp() throws IOException {
70
71 GiraphConfiguration tmpConf = new GiraphConfiguration();
72 tmpConf.setComputationClass(IntNoOpComputation.class);
73 conf = new ImmutableClassesGiraphConfiguration(tmpConf);
74
75 context = mock(Context.class);
76 when(context.getConfiguration()).thenReturn(conf);
77 Counter counter = mock(Counter.class);
78 when(context.getCounter(any(String.class), any(String.class))).thenReturn(
79 counter);
80 }
81
82 private WritableRequest getRequest() {
83
84 final int partitionId = 0;
85 PairList<Integer, VertexIdMessages<IntWritable,
86 IntWritable>>
87 dataToSend = new PairList<Integer,
88 VertexIdMessages<IntWritable, IntWritable>>();
89 dataToSend.initialize();
90 ByteArrayVertexIdMessages<IntWritable,
91 IntWritable> vertexIdMessages =
92 new ByteArrayVertexIdMessages<IntWritable, IntWritable>(
93 new TestMessageValueFactory<IntWritable>(IntWritable.class));
94 vertexIdMessages.setConf(conf);
95 vertexIdMessages.initialize();
96 dataToSend.add(partitionId, vertexIdMessages);
97 for (int i = 1; i < 7; ++i) {
98 IntWritable vertexId = new IntWritable(i);
99 for (int j = 0; j < i; ++j) {
100 vertexIdMessages.add(vertexId, new IntWritable(j));
101 }
102 }
103
104
105 SendWorkerMessagesRequest<IntWritable, IntWritable> request =
106 new SendWorkerMessagesRequest<IntWritable, IntWritable>(dataToSend);
107 request.setConf(conf);
108 return request;
109 }
110
111 private void checkResult(int numRequests) {
112
113 Iterable<IntWritable> vertices =
114 serverData.getIncomingMessageStore().getPartitionDestinationVertices(0);
115 int keySum = 0;
116 int messageSum = 0;
117 for (IntWritable vertexId : vertices) {
118 keySum += vertexId.get();
119 Iterable<IntWritable> messages =
120 serverData.<IntWritable>getIncomingMessageStore().getVertexMessages(
121 vertexId);
122 synchronized (messages) {
123 for (IntWritable message : messages) {
124 messageSum += message.get();
125 }
126 }
127 }
128 assertEquals(21, keySum);
129 assertEquals(35 * numRequests, messageSum);
130 }
131
132 @Test
133 public void send2Requests() throws IOException {
134 checkSendingTwoRequests();
135 }
136
137 @Test
138 public void alreadyProcessedRequest() throws IOException {
139
140 GiraphConstants.NETTY_SIMULATE_FIRST_RESPONSE_FAILED.set(conf, true);
141
142 GiraphConstants.MAX_REQUEST_MILLISECONDS.set(conf, 1000);
143
144 GiraphConstants.WAITING_REQUEST_MSECS.set(conf, 2000);
145
146 checkSendingTwoRequests();
147 }
148
149 @Test
150 public void resendRequest() throws IOException {
151
152 GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED.set(conf, false);
153
154 GiraphConstants.MAX_REQUEST_MILLISECONDS.set(conf, 1000);
155
156 GiraphConstants.WAITING_REQUEST_MSECS.set(conf, 2000);
157
158 checkSendingTwoRequests();
159 }
160
161 private void checkSendingTwoRequests() throws IOException {
162
163 serverData = MockUtils.createNewServerData(conf, context);
164 serverData.prepareSuperstep();
165 WorkerInfo workerInfo = new WorkerInfo();
166 server = new NettyServer(conf,
167 new WorkerRequestServerHandler.Factory(serverData), workerInfo,
168 context, new MockExceptionHandler());
169 server.start();
170 workerInfo.setInetSocketAddress(server.getMyAddress(), server.getLocalHostOrIp());
171 client = new NettyClient(context, conf, new WorkerInfo(),
172 new MockExceptionHandler());
173 server.setFlowControl(client.getFlowControl());
174 client.connectAllAddresses(
175 Lists.<WorkerInfo>newArrayList(workerInfo));
176
177
178 WritableRequest request1 = getRequest();
179 WritableRequest request2 = getRequest();
180 client.sendWritableRequest(workerInfo.getTaskId(), request1);
181 client.sendWritableRequest(workerInfo.getTaskId(), request2);
182 client.waitAllRequests();
183
184
185 client.stop();
186 server.stop();
187
188
189 checkResult(2);
190 }
191 }