This project has retired. For details please refer to its Attic page.
CreditBasedFlowControl xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.flow_control;
20  
21  import static com.google.common.base.Preconditions.checkState;
22  import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS;
23  import com.google.common.collect.Lists;
24  import com.google.common.collect.Maps;
25  import com.google.common.collect.Sets;
26  import org.apache.commons.lang3.tuple.ImmutablePair;
27  import org.apache.commons.lang3.tuple.MutablePair;
28  import org.apache.commons.lang3.tuple.Pair;
29  import org.apache.giraph.comm.netty.NettyClient;
30  import org.apache.giraph.comm.netty.handler.AckSignalFlag;
31  import org.apache.giraph.comm.requests.SendResumeRequest;
32  import org.apache.giraph.comm.requests.WritableRequest;
33  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
34  import org.apache.giraph.conf.IntConfOption;
35  import org.apache.giraph.utils.AdjustableSemaphore;
36  import org.apache.giraph.utils.ThreadUtils;
37  import org.apache.log4j.Logger;
38  
39  import java.util.ArrayDeque;
40  import java.util.ArrayList;
41  import java.util.Collections;
42  import java.util.Comparator;
43  import java.util.Deque;
44  import java.util.Map;
45  import java.util.Set;
46  import java.util.concurrent.ArrayBlockingQueue;
47  import java.util.concurrent.BlockingQueue;
48  import java.util.concurrent.ConcurrentMap;
49  import java.util.concurrent.Semaphore;
50  import java.util.concurrent.TimeUnit;
51  import java.util.concurrent.atomic.AtomicInteger;
52  
53  /**
54   * Representation of credit-based flow control policy. With this policy there
55   * can be limited number of open requests from any worker x to any other worker
56   * y. This number is called 'credit'. Let's denote this number by C{x->y}.
57   * This implementation assumes that for a particular worker W, all values of
58   * C{x->W} are the same. Let's denote this value by CR_W. CR_W may change
59   * due to other reasons (e.g. memory pressure observed in an out-of-core
60   * mechanism). However, CR_W is always in range [0, MAX_CR], where MAX_CR
61   * is a user-defined constant. Note that MAX_CR should be representable by
62   * at most 14 bits.
63   *
64   * In this implementation, the value of CR_W is announced to other workers along
65   * with the ACK response envelope for all ACK response envelope going out of W.
66   * Therefore, for non-zero values of CR_W, other workers know the instant value
67   * of CR_W, hence they can control the number of open requests they have to W.
68   * However, it is possible that W announces 0 as CR_W. In this case, other
69   * workers stop opening more requests to W, hence they will not get any new
70   * response envelope from W. This means other workers should be notified with
71   * a dedicated request to resume sending more requests once CR_W becomes
72   * non-zero. In this implementation, once W_CR is announced as 0 to a particular
73   * worker U, we keep U in a set, so later on we can send 'resume signal' to U
74   * once CR_W becomes non-zero. Sending resume signals are done through a
75   * separate thread.
76   */
77  public class CreditBasedFlowControl implements FlowControl {
78    /**
79     * Maximum number of requests we can have per worker without confirmation
80     * (i.e. open requests)
81     */
82    public static final IntConfOption MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER =
83        new IntConfOption("giraph.maxOpenRequestsPerWorker", 20,
84            "Maximum number of requests without confirmation we can have per " +
85                "worker");
86    /** Aggregate number of in-memory unsent requests */
87    public static final IntConfOption MAX_NUM_OF_UNSENT_REQUESTS =
88        new IntConfOption("giraph.maxNumberOfUnsentRequests", 2000,
89            "Maximum number of unsent requests we can keep in memory");
90    /**
91     * Time interval to wait on unsent requests cahce until we find a spot in it
92     */
93    public static final IntConfOption UNSENT_CACHE_WAIT_INTERVAL =
94        new IntConfOption("giraph.unsentCacheWaitInterval", 1000,
95            "Time interval to wait on unsent requests cache (in milliseconds)");
96    /** Class logger */
97    private static final Logger LOG =
98        Logger.getLogger(CreditBasedFlowControl.class);
99  
100   /** Waiting interval on unsent requests cache until it frees up */
101   private final int unsentWaitMsecs;
102   /** Waiting interval for checking outstanding requests msecs */
103   private final int waitingRequestMsecs;
104   /**
105    * Maximum number of open requests each worker can have to this work at each
106    * moment (CR_W -define above- for this worker)
107    */
108   private volatile short maxOpenRequestsPerWorker;
109   /** Total number of unsent, cached requests */
110   private final AtomicInteger aggregateUnsentRequests = new AtomicInteger(0);
111   /**
112    * Map of requests permits per worker. Keys in the map are worker ids and
113    * values are pairs (X, Y) where:
114    *   X: is the semaphore to control the number of open requests for a
115    *      particular worker. Basically, the number of available permits on a
116    *      semaphore is the credit available for the worker associated with that
117    *      semaphore.
118    *   Y: is the timestamp of the latest message (resume signal or ACK response)
119    *      that changed the number of permits in the semaphore.
120    * The idea behind keeping a timestamp is to avoid any issue that may happen
121    * due to out-of-order message delivery. For example, consider this scenario:
122    * an ACK response is sent to a worker announcing the credit is 0. Later on,
123    * a resume signal announcing a non-zero credit is sent to the same worker.
124    * Now, if the resume signal receives before the ACK message, the worker
125    * would incorrectly assume credit value of 0, and would avoid sending any
126    * messages, which may lead to a live-lock.
127    *
128    * The timestamp value is simply the request id generated by NettyClient.
129    * These ids are generated in consecutive order, hence simulating the concept
130    * of timestamp. However, the timestamp value should be sent along with
131    * any ACK response envelope. The ACK response envelope is already very small
132    * (maybe 10-20 bytes). So, the timestamp value should not add much overhead
133    * to it. Instead of sending the whole long value request id (8 bytes) as the
134    * timestamp, we can simply send the least significant 2 bytes of it. This is
135    * a valid timestamp, as the credit value can be 0x3FFF (=16383) at most. This
136    * means there will be at most 0x3FFF messages on the fly at each moment,
137    * which means that the timestamp value sent by all messages in fly will fall
138    * into a range of size 0x3FFF. So, it is enough to only consider timestamp
139    * values twice as big as the mentioned range to be able to accurately
140    * determine ordering even when values wrap around. This means we only need to
141    * consider 15 least significant bits of request ids as timestamp values.
142    *
143    * The ACK response value contains following information (from least
144    * significant to most significant):
145    *  - 16 bits timestamp
146    *  - 14 bits credit value
147    *  - 1 bit specifying whether one end of communication is master and hence
148    *    credit based flow control should be ignored
149    *  - 1 bit response flag
150    */
151   private final ConcurrentMap<Integer, Pair<AdjustableSemaphore, Integer>>
152       perWorkerOpenRequestMap = Maps.newConcurrentMap();
153   /** Map of unsent cached requests per worker */
154   private final ConcurrentMap<Integer, Deque<WritableRequest>>
155       perWorkerUnsentRequestMap = Maps.newConcurrentMap();
156   /**
157    * Set of workers that should be notified to resume sending more requests if
158    * the credit becomes non-zero
159    */
160   private final Set<Integer> workersToResume = Sets.newHashSet();
161   /**
162    * Resume signals are not using any credit, so they should be treated
163    * differently than normal requests. Resume signals should be ignored in
164    * accounting for credits in credit-based flow control. The following map
165    * keeps sets of request ids, for resume signals sent to other workers that
166    * are still "open". The set of request ids used for resume signals for a
167    * worker is important so we can determine if a received response is for a
168    * resume signal or not.
169    */
170   private final ConcurrentMap<Integer, Set<Long>> resumeRequestsId =
171       Maps.newConcurrentMap();
172   /**
173    * Queue of the cached requests to be sent out. The queue keeps pairs of
174    * (destination id, request). The thread-safe blocking queue is used here for
175    * the sake of simplicity. The blocking queue should be bounded (with bounds
176    * no more than user-defined max number of unsent/cached requests) to avoid
177    * excessive memory footprint.
178    */
179   private final BlockingQueue<Pair<Integer, WritableRequest>> toBeSent;
180   /**
181    * Semaphore to control number of cached unsent requests. Maximum number of
182    * permits of this semaphore should be equal to MAX_NUM_OF_UNSENT_REQUESTS.
183    */
184   private final Semaphore unsentRequestPermit;
185   /** Netty client used for sending requests */
186   private final NettyClient nettyClient;
187 
188   /**
189    * Constructor
190    * @param conf configuration
191    * @param nettyClient netty client
192    * @param exceptionHandler Exception handler
193    */
194   public CreditBasedFlowControl(ImmutableClassesGiraphConfiguration conf,
195                                 final NettyClient nettyClient,
196                                 Thread.UncaughtExceptionHandler
197                                     exceptionHandler) {
198     this.nettyClient = nettyClient;
199     maxOpenRequestsPerWorker =
200         (short) MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER.get(conf);
201     checkState(maxOpenRequestsPerWorker < 0x4000 &&
202         maxOpenRequestsPerWorker > 0, "NettyClient: max number of open " +
203         "requests should be in range (0, " + 0x4FFF + ")");
204     int maxUnsentRequests = MAX_NUM_OF_UNSENT_REQUESTS.get(conf);
205     unsentRequestPermit = new Semaphore(maxUnsentRequests);
206     this.toBeSent = new ArrayBlockingQueue<Pair<Integer, WritableRequest>>(
207         maxUnsentRequests);
208     unsentWaitMsecs = UNSENT_CACHE_WAIT_INTERVAL.get(conf);
209     waitingRequestMsecs = WAITING_REQUEST_MSECS.get(conf);
210 
211     // Thread to handle/send resume signals when necessary
212     ThreadUtils.startThread(new Runnable() {
213       @Override
214       public void run() {
215         while (true) {
216           synchronized (workersToResume) {
217             for (Integer workerId : workersToResume) {
218               if (maxOpenRequestsPerWorker != 0) {
219                 sendResumeSignal(workerId);
220               } else {
221                 break;
222               }
223             }
224             try {
225               workersToResume.wait();
226             } catch (InterruptedException e) {
227               throw new IllegalStateException("run: caught exception " +
228                   "while waiting for resume-sender thread to be notified!",
229                   e);
230             }
231           }
232         }
233       }
234     }, "resume-sender", exceptionHandler);
235 
236     // Thread to handle/send cached requests
237     ThreadUtils.startThread(new Runnable() {
238       @Override
239       public void run() {
240         while (true) {
241           Pair<Integer, WritableRequest> pair = null;
242           try {
243             pair = toBeSent.take();
244           } catch (InterruptedException e) {
245             throw new IllegalStateException("run: failed while waiting to " +
246                 "take an element from the request queue!", e);
247           }
248           int taskId = pair.getLeft();
249           WritableRequest request = pair.getRight();
250           nettyClient.doSend(taskId, request);
251           if (aggregateUnsentRequests.decrementAndGet() == 0) {
252             synchronized (aggregateUnsentRequests) {
253               aggregateUnsentRequests.notifyAll();
254             }
255           }
256         }
257       }
258     }, "cached-req-sender", exceptionHandler);
259   }
260 
261   /**
262    * Send resume signal request to a given worker
263    *
264    * @param workerId id of the worker to send the resume signal to
265    */
266   private void sendResumeSignal(int workerId) {
267     if (maxOpenRequestsPerWorker == 0) {
268       LOG.warn("sendResumeSignal: method called while the max open requests " +
269           "for worker " + workerId + " is still 0");
270       return;
271     }
272     WritableRequest request = new SendResumeRequest(maxOpenRequestsPerWorker);
273     Long resumeId = nettyClient.doSend(workerId, request);
274     checkState(resumeId != null);
275     if (LOG.isDebugEnabled()) {
276       LOG.debug("sendResumeSignal: sending signal to worker " + workerId +
277           " with credit=" + maxOpenRequestsPerWorker + ", ID=" +
278           (resumeId & 0xFFFF));
279     }
280     resumeRequestsId.get(workerId).add(resumeId);
281   }
282 
283   @Override
284   public void sendRequest(int destTaskId, WritableRequest request) {
285     Pair<AdjustableSemaphore, Integer> pair =
286         perWorkerOpenRequestMap.get(destTaskId);
287     // Check if this is the first time sending a request to a worker. If so, we
288     // should the worker id to necessary bookkeeping data structure.
289     if (pair == null) {
290       pair = new MutablePair<>(
291           new AdjustableSemaphore(maxOpenRequestsPerWorker), -1);
292       Pair<AdjustableSemaphore, Integer> temp =
293           perWorkerOpenRequestMap.putIfAbsent(destTaskId, pair);
294       perWorkerUnsentRequestMap.putIfAbsent(
295           destTaskId, new ArrayDeque<WritableRequest>());
296       resumeRequestsId.putIfAbsent(
297           destTaskId, Sets.<Long>newConcurrentHashSet());
298       if (temp != null) {
299         pair = temp;
300       }
301     }
302     AdjustableSemaphore openRequestPermit = pair.getLeft();
303     // Try to reserve a spot for the request amongst the open requests of
304     // the destination worker.
305     boolean shouldSend = openRequestPermit.tryAcquire();
306     boolean shouldCache = false;
307     while (!shouldSend) {
308       // We should not send the request, and should cache the request instead.
309       // It may be possible that the unsent message cache is also full, so we
310       // should try to acquire a space on the cache, and if there is no extra
311       // space in unsent request cache, we should wait until some space
312       // become available. However, it is possible that during the time we are
313       // waiting on the unsent messages cache, actual buffer for open requests
314       // frees up space.
315       try {
316         shouldCache = unsentRequestPermit.tryAcquire(unsentWaitMsecs,
317             TimeUnit.MILLISECONDS);
318       } catch (InterruptedException e) {
319         throw new IllegalStateException("shouldSend: failed " +
320             "while waiting on the unsent request cache to have some more " +
321             "room for extra unsent requests!");
322       }
323       if (shouldCache) {
324         break;
325       }
326       // We may have an open spot in the meantime that we were waiting on the
327       // unsent requests.
328       shouldSend = openRequestPermit.tryAcquire();
329       if (shouldSend) {
330         break;
331       }
332       // The current thread will be at this point only if it could not make
333       // space amongst open requests for the destination worker and has been
334       // timed-out in trying to acquire a space amongst unsent messages. So,
335       // we should report logs, report progress, and check for request
336       // failures.
337       nettyClient.logAndSanityCheck();
338     }
339     // Either shouldSend == true or shouldCache == true
340     if (shouldCache) {
341       Deque<WritableRequest> unsentRequests =
342           perWorkerUnsentRequestMap.get(destTaskId);
343       // This synchronize block is necessary for the following reason:
344       // Once we are at this point, it means there was no room for this
345       // request to become an open request, hence we have to put it into
346       // unsent cache. Consider the case that since last time we checked if
347       // there is any room for an additional open request so far, all open
348       // requests are delivered and their acknowledgements are also processed.
349       // Now, if we put this request in the unsent cache, it is not being
350       // considered to become an open request, as the only one who checks
351       // on this matter would be the one who receives an acknowledgment for an
352       // open request for the destination worker. So, a lock is necessary
353       // to forcefully serialize the execution if this scenario is about to
354       // happen.
355       synchronized (unsentRequests) {
356         shouldSend = openRequestPermit.tryAcquire();
357         if (!shouldSend) {
358           aggregateUnsentRequests.getAndIncrement();
359           unsentRequests.add(request);
360           return;
361         }
362       }
363       // We found a spot amongst open requests to send this request. So, this
364       // request won't be cached anymore.
365       unsentRequestPermit.release();
366     }
367     nettyClient.doSend(destTaskId, request);
368   }
369 
370   /**
371    * Whether response specifies that credit should be ignored
372    *
373    * @param response response received
374    * @return true iff credit should be ignored, false otherwise
375    */
376   private boolean shouldIgnoreCredit(int response) {
377     return ((short) ((response >> (14 + 16)) & 1)) == 1;
378   }
379 
380   /**
381    * Get the credit from a response
382    *
383    * @param response response received
384    * @return credit from the received response
385    */
386   private short getCredit(int response) {
387     return (short) ((response >> 16) & 0x3FFF);
388   }
389 
390   /**
391    * Get the timestamp from a response
392    *
393    * @param response response received
394    * @return timestamp from the received response
395    */
396   private int getTimestamp(int response) {
397     return response & 0xFFFF;
398   }
399 
400   /**
401    * Get the response flag from a response
402    *
403    * @param response response received
404    * @return AckSignalFlag coming with the response
405    */
406   @Override
407   public AckSignalFlag getAckSignalFlag(int response) {
408     return AckSignalFlag.values()[(response >> (16 + 14 + 1)) & 1];
409   }
410 
411   @Override
412   public int calculateResponse(AckSignalFlag flag, int taskId) {
413     boolean ignoreCredit = nettyClient.masterInvolved(taskId);
414     if (!ignoreCredit && maxOpenRequestsPerWorker == 0) {
415       synchronized (workersToResume) {
416         workersToResume.add(taskId);
417       }
418     }
419     int timestamp = (int) (nettyClient.getNextRequestId(taskId) & 0xFFFF);
420     return (flag.ordinal() << (16 + 14 + 1)) |
421         ((ignoreCredit ? 1 : 0) << (16 + 14)) |
422         (maxOpenRequestsPerWorker << 16) |
423         timestamp;
424   }
425 
426   @Override
427   public void logInfo() {
428     if (LOG.isInfoEnabled()) {
429       // Count how many unsent requests each task has
430       Map<Integer, Integer> unsentRequestCounts = Maps.newHashMap();
431       for (Map.Entry<Integer, Deque<WritableRequest>> entry :
432           perWorkerUnsentRequestMap.entrySet()) {
433         unsentRequestCounts.put(entry.getKey(), entry.getValue().size());
434       }
435       ArrayList<Map.Entry<Integer, Integer>> sorted =
436           Lists.newArrayList(unsentRequestCounts.entrySet());
437       Collections.sort(sorted, new Comparator<Map.Entry<Integer, Integer>>() {
438         @Override
439         public int compare(Map.Entry<Integer, Integer> entry1,
440                            Map.Entry<Integer, Integer> entry2) {
441           int value1 = entry1.getValue();
442           int value2 = entry2.getValue();
443           return (value1 < value2) ? 1 : ((value1 == value2) ? 0 : -1);
444         }
445       });
446       StringBuilder message = new StringBuilder();
447       message.append("logInfo: ").append(aggregateUnsentRequests.get())
448           .append(" unsent requests in total. ");
449       int itemsToPrint = Math.min(10, sorted.size());
450       for (int i = 0; i < itemsToPrint; ++i) {
451         message.append(sorted.get(i).getValue())
452             .append(" unsent requests for taskId=")
453             .append(sorted.get(i).getKey()).append(" (credit=")
454             .append(perWorkerOpenRequestMap.get(sorted.get(i).getKey())
455                 .getKey().getMaxPermits())
456             .append("), ");
457       }
458       LOG.info(message);
459     }
460   }
461 
462   @Override
463   public void waitAllRequests() {
464     while (true) {
465       synchronized (aggregateUnsentRequests) {
466         if (aggregateUnsentRequests.get() == 0) {
467           break;
468         }
469         try {
470           aggregateUnsentRequests.wait(waitingRequestMsecs);
471         } catch (InterruptedException e) {
472           throw new IllegalStateException("waitAllRequests: failed while " +
473               "waiting on open/cached requests");
474         }
475       }
476       if (aggregateUnsentRequests.get() == 0) {
477         break;
478       }
479       nettyClient.logAndSanityCheck();
480     }
481   }
482 
483   @Override
484   public int getNumberOfUnsentRequests() {
485     return aggregateUnsentRequests.get();
486   }
487 
488   @Override
489   public void messageAckReceived(int taskId, long requestId, int response) {
490     boolean ignoreCredit = shouldIgnoreCredit(response);
491     short credit = getCredit(response);
492     int timestamp = getTimestamp(response);
493     MutablePair<AdjustableSemaphore, Integer> pair =
494         (MutablePair<AdjustableSemaphore, Integer>)
495             perWorkerOpenRequestMap.get(taskId);
496     AdjustableSemaphore openRequestPermit = pair.getLeft();
497     // Release a permit on open requests if we received ACK of a request other
498     // than a Resume request (resume requests are always sent regardless of
499     // number of open requests)
500     if (!resumeRequestsId.get(taskId).remove(requestId)) {
501       openRequestPermit.release();
502     } else if (LOG.isDebugEnabled()) {
503       LOG.debug("messageAckReceived: ACK of resume received from " + taskId +
504           " timestamp=" + timestamp);
505     }
506     if (!ignoreCredit) {
507       synchronized (pair) {
508         if (compareTimestamps(timestamp, pair.getRight()) > 0) {
509           pair.setRight(timestamp);
510           openRequestPermit.setMaxPermits(credit);
511         } else if (LOG.isDebugEnabled()) {
512           LOG.debug("messageAckReceived: received out-of-order messages." +
513               "Received timestamp=" + timestamp + " and current timestamp=" +
514               pair.getRight());
515         }
516       }
517     }
518     // Since we received a response and we changed the credit of the sender
519     // client, we may be able to send some more requests to the sender
520     // client. So, we try to send as much request as we can to the sender
521     // client.
522     trySendCachedRequests(taskId);
523   }
524 
525   /**
526    * Try to send as much as cached requests to a given worker
527    *
528    * @param taskId id of the worker to send cached requests to
529    */
530   private void trySendCachedRequests(int taskId) {
531     Deque<WritableRequest> requestDeque =
532         perWorkerUnsentRequestMap.get(taskId);
533     AdjustableSemaphore openRequestPermit =
534         perWorkerOpenRequestMap.get(taskId).getLeft();
535     while (true) {
536       WritableRequest request;
537       synchronized (requestDeque) {
538         request = requestDeque.pollFirst();
539         if (request == null) {
540           break;
541         }
542         // See whether the sender client has any unused credit
543         if (!openRequestPermit.tryAcquire()) {
544           requestDeque.offerFirst(request);
545           break;
546         }
547       }
548       unsentRequestPermit.release();
549       // At this point, we have a request, and we reserved a credit for the
550       // sender client. So, we put the request in a queue to be sent to the
551       // client.
552       try {
553         toBeSent.put(
554             new ImmutablePair<Integer, WritableRequest>(taskId, request));
555       } catch (InterruptedException e) {
556         throw new IllegalStateException("trySendCachedRequests: failed while" +
557             "waiting to put element in send queue!", e);
558       }
559     }
560   }
561 
562   /**
563    * Update the max credit that is announced to other workers
564    *
565    * @param newCredit new credit
566    */
567   public void updateCredit(short newCredit) {
568     newCredit = (short) Math.max(0, Math.min(0x3FFF, newCredit));
569     // Check whether we should send resume signals to some workers
570     if (maxOpenRequestsPerWorker == 0 && newCredit != 0) {
571       maxOpenRequestsPerWorker = newCredit;
572       synchronized (workersToResume) {
573         workersToResume.notifyAll();
574       }
575     } else {
576       maxOpenRequestsPerWorker = newCredit;
577     }
578   }
579 
580   /**
581    * Compare two timestamps accounting for wrap around. Note that the timestamp
582    * values should be in a range that fits into 14 bits values. This means if
583    * the difference of the two given timestamp is large, we are dealing with one
584    * value being wrapped around.
585    *
586    * @param ts1 first timestamp
587    * @param ts2 second timestamp
588    * @return positive value if first timestamp is later than second timestamp,
589    *         negative otherwise
590    */
591   private int compareTimestamps(int ts1, int ts2) {
592     int diff = ts1 - ts2;
593     if (Math.abs(diff) < 0x7FFF) {
594       return diff;
595     } else {
596       return -diff;
597     }
598   }
599 
600   /**
601    * Process a resume signal came from a given worker
602    *
603    * @param clientId id of the worker that sent the signal
604    * @param credit the credit value sent along with the resume signal
605    * @param requestId timestamp (request id) of the resume signal
606    */
607   public void processResumeSignal(int clientId, short credit, long requestId) {
608     int timestamp = (int) (requestId & 0xFFFF);
609     if (LOG.isDebugEnabled()) {
610       LOG.debug("processResumeSignal: resume signal from " + clientId +
611           " with timestamp=" + timestamp);
612     }
613     MutablePair<AdjustableSemaphore, Integer> pair =
614         (MutablePair<AdjustableSemaphore, Integer>)
615             perWorkerOpenRequestMap.get(clientId);
616     synchronized (pair) {
617       if (compareTimestamps(timestamp, pair.getRight()) > 0) {
618         pair.setRight(timestamp);
619         pair.getLeft().setMaxPermits(credit);
620       } else if (LOG.isDebugEnabled()) {
621         LOG.debug("processResumeSignal: received out-of-order messages. " +
622             "Received timestamp=" + timestamp + " and current timestamp=" +
623             pair.getRight());
624       }
625     }
626     trySendCachedRequests(clientId);
627   }
628 }