This project has retired. For details please refer to its
Attic page.
SimplePartitionStore xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.partition;
20
21 import com.google.common.collect.Maps;
22 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23 import org.apache.giraph.utils.ExtendedDataOutput;
24 import org.apache.giraph.utils.VertexIterator;
25 import org.apache.hadoop.io.Writable;
26 import org.apache.hadoop.io.WritableComparable;
27 import org.apache.hadoop.mapreduce.Mapper;
28
29 import java.util.concurrent.ArrayBlockingQueue;
30 import java.util.concurrent.BlockingQueue;
31 import java.util.concurrent.ConcurrentMap;
32
33 import static com.google.common.base.Preconditions.checkState;
34
35
36
37
38
39
40
41
42 public class SimplePartitionStore<I extends WritableComparable,
43 V extends Writable, E extends Writable>
44 implements PartitionStore<I, V, E> {
45
46 private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
47
48 private final Mapper<?, ?, ?, ?>.Context context;
49
50 private final ConcurrentMap<Integer, Partition<I, V, E>> partitions =
51 Maps.newConcurrentMap();
52
53 private BlockingQueue<Partition<I, V, E>> partitionQueue;
54
55
56
57
58
59
60 public SimplePartitionStore(ImmutableClassesGiraphConfiguration<I, V, E> conf,
61 Mapper<?, ?, ?, ?>.Context context) {
62 this.conf = conf;
63 this.context = context;
64 }
65
66 @Override
67 public boolean addPartition(Partition<I, V, E> partition) {
68 return partitions.putIfAbsent(partition.getId(), partition) == null;
69 }
70
71 @Override
72 public Partition<I, V, E> removePartition(Integer partitionId) {
73 return partitions.remove(partitionId);
74 }
75
76 @Override
77 public boolean hasPartition(Integer partitionId) {
78 return partitions.containsKey(partitionId);
79 }
80
81 @Override
82 public Iterable<Integer> getPartitionIds() {
83 return partitions.keySet();
84 }
85
86 @Override
87 public int getNumPartitions() {
88 return partitions.size();
89 }
90
91 @Override
92 public long getPartitionVertexCount(Integer partitionId) {
93 Partition partition = partitions.get(partitionId);
94 if (partition == null) {
95 return 0;
96 } else {
97 return partition.getVertexCount();
98 }
99 }
100
101 @Override
102 public long getPartitionEdgeCount(Integer partitionId) {
103 Partition partition = partitions.get(partitionId);
104 if (partition == null) {
105 return 0;
106 } else {
107 return partition.getEdgeCount();
108 }
109 }
110
111 @Override
112 public boolean isEmpty() {
113 return partitions.size() == 0;
114 }
115
116 @Override
117 public void startIteration() {
118 checkState(partitionQueue == null || partitionQueue.isEmpty(),
119 "startIteration: It seems that some of " +
120 "of the partitions from previous iteration over partition store are" +
121 " not yet processed.");
122
123 partitionQueue =
124 new ArrayBlockingQueue<Partition<I, V, E>>(getNumPartitions());
125 for (Partition<I, V, E> partition : partitions.values()) {
126 partitionQueue.add(partition);
127 }
128 }
129
130 @Override
131 public Partition<I, V, E> getNextPartition() {
132 return partitionQueue.poll();
133 }
134
135 @Override
136 public void putPartition(Partition<I, V, E> partition) { }
137
138
139
140
141
142
143 private Partition<I, V, E> getOrCreatePartition(Integer partitionId) {
144 Partition<I, V, E> oldPartition = partitions.get(partitionId);
145 if (oldPartition == null) {
146 Partition<I, V, E> newPartition =
147 conf.createPartition(partitionId, context);
148 oldPartition = partitions.putIfAbsent(partitionId, newPartition);
149 if (oldPartition == null) {
150 return newPartition;
151 }
152 }
153 return oldPartition;
154 }
155
156 @Override
157 public void addPartitionVertices(Integer partitionId,
158 ExtendedDataOutput extendedDataOutput) {
159 VertexIterator<I, V, E> vertexIterator =
160 new VertexIterator<I, V, E>(extendedDataOutput, conf);
161
162 Partition<I, V, E> partition = getOrCreatePartition(partitionId);
163 partition.addPartitionVertices(vertexIterator);
164 putPartition(partition);
165 }
166
167 @Override
168 public void shutdown() { }
169
170 @Override
171 public void initialize() { }
172 }