1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.ooc.policy;
2021import com.sun.management.GarbageCollectionNotificationInfo;
22import org.apache.giraph.conf.GiraphConstants;
23import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24import org.apache.giraph.ooc.OutOfCoreEngine;
25import org.apache.giraph.ooc.command.IOCommand;
26import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
27import org.apache.giraph.ooc.command.StorePartitionIOCommand;
28import org.apache.log4j.Logger;
2930import java.util.concurrent.atomic.AtomicInteger;
3132importstatic com.google.common.base.Preconditions.checkState;
3334/** Oracle for fixed out-of-core mechanism */35publicclassFixedPartitionsOracleimplementsOutOfCoreOracle {
36/** Class logger */37privatestaticfinal Logger LOG =
38 Logger.getLogger(FixedPartitionsOracle.class);
39/** Maximum number of partitions to be kept in memory */40privatefinalint maxPartitionsInMemory;
41/**42 * Number of partitions to be added (loaded) or removed (stored) to/from43 * memory. Each outstanding load partition counts +1 and each outstanding44 * store partition counts -1 toward this counter.45 */46privatefinal AtomicInteger deltaNumPartitionsInMemory =
47new AtomicInteger(0);
48/** Out-of-core engine */49privatefinalOutOfCoreEngine oocEngine;
5051/**52 * Constructor53 *54 * @param conf configuration55 * @param oocEngine out-of-core engine56 */57publicFixedPartitionsOracle(ImmutableClassesGiraphConfiguration conf,
58OutOfCoreEngine oocEngine) {
59this.maxPartitionsInMemory =
60 GiraphConstants.MAX_PARTITIONS_IN_MEMORY.get(conf);
61this.oocEngine = oocEngine;
62 }
6364 @Override
65publicIOAction[] getNextIOActions() {
66int numPartitionsInMemory =
67 oocEngine.getMetaPartitionManager().getNumInMemoryPartitions();
68int numPartialPartitionsInMemory =
69 oocEngine.getMetaPartitionManager().getNumPartiallyInMemoryPartitions();
70if (LOG.isDebugEnabled()) {
71 LOG.debug("getNextIOActions: calling with " + numPartitionsInMemory +
72" partitions entirely in memory and " + numPartialPartitionsInMemory +
73" partitions partially in memory, " +
74 deltaNumPartitionsInMemory.get() + " to be loaded");
75 }
76 checkState(numPartitionsInMemory >= 0);
77 checkState(numPartialPartitionsInMemory >= 0);
78int numPartitions =
79 numPartitionsInMemory + deltaNumPartitionsInMemory.get();
80// Fixed out-of-core policy:81// - if the number of partitions in memory is less than the max number of82// partitions in memory, we should load a partition to memory. This83// basically means we are prefetching partition's data either for the84// current superstep, or for the next superstep.85// - if the number of partitions in memory is equal to the the max number86// of partitions in memory, we do a 'soft store', meaning, we store87// processed partition to disk only if there is an unprocessed partition88// on disk. This basically makes room for unprocessed partitions on disk89// to be prefetched.90// - if the number of partitions in memory is more than the max number of91// partitions in memory, we do a 'hard store', meaning we store a92// partition to disk, regardless of its processing state.93if (numPartitions < maxPartitionsInMemory) {
94returnnewIOAction[]{
95 IOAction.LOAD_PARTITION,
96 IOAction.STORE_MESSAGES_AND_BUFFERS};
97 } elseif (numPartitions > maxPartitionsInMemory) {
98if (LOG.isDebugEnabled()) {
99 LOG.debug("getNextIOActions: number of partitions in memory passed " +
100"the specified threshold!");
101 }
102returnnewIOAction[]{
103 IOAction.STORE_PARTITION,
104 IOAction.STORE_MESSAGES_AND_BUFFERS};
105 } else {
106returnnewIOAction[]{
107 IOAction.STORE_MESSAGES_AND_BUFFERS,
108 IOAction.LOAD_TO_SWAP_PARTITION};
109 }
110 }
111112 @Override
113publicboolean approve(IOCommand command) {
114int numPartitionsInMemory = oocEngine.getMetaPartitionManager()
115 .getNumInMemoryPartitions();
116// If loading a partition result in having more partition in memory, the117// command should be denied. Also, if number of partitions in memory is118// already less than the max number of partitions, any command for storing119// a partition should be denied.120if (command instanceof LoadPartitionIOCommand &&
121 numPartitionsInMemory + deltaNumPartitionsInMemory.getAndIncrement() >
122 maxPartitionsInMemory) {
123 deltaNumPartitionsInMemory.getAndDecrement();
124return false;
125126 } elseif (command instanceof StorePartitionIOCommand &&
127 numPartitionsInMemory + deltaNumPartitionsInMemory.getAndDecrement() <
128 maxPartitionsInMemory) {
129 deltaNumPartitionsInMemory.getAndIncrement();
130return false;
131 }
132returntrue;
133 }
134135 @Override
136publicvoid commandCompleted(IOCommand command) {
137if (command instanceof LoadPartitionIOCommand) {
138 deltaNumPartitionsInMemory.getAndDecrement();
139 } elseif (command instanceof StorePartitionIOCommand) {
140 deltaNumPartitionsInMemory.getAndIncrement();
141 }
142 }
143144 @Override
145publicvoid gcCompleted(GarbageCollectionNotificationInfo gcInfo) { }
146147 @Override
148publicvoid startIteration() {
149 }
150 }