This project has retired. For details please refer to its
Attic page.
ProgressableUtils xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.utils;
20
21 import org.apache.hadoop.util.Progressable;
22 import org.apache.log4j.Logger;
23
24 import io.netty.channel.ChannelFuture;
25 import io.netty.channel.group.ChannelGroupFuture;
26 import io.netty.util.concurrent.EventExecutorGroup;
27
28 import java.util.ArrayList;
29 import java.util.Collections;
30 import java.util.HashMap;
31 import java.util.Iterator;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.concurrent.Callable;
35 import java.util.concurrent.ExecutionException;
36 import java.util.concurrent.ExecutorService;
37 import java.util.concurrent.Executors;
38 import java.util.concurrent.Future;
39 import java.util.concurrent.Semaphore;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.TimeoutException;
42
43
44 public class ProgressableUtils {
45
46 private static final Logger LOG =
47 Logger.getLogger(ProgressableUtils.class);
48
49 private static final int DEFUALT_MSEC_PERIOD = 60 * 1000;
50
51
52
53
54 private static final int MSEC_TO_WAIT_ON_EACH_FUTURE = 10;
55
56
57 private ProgressableUtils() {
58 }
59
60
61
62
63
64
65
66
67
68 public static void awaitExecutorTermination(ExecutorService executor,
69 Progressable progressable, int msecsPeriod) {
70 waitForever(new ExecutorServiceWaitable(executor), progressable,
71 msecsPeriod);
72 }
73
74
75
76
77
78
79
80
81 public static void awaitExecutorTermination(ExecutorService executor,
82 Progressable progressable) {
83 waitForever(new ExecutorServiceWaitable(executor), progressable);
84 }
85
86
87
88
89
90
91
92 public static void awaitTerminationFuture(EventExecutorGroup group,
93 Progressable progressable) {
94 waitForever(new FutureWaitable<>(group.terminationFuture()), progressable);
95 }
96
97
98
99
100
101
102
103
104
105
106 public static <T> T getFutureResult(Future<T> future,
107 Progressable progressable) {
108 return waitForever(new FutureWaitable<T>(future), progressable);
109 }
110
111
112
113
114
115
116
117
118 public static void awaitChannelGroupFuture(ChannelGroupFuture future,
119 Progressable progressable) {
120 waitForever(new ChannelGroupFutureWaitable(future), progressable);
121 }
122
123
124
125
126
127
128
129
130 public static void awaitChannelFuture(ChannelFuture future,
131 Progressable progressable) {
132 waitForever(new ChannelFutureWaitable(future), progressable);
133 }
134
135
136
137
138
139
140
141
142
143 public static void awaitSemaphorePermits(final Semaphore semaphore,
144 int permits, Progressable progressable) {
145 while (true) {
146 waitForever(new SemaphoreWaitable(semaphore, permits), progressable);
147
148
149 if (semaphore.tryAcquire(permits)) {
150 return;
151 }
152 }
153 }
154
155
156
157
158
159
160
161
162
163 private static <T> T waitForever(Waitable<T> waitable,
164 Progressable progressable) {
165 return waitForever(waitable, progressable, DEFUALT_MSEC_PERIOD);
166 }
167
168
169
170
171
172
173
174
175
176
177 private static <T> T waitForever(Waitable<T> waitable,
178 Progressable progressable, int msecsPeriod) {
179 while (true) {
180 waitFor(waitable, progressable, msecsPeriod, msecsPeriod);
181 if (waitable.isFinished()) {
182 try {
183 return waitable.getResult();
184 } catch (ExecutionException e) {
185 throw new IllegalStateException("waitForever: " +
186 "ExecutionException occurred while waiting for " + waitable, e);
187 } catch (InterruptedException e) {
188 throw new IllegalStateException("waitForever: " +
189 "InterruptedException occurred while waiting for " + waitable, e);
190 }
191 }
192 }
193 }
194
195
196
197
198
199
200
201
202
203
204
205
206 private static <T> T waitFor(Waitable<T> waitable, Progressable progressable,
207 int msecs, int msecsPeriod) {
208 long timeoutTimeMsecs = System.currentTimeMillis() + msecs;
209 int currentWaitMsecs;
210 while (true) {
211 progressable.progress();
212 currentWaitMsecs = Math.min(msecs, msecsPeriod);
213 try {
214 waitable.waitFor(currentWaitMsecs);
215 if (waitable.isFinished()) {
216 return waitable.getResult();
217 }
218 } catch (InterruptedException e) {
219 throw new IllegalStateException("waitFor: " +
220 "InterruptedException occurred while waiting for " + waitable, e);
221 } catch (ExecutionException e) {
222 throw new IllegalStateException("waitFor: " +
223 "ExecutionException occurred while waiting for " + waitable, e);
224 }
225 if (LOG.isInfoEnabled()) {
226 LOG.info("waitFor: Waiting for " + waitable);
227 }
228 if (System.currentTimeMillis() >= timeoutTimeMsecs) {
229 return waitable.getTimeoutResult();
230 }
231 msecs = Math.max(0, msecs - currentWaitMsecs);
232 }
233 }
234
235
236
237
238
239
240
241
242
243
244
245
246 public static <R> List<R> getResultsWithNCallables(
247 CallableFactory<R> callableFactory, int numThreads,
248 String threadNameFormat, Progressable progressable) {
249 ExecutorService executorService = Executors.newFixedThreadPool(numThreads,
250 ThreadUtils.createThreadFactory(threadNameFormat));
251 HashMap<Integer, Future<R>> futures = new HashMap<>(numThreads);
252 for (int i = 0; i < numThreads; i++) {
253 Callable<R> callable = callableFactory.newCallable(i);
254 Future<R> future = executorService.submit(
255 new LogStacktraceCallable<R>(callable));
256 futures.put(i, future);
257 }
258 executorService.shutdown();
259 List<R> futureResults =
260 new ArrayList<>(Collections.<R>nCopies(numThreads, null));
261
262
263 while (!futures.isEmpty()) {
264 Iterator<Map.Entry<Integer, Future<R>>> iterator =
265 futures.entrySet().iterator();
266 while (iterator.hasNext()) {
267 Map.Entry<Integer, Future<R>> entry = iterator.next();
268 R result;
269 try {
270
271 result = entry.getValue().get(
272 MSEC_TO_WAIT_ON_EACH_FUTURE, TimeUnit.MILLISECONDS);
273 } catch (InterruptedException e) {
274 throw new IllegalStateException("Interrupted", e);
275 } catch (ExecutionException e) {
276
277 if (e.getCause() instanceof RuntimeException) {
278 throw (RuntimeException) e.getCause();
279 } else {
280 throw new IllegalStateException("Exception occurred", e.getCause());
281 }
282
283 } catch (TimeoutException e) {
284
285 continue;
286 }
287
288 futureResults.set(entry.getKey(), result);
289
290 iterator.remove();
291 }
292 progressable.progress();
293 }
294 return futureResults;
295 }
296
297
298
299
300
301
302 private interface Waitable<T> {
303
304
305
306
307
308 void waitFor(int msecs) throws InterruptedException, ExecutionException;
309
310
311
312
313
314
315 boolean isFinished();
316
317
318
319
320
321
322 T getResult() throws ExecutionException, InterruptedException;
323
324
325
326
327
328
329 T getTimeoutResult();
330 }
331
332
333
334
335 private abstract static class WaitableWithoutResult
336 implements Waitable<Void> {
337 @Override
338 public Void getResult() throws ExecutionException, InterruptedException {
339 return null;
340 }
341
342 @Override
343 public Void getTimeoutResult() {
344 return null;
345 }
346 }
347
348
349
350
351
352
353 private static class FutureWaitable<T> implements Waitable<T> {
354
355 private final Future<T> future;
356
357
358
359
360
361
362 public FutureWaitable(Future<T> future) {
363 this.future = future;
364 }
365
366 @Override
367 public void waitFor(int msecs) throws InterruptedException,
368 ExecutionException {
369 try {
370 future.get(msecs, TimeUnit.MILLISECONDS);
371 } catch (TimeoutException e) {
372 if (LOG.isInfoEnabled()) {
373 LOG.info("waitFor: Future result not ready yet " + future);
374 }
375 }
376 }
377
378 @Override
379 public boolean isFinished() {
380 return future.isDone();
381 }
382
383 @Override
384 public T getResult() throws ExecutionException, InterruptedException {
385 return future.get();
386 }
387
388 @Override
389 public T getTimeoutResult() {
390 return null;
391 }
392 }
393
394
395
396
397 private static class ExecutorServiceWaitable extends WaitableWithoutResult {
398
399 private final ExecutorService executorService;
400
401
402
403
404
405
406 public ExecutorServiceWaitable(ExecutorService executorService) {
407 this.executorService = executorService;
408 }
409
410 @Override
411 public void waitFor(int msecs) throws InterruptedException {
412 executorService.awaitTermination(msecs, TimeUnit.MILLISECONDS);
413 }
414
415 @Override
416 public boolean isFinished() {
417 return executorService.isTerminated();
418 }
419 }
420
421
422
423
424
425 private static class ChannelGroupFutureWaitable extends
426 WaitableWithoutResult {
427
428 private final ChannelGroupFuture future;
429
430
431
432
433
434
435 public ChannelGroupFutureWaitable(ChannelGroupFuture future) {
436 this.future = future;
437 }
438
439 @Override
440 public void waitFor(int msecs) throws InterruptedException {
441 future.await(msecs, TimeUnit.MILLISECONDS);
442 }
443
444 @Override
445 public boolean isFinished() {
446 return future.isDone();
447 }
448 }
449
450
451
452
453
454 private static class ChannelFutureWaitable extends WaitableWithoutResult {
455
456 private final ChannelFuture future;
457
458
459
460
461
462
463 public ChannelFutureWaitable(ChannelFuture future) {
464 this.future = future;
465 }
466
467 @Override
468 public void waitFor(int msecs) throws InterruptedException {
469 future.await(msecs, TimeUnit.MILLISECONDS);
470 }
471
472 @Override
473 public boolean isFinished() {
474 return future.isDone();
475 }
476 }
477
478
479
480
481
482 private static class SemaphoreWaitable extends WaitableWithoutResult {
483
484 private final Semaphore semaphore;
485
486 private final int permits;
487
488
489
490
491
492
493
494 public SemaphoreWaitable(Semaphore semaphore, int permits) {
495 this.semaphore = semaphore;
496 this.permits = permits;
497 }
498
499 @Override
500 public void waitFor(int msecs) throws InterruptedException {
501 boolean acquired =
502 semaphore.tryAcquire(permits, msecs, TimeUnit.MILLISECONDS);
503
504 if (acquired) {
505 semaphore.release(permits);
506 }
507 }
508
509 @Override
510 public boolean isFinished() {
511 return semaphore.availablePermits() >= permits;
512 }
513 }
514 }