View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm;
20  
21  import org.apache.giraph.comm.netty.NettyClient;
22  import org.apache.giraph.comm.netty.NettyServer;
23  import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
24  import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
25  import org.apache.giraph.comm.requests.WritableRequest;
26  import org.apache.giraph.conf.GiraphConfiguration;
27  import org.apache.giraph.conf.GiraphConstants;
28  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
29  import org.apache.giraph.factories.TestMessageValueFactory;
30  import org.apache.giraph.utils.VertexIdMessages;
31  import org.apache.giraph.utils.ByteArrayVertexIdMessages;
32  import org.apache.giraph.utils.IntNoOpComputation;
33  import org.apache.giraph.utils.MockUtils;
34  import org.apache.giraph.utils.PairList;
35  import org.apache.giraph.worker.WorkerInfo;
36  import org.apache.hadoop.io.IntWritable;
37  import org.apache.hadoop.mapreduce.Mapper.Context;
38  import org.junit.Before;
39  import org.junit.Test;
40  
41  import com.google.common.collect.Lists;
42  
43  import java.io.IOException;
44  
45  import static org.junit.Assert.assertEquals;
46  import static org.mockito.Mockito.mock;
47  import static org.mockito.Mockito.when;
48  
49  /**
50   * Test all the netty failure scenarios
51   */
52  @SuppressWarnings("unchecked")
53  public class RequestFailureTest {
54    /** Configuration */
55    private ImmutableClassesGiraphConfiguration conf;
56    /** Server data */
57    private ServerData<IntWritable, IntWritable, IntWritable>
58    serverData;
59    /** Server */
60    private NettyServer server;
61    /** Client */
62    private NettyClient client;
63    /** Mock context */
64    private Context context;
65  
66    @Before
67    public void setUp() throws IOException {
68      // Setup the conf
69      GiraphConfiguration tmpConf = new GiraphConfiguration();
70      tmpConf.setComputationClass(IntNoOpComputation.class);
71      conf = new ImmutableClassesGiraphConfiguration(tmpConf);
72  
73      context = mock(Context.class);
74      when(context.getConfiguration()).thenReturn(conf);
75    }
76  
77    private WritableRequest getRequest() {
78      // Data to send
79      final int partitionId = 0;
80      PairList<Integer, VertexIdMessages<IntWritable,
81                  IntWritable>>
82          dataToSend = new PairList<Integer,
83          VertexIdMessages<IntWritable, IntWritable>>();
84      dataToSend.initialize();
85      ByteArrayVertexIdMessages<IntWritable,
86              IntWritable> vertexIdMessages =
87          new ByteArrayVertexIdMessages<IntWritable, IntWritable>(
88              new TestMessageValueFactory<IntWritable>(IntWritable.class));
89      vertexIdMessages.setConf(conf);
90      vertexIdMessages.initialize();
91      dataToSend.add(partitionId, vertexIdMessages);
92      for (int i = 1; i < 7; ++i) {
93        IntWritable vertexId = new IntWritable(i);
94        for (int j = 0; j < i; ++j) {
95          vertexIdMessages.add(vertexId, new IntWritable(j));
96        }
97      }
98  
99      // Send the request
100     SendWorkerMessagesRequest<IntWritable, IntWritable> request =
101         new SendWorkerMessagesRequest<IntWritable, IntWritable>(dataToSend);
102     request.setConf(conf);
103     return request;
104   }
105 
106   private void checkResult(int numRequests) {
107     // Check the output
108     Iterable<IntWritable> vertices =
109         serverData.getIncomingMessageStore().getPartitionDestinationVertices(0);
110     int keySum = 0;
111     int messageSum = 0;
112     for (IntWritable vertexId : vertices) {
113       keySum += vertexId.get();
114       Iterable<IntWritable> messages =
115           serverData.<IntWritable>getIncomingMessageStore().getVertexMessages(
116               vertexId);
117       synchronized (messages) {
118         for (IntWritable message : messages) {
119           messageSum += message.get();
120         }
121       }
122     }
123     assertEquals(21, keySum);
124     assertEquals(35 * numRequests, messageSum);
125   }
126 
127   @Test
128   public void send2Requests() throws IOException {
129     checkSendingTwoRequests();
130   }
131 
132   @Test
133   public void alreadyProcessedRequest() throws IOException {
134     // Force a drop of the first request
135     GiraphConstants.NETTY_SIMULATE_FIRST_RESPONSE_FAILED.set(conf, true);
136     // One second to finish a request
137     GiraphConstants.MAX_REQUEST_MILLISECONDS.set(conf, 1000);
138     // Loop every 2 seconds
139     GiraphConstants.WAITING_REQUEST_MSECS.set(conf, 2000);
140 
141     checkSendingTwoRequests();
142   }
143 
144   @Test
145   public void resendRequest() throws IOException {
146     // Force a drop of the first request
147     GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED.set(conf, false);
148     // One second to finish a request
149     GiraphConstants.MAX_REQUEST_MILLISECONDS.set(conf, 1000);
150     // Loop every 2 seconds
151     GiraphConstants.WAITING_REQUEST_MSECS.set(conf, 2000);
152 
153     checkSendingTwoRequests();
154   }
155 
156   private void checkSendingTwoRequests() throws IOException {
157     // Start the service
158     serverData = MockUtils.createNewServerData(conf, context);
159     serverData.prepareSuperstep();
160     WorkerInfo workerInfo = new WorkerInfo();
161     server = new NettyServer(conf,
162         new WorkerRequestServerHandler.Factory(serverData), workerInfo,
163             context, new MockExceptionHandler());
164     server.start();
165     workerInfo.setInetSocketAddress(server.getMyAddress(), server.getLocalHostOrIp());
166     client = new NettyClient(context, conf, new WorkerInfo(),
167         new MockExceptionHandler());
168     server.setFlowControl(client.getFlowControl());
169     client.connectAllAddresses(
170         Lists.<WorkerInfo>newArrayList(workerInfo));
171 
172     // Send the request 2x, but should only be processed once
173     WritableRequest request1 = getRequest();
174     WritableRequest request2 = getRequest();
175     client.sendWritableRequest(workerInfo.getTaskId(), request1);
176     client.sendWritableRequest(workerInfo.getTaskId(), request2);
177     client.waitAllRequests();
178 
179     // Stop the service
180     client.stop();
181     server.stop();
182 
183     // Check the output (should have been only processed once)
184     checkResult(2);
185   }
186 }