This project has retired. For details please refer to its Attic page.
NettyClient xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.netty;
20  
21  import io.netty.handler.flush.FlushConsolidationHandler;
22  import org.apache.giraph.comm.flow_control.CreditBasedFlowControl;
23  import org.apache.giraph.comm.flow_control.FlowControl;
24  import org.apache.giraph.comm.flow_control.NoOpFlowControl;
25  import org.apache.giraph.comm.flow_control.StaticFlowControl;
26  import org.apache.giraph.comm.netty.handler.AckSignalFlag;
27  import org.apache.giraph.comm.netty.handler.TaskRequestIdGenerator;
28  import org.apache.giraph.comm.netty.handler.ClientRequestId;
29  import org.apache.giraph.comm.netty.handler.RequestEncoder;
30  import org.apache.giraph.comm.netty.handler.RequestInfo;
31  import org.apache.giraph.comm.netty.handler.RequestServerHandler;
32  import org.apache.giraph.comm.netty.handler.ResponseClientHandler;
33  /*if_not[HADOOP_NON_SECURE]*/
34  import org.apache.giraph.comm.netty.handler.SaslClientHandler;
35  import org.apache.giraph.comm.requests.RequestType;
36  import org.apache.giraph.comm.requests.SaslTokenMessageRequest;
37  /*end[HADOOP_NON_SECURE]*/
38  import org.apache.giraph.comm.requests.WritableRequest;
39  import org.apache.giraph.conf.BooleanConfOption;
40  import org.apache.giraph.conf.GiraphConstants;
41  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
42  import org.apache.giraph.counters.GiraphHadoopCounter;
43  import org.apache.giraph.function.Predicate;
44  import org.apache.giraph.graph.TaskInfo;
45  import org.apache.giraph.master.MasterInfo;
46  import org.apache.giraph.utils.PipelineUtils;
47  import org.apache.giraph.utils.ProgressableUtils;
48  import org.apache.giraph.utils.ThreadUtils;
49  import org.apache.giraph.utils.TimedLogger;
50  import org.apache.hadoop.mapreduce.Mapper;
51  import org.apache.log4j.Logger;
52  
53  import com.google.common.collect.Lists;
54  import com.google.common.collect.MapMaker;
55  import com.google.common.collect.Maps;
56  
57  /*if_not[HADOOP_NON_SECURE]*/
58  import java.io.IOException;
59  /*end[HADOOP_NON_SECURE]*/
60  import java.net.InetSocketAddress;
61  import java.util.Collection;
62  import java.util.Collections;
63  import java.util.Comparator;
64  import java.util.HashMap;
65  import java.util.HashSet;
66  import java.util.List;
67  import java.util.Map;
68  import java.util.Set;
69  import java.util.concurrent.ConcurrentMap;
70  import java.util.concurrent.atomic.AtomicInteger;
71  import java.util.concurrent.atomic.AtomicLong;
72  
73  import io.netty.bootstrap.Bootstrap;
74  import io.netty.channel.Channel;
75  import io.netty.channel.ChannelFuture;
76  import io.netty.channel.ChannelFutureListener;
77  import io.netty.channel.ChannelHandlerContext;
78  import io.netty.channel.ChannelInitializer;
79  import io.netty.channel.ChannelOption;
80  import io.netty.channel.EventLoopGroup;
81  import io.netty.channel.nio.NioEventLoopGroup;
82  import io.netty.channel.socket.SocketChannel;
83  import io.netty.channel.socket.nio.NioSocketChannel;
84  import io.netty.handler.codec.FixedLengthFrameDecoder;
85  /*if_not[HADOOP_NON_SECURE]*/
86  import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
87  import io.netty.util.AttributeKey;
88  /*end[HADOOP_NON_SECURE]*/
89  import io.netty.util.concurrent.BlockingOperationException;
90  import io.netty.util.concurrent.DefaultEventExecutorGroup;
91  import io.netty.util.concurrent.EventExecutorGroup;
92  
93  import static com.google.common.base.Preconditions.checkState;
94  import static org.apache.giraph.conf.GiraphConstants.CLIENT_RECEIVE_BUFFER_SIZE;
95  import static org.apache.giraph.conf.GiraphConstants.CLIENT_SEND_BUFFER_SIZE;
96  import static org.apache.giraph.conf.GiraphConstants.MAX_REQUEST_MILLISECONDS;
97  import static org.apache.giraph.conf.GiraphConstants.MAX_RESOLVE_ADDRESS_ATTEMPTS;
98  import static org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_EXECUTION_AFTER_HANDLER;
99  import static org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_EXECUTION_THREADS;
100 import static org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_USE_EXECUTION_HANDLER;
101 import static org.apache.giraph.conf.GiraphConstants.NETTY_MAX_CONNECTION_FAILURES;
102 import static org.apache.giraph.conf.GiraphConstants.RESEND_TIMED_OUT_REQUESTS;
103 import static org.apache.giraph.conf.GiraphConstants.WAIT_TIME_BETWEEN_CONNECTION_RETRIES_MS;
104 import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS;
105 
106 /**
107  * Netty client for sending requests.  Thread-safe.
108  */
109 public class NettyClient {
110   /** Do we have a limit on number of open requests we can have */
111   public static final BooleanConfOption LIMIT_NUMBER_OF_OPEN_REQUESTS =
112       new BooleanConfOption("giraph.waitForRequestsConfirmation", false,
113           "Whether to have a limit on number of open requests or not");
114   /**
115    * Do we have a limit on number of open requests we can have for each worker.
116    * Note that if this option is enabled, Netty will not keep more than a
117    * certain number of requests open for each other worker in the job. If there
118    * are more requests generated for a worker, Netty will not actually send the
119    * surplus requests, instead, it caches the requests in a local buffer. The
120    * maximum number of these unsent requests in the cache is another
121    * user-defined parameter (MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER).
122    */
123   public static final BooleanConfOption LIMIT_OPEN_REQUESTS_PER_WORKER =
124       new BooleanConfOption("giraph.waitForPerWorkerRequests", false,
125           "Whether to have a limit on number of open requests for each worker" +
126               "or not");
127   /** Maximum number of requests to list (for debugging) */
128   public static final int MAX_REQUESTS_TO_LIST = 10;
129   /**
130    * Maximum number of destination task ids with open requests to list
131    * (for debugging)
132    */
133   public static final int MAX_DESTINATION_TASK_IDS_TO_LIST = 10;
134   /** 30 seconds to connect by default */
135   public static final int MAX_CONNECTION_MILLISECONDS_DEFAULT = 30 * 1000;
136 /*if_not[HADOOP_NON_SECURE]*/
137   /** Used to authenticate with other workers acting as servers */
138   public static final AttributeKey<SaslNettyClient> SASL =
139       AttributeKey.valueOf("saslNettyClient");
140 /*end[HADOOP_NON_SECURE]*/
141 
142   /** Group name for netty counters */
143   public static final String NETTY_COUNTERS_GROUP = "Netty counters";
144   /** How many network requests were resent because they took too long */
145   public static final String NETWORK_REQUESTS_RESENT_FOR_TIMEOUT_NAME =
146       "Network requests resent for timeout";
147   /** How many network requests were resent because channel failed */
148   public static final String NETWORK_REQUESTS_RESENT_FOR_CHANNEL_FAILURE_NAME =
149       "Network requests resent for channel failure";
150   /** How many network requests were resent because connection failed */
151   public static final String
152       NETWORK_REQUESTS_RESENT_FOR_CONNECTION_FAILURE_NAME =
153       "Network requests resent for connection or request failure";
154 
155   /** Class logger */
156   private static final Logger LOG = Logger.getLogger(NettyClient.class);
157   /** Netty related counter names */
158   private static Map<String, Set<String>> COUNTER_GROUP_AND_NAMES =
159           new HashMap<>();
160   /** Context used to report progress */
161   private final Mapper<?, ?, ?, ?>.Context context;
162   /** Client bootstrap */
163   private final Bootstrap bootstrap;
164   /**
165    * Map of the peer connections, mapping from remote socket address to client
166    * meta data
167    */
168   private final ConcurrentMap<InetSocketAddress, ChannelRotater>
169   addressChannelMap = new MapMaker().makeMap();
170   /**
171    * Map from task id to address of its server
172    */
173   private final Map<Integer, InetSocketAddress> taskIdAddressMap =
174       new MapMaker().makeMap();
175   /**
176    * Request map of client request ids to request information.
177    */
178   private final ConcurrentMap<ClientRequestId, RequestInfo>
179   clientRequestIdRequestInfoMap;
180   /** Number of channels per server */
181   private final int channelsPerServer;
182   /** Inbound byte counter for this client */
183   private final InboundByteCounter inboundByteCounter = new
184       InboundByteCounter();
185   /** Outbound byte counter for this client */
186   private final OutboundByteCounter outboundByteCounter = new
187       OutboundByteCounter();
188   /** Send buffer size */
189   private final int sendBufferSize;
190   /** Receive buffer size */
191   private final int receiveBufferSize;
192   /** Warn if request size is bigger than the buffer size by this factor */
193   private final float requestSizeWarningThreshold;
194   /** Maximum number of connection failures */
195   private final int maxConnectionFailures;
196   /** How long to wait before trying to reconnect failed connections */
197   private final long waitTimeBetweenConnectionRetriesMs;
198   /** Maximum number of milliseconds for a request */
199   private final int maxRequestMilliseconds;
200   /**
201    * Whether to resend request which timed out or fail the job if timeout
202    * happens
203    */
204   private final boolean resendTimedOutRequests;
205   /** Waiting interval for checking outstanding requests msecs */
206   private final int waitingRequestMsecs;
207   /** Timed logger for printing request debugging */
208   private final TimedLogger requestLogger;
209   /** Worker executor group */
210   private final EventLoopGroup workerGroup;
211   /** Task request id generator */
212   private final TaskRequestIdGenerator taskRequestIdGenerator =
213       new TaskRequestIdGenerator();
214   /** Task info */
215   private final TaskInfo myTaskInfo;
216   /** Maximum thread pool size */
217   private final int maxPoolSize;
218   /** Maximum number of attempts to resolve an address*/
219   private final int maxResolveAddressAttempts;
220   /** Use execution handler? */
221   private final boolean useExecutionGroup;
222   /** EventExecutor Group (if used) */
223   private final EventExecutorGroup executionGroup;
224   /** Name of the handler to use execution group for (if used) */
225   private final String handlerToUseExecutionGroup;
226   /** When was the last time we checked if we should resend some requests */
227   private final AtomicLong lastTimeCheckedRequestsForProblems =
228       new AtomicLong(0);
229   /**
230    * Logger used to dump stack traces for every exception that happens
231    * in netty client threads.
232    */
233   private final LogOnErrorChannelFutureListener logErrorListener =
234       new LogOnErrorChannelFutureListener();
235   /** Flow control policy used */
236   private final FlowControl flowControl;
237 
238   /** How many network requests were resent because they took too long */
239   private final GiraphHadoopCounter networkRequestsResentForTimeout;
240   /** How many network requests were resent because channel failed */
241   private final GiraphHadoopCounter networkRequestsResentForChannelFailure;
242   /** How many network requests were resent because connection failed */
243   private final GiraphHadoopCounter networkRequestsResentForConnectionFailure;
244 
245   /**
246    * Keeps track of the number of reconnect failures. Once this exceeds the
247    * value of {@link #maxConnectionFailures}, the job will fail.
248    */
249   private int reconnectFailures = 0;
250 
251   /**
252    * Only constructor
253    *
254    * @param context Context for progress
255    * @param conf Configuration
256    * @param myTaskInfo Current task info
257    * @param exceptionHandler handler for uncaught exception. Will
258    *                         terminate job.
259    */
260   public NettyClient(Mapper<?, ?, ?, ?>.Context context,
261     final ImmutableClassesGiraphConfiguration conf, TaskInfo myTaskInfo,
262     final Thread.UncaughtExceptionHandler exceptionHandler) {
263 
264     this.context = context;
265     this.myTaskInfo = myTaskInfo;
266     this.channelsPerServer = GiraphConstants.CHANNELS_PER_SERVER.get(conf);
267     sendBufferSize = CLIENT_SEND_BUFFER_SIZE.get(conf);
268     receiveBufferSize = CLIENT_RECEIVE_BUFFER_SIZE.get(conf);
269     this.requestSizeWarningThreshold =
270         GiraphConstants.REQUEST_SIZE_WARNING_THRESHOLD.get(conf);
271 
272     boolean limitNumberOfOpenRequests = LIMIT_NUMBER_OF_OPEN_REQUESTS.get(conf);
273     boolean limitOpenRequestsPerWorker =
274         LIMIT_OPEN_REQUESTS_PER_WORKER.get(conf);
275     checkState(!limitNumberOfOpenRequests || !limitOpenRequestsPerWorker,
276         "NettyClient: it is not allowed to have both limitations on the " +
277             "number of total open requests, and on the number of open " +
278             "requests per worker!");
279     if (limitNumberOfOpenRequests) {
280       flowControl = new StaticFlowControl(conf, this);
281     } else if (limitOpenRequestsPerWorker) {
282       flowControl = new CreditBasedFlowControl(conf, this, exceptionHandler);
283     } else {
284       flowControl = new NoOpFlowControl(this);
285     }
286 
287     initialiseCounters();
288     networkRequestsResentForTimeout =
289         new GiraphHadoopCounter(context.getCounter(NETTY_COUNTERS_GROUP,
290             NETWORK_REQUESTS_RESENT_FOR_TIMEOUT_NAME));
291     networkRequestsResentForChannelFailure =
292         new GiraphHadoopCounter(context.getCounter(NETTY_COUNTERS_GROUP,
293             NETWORK_REQUESTS_RESENT_FOR_CHANNEL_FAILURE_NAME));
294     networkRequestsResentForConnectionFailure =
295       new GiraphHadoopCounter(context.getCounter(NETTY_COUNTERS_GROUP,
296         NETWORK_REQUESTS_RESENT_FOR_CONNECTION_FAILURE_NAME));
297 
298     maxRequestMilliseconds = MAX_REQUEST_MILLISECONDS.get(conf);
299     resendTimedOutRequests = RESEND_TIMED_OUT_REQUESTS.get(conf);
300     maxConnectionFailures = NETTY_MAX_CONNECTION_FAILURES.get(conf);
301     waitTimeBetweenConnectionRetriesMs =
302         WAIT_TIME_BETWEEN_CONNECTION_RETRIES_MS.get(conf);
303     waitingRequestMsecs = WAITING_REQUEST_MSECS.get(conf);
304     requestLogger = new TimedLogger(waitingRequestMsecs, LOG);
305     maxPoolSize = GiraphConstants.NETTY_CLIENT_THREADS.get(conf);
306     maxResolveAddressAttempts = MAX_RESOLVE_ADDRESS_ATTEMPTS.get(conf);
307 
308     clientRequestIdRequestInfoMap =
309         new MapMaker().concurrencyLevel(maxPoolSize).makeMap();
310 
311     handlerToUseExecutionGroup =
312         NETTY_CLIENT_EXECUTION_AFTER_HANDLER.get(conf);
313     useExecutionGroup = NETTY_CLIENT_USE_EXECUTION_HANDLER.get(conf);
314     if (useExecutionGroup) {
315       int executionThreads = NETTY_CLIENT_EXECUTION_THREADS.get(conf);
316       executionGroup = new DefaultEventExecutorGroup(executionThreads,
317           ThreadUtils.createThreadFactory(
318               "netty-client-exec-%d", exceptionHandler));
319       if (LOG.isInfoEnabled()) {
320         LOG.info("NettyClient: Using execution handler with " +
321             executionThreads + " threads after " +
322             handlerToUseExecutionGroup + ".");
323       }
324     } else {
325       executionGroup = null;
326     }
327 
328     workerGroup = new NioEventLoopGroup(maxPoolSize,
329         ThreadUtils.createThreadFactory(
330             "netty-client-worker-%d", exceptionHandler));
331 
332     bootstrap = new Bootstrap();
333     bootstrap.group(workerGroup)
334         .channel(NioSocketChannel.class)
335         .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
336             MAX_CONNECTION_MILLISECONDS_DEFAULT)
337         .option(ChannelOption.TCP_NODELAY, true)
338         .option(ChannelOption.SO_KEEPALIVE, true)
339         .option(ChannelOption.SO_SNDBUF, sendBufferSize)
340         .option(ChannelOption.SO_RCVBUF, receiveBufferSize)
341         .option(ChannelOption.ALLOCATOR, conf.getNettyAllocator())
342         .handler(new ChannelInitializer<SocketChannel>() {
343           @Override
344           protected void initChannel(SocketChannel ch) throws Exception {
345 /*if_not[HADOOP_NON_SECURE]*/
346             if (conf.authenticate()) {
347               LOG.info("Using Netty with authentication.");
348               PipelineUtils.addLastWithExecutorCheck("flushConsolidation",
349                 new FlushConsolidationHandler(FlushConsolidationHandler
350                   .DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true),
351                 handlerToUseExecutionGroup, executionGroup, ch);
352               // Our pipeline starts with just byteCounter, and then we use
353               // addLast() to incrementally add pipeline elements, so that we
354               // can name them for identification for removal or replacement
355               // after client is authenticated by server.
356               // After authentication is complete, the pipeline's SASL-specific
357               // functionality is removed, restoring the pipeline to exactly the
358               // same configuration as it would be without authentication.
359               PipelineUtils.addLastWithExecutorCheck("clientInboundByteCounter",
360                   inboundByteCounter, handlerToUseExecutionGroup,
361                   executionGroup, ch);
362               if (conf.doCompression()) {
363                 PipelineUtils.addLastWithExecutorCheck("compressionDecoder",
364                     conf.getNettyCompressionDecoder(),
365                     handlerToUseExecutionGroup, executionGroup, ch);
366               }
367               PipelineUtils.addLastWithExecutorCheck(
368                   "clientOutboundByteCounter",
369                   outboundByteCounter, handlerToUseExecutionGroup,
370                   executionGroup, ch);
371               if (conf.doCompression()) {
372                 PipelineUtils.addLastWithExecutorCheck("compressionEncoder",
373                     conf.getNettyCompressionEncoder(),
374                     handlerToUseExecutionGroup, executionGroup, ch);
375               }
376               // The following pipeline component is needed to decode the
377               // server's SASL tokens. It is replaced with a
378               // FixedLengthFrameDecoder (same as used with the
379               // non-authenticated pipeline) after authentication
380               // completes (as in non-auth pipeline below).
381               PipelineUtils.addLastWithExecutorCheck(
382                   "length-field-based-frame-decoder",
383                   new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4),
384                   handlerToUseExecutionGroup, executionGroup, ch);
385               PipelineUtils.addLastWithExecutorCheck("request-encoder",
386                   new RequestEncoder(conf), handlerToUseExecutionGroup,
387                   executionGroup, ch);
388               // The following pipeline component responds to the server's SASL
389               // tokens with its own responses. Both client and server share the
390               // same Hadoop Job token, which is used to create the SASL
391               // tokens to authenticate with each other.
392               // After authentication finishes, this pipeline component
393               // is removed.
394               PipelineUtils.addLastWithExecutorCheck("sasl-client-handler",
395                   new SaslClientHandler(conf), handlerToUseExecutionGroup,
396                   executionGroup, ch);
397               PipelineUtils.addLastWithExecutorCheck("response-handler",
398                   new ResponseClientHandler(NettyClient.this, conf),
399                   handlerToUseExecutionGroup, executionGroup, ch);
400             } else {
401               LOG.info("Using Netty without authentication.");
402 /*end[HADOOP_NON_SECURE]*/
403               PipelineUtils.addLastWithExecutorCheck("flushConsolidation",
404                 new FlushConsolidationHandler(FlushConsolidationHandler
405                     .DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true),
406                 handlerToUseExecutionGroup, executionGroup, ch);
407               PipelineUtils.addLastWithExecutorCheck("clientInboundByteCounter",
408                   inboundByteCounter, handlerToUseExecutionGroup,
409                   executionGroup, ch);
410               if (conf.doCompression()) {
411                 PipelineUtils.addLastWithExecutorCheck("compressionDecoder",
412                     conf.getNettyCompressionDecoder(),
413                     handlerToUseExecutionGroup, executionGroup, ch);
414               }
415               PipelineUtils.addLastWithExecutorCheck(
416                   "clientOutboundByteCounter",
417                   outboundByteCounter, handlerToUseExecutionGroup,
418                   executionGroup, ch);
419               if (conf.doCompression()) {
420                 PipelineUtils.addLastWithExecutorCheck("compressionEncoder",
421                     conf.getNettyCompressionEncoder(),
422                     handlerToUseExecutionGroup, executionGroup, ch);
423               }
424               PipelineUtils.addLastWithExecutorCheck(
425                   "fixed-length-frame-decoder",
426                   new FixedLengthFrameDecoder(
427                       RequestServerHandler.RESPONSE_BYTES),
428                  handlerToUseExecutionGroup, executionGroup, ch);
429               PipelineUtils.addLastWithExecutorCheck("request-encoder",
430                     new RequestEncoder(conf), handlerToUseExecutionGroup,
431                   executionGroup, ch);
432               PipelineUtils.addLastWithExecutorCheck("response-handler",
433                   new ResponseClientHandler(NettyClient.this, conf),
434                   handlerToUseExecutionGroup, executionGroup, ch);
435 
436 /*if_not[HADOOP_NON_SECURE]*/
437             }
438 /*end[HADOOP_NON_SECURE]*/
439           }
440 
441           @Override
442           public void channelUnregistered(ChannelHandlerContext ctx) throws
443               Exception {
444             super.channelUnregistered(ctx);
445             LOG.error("Channel failed " + ctx.channel());
446             checkRequestsAfterChannelFailure(ctx.channel());
447           }
448         });
449 
450     // Start a thread which will observe if there are any problems with open
451     // requests
452     ThreadUtils.startThread(new Runnable() {
453       @Override
454       public void run() {
455         while (true) {
456           ThreadUtils.trySleep(waitingRequestMsecs);
457           checkRequestsForProblems();
458         }
459       }
460     }, "open-requests-observer");
461   }
462 
463   /**
464    * Put the Netty-related counters in a single map which will be accessed
465    * from the worker/master
466    */
467   private void initialiseCounters() {
468     Set<String> counters = COUNTER_GROUP_AND_NAMES.getOrDefault(
469             NETTY_COUNTERS_GROUP, new HashSet<>());
470     counters.add(NETWORK_REQUESTS_RESENT_FOR_TIMEOUT_NAME);
471     counters.add(NETWORK_REQUESTS_RESENT_FOR_CHANNEL_FAILURE_NAME);
472     counters.add(NETWORK_REQUESTS_RESENT_FOR_CONNECTION_FAILURE_NAME);
473     COUNTER_GROUP_AND_NAMES.put(NETTY_COUNTERS_GROUP, counters);
474   }
475 
476   public static Map<String, Set<String>> getCounterGroupsAndNames() {
477     return COUNTER_GROUP_AND_NAMES;
478   }
479 
480   /**
481    * Whether master task is involved in the communication with a given client
482    *
483    * @param clientId id of the communication (on the end of the communication)
484    * @return true if master is on one end of the communication
485    */
486   public boolean masterInvolved(int clientId) {
487     return myTaskInfo.getTaskId() == MasterInfo.MASTER_TASK_ID ||
488         clientId == MasterInfo.MASTER_TASK_ID;
489   }
490 
491   /**
492    * Pair object for connectAllAddresses().
493    */
494   private static class ChannelFutureAddress {
495     /** Future object */
496     private final ChannelFuture future;
497     /** Address of the future */
498     private final InetSocketAddress address;
499     /** Task id */
500     private final Integer taskId;
501 
502     /**
503      * Constructor.
504      *
505      * @param future Immutable future
506      * @param address Immutable address
507      * @param taskId Immutable taskId
508      */
509     ChannelFutureAddress(
510         ChannelFuture future, InetSocketAddress address, Integer taskId) {
511       this.future = future;
512       this.address = address;
513       this.taskId = taskId;
514     }
515 
516     @Override
517     public String toString() {
518       return "(future=" + future + ",address=" + address + ",taskId=" +
519           taskId + ")";
520     }
521   }
522 
523   /**
524    * Connect to a collection of tasks servers
525    *
526    * @param tasks Tasks to connect to (if haven't already connected)
527    */
528   public void connectAllAddresses(Collection<? extends TaskInfo> tasks) {
529     List<ChannelFutureAddress> waitingConnectionList =
530         Lists.newArrayListWithCapacity(tasks.size() * channelsPerServer);
531     for (TaskInfo taskInfo : tasks) {
532       context.progress();
533       int taskId = taskInfo.getTaskId();
534       InetSocketAddress address = taskIdAddressMap.get(taskId);
535       if (address == null ||
536           !address.getHostName().equals(taskInfo.getHostname()) ||
537           address.getPort() != taskInfo.getPort()) {
538         address = resolveAddress(maxResolveAddressAttempts,
539             taskInfo.getHostOrIp(), taskInfo.getPort());
540         taskIdAddressMap.put(taskId, address);
541       }
542       if (address == null || address.getHostName() == null ||
543           address.getHostName().isEmpty()) {
544         throw new IllegalStateException("connectAllAddresses: Null address " +
545             "in addresses " + tasks);
546       }
547       if (address.isUnresolved()) {
548         throw new IllegalStateException("connectAllAddresses: Unresolved " +
549             "address " + address);
550       }
551 
552       if (addressChannelMap.containsKey(address)) {
553         continue;
554       }
555 
556       // Start connecting to the remote server up to n time
557       for (int i = 0; i < channelsPerServer; ++i) {
558         ChannelFuture connectionFuture = bootstrap.connect(address);
559 
560         waitingConnectionList.add(
561             new ChannelFutureAddress(
562                 connectionFuture, address, taskId));
563       }
564     }
565 
566     // Wait for all the connections to succeed up to n tries
567     int failures = 0;
568     int connected = 0;
569     while (failures < maxConnectionFailures) {
570       List<ChannelFutureAddress> nextCheckFutures = Lists.newArrayList();
571       boolean isFirstFailure = true;
572       for (ChannelFutureAddress waitingConnection : waitingConnectionList) {
573         context.progress();
574         ChannelFuture future = waitingConnection.future;
575         ProgressableUtils.awaitChannelFuture(future, context);
576         if (!future.isSuccess() || !future.channel().isOpen()) {
577           // Make a short pause before trying to reconnect failed addresses
578           // again, but to do it just once per iterating through channels
579           if (isFirstFailure) {
580             isFirstFailure = false;
581             try {
582               Thread.sleep(waitTimeBetweenConnectionRetriesMs);
583             } catch (InterruptedException e) {
584               throw new IllegalStateException(
585                   "connectAllAddresses: InterruptedException occurred", e);
586             }
587           }
588 
589           LOG.warn("connectAllAddresses: Future failed " +
590               "to connect with " + waitingConnection.address + " with " +
591               failures + " failures because of " + future.cause());
592 
593           ChannelFuture connectionFuture =
594               bootstrap.connect(waitingConnection.address);
595           nextCheckFutures.add(new ChannelFutureAddress(connectionFuture,
596               waitingConnection.address, waitingConnection.taskId));
597           ++failures;
598         } else {
599           Channel channel = future.channel();
600           if (LOG.isDebugEnabled()) {
601             LOG.debug("connectAllAddresses: Connected to " +
602                 channel.remoteAddress() + ", open = " + channel.isOpen());
603           }
604 
605           if (channel.remoteAddress() == null) {
606             throw new IllegalStateException(
607                 "connectAllAddresses: Null remote address!");
608           }
609 
610           ChannelRotater rotater =
611               addressChannelMap.get(waitingConnection.address);
612           if (rotater == null) {
613             ChannelRotater newRotater =
614                 new ChannelRotater(waitingConnection.taskId,
615                     waitingConnection.address);
616             rotater = addressChannelMap.putIfAbsent(
617                 waitingConnection.address, newRotater);
618             if (rotater == null) {
619               rotater = newRotater;
620             }
621           }
622           rotater.addChannel(future.channel());
623           ++connected;
624         }
625       }
626       LOG.info("connectAllAddresses: Successfully added " +
627           (waitingConnectionList.size() - nextCheckFutures.size()) +
628           " connections, (" + connected + " total connected) " +
629           nextCheckFutures.size() + " failed, " +
630           failures + " failures total.");
631       if (nextCheckFutures.isEmpty()) {
632         break;
633       }
634       waitingConnectionList = nextCheckFutures;
635     }
636     if (failures >= maxConnectionFailures) {
637       throw new IllegalStateException(
638           "connectAllAddresses: Too many failures (" + failures + ").");
639     }
640   }
641 
642 /*if_not[HADOOP_NON_SECURE]*/
643   /**
644    * Authenticate all servers in addressChannelMap.
645    */
646   public void authenticate() {
647     LOG.info("authenticate: NettyClient starting authentication with " +
648         "servers.");
649     for (Map.Entry<InetSocketAddress, ChannelRotater> entry :
650       addressChannelMap.entrySet()) {
651       if (LOG.isDebugEnabled()) {
652         LOG.debug("authenticate: Authenticating with address:" +
653           entry.getKey());
654       }
655       ChannelRotater channelRotater = entry.getValue();
656       for (Channel channel: channelRotater.getChannels()) {
657         if (LOG.isDebugEnabled()) {
658           LOG.debug("authenticate: Authenticating with server on channel: " +
659               channel);
660         }
661         authenticateOnChannel(channelRotater.getTaskId(), channel);
662       }
663     }
664     if (LOG.isInfoEnabled()) {
665       LOG.info("authenticate: NettyClient successfully authenticated with " +
666           addressChannelMap.size() + " server" +
667           ((addressChannelMap.size() != 1) ? "s" : "") +
668           " - continuing with normal work.");
669     }
670   }
671 
672   /**
673    * Authenticate with server connected at given channel.
674    *
675    * @param taskId Task id of the channel
676    * @param channel Connection to server to authenticate with.
677    */
678   private void authenticateOnChannel(Integer taskId, Channel channel) {
679     try {
680       SaslNettyClient saslNettyClient = channel.attr(SASL).get();
681       if (channel.attr(SASL).get() == null) {
682         if (LOG.isDebugEnabled()) {
683           LOG.debug("authenticateOnChannel: Creating saslNettyClient now " +
684               "for channel: " + channel);
685         }
686         saslNettyClient = new SaslNettyClient();
687         channel.attr(SASL).set(saslNettyClient);
688       }
689       if (!saslNettyClient.isComplete()) {
690         if (LOG.isDebugEnabled()) {
691           LOG.debug("authenticateOnChannel: Waiting for authentication " +
692               "to complete..");
693         }
694         SaslTokenMessageRequest saslTokenMessage = saslNettyClient.firstToken();
695         sendWritableRequest(taskId, saslTokenMessage);
696         // We now wait for Netty's thread pool to communicate over this
697         // channel to authenticate with another worker acting as a server.
698         try {
699           synchronized (saslNettyClient.getAuthenticated()) {
700             while (!saslNettyClient.isComplete()) {
701               saslNettyClient.getAuthenticated().wait();
702             }
703           }
704         } catch (InterruptedException e) {
705           LOG.error("authenticateOnChannel: Interrupted while waiting for " +
706               "authentication.");
707         }
708       }
709       if (LOG.isDebugEnabled()) {
710         LOG.debug("authenticateOnChannel: Authentication on channel: " +
711             channel + " has completed successfully.");
712       }
713     } catch (IOException e) {
714       LOG.error("authenticateOnChannel: Failed to authenticate with server " +
715           "due to error: " + e);
716     }
717     return;
718   }
719 /*end[HADOOP_NON_SECURE]*/
720 
721   /**
722    * Stop the client.
723    */
724   public void stop() {
725     if (LOG.isInfoEnabled()) {
726       LOG.info("stop: Halting netty client");
727     }
728     // Close connections asynchronously, in a Netty-approved
729     // way, without cleaning up thread pools until all channels
730     // in addressChannelMap are closed (success or failure)
731     int channelCount = 0;
732     for (ChannelRotater channelRotater : addressChannelMap.values()) {
733       channelCount += channelRotater.size();
734     }
735     final int done = channelCount;
736     final AtomicInteger count = new AtomicInteger(0);
737     for (ChannelRotater channelRotater : addressChannelMap.values()) {
738       channelRotater.closeChannels(new ChannelFutureListener() {
739         @Override
740         public void operationComplete(ChannelFuture cf) {
741           context.progress();
742           if (count.incrementAndGet() == done) {
743             if (LOG.isInfoEnabled()) {
744               LOG.info("stop: reached wait threshold, " +
745                   done + " connections closed, releasing " +
746                   "resources now.");
747             }
748             workerGroup.shutdownGracefully();
749             if (executionGroup != null) {
750               executionGroup.shutdownGracefully();
751             }
752           }
753         }
754       });
755     }
756     ProgressableUtils.awaitTerminationFuture(workerGroup, context);
757     if (executionGroup != null) {
758       ProgressableUtils.awaitTerminationFuture(executionGroup, context);
759     }
760     if (LOG.isInfoEnabled()) {
761       LOG.info("stop: Netty client halted");
762     }
763   }
764 
765   /**
766    * Get the next available channel, reconnecting if necessary
767    *
768    * @param remoteServer Remote server to get a channel for
769    * @return Available channel for this remote server
770    */
771   private Channel getNextChannel(InetSocketAddress remoteServer) {
772     Channel channel = addressChannelMap.get(remoteServer).nextChannel();
773     if (channel == null) {
774       LOG.warn("getNextChannel: No channel exists for " + remoteServer);
775     } else {
776       // Return this channel if it is connected
777       if (channel.isActive()) {
778         return channel;
779       }
780 
781       // Get rid of the failed channel
782       if (addressChannelMap.get(remoteServer).removeChannel(channel)) {
783         LOG.warn("getNextChannel: Unlikely event that the channel " +
784           channel + " was already removed!");
785       }
786       if (LOG.isInfoEnabled()) {
787         LOG.info("getNextChannel: Fixing disconnected channel to " +
788           remoteServer + ", open = " + channel.isOpen() + ", " +
789           "bound = " + channel.isRegistered());
790       }
791     }
792 
793     while (reconnectFailures < maxConnectionFailures) {
794       ChannelFuture connectionFuture = bootstrap.connect(remoteServer);
795       try {
796         ProgressableUtils.awaitChannelFuture(connectionFuture, context);
797       } catch (BlockingOperationException e) {
798         LOG.warn("getNextChannel: Failed connecting to " + remoteServer, e);
799       }
800       if (connectionFuture.isSuccess()) {
801         if (LOG.isInfoEnabled()) {
802           LOG.info("getNextChannel: Connected to " + remoteServer + "!");
803         }
804         addressChannelMap.get(remoteServer).addChannel(
805             connectionFuture.channel());
806         return connectionFuture.channel();
807       }
808       ++reconnectFailures;
809       LOG.warn("getNextChannel: Failed to reconnect to " +  remoteServer +
810           " on attempt " + reconnectFailures + " out of " +
811           maxConnectionFailures + " max attempts, sleeping for 5 secs",
812           connectionFuture.cause());
813       ThreadUtils.trySleep(5000);
814     }
815     throw new IllegalStateException("getNextChannel: Failed to connect " +
816         "to " + remoteServer + " in " + reconnectFailures +
817         " connect attempts");
818   }
819 
820   /**
821    * Send a request to a remote server honoring the flow control mechanism
822    * (should be already connected)
823    *
824    * @param destTaskId Destination task id
825    * @param request Request to send
826    */
827   public void sendWritableRequest(int destTaskId, WritableRequest request) {
828     flowControl.sendRequest(destTaskId, request);
829   }
830 
831   /**
832    * Actual send of a request.
833    *
834    * @param destTaskId destination to send the request to
835    * @param request request itself
836    * @return request id generated for sending the request
837    */
838   public Long doSend(int destTaskId, WritableRequest request) {
839     InetSocketAddress remoteServer = taskIdAddressMap.get(destTaskId);
840     if (clientRequestIdRequestInfoMap.isEmpty()) {
841       inboundByteCounter.resetAll();
842       outboundByteCounter.resetAll();
843     }
844     boolean registerRequest = true;
845     Long requestId = null;
846 /*if_not[HADOOP_NON_SECURE]*/
847     if (request.getType() == RequestType.SASL_TOKEN_MESSAGE_REQUEST) {
848       registerRequest = false;
849     }
850 /*end[HADOOP_NON_SECURE]*/
851 
852     RequestInfo newRequestInfo = new RequestInfo(remoteServer, request);
853     if (registerRequest) {
854       request.setClientId(myTaskInfo.getTaskId());
855       requestId = taskRequestIdGenerator.getNextRequestId(destTaskId);
856       request.setRequestId(requestId);
857       ClientRequestId clientRequestId =
858         new ClientRequestId(destTaskId, request.getRequestId());
859       RequestInfo oldRequestInfo = clientRequestIdRequestInfoMap.putIfAbsent(
860         clientRequestId, newRequestInfo);
861       if (oldRequestInfo != null) {
862         throw new IllegalStateException("sendWritableRequest: Impossible to " +
863           "have a previous request id = " + request.getRequestId() + ", " +
864           "request info of " + oldRequestInfo);
865       }
866     }
867     if (request.getSerializedSize() >
868         requestSizeWarningThreshold * sendBufferSize) {
869       LOG.warn("Creating large request of type " + request.getClass() +
870         ", size " + request.getSerializedSize() +
871         " bytes. Check netty buffer size.");
872     }
873     writeRequestToChannel(newRequestInfo);
874     return requestId;
875   }
876 
877   /**
878    * Write request to a channel for its destination.
879    *
880    * Whenever we write to the channel, we also call flush, but we have added a
881    * {@link FlushConsolidationHandler} in the pipeline, which batches the
882    * flushes.
883    *
884    * @param requestInfo Request info
885    */
886   private void writeRequestToChannel(RequestInfo requestInfo) {
887     Channel channel = getNextChannel(requestInfo.getDestinationAddress());
888     ChannelFuture writeFuture = channel.writeAndFlush(requestInfo.getRequest());
889     requestInfo.setWriteFuture(writeFuture);
890     writeFuture.addListener(logErrorListener);
891   }
892 
893   /**
894    * Handle receipt of a message. Called by response handler.
895    *
896    * @param senderId Id of sender of the message
897    * @param requestId Id of the request
898    * @param response Actual response
899    * @param shouldDrop Drop the message?
900    */
901   public void messageReceived(int senderId, long requestId, int response,
902       boolean shouldDrop) {
903     if (shouldDrop) {
904       synchronized (clientRequestIdRequestInfoMap) {
905         clientRequestIdRequestInfoMap.notifyAll();
906       }
907       return;
908     }
909     AckSignalFlag responseFlag = flowControl.getAckSignalFlag(response);
910     if (responseFlag == AckSignalFlag.DUPLICATE_REQUEST) {
911       LOG.info("messageReceived: Already completed request (taskId = " +
912           senderId + ", requestId = " + requestId + ")");
913     } else if (responseFlag != AckSignalFlag.NEW_REQUEST) {
914       throw new IllegalStateException(
915           "messageReceived: Got illegal response " + response);
916     }
917     RequestInfo requestInfo = clientRequestIdRequestInfoMap
918         .remove(new ClientRequestId(senderId, requestId));
919     if (requestInfo == null) {
920       LOG.info("messageReceived: Already received response for (taskId = " +
921           senderId + ", requestId = " + requestId + ")");
922     } else {
923       if (LOG.isDebugEnabled()) {
924         LOG.debug("messageReceived: Completed (taskId = " + senderId + ")" +
925             requestInfo + ".  Waiting on " +
926             clientRequestIdRequestInfoMap.size() + " requests");
927       }
928       flowControl.messageAckReceived(senderId, requestId, response);
929       // Help #waitAllRequests() to finish faster
930       synchronized (clientRequestIdRequestInfoMap) {
931         clientRequestIdRequestInfoMap.notifyAll();
932       }
933     }
934   }
935 
936   /**
937    * Ensure all the request sent so far are complete. Periodically check the
938    * state of current open requests. If there is an issue in any of them,
939    * re-send the request.
940    */
941   public void waitAllRequests() {
942     flowControl.waitAllRequests();
943     checkState(flowControl.getNumberOfUnsentRequests() == 0);
944     while (clientRequestIdRequestInfoMap.size() > 0) {
945       // Wait for requests to complete for some time
946       synchronized (clientRequestIdRequestInfoMap) {
947         if (clientRequestIdRequestInfoMap.size() == 0) {
948           break;
949         }
950         try {
951           clientRequestIdRequestInfoMap.wait(waitingRequestMsecs);
952         } catch (InterruptedException e) {
953           throw new IllegalStateException("waitAllRequests: Got unexpected " +
954               "InterruptedException", e);
955         }
956       }
957       logAndSanityCheck();
958     }
959     if (LOG.isInfoEnabled()) {
960       LOG.info("waitAllRequests: Finished all requests. " +
961           inboundByteCounter.getMetrics() + "\n" + outboundByteCounter
962           .getMetrics());
963     }
964   }
965 
966   /**
967    * Log information about the requests and check for problems in requests
968    */
969   public void logAndSanityCheck() {
970     logInfoAboutOpenRequests();
971     // Make sure that waiting doesn't kill the job
972     context.progress();
973   }
974 
975   /**
976    * Log the status of open requests.
977    */
978   private void logInfoAboutOpenRequests() {
979     if (LOG.isInfoEnabled() && requestLogger.isPrintable()) {
980       LOG.info("logInfoAboutOpenRequests: Waiting interval of " +
981           waitingRequestMsecs + " msecs, " +
982           clientRequestIdRequestInfoMap.size() +
983           " open requests, " + inboundByteCounter.getMetrics() + "\n" +
984           outboundByteCounter.getMetrics());
985 
986       if (clientRequestIdRequestInfoMap.size() < MAX_REQUESTS_TO_LIST) {
987         for (Map.Entry<ClientRequestId, RequestInfo> entry :
988             clientRequestIdRequestInfoMap.entrySet()) {
989           LOG.info("logInfoAboutOpenRequests: Waiting for request " +
990               entry.getKey() + " - " + entry.getValue());
991         }
992       }
993 
994       // Count how many open requests each task has
995       Map<Integer, Integer> openRequestCounts = Maps.newHashMap();
996       for (ClientRequestId clientRequestId :
997           clientRequestIdRequestInfoMap.keySet()) {
998         int taskId = clientRequestId.getDestinationTaskId();
999         Integer currentCount = openRequestCounts.get(taskId);
1000         openRequestCounts.put(taskId,
1001             (currentCount == null ? 0 : currentCount) + 1);
1002       }
1003       // Sort it in decreasing order of number of open requests
1004       List<Map.Entry<Integer, Integer>> sorted =
1005           Lists.newArrayList(openRequestCounts.entrySet());
1006       Collections.sort(sorted, new Comparator<Map.Entry<Integer, Integer>>() {
1007         @Override
1008         public int compare(Map.Entry<Integer, Integer> entry1,
1009             Map.Entry<Integer, Integer> entry2) {
1010           int value1 = entry1.getValue();
1011           int value2 = entry2.getValue();
1012           return (value1 < value2) ? 1 : ((value1 == value2) ? 0 : -1);
1013         }
1014       });
1015       // Print task ids which have the most open requests
1016       StringBuilder message = new StringBuilder();
1017       message.append("logInfoAboutOpenRequests: ");
1018       int itemsToPrint =
1019           Math.min(MAX_DESTINATION_TASK_IDS_TO_LIST, sorted.size());
1020       for (int i = 0; i < itemsToPrint; i++) {
1021         message.append(sorted.get(i).getValue())
1022             .append(" requests for taskId=")
1023             .append(sorted.get(i).getKey())
1024             .append(", ");
1025       }
1026       LOG.info(message);
1027       flowControl.logInfo();
1028     }
1029   }
1030 
1031   /**
1032    * Check if there are some open requests which have been sent a long time
1033    * ago, and if so resend them.
1034    */
1035   private void checkRequestsForProblems() {
1036     long lastTimeChecked = lastTimeCheckedRequestsForProblems.get();
1037     // If not enough time passed from the previous check, return
1038     if (System.currentTimeMillis() < lastTimeChecked + waitingRequestMsecs) {
1039       return;
1040     }
1041     // If another thread did the check already, return
1042     if (!lastTimeCheckedRequestsForProblems.compareAndSet(lastTimeChecked,
1043         System.currentTimeMillis())) {
1044       return;
1045     }
1046     resendRequestsWhenNeeded(new Predicate<RequestInfo>() {
1047       @Override
1048       public boolean apply(RequestInfo requestInfo) {
1049         // If the request is taking too long, re-establish and resend
1050         return requestInfo.getElapsedMsecs() > maxRequestMilliseconds;
1051       }
1052     }, networkRequestsResentForTimeout, resendTimedOutRequests);
1053     resendRequestsWhenNeeded(new Predicate<RequestInfo>() {
1054       @Override
1055       public boolean apply(RequestInfo requestInfo) {
1056         ChannelFuture writeFuture = requestInfo.getWriteFuture();
1057         // If not connected anymore or request failed re-establish and resend
1058         return writeFuture != null && (!writeFuture.channel().isActive() ||
1059             (writeFuture.isDone() && !writeFuture.isSuccess()));
1060       }
1061     }, networkRequestsResentForConnectionFailure, true);
1062   }
1063 
1064   /**
1065    * Resend requests which satisfy predicate
1066    *  @param shouldResendRequestPredicate Predicate to use to check whether
1067    *                                     request should be resent
1068    * @param counter Counter to increment for every resent network request
1069    * @param resendProblematicRequest Whether to resend problematic request or
1070    *                                fail the job if such request is found
1071    */
1072   private void resendRequestsWhenNeeded(
1073       Predicate<RequestInfo> shouldResendRequestPredicate,
1074       GiraphHadoopCounter counter,
1075       boolean resendProblematicRequest) {
1076     // Check if there are open requests which have been sent a long time ago,
1077     // and if so, resend them.
1078     List<ClientRequestId> addedRequestIds = Lists.newArrayList();
1079     List<RequestInfo> addedRequestInfos = Lists.newArrayList();
1080     // Check all the requests for problems
1081     for (Map.Entry<ClientRequestId, RequestInfo> entry :
1082         clientRequestIdRequestInfoMap.entrySet()) {
1083       RequestInfo requestInfo = entry.getValue();
1084       // If request should be resent
1085       if (shouldResendRequestPredicate.apply(requestInfo)) {
1086         if (!resendProblematicRequest) {
1087           throw new IllegalStateException("Problem with request id " +
1088               entry.getKey() + " for " + requestInfo.getDestinationAddress() +
1089               ", failing the job");
1090         }
1091         ChannelFuture writeFuture = requestInfo.getWriteFuture();
1092         String logMessage;
1093         if (writeFuture == null) {
1094           logMessage = "wasn't sent successfully";
1095         } else {
1096           logMessage = "connected = " +
1097               writeFuture.channel().isActive() +
1098               ", future done = " + writeFuture.isDone() + ", " +
1099               "success = " + writeFuture.isSuccess() + ", " +
1100               "cause = " + writeFuture.cause() + ", " +
1101               "channelId = " + writeFuture.channel().hashCode();
1102         }
1103         LOG.warn("checkRequestsForProblems: Problem with request id " +
1104             entry.getKey() + ", " + logMessage + ", " +
1105             "elapsed time = " + requestInfo.getElapsedMsecs() + ", " +
1106             "destination = " + requestInfo.getDestinationAddress() +
1107             " " + requestInfo);
1108         addedRequestIds.add(entry.getKey());
1109         addedRequestInfos.add(new RequestInfo(
1110             requestInfo.getDestinationAddress(), requestInfo.getRequest()));
1111         counter.increment();
1112       }
1113     }
1114 
1115     // Add any new requests to the system, connect if necessary, and re-send
1116     for (int i = 0; i < addedRequestIds.size(); ++i) {
1117       ClientRequestId requestId = addedRequestIds.get(i);
1118       RequestInfo requestInfo = addedRequestInfos.get(i);
1119 
1120       if (clientRequestIdRequestInfoMap.put(requestId, requestInfo) == null) {
1121         LOG.warn("checkRequestsForProblems: Request " + requestId +
1122             " completed prior to sending the next request");
1123         clientRequestIdRequestInfoMap.remove(requestId);
1124       }
1125       if (LOG.isInfoEnabled()) {
1126         LOG.info("checkRequestsForProblems: Re-issuing request " + requestInfo);
1127       }
1128       writeRequestToChannel(requestInfo);
1129       if (LOG.isInfoEnabled()) {
1130         LOG.info("checkRequestsForProblems: Request " + requestId +
1131             " was resent through channelId=" +
1132             requestInfo.getWriteFuture().channel().hashCode());
1133       }
1134     }
1135     addedRequestIds.clear();
1136     addedRequestInfos.clear();
1137   }
1138 
1139   /**
1140    * Utility method for resolving addresses
1141    *
1142    * @param maxResolveAddressAttempts Maximum number of attempts to resolve the
1143    *        address
1144    * @param hostOrIp Known IP or host name
1145    * @param port Target port number
1146    * @return The successfully resolved address.
1147    * @throws IllegalStateException if the address is not resolved
1148    *         in <code>maxResolveAddressAttempts</code> tries.
1149    */
1150   private static InetSocketAddress resolveAddress(
1151       int maxResolveAddressAttempts, String hostOrIp, int port) {
1152     int resolveAttempts = 0;
1153     InetSocketAddress address = new InetSocketAddress(hostOrIp, port);
1154     while (address.isUnresolved() &&
1155         resolveAttempts < maxResolveAddressAttempts) {
1156       ++resolveAttempts;
1157       LOG.warn("resolveAddress: Failed to resolve " + address +
1158           " on attempt " + resolveAttempts + " of " +
1159           maxResolveAddressAttempts + " attempts, sleeping for 5 seconds");
1160       ThreadUtils.trySleep(5000);
1161       address = new InetSocketAddress(hostOrIp,
1162           address.getPort());
1163     }
1164     if (resolveAttempts >= maxResolveAddressAttempts) {
1165       throw new IllegalStateException("resolveAddress: Couldn't " +
1166           "resolve " + address + " in " +  resolveAttempts + " tries.");
1167     }
1168     return address;
1169   }
1170 
1171   public FlowControl getFlowControl() {
1172     return flowControl;
1173   }
1174 
1175   /**
1176    * Generate and get the next request id to be used for a given worker
1177    *
1178    * @param taskId id of the worker to generate the next request id
1179    * @return request id
1180    */
1181   public Long getNextRequestId(int taskId) {
1182     return taskRequestIdGenerator.getNextRequestId(taskId);
1183   }
1184 
1185   /**
1186    * @return number of open requests
1187    */
1188   public int getNumberOfOpenRequests() {
1189     return clientRequestIdRequestInfoMap.size();
1190   }
1191 
1192   /**
1193    * Resend requests related to channel which failed
1194    *
1195    * @param channel Channel which failed
1196    */
1197   private void checkRequestsAfterChannelFailure(final Channel channel) {
1198     resendRequestsWhenNeeded(new Predicate<RequestInfo>() {
1199       @Override
1200       public boolean apply(RequestInfo requestInfo) {
1201         if (requestInfo.getWriteFuture() == null ||
1202             requestInfo.getWriteFuture().channel() == null) {
1203           return false;
1204         }
1205         return requestInfo.getWriteFuture().channel().equals(channel);
1206       }
1207     }, networkRequestsResentForChannelFailure, true);
1208   }
1209 
1210   /**
1211    * This listener class just dumps exception stack traces if
1212    * something happens.
1213    */
1214   private static class LogOnErrorChannelFutureListener
1215       implements ChannelFutureListener {
1216 
1217     @Override
1218     public void operationComplete(ChannelFuture future) throws Exception {
1219       if (future.isDone() && !future.isSuccess()) {
1220         LOG.error("Channel failed channelId=" + future.channel().hashCode(),
1221             future.cause());
1222       }
1223     }
1224   }
1225 }