This project has retired. For details please refer to its Attic page.
RequestFailureTest xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm;
20  
21  import org.apache.giraph.comm.netty.NettyClient;
22  import org.apache.giraph.comm.netty.NettyServer;
23  import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
24  import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
25  import org.apache.giraph.comm.requests.WritableRequest;
26  import org.apache.giraph.conf.GiraphConfiguration;
27  import org.apache.giraph.conf.GiraphConstants;
28  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
29  import org.apache.giraph.factories.TestMessageValueFactory;
30  import org.apache.giraph.utils.VertexIdMessages;
31  import org.apache.giraph.utils.ByteArrayVertexIdMessages;
32  import org.apache.giraph.utils.IntNoOpComputation;
33  import org.apache.giraph.utils.MockUtils;
34  import org.apache.giraph.utils.PairList;
35  import org.apache.giraph.worker.WorkerInfo;
36  import org.apache.hadoop.io.IntWritable;
37  import org.apache.hadoop.mapreduce.Counter;
38  import org.apache.hadoop.mapreduce.Mapper.Context;
39  import org.junit.Before;
40  import org.junit.Test;
41  
42  import com.google.common.collect.Lists;
43  
44  import java.io.IOException;
45  
46  import static org.junit.Assert.assertEquals;
47  import static org.mockito.Mockito.mock;
48  import static org.mockito.Mockito.when;
49  import static org.mockito.Mockito.any;
50  
51  /**
52   * Test all the netty failure scenarios
53   */
54  @SuppressWarnings("unchecked")
55  public class RequestFailureTest {
56    /** Configuration */
57    private ImmutableClassesGiraphConfiguration conf;
58    /** Server data */
59    private ServerData<IntWritable, IntWritable, IntWritable>
60    serverData;
61    /** Server */
62    private NettyServer server;
63    /** Client */
64    private NettyClient client;
65    /** Mock context */
66    private Context context;
67  
68    @Before
69    public void setUp() throws IOException {
70      // Setup the conf
71      GiraphConfiguration tmpConf = new GiraphConfiguration();
72      tmpConf.setComputationClass(IntNoOpComputation.class);
73      conf = new ImmutableClassesGiraphConfiguration(tmpConf);
74  
75      context = mock(Context.class);
76      when(context.getConfiguration()).thenReturn(conf);
77      Counter counter = mock(Counter.class);
78      when(context.getCounter(any(String.class), any(String.class))).thenReturn(
79          counter);
80    }
81  
82    private WritableRequest getRequest() {
83      // Data to send
84      final int partitionId = 0;
85      PairList<Integer, VertexIdMessages<IntWritable,
86                  IntWritable>>
87          dataToSend = new PairList<Integer,
88          VertexIdMessages<IntWritable, IntWritable>>();
89      dataToSend.initialize();
90      ByteArrayVertexIdMessages<IntWritable,
91              IntWritable> vertexIdMessages =
92          new ByteArrayVertexIdMessages<IntWritable, IntWritable>(
93              new TestMessageValueFactory<IntWritable>(IntWritable.class));
94      vertexIdMessages.setConf(conf);
95      vertexIdMessages.initialize();
96      dataToSend.add(partitionId, vertexIdMessages);
97      for (int i = 1; i < 7; ++i) {
98        IntWritable vertexId = new IntWritable(i);
99        for (int j = 0; j < i; ++j) {
100         vertexIdMessages.add(vertexId, new IntWritable(j));
101       }
102     }
103 
104     // Send the request
105     SendWorkerMessagesRequest<IntWritable, IntWritable> request =
106         new SendWorkerMessagesRequest<IntWritable, IntWritable>(dataToSend);
107     request.setConf(conf);
108     return request;
109   }
110 
111   private void checkResult(int numRequests) {
112     // Check the output
113     Iterable<IntWritable> vertices =
114         serverData.getIncomingMessageStore().getPartitionDestinationVertices(0);
115     int keySum = 0;
116     int messageSum = 0;
117     for (IntWritable vertexId : vertices) {
118       keySum += vertexId.get();
119       Iterable<IntWritable> messages =
120           serverData.<IntWritable>getIncomingMessageStore().getVertexMessages(
121               vertexId);
122       synchronized (messages) {
123         for (IntWritable message : messages) {
124           messageSum += message.get();
125         }
126       }
127     }
128     assertEquals(21, keySum);
129     assertEquals(35 * numRequests, messageSum);
130   }
131 
132   @Test
133   public void send2Requests() throws IOException {
134     checkSendingTwoRequests();
135   }
136 
137   @Test
138   public void alreadyProcessedRequest() throws IOException {
139     // Force a drop of the first request
140     GiraphConstants.NETTY_SIMULATE_FIRST_RESPONSE_FAILED.set(conf, true);
141     // One second to finish a request
142     GiraphConstants.MAX_REQUEST_MILLISECONDS.set(conf, 1000);
143     // Loop every 2 seconds
144     GiraphConstants.WAITING_REQUEST_MSECS.set(conf, 2000);
145 
146     checkSendingTwoRequests();
147   }
148 
149   @Test
150   public void resendRequest() throws IOException {
151     // Force a drop of the first request
152     GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED.set(conf, false);
153     // One second to finish a request
154     GiraphConstants.MAX_REQUEST_MILLISECONDS.set(conf, 1000);
155     // Loop every 2 seconds
156     GiraphConstants.WAITING_REQUEST_MSECS.set(conf, 2000);
157 
158     checkSendingTwoRequests();
159   }
160 
161   private void checkSendingTwoRequests() throws IOException {
162     // Start the service
163     serverData = MockUtils.createNewServerData(conf, context);
164     serverData.prepareSuperstep();
165     WorkerInfo workerInfo = new WorkerInfo();
166     server = new NettyServer(conf,
167         new WorkerRequestServerHandler.Factory(serverData), workerInfo,
168             context, new MockExceptionHandler());
169     server.start();
170     workerInfo.setInetSocketAddress(server.getMyAddress(), server.getLocalHostOrIp());
171     client = new NettyClient(context, conf, new WorkerInfo(),
172         new MockExceptionHandler());
173     server.setFlowControl(client.getFlowControl());
174     client.connectAllAddresses(
175         Lists.<WorkerInfo>newArrayList(workerInfo));
176 
177     // Send the request 2x, but should only be processed once
178     WritableRequest request1 = getRequest();
179     WritableRequest request2 = getRequest();
180     client.sendWritableRequest(workerInfo.getTaskId(), request1);
181     client.sendWritableRequest(workerInfo.getTaskId(), request2);
182     client.waitAllRequests();
183 
184     // Stop the service
185     client.stop();
186     server.stop();
187 
188     // Check the output (should have been only processed once)
189     checkResult(2);
190   }
191 }