View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.ooc.policy;
20  
21  import com.sun.management.GarbageCollectionNotificationInfo;
22  import org.apache.giraph.conf.GiraphConstants;
23  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24  import org.apache.giraph.ooc.OutOfCoreEngine;
25  import org.apache.giraph.ooc.command.IOCommand;
26  import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
27  import org.apache.giraph.ooc.command.StorePartitionIOCommand;
28  import org.apache.log4j.Logger;
29  
30  import java.util.concurrent.atomic.AtomicInteger;
31  
32  import static com.google.common.base.Preconditions.checkState;
33  
34  /** Oracle for fixed out-of-core mechanism */
35  public class FixedPartitionsOracle implements OutOfCoreOracle {
36    /** Class logger */
37    private static final Logger LOG =
38        Logger.getLogger(FixedPartitionsOracle.class);
39    /** Maximum number of partitions to be kept in memory */
40    private final int maxPartitionsInMemory;
41    /**
42     * Number of partitions to be added (loaded) or removed (stored) to/from
43     * memory. Each outstanding load partition counts +1 and each outstanding
44     * store partition counts -1 toward this counter.
45     */
46    private final AtomicInteger deltaNumPartitionsInMemory =
47        new AtomicInteger(0);
48    /** Out-of-core engine */
49    private final OutOfCoreEngine oocEngine;
50  
51    /**
52     * Constructor
53     *
54     * @param conf configuration
55     * @param oocEngine out-of-core engine
56     */
57    public FixedPartitionsOracle(ImmutableClassesGiraphConfiguration conf,
58                                 OutOfCoreEngine oocEngine) {
59      this.maxPartitionsInMemory =
60          GiraphConstants.MAX_PARTITIONS_IN_MEMORY.get(conf);
61      this.oocEngine = oocEngine;
62    }
63  
64    @Override
65    public IOAction[] getNextIOActions() {
66      int numPartitionsInMemory =
67          oocEngine.getMetaPartitionManager().getNumInMemoryPartitions();
68      int numPartialPartitionsInMemory =
69          oocEngine.getMetaPartitionManager().getNumPartiallyInMemoryPartitions();
70      if (LOG.isDebugEnabled()) {
71        LOG.debug("getNextIOActions: calling with " + numPartitionsInMemory +
72            " partitions entirely in memory and " + numPartialPartitionsInMemory +
73            " partitions partially in memory, " +
74            deltaNumPartitionsInMemory.get() + " to be loaded");
75      }
76      checkState(numPartitionsInMemory >= 0);
77      checkState(numPartialPartitionsInMemory >= 0);
78      int numPartitions =
79          numPartitionsInMemory + deltaNumPartitionsInMemory.get();
80      // Fixed out-of-core policy:
81      //   - if the number of partitions in memory is less than the max number of
82      //     partitions in memory, we should load a partition to memory. This
83      //     basically means we are prefetching partition's data either for the
84      //     current superstep, or for the next superstep.
85      //   - if the number of partitions in memory is equal to the the max number
86      //     of partitions in memory, we do a 'soft store', meaning, we store
87      //     processed partition to disk only if there is an unprocessed partition
88      //     on disk. This basically makes room for unprocessed partitions on disk
89      //     to be prefetched.
90      //   - if the number of partitions in memory is more than the max number of
91      //     partitions in memory, we do a 'hard store', meaning we store a
92      //     partition to disk, regardless of its processing state.
93      if (numPartitions < maxPartitionsInMemory) {
94        return new IOAction[]{
95          IOAction.LOAD_PARTITION,
96          IOAction.STORE_MESSAGES_AND_BUFFERS};
97      } else if (numPartitions > maxPartitionsInMemory) {
98        LOG.warn("getNextIOActions: number of partitions in memory passed the " +
99            "specified threshold!");
100       return new IOAction[]{
101         IOAction.STORE_PARTITION,
102         IOAction.STORE_MESSAGES_AND_BUFFERS};
103     } else {
104       return new IOAction[]{
105         IOAction.STORE_MESSAGES_AND_BUFFERS,
106         IOAction.LOAD_TO_SWAP_PARTITION};
107     }
108   }
109 
110   @Override
111   public boolean approve(IOCommand command) {
112     int numPartitionsInMemory = oocEngine.getMetaPartitionManager()
113         .getNumInMemoryPartitions();
114     // If loading a partition result in having more partition in memory, the
115     // command should be denied. Also, if number of partitions in memory is
116     // already less than the max number of partitions, any command for storing
117     // a partition should be denied.
118     if (command instanceof LoadPartitionIOCommand &&
119         numPartitionsInMemory + deltaNumPartitionsInMemory.getAndIncrement() >
120             maxPartitionsInMemory) {
121       deltaNumPartitionsInMemory.getAndDecrement();
122       return false;
123 
124     } else if (command instanceof StorePartitionIOCommand &&
125         numPartitionsInMemory + deltaNumPartitionsInMemory.getAndDecrement() <
126             maxPartitionsInMemory) {
127       deltaNumPartitionsInMemory.getAndIncrement();
128       return false;
129     }
130     return true;
131   }
132 
133   @Override
134   public void commandCompleted(IOCommand command) {
135     if (command instanceof LoadPartitionIOCommand) {
136       deltaNumPartitionsInMemory.getAndDecrement();
137     } else if (command instanceof StorePartitionIOCommand) {
138       deltaNumPartitionsInMemory.getAndIncrement();
139     }
140   }
141 
142   @Override
143   public void gcCompleted(GarbageCollectionNotificationInfo gcInfo) { }
144 
145   @Override
146   public void shutdown() { }
147 }