This project has retired. For details please refer to its Attic page.
OutOfCoreIOCallableFactory xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.ooc;
20  
21  import org.apache.giraph.utils.CallableFactory;
22  import org.apache.giraph.utils.ThreadUtils;
23  import org.apache.log4j.Logger;
24  
25  import java.util.ArrayList;
26  import java.util.List;
27  import java.util.concurrent.Callable;
28  import java.util.concurrent.ExecutionException;
29  import java.util.concurrent.ExecutorService;
30  import java.util.concurrent.Future;
31  import java.util.concurrent.LinkedBlockingQueue;
32  import java.util.concurrent.ThreadPoolExecutor;
33  import java.util.concurrent.TimeUnit;
34  
35  /**
36   * Factory class to create IO threads for out-of-core engine.
37   */
38  public class OutOfCoreIOCallableFactory {
39    /** Class logger. */
40    private static final Logger LOG =
41        Logger.getLogger(OutOfCoreIOCallableFactory.class);
42    /** Out-of-core engine */
43    private final OutOfCoreEngine oocEngine;
44    /** Result of IO threads at the end of the computation */
45    private final List<Future> results;
46    /** Number of threads used for IO operations */
47    private final int numIOThreads;
48    /** Thread UncaughtExceptionHandler to use */
49    private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
50    /** Executor service for IO threads */
51    private ExecutorService outOfCoreIOExecutor;
52  
53    /**
54     * Constructor
55     * @param oocEngine Out-of-core engine
56     * @param numIOThreads Number of IO threads used
57     * @param uncaughtExceptionHandler Thread UncaughtExceptionHandler to use
58     */
59    public OutOfCoreIOCallableFactory(OutOfCoreEngine oocEngine,
60        int numIOThreads,
61        Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
62      this.oocEngine = oocEngine;
63      this.numIOThreads = numIOThreads;
64      this.results = new ArrayList<>(numIOThreads);
65      this.uncaughtExceptionHandler = uncaughtExceptionHandler;
66    }
67  
68    /**
69     * Creates/Launches IO threads
70     */
71    public void createCallable() {
72      CallableFactory<Void> outOfCoreIOCallableFactory =
73        new CallableFactory<Void>() {
74          @Override
75          public Callable<Void> newCallable(int callableId) {
76            return new OutOfCoreIOCallable(oocEngine, callableId);
77          }
78        };
79      outOfCoreIOExecutor = new ThreadPoolExecutor(numIOThreads, numIOThreads, 0L,
80          TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
81          ThreadUtils.createThreadFactory("ooc-io-%d"));
82  
83      for (int i = 0; i < numIOThreads; ++i) {
84        Future<Void> future = ThreadUtils.submitToExecutor(outOfCoreIOExecutor,
85            outOfCoreIOCallableFactory.newCallable(i), uncaughtExceptionHandler);
86        results.add(future);
87      }
88      // Notify executor to not accept any more tasks
89      outOfCoreIOExecutor.shutdown();
90    }
91  
92    /**
93     * Check whether all IO threads terminated gracefully.
94     */
95    public void shutdown() {
96      boolean threadsTerminated = false;
97      while (!threadsTerminated) {
98        if (LOG.isInfoEnabled()) {
99          LOG.info("shutdown: waiting for IO threads to finish!");
100       }
101       try {
102         threadsTerminated =
103             outOfCoreIOExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS);
104       } catch (InterruptedException e) {
105         throw new IllegalStateException("shutdown: caught " +
106             "InterruptedException while waiting for IO threads to finish");
107       }
108     }
109     for (int i = 0; i < numIOThreads; ++i) {
110       try {
111         // Check whether the tread terminated gracefully
112         results.get(i).get();
113       } catch (InterruptedException e) {
114         LOG.error("shutdown: IO thread " + i + " was interrupted during its " +
115             "execution");
116         throw new IllegalStateException(e);
117       } catch (ExecutionException e) {
118         LOG.error("shutdown: IO thread " + i + " threw an exception during " +
119             "its execution");
120         throw new IllegalStateException(e);
121       }
122     }
123   }
124 }