This project has retired. For details please refer to its Attic page.
LoadPartitionIOCommand xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.ooc.command;
20  
21  import com.google.common.base.Preconditions;
22  import org.apache.giraph.bsp.BspService;
23  import org.apache.giraph.comm.messages.MessageStore;
24  import org.apache.giraph.ooc.OutOfCoreEngine;
25  import org.apache.giraph.ooc.data.DiskBackedEdgeStore;
26  import org.apache.giraph.ooc.data.DiskBackedMessageStore;
27  import org.apache.giraph.ooc.data.DiskBackedPartitionStore;
28  
29  import java.io.IOException;
30  
31  /**
32   * IOCommand to load partition data, edge data (if in INPUT_SUPERSTEP), and
33   * message data (if in compute supersteps). Also, this command can be used to
34   * prefetch a partition to be processed in the next superstep.
35   */
36  public class LoadPartitionIOCommand extends IOCommand {
37    /**
38     * Which superstep this partition should be loaded for? (can be current
39     * superstep or next superstep -- in case of prefetching).
40     */
41    private final long superstep;
42  
43    /**
44     * Constructor
45     *
46     * @param oocEngine out-of-core engine
47     * @param partitionId id of the partition to be loaded
48     * @param superstep superstep to load the partition for
49     */
50    public LoadPartitionIOCommand(OutOfCoreEngine oocEngine, int partitionId,
51                                  long superstep) {
52      super(oocEngine, partitionId);
53      this.superstep = superstep;
54    }
55  
56    @Override
57    public boolean execute() throws IOException {
58      boolean executed = false;
59      if (oocEngine.getMetaPartitionManager()
60          .startLoadingPartition(partitionId, superstep)) {
61        long currentSuperstep = oocEngine.getSuperstep();
62        DiskBackedPartitionStore partitionStore =
63            (DiskBackedPartitionStore)
64                oocEngine.getServerData().getPartitionStore();
65        numBytesTransferred +=
66            partitionStore.loadPartitionData(partitionId);
67        if (currentSuperstep == BspService.INPUT_SUPERSTEP &&
68            superstep == currentSuperstep) {
69          DiskBackedEdgeStore edgeStore =
70              (DiskBackedEdgeStore) oocEngine.getServerData().getEdgeStore();
71          numBytesTransferred +=
72              edgeStore.loadPartitionData(partitionId);
73        }
74        MessageStore messageStore;
75        if (currentSuperstep == superstep) {
76          messageStore = oocEngine.getServerData().getCurrentMessageStore();
77        } else {
78          Preconditions.checkState(superstep == currentSuperstep + 1);
79          messageStore = oocEngine.getServerData().getIncomingMessageStore();
80        }
81        if (messageStore != null) {
82          numBytesTransferred += ((DiskBackedMessageStore) messageStore)
83              .loadPartitionData(partitionId);
84        }
85        oocEngine.getMetaPartitionManager()
86            .doneLoadingPartition(partitionId, superstep);
87        executed = true;
88      }
89      return executed;
90    }
91  
92    @Override
93    public IOCommandType getType() {
94      return IOCommandType.LOAD_PARTITION;
95    }
96  
97    @Override
98    public String toString() {
99      return "LoadPartitionIOCommand: (partitionId = " + partitionId + ", " +
100         "superstep = " + superstep + ")";
101   }
102 }