This project has retired. For details please refer to its Attic page.
PartitionStore xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.partition;
20  
21  import org.apache.giraph.utils.ExtendedDataOutput;
22  import org.apache.hadoop.io.Writable;
23  import org.apache.hadoop.io.WritableComparable;
24  
25  /**
26   * Structure that stores partitions for a worker. PartitionStore does not allow
27   * random accesses to partitions except upon removal.
28   * This structure is thread-safe, unless otherwise specified.
29   *
30   * @param <I> Vertex id
31   * @param <V> Vertex data
32   * @param <E> Edge data
33   */
34  public interface PartitionStore<I extends WritableComparable,
35      V extends Writable, E extends Writable> {
36    /**
37     * Add a *new* partition to the store. If the partition is already existed,
38     * it does not add the partition and returns false.
39     * Note: this method is not thread-safe and should be called by a single
40     * thread.
41     *
42     * @param partition Partition to add
43     * @return Whether the addition made any change in the partition store
44     */
45    boolean addPartition(Partition<I, V, E> partition);
46  
47    /**
48     * Remove a partition and return it. Called from a single thread, *not* from
49     * within a scheduling cycle. This method should *not* be called in
50     * INPUT_SUPERSTEP.
51     *
52     * @param partitionId Partition id
53     * @return The removed partition
54     */
55    Partition<I, V, E> removePartition(Integer partitionId);
56  
57    /**
58     * Whether a specific partition is present in the store.
59     *
60     * @param partitionId Partition id
61     * @return True iff the partition is present
62     */
63    boolean hasPartition(Integer partitionId);
64  
65    /**
66     * Return the ids of all the stored partitions as an Iterable.
67     *
68     * @return The partition ids
69     */
70    Iterable<Integer> getPartitionIds();
71  
72    /**
73     * Return the number of stored partitions.
74     *
75     * @return The number of partitions
76     */
77    int getNumPartitions();
78  
79    /**
80     * Return the number of vertices in a partition.
81     *
82     * @param partitionId Partition id
83     * @return The number of vertices in the specified partition
84     */
85    long getPartitionVertexCount(Integer partitionId);
86  
87    /**
88     * Return the number of edges in a partition.
89     *
90     * @param partitionId Partition id
91     * @return The number of edges in the specified partition
92     */
93    long getPartitionEdgeCount(Integer partitionId);
94  
95    /**
96     * Whether the partition store is empty.
97     *
98     * @return True iff there are no partitions in the store
99     */
100   boolean isEmpty();
101 
102   /**
103    * Add vertices to a given partition from a given DataOutput instance. This
104    * method is called right after receipt of vertex request in INPUT_SUPERSTEP.
105    *
106    * @param partitionId Partition id
107    * @param extendedDataOutput Output containing serialized vertex data
108    */
109   void addPartitionVertices(Integer partitionId,
110                             ExtendedDataOutput extendedDataOutput);
111 
112   /**
113    * Called at the end of the computation. Called from a single thread.
114    */
115   void shutdown();
116 
117   /**
118    * Called at the beginning of the computation. Called from a single thread.
119    */
120   void initialize();
121 
122   /**
123    * Start the iteration cycle to iterate over partitions. Note that each
124    * iteration cycle *must* iterate over *all* partitions. Usually an iteration
125    * cycle is necessary for
126    *   1) moving edges (from edge store) to vertices after edge input splits are
127    *      loaded in INPUT_SUPERSTEP,
128    *   2) computing partitions in each superstep (called once per superstep),
129    *   3) saving vertices/edges in the output superstep.
130    *   4) any sort of populating a data-structure based on the partitions in
131    *      this store.
132    *
133    * After an iteration is started, multiple threads can access the partition
134    * store using {@link #getNextPartition()} to iterate over the partitions.
135    * Each time {@link #getNextPartition()} is called an unprocessed partition in
136    * the current iteration is returned. After processing of the partition is
137    * done, partition should be put back in the store using
138    * {@link #putPartition(Partition)}. Here is an example of the entire
139    * workflow:
140    *
141    * In the main thread:
142    *   partitionStore.startIteration();
143    *
144    * In multiple threads iterating over the partitions:
145    *   Partition partition = partitionStore.getNextPartition();
146    *   ... do stuff with partition ...
147    *   partitionStore.putPartition(partition);
148    *
149    * Called from a single thread.
150    */
151   void startIteration();
152 
153   /**
154    * Return the next partition in iteration for the current superstep.
155    * Note: user has to put back the partition to the store through
156    * {@link #putPartition(Partition)} after use. Look at comments on
157    * {@link #startIteration()} for more detail.
158    *
159    * @return The next partition to process
160    */
161   Partition<I, V, E> getNextPartition();
162 
163   /**
164    * Put a partition back to the store. Use this method to put a partition
165    * back after it has been retrieved through {@link #getNextPartition()}.
166    * Look at comments on {@link #startIteration()} for more detail.
167    *
168    * @param partition Partition
169    */
170   void putPartition(Partition<I, V, E> partition);
171 }