This project has retired. For details please refer to its Attic page.
FixedPartitionsOracle xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.ooc.policy;
20  
21  import com.sun.management.GarbageCollectionNotificationInfo;
22  import org.apache.giraph.conf.GiraphConstants;
23  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24  import org.apache.giraph.ooc.OutOfCoreEngine;
25  import org.apache.giraph.ooc.command.IOCommand;
26  import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
27  import org.apache.giraph.ooc.command.StorePartitionIOCommand;
28  import org.apache.log4j.Logger;
29  
30  import java.util.concurrent.atomic.AtomicInteger;
31  
32  import static com.google.common.base.Preconditions.checkState;
33  
34  /** Oracle for fixed out-of-core mechanism */
35  public class FixedPartitionsOracle implements OutOfCoreOracle {
36    /** Class logger */
37    private static final Logger LOG =
38        Logger.getLogger(FixedPartitionsOracle.class);
39    /** Maximum number of partitions to be kept in memory */
40    private final int maxPartitionsInMemory;
41    /**
42     * Number of partitions to be added (loaded) or removed (stored) to/from
43     * memory. Each outstanding load partition counts +1 and each outstanding
44     * store partition counts -1 toward this counter.
45     */
46    private final AtomicInteger deltaNumPartitionsInMemory =
47        new AtomicInteger(0);
48    /** Out-of-core engine */
49    private final OutOfCoreEngine oocEngine;
50  
51    /**
52     * Constructor
53     *
54     * @param conf configuration
55     * @param oocEngine out-of-core engine
56     */
57    public FixedPartitionsOracle(ImmutableClassesGiraphConfiguration conf,
58                                 OutOfCoreEngine oocEngine) {
59      this.maxPartitionsInMemory =
60          GiraphConstants.MAX_PARTITIONS_IN_MEMORY.get(conf);
61      this.oocEngine = oocEngine;
62    }
63  
64    @Override
65    public IOAction[] getNextIOActions() {
66      int numPartitionsInMemory =
67          oocEngine.getMetaPartitionManager().getNumInMemoryPartitions();
68      int numPartialPartitionsInMemory =
69          oocEngine.getMetaPartitionManager().getNumPartiallyInMemoryPartitions();
70      if (LOG.isDebugEnabled()) {
71        LOG.debug("getNextIOActions: calling with " + numPartitionsInMemory +
72            " partitions entirely in memory and " + numPartialPartitionsInMemory +
73            " partitions partially in memory, " +
74            deltaNumPartitionsInMemory.get() + " to be loaded");
75      }
76      checkState(numPartitionsInMemory >= 0);
77      checkState(numPartialPartitionsInMemory >= 0);
78      int numPartitions =
79          numPartitionsInMemory + deltaNumPartitionsInMemory.get();
80      // Fixed out-of-core policy:
81      //   - if the number of partitions in memory is less than the max number of
82      //     partitions in memory, we should load a partition to memory. This
83      //     basically means we are prefetching partition's data either for the
84      //     current superstep, or for the next superstep.
85      //   - if the number of partitions in memory is equal to the the max number
86      //     of partitions in memory, we do a 'soft store', meaning, we store
87      //     processed partition to disk only if there is an unprocessed partition
88      //     on disk. This basically makes room for unprocessed partitions on disk
89      //     to be prefetched.
90      //   - if the number of partitions in memory is more than the max number of
91      //     partitions in memory, we do a 'hard store', meaning we store a
92      //     partition to disk, regardless of its processing state.
93      if (numPartitions < maxPartitionsInMemory) {
94        return new IOAction[]{
95          IOAction.LOAD_PARTITION,
96          IOAction.STORE_MESSAGES_AND_BUFFERS};
97      } else if (numPartitions > maxPartitionsInMemory) {
98        if (LOG.isDebugEnabled()) {
99          LOG.debug("getNextIOActions: number of partitions in memory passed " +
100           "the specified threshold!");
101       }
102       return new IOAction[]{
103         IOAction.STORE_PARTITION,
104         IOAction.STORE_MESSAGES_AND_BUFFERS};
105     } else {
106       return new IOAction[]{
107         IOAction.STORE_MESSAGES_AND_BUFFERS,
108         IOAction.LOAD_TO_SWAP_PARTITION};
109     }
110   }
111 
112   @Override
113   public boolean approve(IOCommand command) {
114     int numPartitionsInMemory = oocEngine.getMetaPartitionManager()
115         .getNumInMemoryPartitions();
116     // If loading a partition result in having more partition in memory, the
117     // command should be denied. Also, if number of partitions in memory is
118     // already less than the max number of partitions, any command for storing
119     // a partition should be denied.
120     if (command instanceof LoadPartitionIOCommand &&
121         numPartitionsInMemory + deltaNumPartitionsInMemory.getAndIncrement() >
122             maxPartitionsInMemory) {
123       deltaNumPartitionsInMemory.getAndDecrement();
124       return false;
125 
126     } else if (command instanceof StorePartitionIOCommand &&
127         numPartitionsInMemory + deltaNumPartitionsInMemory.getAndDecrement() <
128             maxPartitionsInMemory) {
129       deltaNumPartitionsInMemory.getAndIncrement();
130       return false;
131     }
132     return true;
133   }
134 
135   @Override
136   public void commandCompleted(IOCommand command) {
137     if (command instanceof LoadPartitionIOCommand) {
138       deltaNumPartitionsInMemory.getAndDecrement();
139     } else if (command instanceof StorePartitionIOCommand) {
140       deltaNumPartitionsInMemory.getAndIncrement();
141     }
142   }
143 
144   @Override
145   public void gcCompleted(GarbageCollectionNotificationInfo gcInfo) { }
146 
147   @Override
148   public void startIteration() {
149   }
150 }