This project has retired. For details please refer to its Attic page.
SimplePartitionStore xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.partition;
20  
21  import com.google.common.collect.Maps;
22  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23  import org.apache.giraph.utils.ExtendedDataOutput;
24  import org.apache.giraph.utils.VertexIterator;
25  import org.apache.hadoop.io.Writable;
26  import org.apache.hadoop.io.WritableComparable;
27  import org.apache.hadoop.mapreduce.Mapper;
28  
29  import java.util.concurrent.ArrayBlockingQueue;
30  import java.util.concurrent.BlockingQueue;
31  import java.util.concurrent.ConcurrentMap;
32  
33  import static com.google.common.base.Preconditions.checkState;
34  
35  /**
36   * A simple in-memory partition store.
37   *
38   * @param <I> Vertex id
39   * @param <V> Vertex data
40   * @param <E> Edge data
41   */
42  public class SimplePartitionStore<I extends WritableComparable,
43      V extends Writable, E extends Writable>
44      implements PartitionStore<I, V, E> {
45    /** Configuration */
46    private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
47    /** Job context (for progress) */
48    private final Mapper<?, ?, ?, ?>.Context context;
49    /** Map of stored partitions. */
50    private final ConcurrentMap<Integer, Partition<I, V, E>> partitions =
51        Maps.newConcurrentMap();
52    /** Queue of partitions to be precessed in a superstep */
53    private BlockingQueue<Partition<I, V, E>> partitionQueue;
54  
55    /**
56     * Constructor.
57     * @param conf Configuration
58     * @param context Mapper context
59     */
60    public SimplePartitionStore(ImmutableClassesGiraphConfiguration<I, V, E> conf,
61        Mapper<?, ?, ?, ?>.Context context) {
62      this.conf = conf;
63      this.context = context;
64    }
65  
66    @Override
67    public boolean addPartition(Partition<I, V, E> partition) {
68      return partitions.putIfAbsent(partition.getId(), partition) == null;
69    }
70  
71    @Override
72    public Partition<I, V, E> removePartition(Integer partitionId) {
73      return partitions.remove(partitionId);
74    }
75  
76    @Override
77    public boolean hasPartition(Integer partitionId) {
78      return partitions.containsKey(partitionId);
79    }
80  
81    @Override
82    public Iterable<Integer> getPartitionIds() {
83      return partitions.keySet();
84    }
85  
86    @Override
87    public int getNumPartitions() {
88      return partitions.size();
89    }
90  
91    @Override
92    public long getPartitionVertexCount(Integer partitionId) {
93      Partition partition = partitions.get(partitionId);
94      if (partition == null) {
95        return 0;
96      } else {
97        return partition.getVertexCount();
98      }
99    }
100 
101   @Override
102   public long getPartitionEdgeCount(Integer partitionId) {
103     Partition partition = partitions.get(partitionId);
104     if (partition == null) {
105       return 0;
106     } else {
107       return partition.getEdgeCount();
108     }
109   }
110 
111   @Override
112   public boolean isEmpty() {
113     return partitions.size() == 0;
114   }
115 
116   @Override
117   public void startIteration() {
118     checkState(partitionQueue == null || partitionQueue.isEmpty(),
119         "startIteration: It seems that some of " +
120           "of the partitions from previous iteration over partition store are" +
121           " not yet processed.");
122 
123     partitionQueue =
124         new ArrayBlockingQueue<Partition<I, V, E>>(getNumPartitions());
125     for (Partition<I, V, E> partition : partitions.values()) {
126       partitionQueue.add(partition);
127     }
128   }
129 
130   @Override
131   public Partition<I, V, E> getNextPartition() {
132     return partitionQueue.poll();
133   }
134 
135   @Override
136   public void putPartition(Partition<I, V, E> partition) { }
137 
138   /**
139    * Get or create a partition.
140    * @param partitionId Partition Id
141    * @return The requested partition (never null)
142    */
143   private Partition<I, V, E> getOrCreatePartition(Integer partitionId) {
144     Partition<I, V, E> oldPartition = partitions.get(partitionId);
145     if (oldPartition == null) {
146       Partition<I, V, E> newPartition =
147           conf.createPartition(partitionId, context);
148       oldPartition = partitions.putIfAbsent(partitionId, newPartition);
149       if (oldPartition == null) {
150         return newPartition;
151       }
152     }
153     return oldPartition;
154   }
155 
156   @Override
157   public void addPartitionVertices(Integer partitionId,
158       ExtendedDataOutput extendedDataOutput) {
159     VertexIterator<I, V, E> vertexIterator =
160         new VertexIterator<I, V, E>(extendedDataOutput, conf);
161 
162     Partition<I, V, E> partition = getOrCreatePartition(partitionId);
163     partition.addPartitionVertices(vertexIterator);
164     putPartition(partition);
165   }
166 
167   @Override
168   public void shutdown() { }
169 
170   @Override
171   public void initialize() { }
172 }