View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.utils;
20  
21  import org.apache.hadoop.util.Progressable;
22  import org.apache.log4j.Logger;
23  
24  import io.netty.channel.ChannelFuture;
25  import io.netty.channel.group.ChannelGroupFuture;
26  import io.netty.util.concurrent.EventExecutorGroup;
27  
28  import java.util.ArrayList;
29  import java.util.Collections;
30  import java.util.HashMap;
31  import java.util.Iterator;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.concurrent.Callable;
35  import java.util.concurrent.ExecutionException;
36  import java.util.concurrent.ExecutorService;
37  import java.util.concurrent.Executors;
38  import java.util.concurrent.Future;
39  import java.util.concurrent.Semaphore;
40  import java.util.concurrent.TimeUnit;
41  import java.util.concurrent.TimeoutException;
42  
43  /** Functions for waiting on some events to happen while reporting progress */
44  public class ProgressableUtils {
45    /** Class logger */
46    private static final Logger LOG =
47        Logger.getLogger(ProgressableUtils.class);
48    /** Msecs to refresh the progress meter (one minute) */
49    private static final int DEFUALT_MSEC_PERIOD = 60 * 1000;
50    /**
51     * When getting results with many threads, how many milliseconds to wait
52     * on each when looping through them
53     */
54    private static final int MSEC_TO_WAIT_ON_EACH_FUTURE = 10;
55  
56    /** Do not instantiate. */
57    private ProgressableUtils() {
58    }
59  
60    /**
61     * Wait for executor tasks to terminate, while periodically reporting
62     * progress.
63     *
64     * @param executor     Executor which we are waiting for
65     * @param progressable Progressable for reporting progress (Job context)
66     * @param msecsPeriod How often to report progress
67     */
68    public static void awaitExecutorTermination(ExecutorService executor,
69        Progressable progressable, int msecsPeriod) {
70      waitForever(new ExecutorServiceWaitable(executor), progressable,
71          msecsPeriod);
72    }
73  
74    /**
75     * Wait for executor tasks to terminate, while periodically reporting
76     * progress.
77     *
78     * @param executor     Executor which we are waiting for
79     * @param progressable Progressable for reporting progress (Job context)
80     */
81    public static void awaitExecutorTermination(ExecutorService executor,
82        Progressable progressable) {
83      waitForever(new ExecutorServiceWaitable(executor), progressable);
84    }
85  
86    /**
87     * Wait for executorgroup to terminate, while periodically reporting progress
88     *
89     * @param group ExecutorGroup whose termination we are awaiting
90     * @param progressable Progressable for reporting progress (Job context)
91     */
92    public static void awaitTerminationFuture(EventExecutorGroup group,
93                                              Progressable progressable) {
94      waitForever(new FutureWaitable<>(group.terminationFuture()), progressable);
95    }
96  
97    /**
98     * Wait for the result of the future to be ready, while periodically
99     * reporting progress.
100    *
101    * @param <T>          Type of the return value of the future
102    * @param future       Future
103    * @param progressable Progressable for reporting progress (Job context)
104    * @return Computed result of the future.
105    */
106   public static <T> T getFutureResult(Future<T> future,
107       Progressable progressable) {
108     return waitForever(new FutureWaitable<T>(future), progressable);
109   }
110 
111   /**
112    * Wait for {@link ChannelGroupFuture} to finish, while periodically
113    * reporting progress.
114    *
115    * @param future       ChannelGroupFuture
116    * @param progressable Progressable for reporting progress (Job context)
117    */
118   public static void awaitChannelGroupFuture(ChannelGroupFuture future,
119       Progressable progressable) {
120     waitForever(new ChannelGroupFutureWaitable(future), progressable);
121   }
122 
123   /**
124    * Wait for {@link ChannelFuture} to finish, while periodically
125    * reporting progress.
126    *
127    * @param future       ChannelFuture
128    * @param progressable Progressable for reporting progress (Job context)
129    */
130   public static void awaitChannelFuture(ChannelFuture future,
131       Progressable progressable) {
132     waitForever(new ChannelFutureWaitable(future), progressable);
133   }
134 
135   /**
136    * Wait to acquire enough permits from {@link Semaphore}, while periodically
137    * reporting progress.
138    *
139    * @param semaphore    Semaphore
140    * @param permits      How many permits to acquire
141    * @param progressable Progressable for reporting progress (Job context)
142    */
143   public static void awaitSemaphorePermits(final Semaphore semaphore,
144       int permits, Progressable progressable) {
145     while (true) {
146       waitForever(new SemaphoreWaitable(semaphore, permits), progressable);
147       // Verify permits were not taken by another thread,
148       // if they were keep looping
149       if (semaphore.tryAcquire(permits)) {
150         return;
151       }
152     }
153   }
154 
155   /**
156    * Wait forever for waitable to finish. Periodically reports progress.
157    *
158    * @param waitable Waitable which we wait for
159    * @param progressable Progressable for reporting progress (Job context)
160    * @param <T> Result type
161    * @return Result of waitable
162    */
163   private static <T> T waitForever(Waitable<T> waitable,
164       Progressable progressable) {
165     return waitForever(waitable, progressable, DEFUALT_MSEC_PERIOD);
166   }
167 
168   /**
169    * Wait forever for waitable to finish. Periodically reports progress.
170    *
171    * @param waitable Waitable which we wait for
172    * @param progressable Progressable for reporting progress (Job context)
173    * @param msecsPeriod How often to report progress
174    * @param <T> Result type
175    * @return Result of waitable
176    */
177   private static <T> T waitForever(Waitable<T> waitable,
178       Progressable progressable, int msecsPeriod) {
179     while (true) {
180       waitFor(waitable, progressable, msecsPeriod, msecsPeriod);
181       if (waitable.isFinished()) {
182         try {
183           return waitable.getResult();
184         } catch (ExecutionException e) {
185           throw new IllegalStateException("waitForever: " +
186               "ExecutionException occurred while waiting for " + waitable, e);
187         } catch (InterruptedException e) {
188           throw new IllegalStateException("waitForever: " +
189               "InterruptedException occurred while waiting for " + waitable, e);
190         }
191       }
192     }
193   }
194 
195   /**
196    *  Wait for desired number of milliseconds for waitable to finish.
197    *  Periodically reports progress.
198    *
199    * @param waitable Waitable which we wait for
200    * @param progressable Progressable for reporting progress (Job context)
201    * @param msecs Number of milliseconds to wait for
202    * @param msecsPeriod How often to report progress
203    * @param <T> Result type
204    * @return Result of waitable
205    */
206   private static <T> T waitFor(Waitable<T> waitable, Progressable progressable,
207       int msecs, int msecsPeriod) {
208     long timeoutTimeMsecs = System.currentTimeMillis() + msecs;
209     int currentWaitMsecs;
210     while (true) {
211       progressable.progress();
212       currentWaitMsecs = Math.min(msecs, msecsPeriod);
213       try {
214         waitable.waitFor(currentWaitMsecs);
215         if (waitable.isFinished()) {
216           return waitable.getResult();
217         }
218       } catch (InterruptedException e) {
219         throw new IllegalStateException("waitFor: " +
220             "InterruptedException occurred while waiting for " + waitable, e);
221       } catch (ExecutionException e) {
222         throw new IllegalStateException("waitFor: " +
223             "ExecutionException occurred while waiting for " + waitable, e);
224       }
225       if (LOG.isInfoEnabled()) {
226         LOG.info("waitFor: Waiting for " + waitable);
227       }
228       if (System.currentTimeMillis() >= timeoutTimeMsecs) {
229         return waitable.getTimeoutResult();
230       }
231       msecs = Math.max(0, msecs - currentWaitMsecs);
232     }
233   }
234 
235   /**
236    * Create {#link numThreads} callables from {#link callableFactory},
237    * execute them and gather results.
238    *
239    * @param callableFactory Factory for Callables
240    * @param numThreads Number of threads to use
241    * @param threadNameFormat Format for thread name
242    * @param progressable Progressable for reporting progress
243    * @param <R> Type of Callable's results
244    * @return List of results from Callables
245    */
246   public static <R> List<R> getResultsWithNCallables(
247       CallableFactory<R> callableFactory, int numThreads,
248       String threadNameFormat, Progressable progressable) {
249     ExecutorService executorService = Executors.newFixedThreadPool(numThreads,
250         ThreadUtils.createThreadFactory(threadNameFormat));
251     HashMap<Integer, Future<R>> futures = new HashMap<>(numThreads);
252     for (int i = 0; i < numThreads; i++) {
253       Callable<R> callable = callableFactory.newCallable(i);
254       Future<R> future = executorService.submit(
255           new LogStacktraceCallable<R>(callable));
256       futures.put(i, future);
257     }
258     executorService.shutdown();
259     List<R> futureResults =
260         new ArrayList<>(Collections.<R>nCopies(numThreads, null));
261     // Loop through the futures until all are finished
262     // We do this in order to get any exceptions from the futures early
263     while (!futures.isEmpty()) {
264       Iterator<Map.Entry<Integer, Future<R>>> iterator =
265           futures.entrySet().iterator();
266       while (iterator.hasNext()) {
267         Map.Entry<Integer, Future<R>> entry = iterator.next();
268         R result;
269         try {
270           // Try to get result from the future
271           result = entry.getValue().get(
272               MSEC_TO_WAIT_ON_EACH_FUTURE, TimeUnit.MILLISECONDS);
273         } catch (InterruptedException | ExecutionException e) {
274           throw new IllegalStateException("Exception occurred", e);
275         } catch (TimeoutException e) {
276           // If result is not ready yet just keep waiting
277           continue;
278         }
279         // Result is ready, put it to final results
280         futureResults.set(entry.getKey(), result);
281         // Remove current future since we are done with it
282         iterator.remove();
283       }
284       progressable.progress();
285     }
286     return futureResults;
287   }
288 
289   /**
290    * Interface for waiting on a result from some operation.
291    *
292    * @param <T> Result type.
293    */
294   private interface Waitable<T> {
295     /**
296      * Wait for desired number of milliseconds for waitable to finish.
297      *
298      * @param msecs Number of milliseconds to wait.
299      */
300     void waitFor(int msecs) throws InterruptedException, ExecutionException;
301 
302     /**
303      * Check if waitable is finished.
304      *
305      * @return True iff waitable finished.
306      */
307     boolean isFinished();
308 
309     /**
310      * Get result of waitable. Call after isFinished() returns true.
311      *
312      * @return Result of waitable.
313      */
314     T getResult() throws ExecutionException, InterruptedException;
315 
316     /**
317      * Get the result which we want to return in case of timeout.
318      *
319      * @return Timeout result.
320      */
321     T getTimeoutResult();
322   }
323 
324   /**
325    * abstract class for waitables which don't have the result.
326    */
327   private abstract static class WaitableWithoutResult
328       implements Waitable<Void> {
329     @Override
330     public Void getResult() throws ExecutionException, InterruptedException {
331       return null;
332     }
333 
334     @Override
335     public Void getTimeoutResult() {
336       return null;
337     }
338   }
339 
340   /**
341    * {@link Waitable} for waiting on a result of a {@link Future}.
342    *
343    * @param <T> Future result type
344    */
345   private static class FutureWaitable<T> implements Waitable<T> {
346     /** Future which we want to wait for */
347     private final Future<T> future;
348 
349     /**
350      * Constructor
351      *
352      * @param future Future which we want to wait for
353      */
354     public FutureWaitable(Future<T> future) {
355       this.future = future;
356     }
357 
358     @Override
359     public void waitFor(int msecs) throws InterruptedException,
360         ExecutionException {
361       try {
362         future.get(msecs, TimeUnit.MILLISECONDS);
363       } catch (TimeoutException e) {
364         if (LOG.isInfoEnabled()) {
365           LOG.info("waitFor: Future result not ready yet " + future);
366         }
367       }
368     }
369 
370     @Override
371     public boolean isFinished() {
372       return future.isDone();
373     }
374 
375     @Override
376     public T getResult() throws ExecutionException, InterruptedException {
377       return future.get();
378     }
379 
380     @Override
381     public T getTimeoutResult() {
382       return null;
383     }
384   }
385 
386   /**
387    * {@link Waitable} for waiting on an {@link ExecutorService} to terminate.
388    */
389   private static class ExecutorServiceWaitable extends WaitableWithoutResult {
390     /** ExecutorService which we want to wait for */
391     private final ExecutorService executorService;
392 
393     /**
394      * Constructor
395      *
396      * @param executorService ExecutorService which we want to wait for
397      */
398     public ExecutorServiceWaitable(ExecutorService executorService) {
399       this.executorService = executorService;
400     }
401 
402     @Override
403     public void waitFor(int msecs) throws InterruptedException {
404       executorService.awaitTermination(msecs, TimeUnit.MILLISECONDS);
405     }
406 
407     @Override
408     public boolean isFinished() {
409       return executorService.isTerminated();
410     }
411   }
412 
413   /**
414    * {@link Waitable} for waiting on a {@link ChannelGroupFuture} to
415    * terminate.
416    */
417   private static class ChannelGroupFutureWaitable extends
418       WaitableWithoutResult {
419     /** ChannelGroupFuture which we want to wait for */
420     private final ChannelGroupFuture future;
421 
422     /**
423      * Constructor
424      *
425      * @param future ChannelGroupFuture which we want to wait for
426      */
427     public ChannelGroupFutureWaitable(ChannelGroupFuture future) {
428       this.future = future;
429     }
430 
431     @Override
432     public void waitFor(int msecs) throws InterruptedException {
433       future.await(msecs, TimeUnit.MILLISECONDS);
434     }
435 
436     @Override
437     public boolean isFinished() {
438       return future.isDone();
439     }
440   }
441 
442   /**
443    * {@link Waitable} for waiting on a {@link ChannelFuture} to
444    * terminate.
445    */
446   private static class ChannelFutureWaitable extends WaitableWithoutResult {
447     /** ChannelGroupFuture which we want to wait for */
448     private final ChannelFuture future;
449 
450     /**
451      * Constructor
452      *
453      * @param future ChannelFuture which we want to wait for
454      */
455     public ChannelFutureWaitable(ChannelFuture future) {
456       this.future = future;
457     }
458 
459     @Override
460     public void waitFor(int msecs) throws InterruptedException {
461       future.await(msecs, TimeUnit.MILLISECONDS);
462     }
463 
464     @Override
465     public boolean isFinished() {
466       return future.isDone();
467     }
468   }
469 
470   /**
471    * {@link Waitable} for waiting on required number of permits in a
472    * {@link Semaphore} to become available.
473    */
474   private static class SemaphoreWaitable extends WaitableWithoutResult {
475     /** Semaphore to wait on */
476     private final Semaphore semaphore;
477     /** How many permits to wait on */
478     private final int permits;
479 
480     /**
481      * Constructor
482      *
483      * @param semaphore Semaphore to wait on
484      * @param permits How many permits to wait on
485      */
486     public SemaphoreWaitable(Semaphore semaphore, int permits) {
487       this.semaphore = semaphore;
488       this.permits = permits;
489     }
490 
491     @Override
492     public void waitFor(int msecs) throws InterruptedException {
493       boolean acquired =
494           semaphore.tryAcquire(permits, msecs, TimeUnit.MILLISECONDS);
495       // Return permits if we managed to acquire them
496       if (acquired) {
497         semaphore.release(permits);
498       }
499     }
500 
501     @Override
502     public boolean isFinished() {
503       return semaphore.availablePermits() >= permits;
504     }
505   }
506 }