1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.giraph.partition;
20
21 import org.apache.giraph.utils.ExtendedDataOutput;
22 import org.apache.hadoop.io.Writable;
23 import org.apache.hadoop.io.WritableComparable;
24
25 /**
26 * Structure that stores partitions for a worker. PartitionStore does not allow
27 * random accesses to partitions except upon removal.
28 * This structure is thread-safe, unless otherwise specified.
29 *
30 * @param <I> Vertex id
31 * @param <V> Vertex data
32 * @param <E> Edge data
33 */
34 public interface PartitionStore<I extends WritableComparable,
35 V extends Writable, E extends Writable> {
36 /**
37 * Add a *new* partition to the store. If the partition is already existed,
38 * it does not add the partition and returns false.
39 * Note: this method is not thread-safe and should be called by a single
40 * thread.
41 *
42 * @param partition Partition to add
43 * @return Whether the addition made any change in the partition store
44 */
45 boolean addPartition(Partition<I, V, E> partition);
46
47 /**
48 * Remove a partition and return it. Called from a single thread, *not* from
49 * within a scheduling cycle. This method should *not* be called in
50 * INPUT_SUPERSTEP.
51 *
52 * @param partitionId Partition id
53 * @return The removed partition
54 */
55 Partition<I, V, E> removePartition(Integer partitionId);
56
57 /**
58 * Whether a specific partition is present in the store.
59 *
60 * @param partitionId Partition id
61 * @return True iff the partition is present
62 */
63 boolean hasPartition(Integer partitionId);
64
65 /**
66 * Return the ids of all the stored partitions as an Iterable.
67 *
68 * @return The partition ids
69 */
70 Iterable<Integer> getPartitionIds();
71
72 /**
73 * Return the number of stored partitions.
74 *
75 * @return The number of partitions
76 */
77 int getNumPartitions();
78
79 /**
80 * Return the number of vertices in a partition.
81 *
82 * @param partitionId Partition id
83 * @return The number of vertices in the specified partition
84 */
85 long getPartitionVertexCount(Integer partitionId);
86
87 /**
88 * Return the number of edges in a partition.
89 *
90 * @param partitionId Partition id
91 * @return The number of edges in the specified partition
92 */
93 long getPartitionEdgeCount(Integer partitionId);
94
95 /**
96 * Whether the partition store is empty.
97 *
98 * @return True iff there are no partitions in the store
99 */
100 boolean isEmpty();
101
102 /**
103 * Add vertices to a given partition from a given DataOutput instance. This
104 * method is called right after receipt of vertex request in INPUT_SUPERSTEP.
105 *
106 * @param partitionId Partition id
107 * @param extendedDataOutput Output containing serialized vertex data
108 */
109 void addPartitionVertices(Integer partitionId,
110 ExtendedDataOutput extendedDataOutput);
111
112 /**
113 * Called at the end of the computation. Called from a single thread.
114 */
115 void shutdown();
116
117 /**
118 * Called at the beginning of the computation. Called from a single thread.
119 */
120 void initialize();
121
122 /**
123 * Start the iteration cycle to iterate over partitions. Note that each
124 * iteration cycle *must* iterate over *all* partitions. Usually an iteration
125 * cycle is necessary for
126 * 1) moving edges (from edge store) to vertices after edge input splits are
127 * loaded in INPUT_SUPERSTEP,
128 * 2) computing partitions in each superstep (called once per superstep),
129 * 3) saving vertices/edges in the output superstep.
130 * 4) any sort of populating a data-structure based on the partitions in
131 * this store.
132 *
133 * After an iteration is started, multiple threads can access the partition
134 * store using {@link #getNextPartition()} to iterate over the partitions.
135 * Each time {@link #getNextPartition()} is called an unprocessed partition in
136 * the current iteration is returned. After processing of the partition is
137 * done, partition should be put back in the store using
138 * {@link #putPartition(Partition)}. Here is an example of the entire
139 * workflow:
140 *
141 * In the main thread:
142 * partitionStore.startIteration();
143 *
144 * In multiple threads iterating over the partitions:
145 * Partition partition = partitionStore.getNextPartition();
146 * ... do stuff with partition ...
147 * partitionStore.putPartition(partition);
148 *
149 * Called from a single thread.
150 */
151 void startIteration();
152
153 /**
154 * Return the next partition in iteration for the current superstep.
155 * Note: user has to put back the partition to the store through
156 * {@link #putPartition(Partition)} after use. Look at comments on
157 * {@link #startIteration()} for more detail.
158 *
159 * @return The next partition to process
160 */
161 Partition<I, V, E> getNextPartition();
162
163 /**
164 * Put a partition back to the store. Use this method to put a partition
165 * back after it has been retrieved through {@link #getNextPartition()}.
166 * Look at comments on {@link #startIteration()} for more detail.
167 *
168 * @param partition Partition
169 */
170 void putPartition(Partition<I, V, E> partition);
171 }