This project has retired. For details please refer to its Attic page.
StoreDataBufferIOCommand xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.ooc.command;
20  
21  import org.apache.giraph.ooc.OutOfCoreEngine;
22  import org.apache.giraph.ooc.data.DiskBackedEdgeStore;
23  import org.apache.giraph.ooc.data.DiskBackedMessageStore;
24  import org.apache.giraph.ooc.data.DiskBackedPartitionStore;
25  
26  import java.io.IOException;
27  
28  /**
29   * IOCommand to store raw data buffers on disk.
30   */
31  public class StoreDataBufferIOCommand extends IOCommand {
32    /**
33     * Types of raw data buffer to offload to disk (either vertices/edges buffer
34     * in INPUT_SUPERSTEP or incoming message buffer).
35     */
36    public enum DataBufferType { PARTITION, MESSAGE };
37    /**
38     * Type of the buffer to store on disk.
39     */
40    private final DataBufferType type;
41  
42    /**
43     * Constructor
44     *
45     * @param oocEngine out-of-core engine
46     * @param partitionId id of the partition to offload its buffers
47     * @param type type of the buffer to store on disk
48     */
49    public StoreDataBufferIOCommand(OutOfCoreEngine oocEngine,
50                                    int partitionId,
51                                    DataBufferType type) {
52      super(oocEngine, partitionId);
53      this.type = type;
54    }
55  
56    @Override
57    public boolean execute() throws IOException {
58      boolean executed = false;
59      if (oocEngine.getMetaPartitionManager()
60          .startOffloadingBuffer(partitionId)) {
61        switch (type) {
62        case PARTITION:
63          DiskBackedPartitionStore partitionStore =
64              (DiskBackedPartitionStore)
65                  oocEngine.getServerData().getPartitionStore();
66          numBytesTransferred +=
67              partitionStore.offloadBuffers(partitionId);
68          DiskBackedEdgeStore edgeStore =
69              (DiskBackedEdgeStore) oocEngine.getServerData().getEdgeStore();
70          numBytesTransferred += edgeStore.offloadBuffers(partitionId);
71          break;
72        case MESSAGE:
73          DiskBackedMessageStore messageStore =
74              (DiskBackedMessageStore)
75                  oocEngine.getServerData().getIncomingMessageStore();
76          numBytesTransferred +=
77              messageStore.offloadBuffers(partitionId);
78          break;
79        default:
80          throw new IllegalStateException("execute: requested data buffer type " +
81              "does not exist!");
82        }
83        oocEngine.getMetaPartitionManager().doneOffloadingBuffer(partitionId);
84        executed = true;
85      }
86      return executed;
87    }
88  
89    @Override
90    public IOCommandType getType() {
91      return IOCommandType.STORE_BUFFER;
92    }
93  
94    @Override
95    public String toString() {
96      return "StoreDataBufferIOCommand: (partitionId = " + partitionId + ", " +
97          "type = " + type.name() + ")";
98    }
99  }