View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.netty;
20  
21  import org.apache.giraph.comm.flow_control.CreditBasedFlowControl;
22  import org.apache.giraph.comm.flow_control.FlowControl;
23  import org.apache.giraph.comm.flow_control.NoOpFlowControl;
24  import org.apache.giraph.comm.flow_control.StaticFlowControl;
25  import org.apache.giraph.comm.netty.handler.AckSignalFlag;
26  import org.apache.giraph.comm.netty.handler.TaskRequestIdGenerator;
27  import org.apache.giraph.comm.netty.handler.ClientRequestId;
28  import org.apache.giraph.comm.netty.handler.RequestEncoder;
29  import org.apache.giraph.comm.netty.handler.RequestInfo;
30  import org.apache.giraph.comm.netty.handler.RequestServerHandler;
31  import org.apache.giraph.comm.netty.handler.ResponseClientHandler;
32  /*if_not[HADOOP_NON_SECURE]*/
33  import org.apache.giraph.comm.netty.handler.SaslClientHandler;
34  import org.apache.giraph.comm.requests.RequestType;
35  import org.apache.giraph.comm.requests.SaslTokenMessageRequest;
36  /*end[HADOOP_NON_SECURE]*/
37  import org.apache.giraph.comm.requests.WritableRequest;
38  import org.apache.giraph.conf.BooleanConfOption;
39  import org.apache.giraph.conf.GiraphConstants;
40  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
41  import org.apache.giraph.counters.GiraphHadoopCounter;
42  import org.apache.giraph.function.Predicate;
43  import org.apache.giraph.graph.TaskInfo;
44  import org.apache.giraph.master.MasterInfo;
45  import org.apache.giraph.utils.PipelineUtils;
46  import org.apache.giraph.utils.ProgressableUtils;
47  import org.apache.giraph.utils.ThreadUtils;
48  import org.apache.giraph.utils.TimedLogger;
49  import org.apache.hadoop.mapreduce.Mapper;
50  import org.apache.log4j.Logger;
51  
52  import com.google.common.collect.Lists;
53  import com.google.common.collect.MapMaker;
54  import com.google.common.collect.Maps;
55  
56  /*if_not[HADOOP_NON_SECURE]*/
57  import java.io.IOException;
58  /*end[HADOOP_NON_SECURE]*/
59  import java.net.InetSocketAddress;
60  import java.util.Collection;
61  import java.util.Collections;
62  import java.util.Comparator;
63  import java.util.List;
64  import java.util.Map;
65  import java.util.concurrent.ConcurrentMap;
66  import java.util.concurrent.atomic.AtomicInteger;
67  import java.util.concurrent.atomic.AtomicLong;
68  
69  import io.netty.bootstrap.Bootstrap;
70  import io.netty.channel.Channel;
71  import io.netty.channel.ChannelFuture;
72  import io.netty.channel.ChannelFutureListener;
73  import io.netty.channel.ChannelHandlerContext;
74  import io.netty.channel.ChannelInitializer;
75  import io.netty.channel.ChannelOption;
76  import io.netty.channel.EventLoopGroup;
77  import io.netty.channel.nio.NioEventLoopGroup;
78  import io.netty.channel.socket.SocketChannel;
79  import io.netty.channel.socket.nio.NioSocketChannel;
80  import io.netty.handler.codec.FixedLengthFrameDecoder;
81  /*if_not[HADOOP_NON_SECURE]*/
82  import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
83  import io.netty.util.AttributeKey;
84  /*end[HADOOP_NON_SECURE]*/
85  import io.netty.util.concurrent.BlockingOperationException;
86  import io.netty.util.concurrent.DefaultEventExecutorGroup;
87  import io.netty.util.concurrent.EventExecutorGroup;
88  
89  import static com.google.common.base.Preconditions.checkState;
90  import static org.apache.giraph.conf.GiraphConstants.CLIENT_RECEIVE_BUFFER_SIZE;
91  import static org.apache.giraph.conf.GiraphConstants.CLIENT_SEND_BUFFER_SIZE;
92  import static org.apache.giraph.conf.GiraphConstants.MAX_REQUEST_MILLISECONDS;
93  import static org.apache.giraph.conf.GiraphConstants.MAX_RESOLVE_ADDRESS_ATTEMPTS;
94  import static org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_EXECUTION_AFTER_HANDLER;
95  import static org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_EXECUTION_THREADS;
96  import static org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_USE_EXECUTION_HANDLER;
97  import static org.apache.giraph.conf.GiraphConstants.NETTY_MAX_CONNECTION_FAILURES;
98  import static org.apache.giraph.conf.GiraphConstants.RESEND_TIMED_OUT_REQUESTS;
99  import static org.apache.giraph.conf.GiraphConstants.WAIT_TIME_BETWEEN_CONNECTION_RETRIES_MS;
100 import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS;
101 
102 /**
103  * Netty client for sending requests.  Thread-safe.
104  */
105 public class NettyClient {
106   /** Do we have a limit on number of open requests we can have */
107   public static final BooleanConfOption LIMIT_NUMBER_OF_OPEN_REQUESTS =
108       new BooleanConfOption("giraph.waitForRequestsConfirmation", false,
109           "Whether to have a limit on number of open requests or not");
110   /**
111    * Do we have a limit on number of open requests we can have for each worker.
112    * Note that if this option is enabled, Netty will not keep more than a
113    * certain number of requests open for each other worker in the job. If there
114    * are more requests generated for a worker, Netty will not actually send the
115    * surplus requests, instead, it caches the requests in a local buffer. The
116    * maximum number of these unsent requests in the cache is another
117    * user-defined parameter (MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER).
118    */
119   public static final BooleanConfOption LIMIT_OPEN_REQUESTS_PER_WORKER =
120       new BooleanConfOption("giraph.waitForPerWorkerRequests", false,
121           "Whether to have a limit on number of open requests for each worker" +
122               "or not");
123   /** Maximum number of requests to list (for debugging) */
124   public static final int MAX_REQUESTS_TO_LIST = 10;
125   /**
126    * Maximum number of destination task ids with open requests to list
127    * (for debugging)
128    */
129   public static final int MAX_DESTINATION_TASK_IDS_TO_LIST = 10;
130   /** 30 seconds to connect by default */
131   public static final int MAX_CONNECTION_MILLISECONDS_DEFAULT = 30 * 1000;
132 /*if_not[HADOOP_NON_SECURE]*/
133   /** Used to authenticate with other workers acting as servers */
134   public static final AttributeKey<SaslNettyClient> SASL =
135       AttributeKey.valueOf("saslNettyClient");
136 /*end[HADOOP_NON_SECURE]*/
137 
138   /** Group name for netty counters */
139   public static final String NETTY_COUNTERS_GROUP = "Netty counters";
140   /** How many network requests were resent because they took too long */
141   public static final String NETWORK_REQUESTS_RESENT_FOR_TIMEOUT_NAME =
142       "Network requests resent for timeout";
143   /** How many network requests were resent because channel failed */
144   public static final String NETWORK_REQUESTS_RESENT_FOR_CHANNEL_FAILURE_NAME =
145       "Network requests resent for channel failure";
146   /** How many network requests were resent because connection failed */
147   public static final String
148       NETWORK_REQUESTS_RESENT_FOR_CONNECTION_FAILURE_NAME =
149       "Network requests resent for connection or request failure";
150 
151   /** Class logger */
152   private static final Logger LOG = Logger.getLogger(NettyClient.class);
153   /** Context used to report progress */
154   private final Mapper<?, ?, ?, ?>.Context context;
155   /** Client bootstrap */
156   private final Bootstrap bootstrap;
157   /**
158    * Map of the peer connections, mapping from remote socket address to client
159    * meta data
160    */
161   private final ConcurrentMap<InetSocketAddress, ChannelRotater>
162   addressChannelMap = new MapMaker().makeMap();
163   /**
164    * Map from task id to address of its server
165    */
166   private final Map<Integer, InetSocketAddress> taskIdAddressMap =
167       new MapMaker().makeMap();
168   /**
169    * Request map of client request ids to request information.
170    */
171   private final ConcurrentMap<ClientRequestId, RequestInfo>
172   clientRequestIdRequestInfoMap;
173   /** Number of channels per server */
174   private final int channelsPerServer;
175   /** Inbound byte counter for this client */
176   private final InboundByteCounter inboundByteCounter = new
177       InboundByteCounter();
178   /** Outbound byte counter for this client */
179   private final OutboundByteCounter outboundByteCounter = new
180       OutboundByteCounter();
181   /** Send buffer size */
182   private final int sendBufferSize;
183   /** Receive buffer size */
184   private final int receiveBufferSize;
185   /** Warn if request size is bigger than the buffer size by this factor */
186   private final float requestSizeWarningThreshold;
187   /** Maximum number of connection failures */
188   private final int maxConnectionFailures;
189   /** How long to wait before trying to reconnect failed connections */
190   private final long waitTimeBetweenConnectionRetriesMs;
191   /** Maximum number of milliseconds for a request */
192   private final int maxRequestMilliseconds;
193   /**
194    * Whether to resend request which timed out or fail the job if timeout
195    * happens
196    */
197   private final boolean resendTimedOutRequests;
198   /** Waiting interval for checking outstanding requests msecs */
199   private final int waitingRequestMsecs;
200   /** Timed logger for printing request debugging */
201   private final TimedLogger requestLogger;
202   /** Worker executor group */
203   private final EventLoopGroup workerGroup;
204   /** Task request id generator */
205   private final TaskRequestIdGenerator taskRequestIdGenerator =
206       new TaskRequestIdGenerator();
207   /** Task info */
208   private final TaskInfo myTaskInfo;
209   /** Maximum thread pool size */
210   private final int maxPoolSize;
211   /** Maximum number of attempts to resolve an address*/
212   private final int maxResolveAddressAttempts;
213   /** Use execution handler? */
214   private final boolean useExecutionGroup;
215   /** EventExecutor Group (if used) */
216   private final EventExecutorGroup executionGroup;
217   /** Name of the handler to use execution group for (if used) */
218   private final String handlerToUseExecutionGroup;
219   /** When was the last time we checked if we should resend some requests */
220   private final AtomicLong lastTimeCheckedRequestsForProblems =
221       new AtomicLong(0);
222   /**
223    * Logger used to dump stack traces for every exception that happens
224    * in netty client threads.
225    */
226   private final LogOnErrorChannelFutureListener logErrorListener =
227       new LogOnErrorChannelFutureListener();
228   /** Flow control policy used */
229   private final FlowControl flowControl;
230 
231   /** How many network requests were resent because they took too long */
232   private final GiraphHadoopCounter networkRequestsResentForTimeout;
233   /** How many network requests were resent because channel failed */
234   private final GiraphHadoopCounter networkRequestsResentForChannelFailure;
235   /** How many network requests were resent because connection failed */
236   private final GiraphHadoopCounter networkRequestsResentForConnectionFailure;
237 
238   /**
239    * Only constructor
240    *
241    * @param context Context for progress
242    * @param conf Configuration
243    * @param myTaskInfo Current task info
244    * @param exceptionHandler handler for uncaught exception. Will
245    *                         terminate job.
246    */
247   public NettyClient(Mapper<?, ?, ?, ?>.Context context,
248                      final ImmutableClassesGiraphConfiguration conf,
249                      TaskInfo myTaskInfo,
250                      final Thread.UncaughtExceptionHandler exceptionHandler) {
251     this.context = context;
252     this.myTaskInfo = myTaskInfo;
253     this.channelsPerServer = GiraphConstants.CHANNELS_PER_SERVER.get(conf);
254     sendBufferSize = CLIENT_SEND_BUFFER_SIZE.get(conf);
255     receiveBufferSize = CLIENT_RECEIVE_BUFFER_SIZE.get(conf);
256     this.requestSizeWarningThreshold =
257         GiraphConstants.REQUEST_SIZE_WARNING_THRESHOLD.get(conf);
258 
259     boolean limitNumberOfOpenRequests = LIMIT_NUMBER_OF_OPEN_REQUESTS.get(conf);
260     boolean limitOpenRequestsPerWorker =
261         LIMIT_OPEN_REQUESTS_PER_WORKER.get(conf);
262     checkState(!limitNumberOfOpenRequests || !limitOpenRequestsPerWorker,
263         "NettyClient: it is not allowed to have both limitations on the " +
264             "number of total open requests, and on the number of open " +
265             "requests per worker!");
266     if (limitNumberOfOpenRequests) {
267       flowControl = new StaticFlowControl(conf, this);
268     } else if (limitOpenRequestsPerWorker) {
269       flowControl = new CreditBasedFlowControl(conf, this, exceptionHandler);
270     } else {
271       flowControl = new NoOpFlowControl(this);
272     }
273 
274     networkRequestsResentForTimeout =
275         new GiraphHadoopCounter(context.getCounter(
276             NETTY_COUNTERS_GROUP,
277             NETWORK_REQUESTS_RESENT_FOR_TIMEOUT_NAME));
278     networkRequestsResentForChannelFailure =
279         new GiraphHadoopCounter(context.getCounter(
280             NETTY_COUNTERS_GROUP,
281             NETWORK_REQUESTS_RESENT_FOR_CHANNEL_FAILURE_NAME));
282     networkRequestsResentForConnectionFailure =
283       new GiraphHadoopCounter(context.getCounter(
284         NETTY_COUNTERS_GROUP,
285         NETWORK_REQUESTS_RESENT_FOR_CONNECTION_FAILURE_NAME));
286 
287     maxRequestMilliseconds = MAX_REQUEST_MILLISECONDS.get(conf);
288     resendTimedOutRequests = RESEND_TIMED_OUT_REQUESTS.get(conf);
289     maxConnectionFailures = NETTY_MAX_CONNECTION_FAILURES.get(conf);
290     waitTimeBetweenConnectionRetriesMs =
291         WAIT_TIME_BETWEEN_CONNECTION_RETRIES_MS.get(conf);
292     waitingRequestMsecs = WAITING_REQUEST_MSECS.get(conf);
293     requestLogger = new TimedLogger(waitingRequestMsecs, LOG);
294     maxPoolSize = GiraphConstants.NETTY_CLIENT_THREADS.get(conf);
295     maxResolveAddressAttempts = MAX_RESOLVE_ADDRESS_ATTEMPTS.get(conf);
296 
297     clientRequestIdRequestInfoMap =
298         new MapMaker().concurrencyLevel(maxPoolSize).makeMap();
299 
300     handlerToUseExecutionGroup =
301         NETTY_CLIENT_EXECUTION_AFTER_HANDLER.get(conf);
302     useExecutionGroup = NETTY_CLIENT_USE_EXECUTION_HANDLER.get(conf);
303     if (useExecutionGroup) {
304       int executionThreads = NETTY_CLIENT_EXECUTION_THREADS.get(conf);
305       executionGroup = new DefaultEventExecutorGroup(executionThreads,
306           ThreadUtils.createThreadFactory(
307               "netty-client-exec-%d", exceptionHandler));
308       if (LOG.isInfoEnabled()) {
309         LOG.info("NettyClient: Using execution handler with " +
310             executionThreads + " threads after " +
311             handlerToUseExecutionGroup + ".");
312       }
313     } else {
314       executionGroup = null;
315     }
316 
317     workerGroup = new NioEventLoopGroup(maxPoolSize,
318         ThreadUtils.createThreadFactory(
319             "netty-client-worker-%d", exceptionHandler));
320 
321     bootstrap = new Bootstrap();
322     bootstrap.group(workerGroup)
323         .channel(NioSocketChannel.class)
324         .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
325             MAX_CONNECTION_MILLISECONDS_DEFAULT)
326         .option(ChannelOption.TCP_NODELAY, true)
327         .option(ChannelOption.SO_KEEPALIVE, true)
328         .option(ChannelOption.SO_SNDBUF, sendBufferSize)
329         .option(ChannelOption.SO_RCVBUF, receiveBufferSize)
330         .option(ChannelOption.ALLOCATOR, conf.getNettyAllocator())
331         .handler(new ChannelInitializer<SocketChannel>() {
332           @Override
333           protected void initChannel(SocketChannel ch) throws Exception {
334 /*if_not[HADOOP_NON_SECURE]*/
335             if (conf.authenticate()) {
336               LOG.info("Using Netty with authentication.");
337 
338               // Our pipeline starts with just byteCounter, and then we use
339               // addLast() to incrementally add pipeline elements, so that we
340               // can name them for identification for removal or replacement
341               // after client is authenticated by server.
342               // After authentication is complete, the pipeline's SASL-specific
343               // functionality is removed, restoring the pipeline to exactly the
344               // same configuration as it would be without authentication.
345               PipelineUtils.addLastWithExecutorCheck("clientInboundByteCounter",
346                   inboundByteCounter, handlerToUseExecutionGroup,
347                   executionGroup, ch);
348               if (conf.doCompression()) {
349                 PipelineUtils.addLastWithExecutorCheck("compressionDecoder",
350                     conf.getNettyCompressionDecoder(),
351                     handlerToUseExecutionGroup, executionGroup, ch);
352               }
353               PipelineUtils.addLastWithExecutorCheck(
354                   "clientOutboundByteCounter",
355                   outboundByteCounter, handlerToUseExecutionGroup,
356                   executionGroup, ch);
357               if (conf.doCompression()) {
358                 PipelineUtils.addLastWithExecutorCheck("compressionEncoder",
359                     conf.getNettyCompressionEncoder(),
360                     handlerToUseExecutionGroup, executionGroup, ch);
361               }
362               // The following pipeline component is needed to decode the
363               // server's SASL tokens. It is replaced with a
364               // FixedLengthFrameDecoder (same as used with the
365               // non-authenticated pipeline) after authentication
366               // completes (as in non-auth pipeline below).
367               PipelineUtils.addLastWithExecutorCheck(
368                   "length-field-based-frame-decoder",
369                   new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4),
370                   handlerToUseExecutionGroup, executionGroup, ch);
371               PipelineUtils.addLastWithExecutorCheck("request-encoder",
372                   new RequestEncoder(conf), handlerToUseExecutionGroup,
373                   executionGroup, ch);
374               // The following pipeline component responds to the server's SASL
375               // tokens with its own responses. Both client and server share the
376               // same Hadoop Job token, which is used to create the SASL
377               // tokens to authenticate with each other.
378               // After authentication finishes, this pipeline component
379               // is removed.
380               PipelineUtils.addLastWithExecutorCheck("sasl-client-handler",
381                   new SaslClientHandler(conf), handlerToUseExecutionGroup,
382                   executionGroup, ch);
383               PipelineUtils.addLastWithExecutorCheck("response-handler",
384                   new ResponseClientHandler(NettyClient.this, conf),
385                   handlerToUseExecutionGroup, executionGroup, ch);
386             } else {
387               LOG.info("Using Netty without authentication.");
388 /*end[HADOOP_NON_SECURE]*/
389               PipelineUtils.addLastWithExecutorCheck("clientInboundByteCounter",
390                   inboundByteCounter, handlerToUseExecutionGroup,
391                   executionGroup, ch);
392               if (conf.doCompression()) {
393                 PipelineUtils.addLastWithExecutorCheck("compressionDecoder",
394                     conf.getNettyCompressionDecoder(),
395                     handlerToUseExecutionGroup, executionGroup, ch);
396               }
397               PipelineUtils.addLastWithExecutorCheck(
398                   "clientOutboundByteCounter",
399                   outboundByteCounter, handlerToUseExecutionGroup,
400                   executionGroup, ch);
401               if (conf.doCompression()) {
402                 PipelineUtils.addLastWithExecutorCheck("compressionEncoder",
403                     conf.getNettyCompressionEncoder(),
404                     handlerToUseExecutionGroup, executionGroup, ch);
405               }
406               PipelineUtils.addLastWithExecutorCheck(
407                   "fixed-length-frame-decoder",
408                   new FixedLengthFrameDecoder(
409                       RequestServerHandler.RESPONSE_BYTES),
410                  handlerToUseExecutionGroup, executionGroup, ch);
411               PipelineUtils.addLastWithExecutorCheck("request-encoder",
412                     new RequestEncoder(conf), handlerToUseExecutionGroup,
413                   executionGroup, ch);
414               PipelineUtils.addLastWithExecutorCheck("response-handler",
415                   new ResponseClientHandler(NettyClient.this, conf),
416                   handlerToUseExecutionGroup, executionGroup, ch);
417 
418 /*if_not[HADOOP_NON_SECURE]*/
419             }
420 /*end[HADOOP_NON_SECURE]*/
421           }
422 
423           @Override
424           public void channelUnregistered(ChannelHandlerContext ctx) throws
425               Exception {
426             super.channelUnregistered(ctx);
427             LOG.error("Channel failed " + ctx.channel());
428             checkRequestsAfterChannelFailure(ctx.channel());
429           }
430         });
431 
432     // Start a thread which will observe if there are any problems with open
433     // requests
434     ThreadUtils.startThread(new Runnable() {
435       @Override
436       public void run() {
437         while (true) {
438           ThreadUtils.trySleep(waitingRequestMsecs);
439           checkRequestsForProblems();
440         }
441       }
442     }, "open-requests-observer");
443   }
444 
445   /**
446    * Whether master task is involved in the communication with a given client
447    *
448    * @param clientId id of the communication (on the end of the communication)
449    * @return true if master is on one end of the communication
450    */
451   public boolean masterInvolved(int clientId) {
452     return myTaskInfo.getTaskId() == MasterInfo.MASTER_TASK_ID ||
453         clientId == MasterInfo.MASTER_TASK_ID;
454   }
455 
456   /**
457    * Pair object for connectAllAddresses().
458    */
459   private static class ChannelFutureAddress {
460     /** Future object */
461     private final ChannelFuture future;
462     /** Address of the future */
463     private final InetSocketAddress address;
464     /** Task id */
465     private final Integer taskId;
466 
467     /**
468      * Constructor.
469      *
470      * @param future Immutable future
471      * @param address Immutable address
472      * @param taskId Immutable taskId
473      */
474     ChannelFutureAddress(
475         ChannelFuture future, InetSocketAddress address, Integer taskId) {
476       this.future = future;
477       this.address = address;
478       this.taskId = taskId;
479     }
480 
481     @Override
482     public String toString() {
483       return "(future=" + future + ",address=" + address + ",taskId=" +
484           taskId + ")";
485     }
486   }
487 
488   /**
489    * Connect to a collection of tasks servers
490    *
491    * @param tasks Tasks to connect to (if haven't already connected)
492    */
493   public void connectAllAddresses(Collection<? extends TaskInfo> tasks) {
494     List<ChannelFutureAddress> waitingConnectionList =
495         Lists.newArrayListWithCapacity(tasks.size() * channelsPerServer);
496     for (TaskInfo taskInfo : tasks) {
497       context.progress();
498       int taskId = taskInfo.getTaskId();
499       InetSocketAddress address = taskIdAddressMap.get(taskId);
500       if (address == null ||
501           !address.getHostName().equals(taskInfo.getHostname()) ||
502           address.getPort() != taskInfo.getPort()) {
503         address = resolveAddress(maxResolveAddressAttempts,
504             taskInfo.getHostOrIp(), taskInfo.getPort());
505         taskIdAddressMap.put(taskId, address);
506       }
507       if (address == null || address.getHostName() == null ||
508           address.getHostName().isEmpty()) {
509         throw new IllegalStateException("connectAllAddresses: Null address " +
510             "in addresses " + tasks);
511       }
512       if (address.isUnresolved()) {
513         throw new IllegalStateException("connectAllAddresses: Unresolved " +
514             "address " + address);
515       }
516 
517       if (addressChannelMap.containsKey(address)) {
518         continue;
519       }
520 
521       // Start connecting to the remote server up to n time
522       for (int i = 0; i < channelsPerServer; ++i) {
523         ChannelFuture connectionFuture = bootstrap.connect(address);
524 
525         waitingConnectionList.add(
526             new ChannelFutureAddress(
527                 connectionFuture, address, taskId));
528       }
529     }
530 
531     // Wait for all the connections to succeed up to n tries
532     int failures = 0;
533     int connected = 0;
534     while (failures < maxConnectionFailures) {
535       List<ChannelFutureAddress> nextCheckFutures = Lists.newArrayList();
536       boolean isFirstFailure = true;
537       for (ChannelFutureAddress waitingConnection : waitingConnectionList) {
538         context.progress();
539         ChannelFuture future = waitingConnection.future;
540         ProgressableUtils.awaitChannelFuture(future, context);
541         if (!future.isSuccess() || !future.channel().isOpen()) {
542           // Make a short pause before trying to reconnect failed addresses
543           // again, but to do it just once per iterating through channels
544           if (isFirstFailure) {
545             isFirstFailure = false;
546             try {
547               Thread.sleep(waitTimeBetweenConnectionRetriesMs);
548             } catch (InterruptedException e) {
549               throw new IllegalStateException(
550                   "connectAllAddresses: InterruptedException occurred", e);
551             }
552           }
553 
554           LOG.warn("connectAllAddresses: Future failed " +
555               "to connect with " + waitingConnection.address + " with " +
556               failures + " failures because of " + future.cause());
557 
558           ChannelFuture connectionFuture =
559               bootstrap.connect(waitingConnection.address);
560           nextCheckFutures.add(new ChannelFutureAddress(connectionFuture,
561               waitingConnection.address, waitingConnection.taskId));
562           ++failures;
563         } else {
564           Channel channel = future.channel();
565           if (LOG.isDebugEnabled()) {
566             LOG.debug("connectAllAddresses: Connected to " +
567                 channel.remoteAddress() + ", open = " + channel.isOpen());
568           }
569 
570           if (channel.remoteAddress() == null) {
571             throw new IllegalStateException(
572                 "connectAllAddresses: Null remote address!");
573           }
574 
575           ChannelRotater rotater =
576               addressChannelMap.get(waitingConnection.address);
577           if (rotater == null) {
578             ChannelRotater newRotater =
579                 new ChannelRotater(waitingConnection.taskId,
580                     waitingConnection.address);
581             rotater = addressChannelMap.putIfAbsent(
582                 waitingConnection.address, newRotater);
583             if (rotater == null) {
584               rotater = newRotater;
585             }
586           }
587           rotater.addChannel(future.channel());
588           ++connected;
589         }
590       }
591       LOG.info("connectAllAddresses: Successfully added " +
592           (waitingConnectionList.size() - nextCheckFutures.size()) +
593           " connections, (" + connected + " total connected) " +
594           nextCheckFutures.size() + " failed, " +
595           failures + " failures total.");
596       if (nextCheckFutures.isEmpty()) {
597         break;
598       }
599       waitingConnectionList = nextCheckFutures;
600     }
601     if (failures >= maxConnectionFailures) {
602       throw new IllegalStateException(
603           "connectAllAddresses: Too many failures (" + failures + ").");
604     }
605   }
606 
607 /*if_not[HADOOP_NON_SECURE]*/
608   /**
609    * Authenticate all servers in addressChannelMap.
610    */
611   public void authenticate() {
612     LOG.info("authenticate: NettyClient starting authentication with " +
613         "servers.");
614     for (InetSocketAddress address: addressChannelMap.keySet()) {
615       if (LOG.isDebugEnabled()) {
616         LOG.debug("authenticate: Authenticating with address:" + address);
617       }
618       ChannelRotater channelRotater = addressChannelMap.get(address);
619       for (Channel channel: channelRotater.getChannels()) {
620         if (LOG.isDebugEnabled()) {
621           LOG.debug("authenticate: Authenticating with server on channel: " +
622               channel);
623         }
624         authenticateOnChannel(channelRotater.getTaskId(), channel);
625       }
626     }
627     if (LOG.isInfoEnabled()) {
628       LOG.info("authenticate: NettyClient successfully authenticated with " +
629           addressChannelMap.size() + " server" +
630           ((addressChannelMap.size() != 1) ? "s" : "") +
631           " - continuing with normal work.");
632     }
633   }
634 
635   /**
636    * Authenticate with server connected at given channel.
637    *
638    * @param taskId Task id of the channel
639    * @param channel Connection to server to authenticate with.
640    */
641   private void authenticateOnChannel(Integer taskId, Channel channel) {
642     try {
643       SaslNettyClient saslNettyClient = channel.attr(SASL).get();
644       if (channel.attr(SASL).get() == null) {
645         if (LOG.isDebugEnabled()) {
646           LOG.debug("authenticateOnChannel: Creating saslNettyClient now " +
647               "for channel: " + channel);
648         }
649         saslNettyClient = new SaslNettyClient();
650         channel.attr(SASL).set(saslNettyClient);
651       }
652       if (!saslNettyClient.isComplete()) {
653         if (LOG.isDebugEnabled()) {
654           LOG.debug("authenticateOnChannel: Waiting for authentication " +
655               "to complete..");
656         }
657         SaslTokenMessageRequest saslTokenMessage = saslNettyClient.firstToken();
658         sendWritableRequest(taskId, saslTokenMessage);
659         // We now wait for Netty's thread pool to communicate over this
660         // channel to authenticate with another worker acting as a server.
661         try {
662           synchronized (saslNettyClient.getAuthenticated()) {
663             while (!saslNettyClient.isComplete()) {
664               saslNettyClient.getAuthenticated().wait();
665             }
666           }
667         } catch (InterruptedException e) {
668           LOG.error("authenticateOnChannel: Interrupted while waiting for " +
669               "authentication.");
670         }
671       }
672       if (LOG.isDebugEnabled()) {
673         LOG.debug("authenticateOnChannel: Authentication on channel: " +
674             channel + " has completed successfully.");
675       }
676     } catch (IOException e) {
677       LOG.error("authenticateOnChannel: Failed to authenticate with server " +
678           "due to error: " + e);
679     }
680     return;
681   }
682 /*end[HADOOP_NON_SECURE]*/
683 
684   /**
685    * Stop the client.
686    */
687   public void stop() {
688     if (LOG.isInfoEnabled()) {
689       LOG.info("stop: Halting netty client");
690     }
691     // Close connections asynchronously, in a Netty-approved
692     // way, without cleaning up thread pools until all channels
693     // in addressChannelMap are closed (success or failure)
694     int channelCount = 0;
695     for (ChannelRotater channelRotater : addressChannelMap.values()) {
696       channelCount += channelRotater.size();
697     }
698     final int done = channelCount;
699     final AtomicInteger count = new AtomicInteger(0);
700     for (ChannelRotater channelRotater : addressChannelMap.values()) {
701       channelRotater.closeChannels(new ChannelFutureListener() {
702         @Override
703         public void operationComplete(ChannelFuture cf) {
704           context.progress();
705           if (count.incrementAndGet() == done) {
706             if (LOG.isInfoEnabled()) {
707               LOG.info("stop: reached wait threshold, " +
708                   done + " connections closed, releasing " +
709                   "resources now.");
710             }
711             workerGroup.shutdownGracefully();
712             if (executionGroup != null) {
713               executionGroup.shutdownGracefully();
714             }
715           }
716         }
717       });
718     }
719     ProgressableUtils.awaitTerminationFuture(workerGroup, context);
720     if (executionGroup != null) {
721       ProgressableUtils.awaitTerminationFuture(executionGroup, context);
722     }
723     if (LOG.isInfoEnabled()) {
724       LOG.info("stop: Netty client halted");
725     }
726   }
727 
728   /**
729    * Get the next available channel, reconnecting if necessary
730    *
731    * @param remoteServer Remote server to get a channel for
732    * @return Available channel for this remote server
733    */
734   private Channel getNextChannel(InetSocketAddress remoteServer) {
735     Channel channel = addressChannelMap.get(remoteServer).nextChannel();
736     if (channel == null) {
737       throw new IllegalStateException(
738           "getNextChannel: No channel exists for " + remoteServer);
739     }
740 
741     // Return this channel if it is connected
742     if (channel.isActive()) {
743       return channel;
744     }
745 
746     // Get rid of the failed channel
747     if (addressChannelMap.get(remoteServer).removeChannel(channel)) {
748       LOG.warn("getNextChannel: Unlikely event that the channel " +
749           channel + " was already removed!");
750     }
751     if (LOG.isInfoEnabled()) {
752       LOG.info("getNextChannel: Fixing disconnected channel to " +
753           remoteServer + ", open = " + channel.isOpen() + ", " +
754           "bound = " + channel.isRegistered());
755     }
756     int reconnectFailures = 0;
757     while (reconnectFailures < maxConnectionFailures) {
758       ChannelFuture connectionFuture = bootstrap.connect(remoteServer);
759       try {
760         ProgressableUtils.awaitChannelFuture(connectionFuture, context);
761       } catch (BlockingOperationException e) {
762         LOG.warn("getNextChannel: Failed connecting to " + remoteServer, e);
763       }
764       if (connectionFuture.isSuccess()) {
765         if (LOG.isInfoEnabled()) {
766           LOG.info("getNextChannel: Connected to " + remoteServer + "!");
767         }
768         addressChannelMap.get(remoteServer).addChannel(
769             connectionFuture.channel());
770         return connectionFuture.channel();
771       }
772       ++reconnectFailures;
773       LOG.warn("getNextChannel: Failed to reconnect to " +  remoteServer +
774           " on attempt " + reconnectFailures + " out of " +
775           maxConnectionFailures + " max attempts, sleeping for 5 secs",
776           connectionFuture.cause());
777       ThreadUtils.trySleep(5000);
778     }
779     throw new IllegalStateException("getNextChannel: Failed to connect " +
780         "to " + remoteServer + " in " + reconnectFailures +
781         " connect attempts");
782   }
783 
784   /**
785    * Send a request to a remote server honoring the flow control mechanism
786    * (should be already connected)
787    *
788    * @param destTaskId Destination task id
789    * @param request Request to send
790    */
791   public void sendWritableRequest(int destTaskId, WritableRequest request) {
792     flowControl.sendRequest(destTaskId, request);
793   }
794 
795   /**
796    * Actual send of a request.
797    *
798    * @param destTaskId destination to send the request to
799    * @param request request itself
800    * @return request id generated for sending the request
801    */
802   public Long doSend(int destTaskId, WritableRequest request) {
803     InetSocketAddress remoteServer = taskIdAddressMap.get(destTaskId);
804     if (clientRequestIdRequestInfoMap.isEmpty()) {
805       inboundByteCounter.resetAll();
806       outboundByteCounter.resetAll();
807     }
808     boolean registerRequest = true;
809     Long requestId = null;
810 /*if_not[HADOOP_NON_SECURE]*/
811     if (request.getType() == RequestType.SASL_TOKEN_MESSAGE_REQUEST) {
812       registerRequest = false;
813     }
814 /*end[HADOOP_NON_SECURE]*/
815 
816     RequestInfo newRequestInfo = new RequestInfo(remoteServer, request);
817     if (registerRequest) {
818       request.setClientId(myTaskInfo.getTaskId());
819       requestId = taskRequestIdGenerator.getNextRequestId(destTaskId);
820       request.setRequestId(requestId);
821       ClientRequestId clientRequestId =
822         new ClientRequestId(destTaskId, request.getRequestId());
823       RequestInfo oldRequestInfo = clientRequestIdRequestInfoMap.putIfAbsent(
824         clientRequestId, newRequestInfo);
825       if (oldRequestInfo != null) {
826         throw new IllegalStateException("sendWritableRequest: Impossible to " +
827           "have a previous request id = " + request.getRequestId() + ", " +
828           "request info of " + oldRequestInfo);
829       }
830     }
831     if (request.getSerializedSize() >
832         requestSizeWarningThreshold * sendBufferSize) {
833       LOG.warn("Creating large request of type " + request.getClass() +
834         ", size " + request.getSerializedSize() +
835         " bytes. Check netty buffer size.");
836     }
837     writeRequestToChannel(newRequestInfo);
838     return requestId;
839   }
840 
841   /**
842    * Write request to a channel for its destination
843    *
844    * @param requestInfo Request info
845    */
846   private void writeRequestToChannel(RequestInfo requestInfo) {
847     Channel channel = getNextChannel(requestInfo.getDestinationAddress());
848     ChannelFuture writeFuture = channel.write(requestInfo.getRequest());
849     requestInfo.setWriteFuture(writeFuture);
850     writeFuture.addListener(logErrorListener);
851   }
852 
853   /**
854    * Handle receipt of a message. Called by response handler.
855    *
856    * @param senderId Id of sender of the message
857    * @param requestId Id of the request
858    * @param response Actual response
859    * @param shouldDrop Drop the message?
860    */
861   public void messageReceived(int senderId, long requestId, int response,
862       boolean shouldDrop) {
863     if (shouldDrop) {
864       synchronized (clientRequestIdRequestInfoMap) {
865         clientRequestIdRequestInfoMap.notifyAll();
866       }
867       return;
868     }
869     AckSignalFlag responseFlag = flowControl.getAckSignalFlag(response);
870     if (responseFlag == AckSignalFlag.DUPLICATE_REQUEST) {
871       LOG.info("messageReceived: Already completed request (taskId = " +
872           senderId + ", requestId = " + requestId + ")");
873     } else if (responseFlag != AckSignalFlag.NEW_REQUEST) {
874       throw new IllegalStateException(
875           "messageReceived: Got illegal response " + response);
876     }
877     RequestInfo requestInfo = clientRequestIdRequestInfoMap
878         .remove(new ClientRequestId(senderId, requestId));
879     if (requestInfo == null) {
880       LOG.info("messageReceived: Already received response for (taskId = " +
881           senderId + ", requestId = " + requestId + ")");
882     } else {
883       if (LOG.isDebugEnabled()) {
884         LOG.debug("messageReceived: Completed (taskId = " + senderId + ")" +
885             requestInfo + ".  Waiting on " +
886             clientRequestIdRequestInfoMap.size() + " requests");
887       }
888       flowControl.messageAckReceived(senderId, requestId, response);
889       // Help #waitAllRequests() to finish faster
890       synchronized (clientRequestIdRequestInfoMap) {
891         clientRequestIdRequestInfoMap.notifyAll();
892       }
893     }
894   }
895 
896   /**
897    * Ensure all the request sent so far are complete. Periodically check the
898    * state of current open requests. If there is an issue in any of them,
899    * re-send the request.
900    */
901   public void waitAllRequests() {
902     flowControl.waitAllRequests();
903     checkState(flowControl.getNumberOfUnsentRequests() == 0);
904     while (clientRequestIdRequestInfoMap.size() > 0) {
905       // Wait for requests to complete for some time
906       synchronized (clientRequestIdRequestInfoMap) {
907         if (clientRequestIdRequestInfoMap.size() == 0) {
908           break;
909         }
910         try {
911           clientRequestIdRequestInfoMap.wait(waitingRequestMsecs);
912         } catch (InterruptedException e) {
913           throw new IllegalStateException("waitAllRequests: Got unexpected " +
914               "InterruptedException", e);
915         }
916       }
917       logAndSanityCheck();
918     }
919     if (LOG.isInfoEnabled()) {
920       LOG.info("waitAllRequests: Finished all requests. " +
921           inboundByteCounter.getMetrics() + "\n" + outboundByteCounter
922           .getMetrics());
923     }
924   }
925 
926   /**
927    * Log information about the requests and check for problems in requests
928    */
929   public void logAndSanityCheck() {
930     logInfoAboutOpenRequests();
931     // Make sure that waiting doesn't kill the job
932     context.progress();
933   }
934 
935   /**
936    * Log the status of open requests.
937    */
938   private void logInfoAboutOpenRequests() {
939     if (LOG.isInfoEnabled() && requestLogger.isPrintable()) {
940       LOG.info("logInfoAboutOpenRequests: Waiting interval of " +
941           waitingRequestMsecs + " msecs, " +
942           clientRequestIdRequestInfoMap.size() +
943           " open requests, " + inboundByteCounter.getMetrics() + "\n" +
944           outboundByteCounter.getMetrics());
945 
946       if (clientRequestIdRequestInfoMap.size() < MAX_REQUESTS_TO_LIST) {
947         for (Map.Entry<ClientRequestId, RequestInfo> entry :
948             clientRequestIdRequestInfoMap.entrySet()) {
949           LOG.info("logInfoAboutOpenRequests: Waiting for request " +
950               entry.getKey() + " - " + entry.getValue());
951         }
952       }
953 
954       // Count how many open requests each task has
955       Map<Integer, Integer> openRequestCounts = Maps.newHashMap();
956       for (ClientRequestId clientRequestId :
957           clientRequestIdRequestInfoMap.keySet()) {
958         int taskId = clientRequestId.getDestinationTaskId();
959         Integer currentCount = openRequestCounts.get(taskId);
960         openRequestCounts.put(taskId,
961             (currentCount == null ? 0 : currentCount) + 1);
962       }
963       // Sort it in decreasing order of number of open requests
964       List<Map.Entry<Integer, Integer>> sorted =
965           Lists.newArrayList(openRequestCounts.entrySet());
966       Collections.sort(sorted, new Comparator<Map.Entry<Integer, Integer>>() {
967         @Override
968         public int compare(Map.Entry<Integer, Integer> entry1,
969             Map.Entry<Integer, Integer> entry2) {
970           int value1 = entry1.getValue();
971           int value2 = entry2.getValue();
972           return (value1 < value2) ? 1 : ((value1 == value2) ? 0 : -1);
973         }
974       });
975       // Print task ids which have the most open requests
976       StringBuilder message = new StringBuilder();
977       message.append("logInfoAboutOpenRequests: ");
978       int itemsToPrint =
979           Math.min(MAX_DESTINATION_TASK_IDS_TO_LIST, sorted.size());
980       for (int i = 0; i < itemsToPrint; i++) {
981         message.append(sorted.get(i).getValue())
982             .append(" requests for taskId=")
983             .append(sorted.get(i).getKey())
984             .append(", ");
985       }
986       LOG.info(message);
987       flowControl.logInfo();
988     }
989   }
990 
991   /**
992    * Check if there are some open requests which have been sent a long time
993    * ago, and if so resend them.
994    */
995   private void checkRequestsForProblems() {
996     long lastTimeChecked = lastTimeCheckedRequestsForProblems.get();
997     // If not enough time passed from the previous check, return
998     if (System.currentTimeMillis() < lastTimeChecked + waitingRequestMsecs) {
999       return;
1000     }
1001     // If another thread did the check already, return
1002     if (!lastTimeCheckedRequestsForProblems.compareAndSet(lastTimeChecked,
1003         System.currentTimeMillis())) {
1004       return;
1005     }
1006     resendRequestsWhenNeeded(new Predicate<RequestInfo>() {
1007       @Override
1008       public boolean apply(RequestInfo requestInfo) {
1009         // If the request is taking too long, re-establish and resend
1010         return requestInfo.getElapsedMsecs() > maxRequestMilliseconds;
1011       }
1012     }, networkRequestsResentForTimeout, resendTimedOutRequests);
1013     resendRequestsWhenNeeded(new Predicate<RequestInfo>() {
1014       @Override
1015       public boolean apply(RequestInfo requestInfo) {
1016         ChannelFuture writeFuture = requestInfo.getWriteFuture();
1017         // If not connected anymore or request failed re-establish and resend
1018         return writeFuture != null && (!writeFuture.channel().isActive() ||
1019             (writeFuture.isDone() && !writeFuture.isSuccess()));
1020       }
1021     }, networkRequestsResentForConnectionFailure, true);
1022   }
1023 
1024   /**
1025    * Resend requests which satisfy predicate
1026    *  @param shouldResendRequestPredicate Predicate to use to check whether
1027    *                                     request should be resent
1028    * @param counter Counter to increment for every resent network request
1029    * @param resendProblematicRequest Whether to resend problematic request or
1030    *                                fail the job if such request is found
1031    */
1032   private void resendRequestsWhenNeeded(
1033       Predicate<RequestInfo> shouldResendRequestPredicate,
1034       GiraphHadoopCounter counter,
1035       boolean resendProblematicRequest) {
1036     // Check if there are open requests which have been sent a long time ago,
1037     // and if so, resend them.
1038     List<ClientRequestId> addedRequestIds = Lists.newArrayList();
1039     List<RequestInfo> addedRequestInfos = Lists.newArrayList();
1040     // Check all the requests for problems
1041     for (Map.Entry<ClientRequestId, RequestInfo> entry :
1042         clientRequestIdRequestInfoMap.entrySet()) {
1043       RequestInfo requestInfo = entry.getValue();
1044       // If request should be resent
1045       if (shouldResendRequestPredicate.apply(requestInfo)) {
1046         if (!resendProblematicRequest) {
1047           throw new IllegalStateException("Problem with request id " +
1048               entry.getKey() + " for " + requestInfo.getDestinationAddress() +
1049               ", failing the job");
1050         }
1051         ChannelFuture writeFuture = requestInfo.getWriteFuture();
1052         String logMessage;
1053         if (writeFuture == null) {
1054           logMessage = "wasn't sent successfully";
1055         } else {
1056           logMessage = "connected = " +
1057               writeFuture.channel().isActive() +
1058               ", future done = " + writeFuture.isDone() + ", " +
1059               "success = " + writeFuture.isSuccess() + ", " +
1060               "cause = " + writeFuture.cause() + ", " +
1061               "channelId = " + writeFuture.channel().hashCode();
1062         }
1063         LOG.warn("checkRequestsForProblems: Problem with request id " +
1064             entry.getKey() + ", " + logMessage + ", " +
1065             "elapsed time = " + requestInfo.getElapsedMsecs() + ", " +
1066             "destination = " + requestInfo.getDestinationAddress() +
1067             " " + requestInfo);
1068         addedRequestIds.add(entry.getKey());
1069         addedRequestInfos.add(new RequestInfo(
1070             requestInfo.getDestinationAddress(), requestInfo.getRequest()));
1071         counter.increment();
1072       }
1073     }
1074 
1075     // Add any new requests to the system, connect if necessary, and re-send
1076     for (int i = 0; i < addedRequestIds.size(); ++i) {
1077       ClientRequestId requestId = addedRequestIds.get(i);
1078       RequestInfo requestInfo = addedRequestInfos.get(i);
1079 
1080       if (clientRequestIdRequestInfoMap.put(requestId, requestInfo) == null) {
1081         LOG.warn("checkRequestsForProblems: Request " + requestId +
1082             " completed prior to sending the next request");
1083         clientRequestIdRequestInfoMap.remove(requestId);
1084       }
1085       if (LOG.isInfoEnabled()) {
1086         LOG.info("checkRequestsForProblems: Re-issuing request " + requestInfo);
1087       }
1088       writeRequestToChannel(requestInfo);
1089       if (LOG.isInfoEnabled()) {
1090         LOG.info("checkRequestsForProblems: Request " + requestId +
1091             " was resent through channelId=" +
1092             requestInfo.getWriteFuture().channel().hashCode());
1093       }
1094     }
1095     addedRequestIds.clear();
1096     addedRequestInfos.clear();
1097   }
1098 
1099   /**
1100    * Utility method for resolving addresses
1101    *
1102    * @param maxResolveAddressAttempts Maximum number of attempts to resolve the
1103    *        address
1104    * @param hostOrIp Known IP or host name
1105    * @param port Target port number
1106    * @return The successfully resolved address.
1107    * @throws IllegalStateException if the address is not resolved
1108    *         in <code>maxResolveAddressAttempts</code> tries.
1109    */
1110   private static InetSocketAddress resolveAddress(
1111       int maxResolveAddressAttempts, String hostOrIp, int port) {
1112     int resolveAttempts = 0;
1113     InetSocketAddress address = new InetSocketAddress(hostOrIp, port);
1114     while (address.isUnresolved() &&
1115         resolveAttempts < maxResolveAddressAttempts) {
1116       ++resolveAttempts;
1117       LOG.warn("resolveAddress: Failed to resolve " + address +
1118           " on attempt " + resolveAttempts + " of " +
1119           maxResolveAddressAttempts + " attempts, sleeping for 5 seconds");
1120       ThreadUtils.trySleep(5000);
1121       address = new InetSocketAddress(hostOrIp,
1122           address.getPort());
1123     }
1124     if (resolveAttempts >= maxResolveAddressAttempts) {
1125       throw new IllegalStateException("resolveAddress: Couldn't " +
1126           "resolve " + address + " in " +  resolveAttempts + " tries.");
1127     }
1128     return address;
1129   }
1130 
1131   public FlowControl getFlowControl() {
1132     return flowControl;
1133   }
1134 
1135   /**
1136    * Generate and get the next request id to be used for a given worker
1137    *
1138    * @param taskId id of the worker to generate the next request id
1139    * @return request id
1140    */
1141   public Long getNextRequestId(int taskId) {
1142     return taskRequestIdGenerator.getNextRequestId(taskId);
1143   }
1144 
1145   /**
1146    * @return number of open requests
1147    */
1148   public int getNumberOfOpenRequests() {
1149     return clientRequestIdRequestInfoMap.size();
1150   }
1151 
1152   /**
1153    * Resend requests related to channel which failed
1154    *
1155    * @param channel Channel which failed
1156    */
1157   private void checkRequestsAfterChannelFailure(final Channel channel) {
1158     resendRequestsWhenNeeded(new Predicate<RequestInfo>() {
1159       @Override
1160       public boolean apply(RequestInfo requestInfo) {
1161         if (requestInfo.getWriteFuture() == null ||
1162             requestInfo.getWriteFuture().channel() == null) {
1163           return false;
1164         }
1165         return requestInfo.getWriteFuture().channel().equals(channel);
1166       }
1167     }, networkRequestsResentForChannelFailure, true);
1168   }
1169 
1170   /**
1171    * This listener class just dumps exception stack traces if
1172    * something happens.
1173    */
1174   private class LogOnErrorChannelFutureListener
1175       implements ChannelFutureListener {
1176 
1177     @Override
1178     public void operationComplete(ChannelFuture future) throws Exception {
1179       if (future.isDone() && !future.isSuccess()) {
1180         LOG.error("Channel failed channelId=" + future.channel().hashCode(),
1181             future.cause());
1182         checkRequestsAfterChannelFailure(future.channel());
1183       }
1184     }
1185   }
1186 }