1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.giraph.partition; 20 21 import org.apache.giraph.utils.ExtendedDataOutput; 22 import org.apache.hadoop.io.Writable; 23 import org.apache.hadoop.io.WritableComparable; 24 25 /** 26 * Structure that stores partitions for a worker. PartitionStore does not allow 27 * random accesses to partitions except upon removal. 28 * This structure is thread-safe, unless otherwise specified. 29 * 30 * @param <I> Vertex id 31 * @param <V> Vertex data 32 * @param <E> Edge data 33 */ 34 public interface PartitionStore<I extends WritableComparable, 35 V extends Writable, E extends Writable> { 36 /** 37 * Add a *new* partition to the store. If the partition is already existed, 38 * it does not add the partition and returns false. 39 * Note: this method is not thread-safe and should be called by a single 40 * thread. 41 * 42 * @param partition Partition to add 43 * @return Whether the addition made any change in the partition store 44 */ 45 boolean addPartition(Partition<I, V, E> partition); 46 47 /** 48 * Remove a partition and return it. Called from a single thread, *not* from 49 * within a scheduling cycle. This method should *not* be called in 50 * INPUT_SUPERSTEP. 51 * 52 * @param partitionId Partition id 53 * @return The removed partition 54 */ 55 Partition<I, V, E> removePartition(Integer partitionId); 56 57 /** 58 * Whether a specific partition is present in the store. 59 * 60 * @param partitionId Partition id 61 * @return True iff the partition is present 62 */ 63 boolean hasPartition(Integer partitionId); 64 65 /** 66 * Return the ids of all the stored partitions as an Iterable. 67 * 68 * @return The partition ids 69 */ 70 Iterable<Integer> getPartitionIds(); 71 72 /** 73 * Return the number of stored partitions. 74 * 75 * @return The number of partitions 76 */ 77 int getNumPartitions(); 78 79 /** 80 * Return the number of vertices in a partition. 81 * 82 * @param partitionId Partition id 83 * @return The number of vertices in the specified partition 84 */ 85 long getPartitionVertexCount(Integer partitionId); 86 87 /** 88 * Return the number of edges in a partition. 89 * 90 * @param partitionId Partition id 91 * @return The number of edges in the specified partition 92 */ 93 long getPartitionEdgeCount(Integer partitionId); 94 95 /** 96 * Whether the partition store is empty. 97 * 98 * @return True iff there are no partitions in the store 99 */ 100 boolean isEmpty(); 101 102 /** 103 * Add vertices to a given partition from a given DataOutput instance. This 104 * method is called right after receipt of vertex request in INPUT_SUPERSTEP. 105 * 106 * @param partitionId Partition id 107 * @param extendedDataOutput Output containing serialized vertex data 108 */ 109 void addPartitionVertices(Integer partitionId, 110 ExtendedDataOutput extendedDataOutput); 111 112 /** 113 * Called at the end of the computation. Called from a single thread. 114 */ 115 void shutdown(); 116 117 /** 118 * Called at the beginning of the computation. Called from a single thread. 119 */ 120 void initialize(); 121 122 /** 123 * Start the iteration cycle to iterate over partitions. Note that each 124 * iteration cycle *must* iterate over *all* partitions. Usually an iteration 125 * cycle is necessary for 126 * 1) moving edges (from edge store) to vertices after edge input splits are 127 * loaded in INPUT_SUPERSTEP, 128 * 2) computing partitions in each superstep (called once per superstep), 129 * 3) saving vertices/edges in the output superstep. 130 * 4) any sort of populating a data-structure based on the partitions in 131 * this store. 132 * 133 * After an iteration is started, multiple threads can access the partition 134 * store using {@link #getNextPartition()} to iterate over the partitions. 135 * Each time {@link #getNextPartition()} is called an unprocessed partition in 136 * the current iteration is returned. After processing of the partition is 137 * done, partition should be put back in the store using 138 * {@link #putPartition(Partition)}. Here is an example of the entire 139 * workflow: 140 * 141 * In the main thread: 142 * partitionStore.startIteration(); 143 * 144 * In multiple threads iterating over the partitions: 145 * Partition partition = partitionStore.getNextPartition(); 146 * ... do stuff with partition ... 147 * partitionStore.putPartition(partition); 148 * 149 * Called from a single thread. 150 */ 151 void startIteration(); 152 153 /** 154 * Return the next partition in iteration for the current superstep. 155 * Note: user has to put back the partition to the store through 156 * {@link #putPartition(Partition)} after use. Look at comments on 157 * {@link #startIteration()} for more detail. 158 * 159 * @return The next partition to process 160 */ 161 Partition<I, V, E> getNextPartition(); 162 163 /** 164 * Put a partition back to the store. Use this method to put a partition 165 * back after it has been retrieved through {@link #getNextPartition()}. 166 * Look at comments on {@link #startIteration()} for more detail. 167 * 168 * @param partition Partition 169 */ 170 void putPartition(Partition<I, V, E> partition); 171 }