This project has retired. For details please refer to its Attic page.
ProgressableUtils xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.utils;
20  
21  import org.apache.hadoop.util.Progressable;
22  import org.apache.log4j.Logger;
23  
24  import io.netty.channel.ChannelFuture;
25  import io.netty.channel.group.ChannelGroupFuture;
26  import io.netty.util.concurrent.EventExecutorGroup;
27  
28  import java.util.ArrayList;
29  import java.util.Collections;
30  import java.util.HashMap;
31  import java.util.Iterator;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.concurrent.Callable;
35  import java.util.concurrent.ExecutionException;
36  import java.util.concurrent.ExecutorService;
37  import java.util.concurrent.Executors;
38  import java.util.concurrent.Future;
39  import java.util.concurrent.Semaphore;
40  import java.util.concurrent.TimeUnit;
41  import java.util.concurrent.TimeoutException;
42  
43  /** Functions for waiting on some events to happen while reporting progress */
44  public class ProgressableUtils {
45    /** Class logger */
46    private static final Logger LOG =
47        Logger.getLogger(ProgressableUtils.class);
48    /** Msecs to refresh the progress meter (one minute) */
49    private static final int DEFUALT_MSEC_PERIOD = 60 * 1000;
50    /**
51     * When getting results with many threads, how many milliseconds to wait
52     * on each when looping through them
53     */
54    private static final int MSEC_TO_WAIT_ON_EACH_FUTURE = 10;
55  
56    /** Do not instantiate. */
57    private ProgressableUtils() {
58    }
59  
60    /**
61     * Wait for executor tasks to terminate, while periodically reporting
62     * progress.
63     *
64     * @param executor     Executor which we are waiting for
65     * @param progressable Progressable for reporting progress (Job context)
66     * @param msecsPeriod How often to report progress
67     */
68    public static void awaitExecutorTermination(ExecutorService executor,
69        Progressable progressable, int msecsPeriod) {
70      waitForever(new ExecutorServiceWaitable(executor), progressable,
71          msecsPeriod);
72    }
73  
74    /**
75     * Wait for executor tasks to terminate, while periodically reporting
76     * progress.
77     *
78     * @param executor     Executor which we are waiting for
79     * @param progressable Progressable for reporting progress (Job context)
80     */
81    public static void awaitExecutorTermination(ExecutorService executor,
82        Progressable progressable) {
83      waitForever(new ExecutorServiceWaitable(executor), progressable);
84    }
85  
86    /**
87     * Wait for executorgroup to terminate, while periodically reporting progress
88     *
89     * @param group ExecutorGroup whose termination we are awaiting
90     * @param progressable Progressable for reporting progress (Job context)
91     */
92    public static void awaitTerminationFuture(EventExecutorGroup group,
93                                              Progressable progressable) {
94      waitForever(new FutureWaitable<>(group.terminationFuture()), progressable);
95    }
96  
97    /**
98     * Wait for the result of the future to be ready, while periodically
99     * reporting progress.
100    *
101    * @param <T>          Type of the return value of the future
102    * @param future       Future
103    * @param progressable Progressable for reporting progress (Job context)
104    * @return Computed result of the future.
105    */
106   public static <T> T getFutureResult(Future<T> future,
107       Progressable progressable) {
108     return waitForever(new FutureWaitable<T>(future), progressable);
109   }
110 
111   /**
112    * Wait for {@link ChannelGroupFuture} to finish, while periodically
113    * reporting progress.
114    *
115    * @param future       ChannelGroupFuture
116    * @param progressable Progressable for reporting progress (Job context)
117    */
118   public static void awaitChannelGroupFuture(ChannelGroupFuture future,
119       Progressable progressable) {
120     waitForever(new ChannelGroupFutureWaitable(future), progressable);
121   }
122 
123   /**
124    * Wait for {@link ChannelFuture} to finish, while periodically
125    * reporting progress.
126    *
127    * @param future       ChannelFuture
128    * @param progressable Progressable for reporting progress (Job context)
129    */
130   public static void awaitChannelFuture(ChannelFuture future,
131       Progressable progressable) {
132     waitForever(new ChannelFutureWaitable(future), progressable);
133   }
134 
135   /**
136    * Wait to acquire enough permits from {@link Semaphore}, while periodically
137    * reporting progress.
138    *
139    * @param semaphore    Semaphore
140    * @param permits      How many permits to acquire
141    * @param progressable Progressable for reporting progress (Job context)
142    */
143   public static void awaitSemaphorePermits(final Semaphore semaphore,
144       int permits, Progressable progressable) {
145     while (true) {
146       waitForever(new SemaphoreWaitable(semaphore, permits), progressable);
147       // Verify permits were not taken by another thread,
148       // if they were keep looping
149       if (semaphore.tryAcquire(permits)) {
150         return;
151       }
152     }
153   }
154 
155   /**
156    * Wait forever for waitable to finish. Periodically reports progress.
157    *
158    * @param waitable Waitable which we wait for
159    * @param progressable Progressable for reporting progress (Job context)
160    * @param <T> Result type
161    * @return Result of waitable
162    */
163   private static <T> T waitForever(Waitable<T> waitable,
164       Progressable progressable) {
165     return waitForever(waitable, progressable, DEFUALT_MSEC_PERIOD);
166   }
167 
168   /**
169    * Wait forever for waitable to finish. Periodically reports progress.
170    *
171    * @param waitable Waitable which we wait for
172    * @param progressable Progressable for reporting progress (Job context)
173    * @param msecsPeriod How often to report progress
174    * @param <T> Result type
175    * @return Result of waitable
176    */
177   private static <T> T waitForever(Waitable<T> waitable,
178       Progressable progressable, int msecsPeriod) {
179     while (true) {
180       waitFor(waitable, progressable, msecsPeriod, msecsPeriod);
181       if (waitable.isFinished()) {
182         try {
183           return waitable.getResult();
184         } catch (ExecutionException e) {
185           throw new IllegalStateException("waitForever: " +
186               "ExecutionException occurred while waiting for " + waitable, e);
187         } catch (InterruptedException e) {
188           throw new IllegalStateException("waitForever: " +
189               "InterruptedException occurred while waiting for " + waitable, e);
190         }
191       }
192     }
193   }
194 
195   /**
196    *  Wait for desired number of milliseconds for waitable to finish.
197    *  Periodically reports progress.
198    *
199    * @param waitable Waitable which we wait for
200    * @param progressable Progressable for reporting progress (Job context)
201    * @param msecs Number of milliseconds to wait for
202    * @param msecsPeriod How often to report progress
203    * @param <T> Result type
204    * @return Result of waitable
205    */
206   private static <T> T waitFor(Waitable<T> waitable, Progressable progressable,
207       int msecs, int msecsPeriod) {
208     long timeoutTimeMsecs = System.currentTimeMillis() + msecs;
209     int currentWaitMsecs;
210     while (true) {
211       progressable.progress();
212       currentWaitMsecs = Math.min(msecs, msecsPeriod);
213       try {
214         waitable.waitFor(currentWaitMsecs);
215         if (waitable.isFinished()) {
216           return waitable.getResult();
217         }
218       } catch (InterruptedException e) {
219         throw new IllegalStateException("waitFor: " +
220             "InterruptedException occurred while waiting for " + waitable, e);
221       } catch (ExecutionException e) {
222         throw new IllegalStateException("waitFor: " +
223             "ExecutionException occurred while waiting for " + waitable, e);
224       }
225       if (LOG.isInfoEnabled()) {
226         LOG.info("waitFor: Waiting for " + waitable);
227       }
228       if (System.currentTimeMillis() >= timeoutTimeMsecs) {
229         return waitable.getTimeoutResult();
230       }
231       msecs = Math.max(0, msecs - currentWaitMsecs);
232     }
233   }
234 
235   /**
236    * Create {#link numThreads} callables from {#link callableFactory},
237    * execute them and gather results.
238    *
239    * @param callableFactory Factory for Callables
240    * @param numThreads Number of threads to use
241    * @param threadNameFormat Format for thread name
242    * @param progressable Progressable for reporting progress
243    * @param <R> Type of Callable's results
244    * @return List of results from Callables
245    */
246   public static <R> List<R> getResultsWithNCallables(
247       CallableFactory<R> callableFactory, int numThreads,
248       String threadNameFormat, Progressable progressable) {
249     ExecutorService executorService = Executors.newFixedThreadPool(numThreads,
250         ThreadUtils.createThreadFactory(threadNameFormat));
251     HashMap<Integer, Future<R>> futures = new HashMap<>(numThreads);
252     for (int i = 0; i < numThreads; i++) {
253       Callable<R> callable = callableFactory.newCallable(i);
254       Future<R> future = executorService.submit(
255           new LogStacktraceCallable<R>(callable));
256       futures.put(i, future);
257     }
258     executorService.shutdown();
259     List<R> futureResults =
260         new ArrayList<>(Collections.<R>nCopies(numThreads, null));
261     // Loop through the futures until all are finished
262     // We do this in order to get any exceptions from the futures early
263     while (!futures.isEmpty()) {
264       Iterator<Map.Entry<Integer, Future<R>>> iterator =
265           futures.entrySet().iterator();
266       while (iterator.hasNext()) {
267         Map.Entry<Integer, Future<R>> entry = iterator.next();
268         R result;
269         try {
270           // Try to get result from the future
271           result = entry.getValue().get(
272               MSEC_TO_WAIT_ON_EACH_FUTURE, TimeUnit.MILLISECONDS);
273         } catch (InterruptedException e) {
274           throw new IllegalStateException("Interrupted", e);
275         } catch (ExecutionException e) {
276           // Execution exception wraps the actual cause
277           if (e.getCause() instanceof RuntimeException) {
278             throw (RuntimeException) e.getCause();
279           } else {
280             throw new IllegalStateException("Exception occurred", e.getCause());
281           }
282 
283         } catch (TimeoutException e) {
284           // If result is not ready yet just keep waiting
285           continue;
286         }
287         // Result is ready, put it to final results
288         futureResults.set(entry.getKey(), result);
289         // Remove current future since we are done with it
290         iterator.remove();
291       }
292       progressable.progress();
293     }
294     return futureResults;
295   }
296 
297   /**
298    * Interface for waiting on a result from some operation.
299    *
300    * @param <T> Result type.
301    */
302   private interface Waitable<T> {
303     /**
304      * Wait for desired number of milliseconds for waitable to finish.
305      *
306      * @param msecs Number of milliseconds to wait.
307      */
308     void waitFor(int msecs) throws InterruptedException, ExecutionException;
309 
310     /**
311      * Check if waitable is finished.
312      *
313      * @return True iff waitable finished.
314      */
315     boolean isFinished();
316 
317     /**
318      * Get result of waitable. Call after isFinished() returns true.
319      *
320      * @return Result of waitable.
321      */
322     T getResult() throws ExecutionException, InterruptedException;
323 
324     /**
325      * Get the result which we want to return in case of timeout.
326      *
327      * @return Timeout result.
328      */
329     T getTimeoutResult();
330   }
331 
332   /**
333    * abstract class for waitables which don't have the result.
334    */
335   private abstract static class WaitableWithoutResult
336       implements Waitable<Void> {
337     @Override
338     public Void getResult() throws ExecutionException, InterruptedException {
339       return null;
340     }
341 
342     @Override
343     public Void getTimeoutResult() {
344       return null;
345     }
346   }
347 
348   /**
349    * {@link Waitable} for waiting on a result of a {@link Future}.
350    *
351    * @param <T> Future result type
352    */
353   private static class FutureWaitable<T> implements Waitable<T> {
354     /** Future which we want to wait for */
355     private final Future<T> future;
356 
357     /**
358      * Constructor
359      *
360      * @param future Future which we want to wait for
361      */
362     public FutureWaitable(Future<T> future) {
363       this.future = future;
364     }
365 
366     @Override
367     public void waitFor(int msecs) throws InterruptedException,
368         ExecutionException {
369       try {
370         future.get(msecs, TimeUnit.MILLISECONDS);
371       } catch (TimeoutException e) {
372         if (LOG.isInfoEnabled()) {
373           LOG.info("waitFor: Future result not ready yet " + future);
374         }
375       }
376     }
377 
378     @Override
379     public boolean isFinished() {
380       return future.isDone();
381     }
382 
383     @Override
384     public T getResult() throws ExecutionException, InterruptedException {
385       return future.get();
386     }
387 
388     @Override
389     public T getTimeoutResult() {
390       return null;
391     }
392   }
393 
394   /**
395    * {@link Waitable} for waiting on an {@link ExecutorService} to terminate.
396    */
397   private static class ExecutorServiceWaitable extends WaitableWithoutResult {
398     /** ExecutorService which we want to wait for */
399     private final ExecutorService executorService;
400 
401     /**
402      * Constructor
403      *
404      * @param executorService ExecutorService which we want to wait for
405      */
406     public ExecutorServiceWaitable(ExecutorService executorService) {
407       this.executorService = executorService;
408     }
409 
410     @Override
411     public void waitFor(int msecs) throws InterruptedException {
412       executorService.awaitTermination(msecs, TimeUnit.MILLISECONDS);
413     }
414 
415     @Override
416     public boolean isFinished() {
417       return executorService.isTerminated();
418     }
419   }
420 
421   /**
422    * {@link Waitable} for waiting on a {@link ChannelGroupFuture} to
423    * terminate.
424    */
425   private static class ChannelGroupFutureWaitable extends
426       WaitableWithoutResult {
427     /** ChannelGroupFuture which we want to wait for */
428     private final ChannelGroupFuture future;
429 
430     /**
431      * Constructor
432      *
433      * @param future ChannelGroupFuture which we want to wait for
434      */
435     public ChannelGroupFutureWaitable(ChannelGroupFuture future) {
436       this.future = future;
437     }
438 
439     @Override
440     public void waitFor(int msecs) throws InterruptedException {
441       future.await(msecs, TimeUnit.MILLISECONDS);
442     }
443 
444     @Override
445     public boolean isFinished() {
446       return future.isDone();
447     }
448   }
449 
450   /**
451    * {@link Waitable} for waiting on a {@link ChannelFuture} to
452    * terminate.
453    */
454   private static class ChannelFutureWaitable extends WaitableWithoutResult {
455     /** ChannelGroupFuture which we want to wait for */
456     private final ChannelFuture future;
457 
458     /**
459      * Constructor
460      *
461      * @param future ChannelFuture which we want to wait for
462      */
463     public ChannelFutureWaitable(ChannelFuture future) {
464       this.future = future;
465     }
466 
467     @Override
468     public void waitFor(int msecs) throws InterruptedException {
469       future.await(msecs, TimeUnit.MILLISECONDS);
470     }
471 
472     @Override
473     public boolean isFinished() {
474       return future.isDone();
475     }
476   }
477 
478   /**
479    * {@link Waitable} for waiting on required number of permits in a
480    * {@link Semaphore} to become available.
481    */
482   private static class SemaphoreWaitable extends WaitableWithoutResult {
483     /** Semaphore to wait on */
484     private final Semaphore semaphore;
485     /** How many permits to wait on */
486     private final int permits;
487 
488     /**
489      * Constructor
490      *
491      * @param semaphore Semaphore to wait on
492      * @param permits How many permits to wait on
493      */
494     public SemaphoreWaitable(Semaphore semaphore, int permits) {
495       this.semaphore = semaphore;
496       this.permits = permits;
497     }
498 
499     @Override
500     public void waitFor(int msecs) throws InterruptedException {
501       boolean acquired =
502           semaphore.tryAcquire(permits, msecs, TimeUnit.MILLISECONDS);
503       // Return permits if we managed to acquire them
504       if (acquired) {
505         semaphore.release(permits);
506       }
507     }
508 
509     @Override
510     public boolean isFinished() {
511       return semaphore.availablePermits() >= permits;
512     }
513   }
514 }