View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.flow_control;
20  
21  import com.google.common.collect.Lists;
22  import com.google.common.collect.Maps;
23  import com.google.common.collect.Sets;
24  import org.apache.commons.lang3.tuple.MutablePair;
25  import org.apache.commons.lang3.tuple.Pair;
26  import org.apache.giraph.comm.netty.NettyClient;
27  import org.apache.giraph.comm.netty.handler.AckSignalFlag;
28  import org.apache.giraph.comm.requests.SendResumeRequest;
29  import org.apache.giraph.comm.requests.WritableRequest;
30  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
31  import org.apache.giraph.conf.IntConfOption;
32  import org.apache.giraph.utils.AdjustableSemaphore;
33  import org.apache.giraph.utils.CallableFactory;
34  import org.apache.giraph.utils.LogStacktraceCallable;
35  import org.apache.giraph.utils.ThreadUtils;
36  import org.apache.log4j.Logger;
37  
38  import java.util.ArrayDeque;
39  import java.util.ArrayList;
40  import java.util.Collections;
41  import java.util.Comparator;
42  import java.util.Deque;
43  import java.util.Map;
44  import java.util.Set;
45  import java.util.concurrent.Callable;
46  import java.util.concurrent.ConcurrentMap;
47  import java.util.concurrent.ExecutionException;
48  import java.util.concurrent.ExecutorService;
49  import java.util.concurrent.Executors;
50  import java.util.concurrent.Future;
51  import java.util.concurrent.Semaphore;
52  import java.util.concurrent.TimeUnit;
53  import java.util.concurrent.atomic.AtomicInteger;
54  
55  import static com.google.common.base.Preconditions.checkState;
56  import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS;
57  
58  /**
59   * Representation of credit-based flow control policy. With this policy there
60   * can be limited number of open requests from any worker x to any other worker
61   * y. This number is called 'credit'. Let's denote this number by C{x->y}.
62   * This implementation assumes that for a particular worker W, all values of
63   * C{x->W} are the same. Let's denote this value by CR_W. CR_W may change
64   * due to other reasons (e.g. memory pressure observed in an out-of-core
65   * mechanism). However, CR_W is always in range [0, MAX_CR], where MAX_CR
66   * is a user-defined constant. Note that MAX_CR should be representable by
67   * at most 14 bits.
68   *
69   * In this implementation, the value of CR_W is announced to other workers along
70   * with the ACK response envelope for all ACK response envelope going out of W.
71   * Therefore, for non-zero values of CR_W, other workers know the instant value
72   * of CR_W, hence they can control the number of open requests they have to W.
73   * However, it is possible that W announces 0 as CR_W. In this case, other
74   * workers stop opening more requests to W, hence they will not get any new
75   * response envelope from W. This means other workers should be notified with
76   * a dedicated request to resume sending more requests once CR_W becomes
77   * non-zero. In this implementation, once W_CR is announced as 0 to a particular
78   * worker U, we keep U in a set, so later on we can send 'resume signal' to U
79   * once CR_W becomes non-zero. Sending resume signals are done through a
80   * separate thread.
81   */
82  public class CreditBasedFlowControl implements FlowControl {
83    /**
84     * Maximum number of requests we can have per worker without confirmation
85     * (i.e. open requests)
86     */
87    public static final IntConfOption MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER =
88        new IntConfOption("giraph.maxOpenRequestsPerWorker", 20,
89            "Maximum number of requests without confirmation we can have per " +
90                "worker");
91    /** Aggregate number of in-memory unsent requests */
92    public static final IntConfOption MAX_NUM_OF_UNSENT_REQUESTS =
93        new IntConfOption("giraph.maxNumberOfUnsentRequests", 2000,
94            "Maximum number of unsent requests we can keep in memory");
95    /**
96     * Time interval to wait on unsent requests cahce until we find a spot in it
97     */
98    public static final IntConfOption UNSENT_CACHE_WAIT_INTERVAL =
99        new IntConfOption("giraph.unsentCacheWaitInterval", 1000,
100           "Time interval to wait on unsent requests cache (in milliseconds)");
101   /** Class logger */
102   private static final Logger LOG =
103       Logger.getLogger(CreditBasedFlowControl.class);
104 
105   /** Waiting interval on unsent requests cache until it frees up */
106   private final int unsentWaitMsecs;
107   /** Waiting interval for checking outstanding requests msecs */
108   private final int waitingRequestMsecs;
109   /**
110    * Maximum number of open requests each worker can have to this work at each
111    * moment (CR_W -define above- for this worker)
112    */
113   private volatile short maxOpenRequestsPerWorker;
114   /** Total number of unsent, cached requests */
115   private final AtomicInteger aggregateUnsentRequests = new AtomicInteger(0);
116   /**
117    * Map of requests permits per worker. Keys in the map are worker ids and
118    * values are pairs (X, Y) where:
119    *   X: is the semaphore to control the number of open requests for a
120    *      particular worker. Basically, the number of available permits on a
121    *      semaphore is the credit available for the worker associated with that
122    *      semaphore.
123    *   Y: is the timestamp of the latest message (resume signal or ACK response)
124    *      that changed the number of permits in the semaphore.
125    * The idea behind keeping a timestamp is to avoid any issue that may happen
126    * due to out-of-order message delivery. For example, consider this scenario:
127    * an ACK response is sent to a worker announcing the credit is 0. Later on,
128    * a resume signal announcing a non-zero credit is sent to the same worker.
129    * Now, if the resume signal receives before the ACK message, the worker
130    * would incorrectly assume credit value of 0, and would avoid sending any
131    * messages, which may lead to a live-lock.
132    *
133    * The timestamp value is simply the request id generated by NettyClient.
134    * These ids are generated in consecutive order, hence simulating the concept
135    * of timestamp. However, the timestamp value should be sent along with
136    * any ACK response envelope. The ACK response envelope is already very small
137    * (maybe 10-20 bytes). So, the timestamp value should not add much overhead
138    * to it. Instead of sending the whole long value request id (8 bytes) as the
139    * timestamp, we can simply send the least significant 2 bytes of it. This is
140    * a valid timestamp, as the credit value can be 0x3FFF (=16383) at most. This
141    * means there will be at most 0x3FFF messages on the fly at each moment,
142    * which means that the timestamp value sent by all messages in fly will fall
143    * into a range of size 0x3FFF. So, it is enough to only consider timestamp
144    * values twice as big as the mentioned range to be able to accurately
145    * determine ordering even when values wrap around. This means we only need to
146    * consider 15 least significant bits of request ids as timestamp values.
147    *
148    * The ACK response value contains following information (from least
149    * significant to most significant):
150    *  - 16 bits timestamp
151    *  - 14 bits credit value
152    *  - 1 bit specifying whether one end of communication is master and hence
153    *    credit based flow control should be ignored
154    *  - 1 bit response flag
155    */
156   private final ConcurrentMap<Integer, Pair<AdjustableSemaphore, Integer>>
157       perWorkerOpenRequestMap = Maps.newConcurrentMap();
158   /** Map of unsent cached requests per worker */
159   private final ConcurrentMap<Integer, Deque<WritableRequest>>
160       perWorkerUnsentRequestMap = Maps.newConcurrentMap();
161   /**
162    * Set of workers that should be notified to resume sending more requests if
163    * the credit becomes non-zero
164    */
165   private final Set<Integer> workersToResume = Sets.newHashSet();
166   /**
167    * Resume signals are not using any credit, so they should be treated
168    * differently than normal requests. Resume signals should be ignored in
169    * accounting for credits in credit-based flow control. The following map
170    * keeps sets of request ids, for resume signals sent to other workers that
171    * are still "open". The set of request ids used for resume signals for a
172    * worker is important so we can determine if a received response is for a
173    * resume signal or not.
174    */
175   private final ConcurrentMap<Integer, Set<Long>> resumeRequestsId =
176       Maps.newConcurrentMap();
177   /**
178    * Semaphore to control number of cached unsent requests. Maximum number of
179    * permits of this semaphore should be equal to MAX_NUM_OF_UNSENT_REQUESTS.
180    */
181   private final Semaphore unsentRequestPermit;
182   /** Netty client used for sending requests */
183   private final NettyClient nettyClient;
184   /**
185    * Result of execution for the thread responsible for sending resume signals
186    */
187   private final Future<Void> resumeThreadResult;
188   /** Whether we are shutting down the execution */
189   private volatile boolean shouldTerminate;
190 
191   /**
192    * Constructor
193    * @param conf configuration
194    * @param nettyClient netty client
195    */
196   public CreditBasedFlowControl(ImmutableClassesGiraphConfiguration conf,
197                                 NettyClient nettyClient) {
198     this.nettyClient = nettyClient;
199     maxOpenRequestsPerWorker =
200         (short) MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER.get(conf);
201     checkState(maxOpenRequestsPerWorker < 0x4000 &&
202         maxOpenRequestsPerWorker > 0, "NettyClient: max number of open " +
203         "requests should be in range (0, " + 0x4FFF + ")");
204     unsentRequestPermit = new Semaphore(MAX_NUM_OF_UNSENT_REQUESTS.get(conf));
205     unsentWaitMsecs = UNSENT_CACHE_WAIT_INTERVAL.get(conf);
206     waitingRequestMsecs = WAITING_REQUEST_MSECS.get(conf);
207     shouldTerminate = false;
208     CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
209       @Override
210       public Callable<Void> newCallable(int callableId) {
211         return new Callable<Void>() {
212           @Override
213           public Void call() throws Exception {
214             while (true) {
215               synchronized (workersToResume) {
216                 if (shouldTerminate) {
217                   break;
218                 }
219                 for (Integer workerId : workersToResume) {
220                   if (maxOpenRequestsPerWorker != 0) {
221                     sendResumeSignal(workerId);
222                   } else {
223                     break;
224                   }
225                 }
226                 try {
227                   workersToResume.wait();
228                 } catch (InterruptedException e) {
229                   throw new IllegalStateException("call: caught exception " +
230                       "while waiting for resume-sender thread to be notified!",
231                       e);
232                 }
233               }
234             }
235             return null;
236           }
237         };
238       }
239     };
240 
241     ExecutorService executor = Executors.newSingleThreadExecutor(
242         ThreadUtils.createThreadFactory("resume-sender"));
243     resumeThreadResult = executor.submit(new LogStacktraceCallable<>(
244         callableFactory.newCallable(0)));
245     executor.shutdown();
246   }
247 
248   /**
249    * Send resume signal request to a given worker
250    *
251    * @param workerId id of the worker to send the resume signal to
252    */
253   private void sendResumeSignal(int workerId) {
254     WritableRequest request = new SendResumeRequest(maxOpenRequestsPerWorker);
255     Long resumeId = nettyClient.doSend(workerId, request);
256     checkState(resumeId != null);
257     if (LOG.isDebugEnabled()) {
258       LOG.debug("sendResumeSignal: sending signal to worker " + workerId +
259           " with credit=" + maxOpenRequestsPerWorker + ", ID=" +
260           (resumeId & 0xFFFF));
261     }
262     resumeRequestsId.get(workerId).add(resumeId);
263   }
264 
265   @Override
266   public void sendRequest(int destTaskId, WritableRequest request) {
267     Pair<AdjustableSemaphore, Integer> pair =
268         perWorkerOpenRequestMap.get(destTaskId);
269     // Check if this is the first time sending a request to a worker. If so, we
270     // should the worker id to necessary bookkeeping data structure.
271     if (pair == null) {
272       pair = new MutablePair<>(
273           new AdjustableSemaphore(maxOpenRequestsPerWorker), -1);
274       Pair<AdjustableSemaphore, Integer> temp =
275           perWorkerOpenRequestMap.putIfAbsent(destTaskId, pair);
276       perWorkerUnsentRequestMap.putIfAbsent(
277           destTaskId, new ArrayDeque<WritableRequest>());
278       resumeRequestsId.putIfAbsent(
279           destTaskId, Sets.<Long>newConcurrentHashSet());
280       if (temp != null) {
281         pair = temp;
282       }
283     }
284     AdjustableSemaphore openRequestPermit = pair.getLeft();
285     // Try to reserve a spot for the request amongst the open requests of
286     // the destination worker.
287     boolean shouldSend = openRequestPermit.tryAcquire();
288     boolean shouldCache = false;
289     while (!shouldSend) {
290       // We should not send the request, and should cache the request instead.
291       // It may be possible that the unsent message cache is also full, so we
292       // should try to acquire a space on the cache, and if there is no extra
293       // space in unsent request cache, we should wait until some space
294       // become available. However, it is possible that during the time we are
295       // waiting on the unsent messages cache, actual buffer for open requests
296       // frees up space.
297       try {
298         shouldCache = unsentRequestPermit.tryAcquire(unsentWaitMsecs,
299             TimeUnit.MILLISECONDS);
300       } catch (InterruptedException e) {
301         throw new IllegalStateException("shouldSend: failed " +
302             "while waiting on the unsent request cache to have some more " +
303             "room for extra unsent requests!");
304       }
305       if (shouldCache) {
306         break;
307       }
308       // We may have an open spot in the meantime that we were waiting on the
309       // unsent requests.
310       shouldSend = openRequestPermit.tryAcquire();
311       if (shouldSend) {
312         break;
313       }
314       // The current thread will be at this point only if it could not make
315       // space amongst open requests for the destination worker and has been
316       // timed-out in trying to acquire a space amongst unsent messages. So,
317       // we should report logs, report progress, and check for request
318       // failures.
319       nettyClient.logAndSanityCheck();
320     }
321     // Either shouldSend == true or shouldCache == true
322     if (shouldCache) {
323       Deque<WritableRequest> unsentRequests =
324           perWorkerUnsentRequestMap.get(destTaskId);
325       // This synchronize block is necessary for the following reason:
326       // Once we are at this point, it means there was no room for this
327       // request to become an open request, hence we have to put it into
328       // unsent cache. Consider the case that since last time we checked if
329       // there is any room for an additional open request so far, all open
330       // requests are delivered and their acknowledgements are also processed.
331       // Now, if we put this request in the unsent cache, it is not being
332       // considered to become an open request, as the only one who checks
333       // on this matter would be the one who receives an acknowledgment for an
334       // open request for the destination worker. So, a lock is necessary
335       // to forcefully serialize the execution if this scenario is about to
336       // happen.
337       synchronized (unsentRequests) {
338         shouldSend = openRequestPermit.tryAcquire();
339         if (!shouldSend) {
340           aggregateUnsentRequests.getAndIncrement();
341           unsentRequests.add(request);
342           return;
343         }
344       }
345       // We found a spot amongst open requests to send this request. So, this
346       // request won't be cached anymore.
347       unsentRequestPermit.release();
348     }
349     nettyClient.doSend(destTaskId, request);
350   }
351 
352   /**
353    * Whether response specifies that credit should be ignored
354    *
355    * @param response response received
356    * @return true iff credit should be ignored, false otherwise
357    */
358   private boolean shouldIgnoreCredit(int response) {
359     return ((short) ((response >> (14 + 16)) & 1)) == 1;
360   }
361 
362   /**
363    * Get the credit from a response
364    *
365    * @param response response received
366    * @return credit from the received response
367    */
368   private short getCredit(int response) {
369     return (short) ((response >> 16) & 0x3FFF);
370   }
371 
372   /**
373    * Get the timestamp from a response
374    *
375    * @param response response received
376    * @return timestamp from the received response
377    */
378   private int getTimestamp(int response) {
379     return response & 0xFFFF;
380   }
381 
382   /**
383    * Get the response flag from a response
384    *
385    * @param response response received
386    * @return AckSignalFlag coming with the response
387    */
388   @Override
389   public AckSignalFlag getAckSignalFlag(int response) {
390     return AckSignalFlag.values()[(response >> (16 + 14 + 1)) & 1];
391   }
392 
393   @Override
394   public int calculateResponse(AckSignalFlag flag, int taskId) {
395     boolean ignoreCredit = nettyClient.masterInvolved(taskId);
396     if (!ignoreCredit && maxOpenRequestsPerWorker == 0) {
397       synchronized (workersToResume) {
398         workersToResume.add(taskId);
399       }
400     }
401     int timestamp = (int) (nettyClient.getNextRequestId(taskId) & 0xFFFF);
402     return (flag.ordinal() << (16 + 14 + 1)) |
403         ((ignoreCredit ? 1 : 0) << (16 + 14)) |
404         (maxOpenRequestsPerWorker << 16) |
405         timestamp;
406   }
407 
408   @Override
409   public void shutdown() {
410     synchronized (workersToResume) {
411       shouldTerminate = true;
412       workersToResume.notifyAll();
413     }
414     try {
415       resumeThreadResult.get();
416     } catch (InterruptedException | ExecutionException e) {
417       throw new IllegalStateException("shutdown: caught exception while" +
418           "getting result of resume-sender thread");
419     }
420   }
421 
422   @Override
423   public void logInfo() {
424     if (LOG.isInfoEnabled()) {
425       // Count how many unsent requests each task has
426       Map<Integer, Integer> unsentRequestCounts = Maps.newHashMap();
427       for (Map.Entry<Integer, Deque<WritableRequest>> entry :
428           perWorkerUnsentRequestMap.entrySet()) {
429         unsentRequestCounts.put(entry.getKey(), entry.getValue().size());
430       }
431       ArrayList<Map.Entry<Integer, Integer>> sorted =
432           Lists.newArrayList(unsentRequestCounts.entrySet());
433       Collections.sort(sorted, new Comparator<Map.Entry<Integer, Integer>>() {
434         @Override
435         public int compare(Map.Entry<Integer, Integer> entry1,
436                            Map.Entry<Integer, Integer> entry2) {
437           int value1 = entry1.getValue();
438           int value2 = entry2.getValue();
439           return (value1 < value2) ? 1 : ((value1 == value2) ? 0 : -1);
440         }
441       });
442       StringBuilder message = new StringBuilder();
443       message.append("logInfo: ").append(aggregateUnsentRequests.get())
444           .append(" unsent requests in total. ");
445       int itemsToPrint = Math.min(10, sorted.size());
446       for (int i = 0; i < itemsToPrint; ++i) {
447         message.append(sorted.get(i).getValue())
448             .append(" unsent requests for taskId=")
449             .append(sorted.get(i).getKey()).append(" (credit=")
450             .append(perWorkerOpenRequestMap.get(sorted.get(i).getKey())
451                 .getKey().getMaxPermits())
452             .append("), ");
453       }
454       LOG.info(message);
455     }
456   }
457 
458   @Override
459   public void waitAllRequests() {
460     while (true) {
461       synchronized (aggregateUnsentRequests) {
462         if (aggregateUnsentRequests.get() == 0) {
463           break;
464         }
465         try {
466           aggregateUnsentRequests.wait(waitingRequestMsecs);
467         } catch (InterruptedException e) {
468           throw new IllegalStateException("waitAllRequests: failed while " +
469               "waiting on open/cached requests");
470         }
471       }
472       if (aggregateUnsentRequests.get() == 0) {
473         break;
474       }
475       nettyClient.logAndSanityCheck();
476     }
477   }
478 
479   @Override
480   public int getNumberOfUnsentRequests() {
481     return aggregateUnsentRequests.get();
482   }
483 
484   @Override
485   public void messageAckReceived(int taskId, long requestId, int response) {
486     boolean ignoreCredit = shouldIgnoreCredit(response);
487     short credit = getCredit(response);
488     int timestamp = getTimestamp(response);
489     MutablePair<AdjustableSemaphore, Integer> pair =
490         (MutablePair<AdjustableSemaphore, Integer>)
491             perWorkerOpenRequestMap.get(taskId);
492     AdjustableSemaphore openRequestPermit = pair.getLeft();
493     // Release a permit on open requests if we received ACK of a request other
494     // than a Resume request (resume requests are always sent regardless of
495     // number of open requests)
496     if (!resumeRequestsId.get(taskId).remove(requestId)) {
497       openRequestPermit.release();
498     } else if (LOG.isDebugEnabled()) {
499       LOG.debug("messageAckReceived: ACK of resume received from " + taskId +
500           " timestamp=" + timestamp);
501     }
502     if (!ignoreCredit) {
503       synchronized (pair) {
504         if (compareTimestamps(timestamp, pair.getRight()) > 0) {
505           pair.setRight(timestamp);
506           openRequestPermit.setMaxPermits(credit);
507         } else if (LOG.isDebugEnabled()) {
508           LOG.debug("messageAckReceived: received out-of-order messages." +
509               "Received timestamp=" + timestamp + " and current timestamp=" +
510               pair.getRight());
511         }
512       }
513     }
514     // Since we received a response and we changed the credit of the sender
515     // client, we may be able to send some more requests to the sender
516     // client. So, we try to send as much request as we can to the sender
517     // client.
518     trySendCachedRequests(taskId);
519   }
520 
521   /**
522    * Try to send as much as cached requests to a given worker
523    *
524    * @param taskId id of the worker to send cached requests to
525    */
526   private void trySendCachedRequests(int taskId) {
527     Deque<WritableRequest> requestDeque =
528         perWorkerUnsentRequestMap.get(taskId);
529     AdjustableSemaphore openRequestPermit =
530         perWorkerOpenRequestMap.get(taskId).getLeft();
531     while (true) {
532       WritableRequest request;
533       synchronized (requestDeque) {
534         request = requestDeque.pollFirst();
535         if (request == null) {
536           break;
537         }
538         // See whether the sender client has any unused credit
539         if (!openRequestPermit.tryAcquire()) {
540           requestDeque.offerFirst(request);
541           break;
542         }
543       }
544       unsentRequestPermit.release();
545       // At this point, we have a request, and we reserved a credit for the
546       // sender client. So, we send the request to the client and update
547       // the state.
548       nettyClient.doSend(taskId, request);
549       if (aggregateUnsentRequests.decrementAndGet() == 0) {
550         synchronized (aggregateUnsentRequests) {
551           aggregateUnsentRequests.notifyAll();
552         }
553       }
554     }
555   }
556 
557   /**
558    * Update the max credit that is announced to other workers
559    *
560    * @param newCredit new credit
561    */
562   public void updateCredit(short newCredit) {
563     newCredit = (short) Math.max(0, Math.min(0x3FFF, newCredit));
564     // Check whether we should send resume signals to some workers
565     if (maxOpenRequestsPerWorker == 0 && newCredit != 0) {
566       maxOpenRequestsPerWorker = newCredit;
567       synchronized (workersToResume) {
568         workersToResume.notifyAll();
569       }
570     } else {
571       maxOpenRequestsPerWorker = newCredit;
572     }
573   }
574 
575   /**
576    * Compare two timestamps accounting for wrap around. Note that the timestamp
577    * values should be in a range that fits into 14 bits values. This means if
578    * the difference of the two given timestamp is large, we are dealing with one
579    * value being wrapped around.
580    *
581    * @param ts1 first timestamp
582    * @param ts2 second timestamp
583    * @return positive value if first timestamp is later than second timestamp,
584    *         negative otherwise
585    */
586   private int compareTimestamps(int ts1, int ts2) {
587     int diff = ts1 - ts2;
588     if (Math.abs(diff) < 0x7FFF) {
589       return diff;
590     } else {
591       return -diff;
592     }
593   }
594 
595   /**
596    * Process a resume signal came from a given worker
597    *
598    * @param clientId id of the worker that sent the signal
599    * @param credit the credit value sent along with the resume signal
600    * @param requestId timestamp (request id) of the resume signal
601    */
602   public void processResumeSignal(int clientId, short credit, long requestId) {
603     int timestamp = (int) (requestId & 0xFFFF);
604     if (LOG.isDebugEnabled()) {
605       LOG.debug("processResumeSignal: resume signal from " + clientId +
606           " with timestamp=" + timestamp);
607     }
608     MutablePair<AdjustableSemaphore, Integer> pair =
609         (MutablePair<AdjustableSemaphore, Integer>)
610             perWorkerOpenRequestMap.get(clientId);
611     synchronized (pair) {
612       if (compareTimestamps(timestamp, pair.getRight()) > 0) {
613         pair.setRight(timestamp);
614         pair.getLeft().setMaxPermits(credit);
615       } else if (LOG.isDebugEnabled()) {
616         LOG.debug("processResumeSignal: received out-of-order messages. " +
617             "Received timestamp=" + timestamp + " and current timestamp=" +
618             pair.getRight());
619       }
620     }
621     trySendCachedRequests(clientId);
622   }
623 }