1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.giraph.ooc.command;
20
21 import org.apache.giraph.ooc.OutOfCoreEngine;
22 import org.apache.giraph.ooc.data.DiskBackedEdgeStore;
23 import org.apache.giraph.ooc.data.DiskBackedMessageStore;
24 import org.apache.giraph.ooc.data.DiskBackedPartitionStore;
25
26 import java.io.IOException;
27
28 /**
29 * IOCommand to store raw data buffers on disk.
30 */
31 public class StoreDataBufferIOCommand extends IOCommand {
32 /**
33 * Types of raw data buffer to offload to disk (either vertices/edges buffer
34 * in INPUT_SUPERSTEP or incoming message buffer).
35 */
36 public enum DataBufferType { PARTITION, MESSAGE };
37 /**
38 * Type of the buffer to store on disk.
39 */
40 private final DataBufferType type;
41
42 /**
43 * Constructor
44 *
45 * @param oocEngine out-of-core engine
46 * @param partitionId id of the partition to offload its buffers
47 * @param type type of the buffer to store on disk
48 */
49 public StoreDataBufferIOCommand(OutOfCoreEngine oocEngine,
50 int partitionId,
51 DataBufferType type) {
52 super(oocEngine, partitionId);
53 this.type = type;
54 }
55
56 @Override
57 public boolean execute() throws IOException {
58 boolean executed = false;
59 if (oocEngine.getMetaPartitionManager()
60 .startOffloadingBuffer(partitionId)) {
61 switch (type) {
62 case PARTITION:
63 DiskBackedPartitionStore partitionStore =
64 (DiskBackedPartitionStore)
65 oocEngine.getServerData().getPartitionStore();
66 numBytesTransferred +=
67 partitionStore.offloadBuffers(partitionId);
68 DiskBackedEdgeStore edgeStore =
69 (DiskBackedEdgeStore) oocEngine.getServerData().getEdgeStore();
70 numBytesTransferred += edgeStore.offloadBuffers(partitionId);
71 break;
72 case MESSAGE:
73 DiskBackedMessageStore messageStore =
74 (DiskBackedMessageStore)
75 oocEngine.getServerData().getIncomingMessageStore();
76 numBytesTransferred +=
77 messageStore.offloadBuffers(partitionId);
78 break;
79 default:
80 throw new IllegalStateException("execute: requested data buffer type " +
81 "does not exist!");
82 }
83 oocEngine.getMetaPartitionManager().doneOffloadingBuffer(partitionId);
84 executed = true;
85 }
86 return executed;
87 }
88
89 @Override
90 public IOCommandType getType() {
91 return IOCommandType.STORE_BUFFER;
92 }
93
94 @Override
95 public String toString() {
96 return "StoreDataBufferIOCommand: (partitionId = " + partitionId + ", " +
97 "type = " + type.name() + ")";
98 }
99 }