This project has retired. For details please refer to its Attic page.
OutOfCoreIOScheduler xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.ooc;
20  
21  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
22  import org.apache.giraph.conf.IntConfOption;
23  import org.apache.giraph.ooc.command.IOCommand;
24  import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
25  import org.apache.giraph.ooc.command.StoreDataBufferIOCommand;
26  import org.apache.giraph.ooc.command.StoreIncomingMessageIOCommand;
27  import org.apache.giraph.ooc.command.StorePartitionIOCommand;
28  import org.apache.giraph.ooc.command.WaitIOCommand;
29  import org.apache.giraph.ooc.policy.OutOfCoreOracle;
30  import org.apache.log4j.Logger;
31  
32  import java.util.ArrayList;
33  import java.util.Arrays;
34  import java.util.List;
35  import java.util.Queue;
36  import java.util.concurrent.ConcurrentLinkedQueue;
37  
38  import static com.google.common.base.Preconditions.checkNotNull;
39  
40  /**
41   * Representation of IO thread scheduler for out-of-core mechanism
42   */
43  public class OutOfCoreIOScheduler {
44    /**
45     * If an IO thread does not have any command to do, it waits for certain a
46     * period and check back again to see if there exist any command to perform.
47     * This constant determines this wait period in milliseconds.
48     */
49    public static final IntConfOption OOC_WAIT_INTERVAL =
50        new IntConfOption("giraph.oocWaitInterval", 1000,
51            "Duration (in milliseconds) which IO threads in out-of-core " +
52                "mechanism would wait until a command becomes available");
53    /** Class logger. */
54    private static final Logger LOG =
55        Logger.getLogger(OutOfCoreIOScheduler.class);
56    /** Out-of-core engine */
57    private final OutOfCoreEngine oocEngine;
58    /** How much an IO thread should wait if there is no IO command */
59    private final int waitInterval;
60    /**
61     * Queue of IO commands for loading partitions to memory. Load commands are
62     * urgent and should be done once loading data is a viable IO command.
63     */
64    private final List<Queue<IOCommand>> threadLoadCommandQueue;
65    /** Whether IO threads should terminate */
66    private volatile boolean shouldTerminate;
67  
68    /**
69     * Constructor
70     *
71     * @param conf configuration
72     * @param oocEngine out-of-core engine
73     * @param numDisks number of disks (IO threads)
74     */
75    OutOfCoreIOScheduler(final ImmutableClassesGiraphConfiguration conf,
76                         OutOfCoreEngine oocEngine, int numDisks) {
77      this.oocEngine = oocEngine;
78      this.waitInterval = OOC_WAIT_INTERVAL.get(conf);
79      threadLoadCommandQueue = new ArrayList<>(numDisks);
80      for (int i = 0; i < numDisks; ++i) {
81        threadLoadCommandQueue.add(
82            new ConcurrentLinkedQueue<IOCommand>());
83      }
84      shouldTerminate = false;
85    }
86  
87    /**
88     * Generate and return the next appropriate IO command for a given thread
89     *
90     * @param threadId id of the thread ready to execute the next IO command
91     * @return next IO command to be executed by the given thread
92     */
93    public IOCommand getNextIOCommand(int threadId) {
94      if (shouldTerminate) {
95        return null;
96      }
97      IOCommand command = null;
98      do {
99        if (command != null && LOG.isInfoEnabled()) {
100         LOG.info("getNextIOCommand: command " + command + " was proposed to " +
101             "the oracle, but got denied. Generating another command!");
102       }
103       OutOfCoreOracle.IOAction[] actions =
104           oocEngine.getOracle().getNextIOActions();
105       if (LOG.isDebugEnabled()) {
106         LOG.debug("getNextIOCommand: actions are " + Arrays.toString(actions));
107       }
108       // Check whether there are any urgent outstanding load requests
109       if (!threadLoadCommandQueue.get(threadId).isEmpty()) {
110         // Check whether loading a partition is a viable (allowed) action to do
111         boolean canLoad = false;
112         for (OutOfCoreOracle.IOAction action : actions) {
113           if (action == OutOfCoreOracle.IOAction.LOAD_PARTITION ||
114               action == OutOfCoreOracle.IOAction.LOAD_UNPROCESSED_PARTITION ||
115               action == OutOfCoreOracle.IOAction.LOAD_TO_SWAP_PARTITION ||
116               action == OutOfCoreOracle.IOAction.URGENT_LOAD_PARTITION) {
117             canLoad = true;
118             break;
119           }
120         }
121         if (canLoad) {
122           command = threadLoadCommandQueue.get(threadId).poll();
123           checkNotNull(command);
124           if (oocEngine.getOracle().approve(command)) {
125             return command;
126           } else {
127             // Loading is not viable at this moment. We should put the command
128             // back in the load queue and wait until loading becomes viable.
129             threadLoadCommandQueue.get(threadId).offer(command);
130           }
131         }
132       }
133       command = null;
134       for (OutOfCoreOracle.IOAction action : actions) {
135         Integer partitionId;
136         switch (action) {
137         case STORE_MESSAGES_AND_BUFFERS:
138           partitionId = oocEngine.getMetaPartitionManager()
139               .getOffloadPartitionBufferId(threadId);
140           if (partitionId != null) {
141             command = new StoreDataBufferIOCommand(oocEngine, partitionId,
142                 StoreDataBufferIOCommand.DataBufferType.PARTITION);
143           } else {
144             partitionId = oocEngine.getMetaPartitionManager()
145                 .getOffloadMessageBufferId(threadId);
146             if (partitionId != null) {
147               command = new StoreDataBufferIOCommand(oocEngine, partitionId,
148                   StoreDataBufferIOCommand.DataBufferType.MESSAGE);
149             } else {
150               partitionId = oocEngine.getMetaPartitionManager()
151                   .getOffloadMessageId(threadId);
152               if (partitionId != null) {
153                 command = new StoreIncomingMessageIOCommand(oocEngine,
154                     partitionId);
155               }
156             }
157           }
158           break;
159         case STORE_PROCESSED_PARTITION:
160           partitionId = oocEngine.getMetaPartitionManager()
161               .getOffloadPartitionId(threadId);
162           if (partitionId != null &&
163               oocEngine.getMetaPartitionManager()
164                   .isPartitionProcessed(partitionId)) {
165             command = new StorePartitionIOCommand(oocEngine, partitionId);
166           }
167           break;
168         case STORE_PARTITION:
169           partitionId = oocEngine.getMetaPartitionManager()
170               .getOffloadPartitionId(threadId);
171           if (partitionId != null) {
172             command = new StorePartitionIOCommand(oocEngine, partitionId);
173           }
174           break;
175         case LOAD_UNPROCESSED_PARTITION:
176           partitionId = oocEngine.getMetaPartitionManager()
177               .getLoadPartitionId(threadId);
178           if (partitionId != null &&
179               !oocEngine.getMetaPartitionManager()
180                   .isPartitionProcessed(partitionId)) {
181             command = new LoadPartitionIOCommand(oocEngine, partitionId,
182                 oocEngine.getSuperstep());
183           }
184           break;
185         case LOAD_TO_SWAP_PARTITION:
186           partitionId = oocEngine.getMetaPartitionManager()
187               .getLoadPartitionId(threadId);
188           if (partitionId != null &&
189               !oocEngine.getMetaPartitionManager()
190                   .isPartitionProcessed(partitionId) &&
191               oocEngine.getMetaPartitionManager().hasProcessedOnMemory()) {
192             command = new LoadPartitionIOCommand(oocEngine, partitionId,
193                 oocEngine.getSuperstep());
194           }
195           break;
196         case LOAD_PARTITION:
197           partitionId = oocEngine.getMetaPartitionManager()
198               .getLoadPartitionId(threadId);
199           if (partitionId != null) {
200             if (oocEngine.getMetaPartitionManager()
201                 .isPartitionProcessed(partitionId)) {
202               command = new LoadPartitionIOCommand(oocEngine, partitionId,
203                   oocEngine.getSuperstep() + 1);
204             } else {
205               command = new LoadPartitionIOCommand(oocEngine, partitionId,
206                   oocEngine.getSuperstep());
207             }
208           }
209           break;
210         case URGENT_LOAD_PARTITION:
211           // Do nothing
212           break;
213         default:
214           throw new IllegalStateException("getNextIOCommand: the IO action " +
215               "is not defined!");
216         }
217         if (command != null) {
218           break;
219         }
220       }
221       if (command == null) {
222         command = new WaitIOCommand(oocEngine, waitInterval);
223       }
224     } while (!oocEngine.getOracle().approve(command));
225     return command;
226   }
227 
228   /**
229    * Notify IO scheduler that the IO command is completed
230    *
231    * @param command completed command
232    */
233   public void ioCommandCompleted(IOCommand command) {
234     oocEngine.ioCommandCompleted(command);
235   }
236 
237   /**
238    * Add an IO command to the scheduling queue of the IO scheduler
239    *
240    * @param ioCommand IO command to add to the scheduler
241    */
242   public void addIOCommand(IOCommand ioCommand) {
243     if (ioCommand instanceof LoadPartitionIOCommand) {
244       int ownerThread = oocEngine.getMetaPartitionManager()
245           .getOwnerThreadId(ioCommand.getPartitionId());
246       threadLoadCommandQueue.get(ownerThread).offer(ioCommand);
247     } else {
248       throw new IllegalStateException("addIOCommand: IO command type is not " +
249           "supported for addition");
250     }
251   }
252 
253   /**
254    * Shutdown/Terminate the IO scheduler, and notify all IO threads to halt
255    */
256   public void shutdown() {
257     shouldTerminate = true;
258     if (LOG.isInfoEnabled()) {
259       LOG.info("shutdown: OutOfCoreIOScheduler shutting down!");
260     }
261   }
262 }