This project has retired. For details please refer to its
Attic page.
ByteArrayPartition xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.partition;
19
20 import com.google.common.collect.MapMaker;
21 import com.google.common.primitives.Ints;
22 import org.apache.giraph.edge.Edge;
23 import org.apache.giraph.graph.Vertex;
24 import org.apache.giraph.utils.UnsafeByteArrayInputStream;
25 import org.apache.giraph.utils.WritableUtils;
26 import org.apache.hadoop.io.Writable;
27 import org.apache.hadoop.io.WritableComparable;
28 import org.apache.hadoop.util.Progressable;
29
30 import javax.annotation.concurrent.NotThreadSafe;
31 import java.io.DataInput;
32 import java.io.DataOutput;
33 import java.io.IOException;
34 import java.util.Iterator;
35 import java.util.Map;
36 import java.util.concurrent.ConcurrentMap;
37
38
39
40
41
42
43
44
45
46
47 @NotThreadSafe
48 public class ByteArrayPartition<I extends WritableComparable,
49 V extends Writable, E extends Writable>
50 extends BasicPartition<I, V, E>
51 implements ReusesObjectsPartition<I, V, E> {
52
53
54
55
56
57 private ConcurrentMap<I, byte[]> vertexMap;
58
59 private Vertex<I, V, E> representativeVertex;
60
61 private Vertex<I, V, E> representativeCombinerVertex;
62
63 private boolean useUnsafeSerialization;
64
65
66
67
68 public ByteArrayPartition() { }
69
70 @Override
71 public void initialize(int partitionId, Progressable progressable) {
72 super.initialize(partitionId, progressable);
73 vertexMap = new MapMaker().concurrencyLevel(
74 getConf().getNettyServerExecutionConcurrency()).makeMap();
75 representativeVertex = getConf().createVertex();
76 representativeVertex.initialize(
77 getConf().createVertexId(),
78 getConf().createVertexValue(),
79 getConf().createOutEdges());
80 representativeCombinerVertex = getConf().createVertex();
81 representativeCombinerVertex.initialize(
82 getConf().createVertexId(),
83 getConf().createVertexValue(),
84 getConf().createOutEdges());
85 useUnsafeSerialization = getConf().useUnsafeSerialization();
86 }
87
88 @Override
89 public Vertex<I, V, E> getVertex(I vertexIndex) {
90 byte[] vertexData = vertexMap.get(vertexIndex);
91 if (vertexData == null) {
92 return null;
93 }
94 WritableUtils.reinitializeVertexFromByteArray(
95 vertexData, representativeVertex, useUnsafeSerialization, getConf());
96 return representativeVertex;
97 }
98
99 @Override
100 public Vertex<I, V, E> putVertex(Vertex<I, V, E> vertex) {
101 byte[] vertexData =
102 WritableUtils.writeVertexToByteArray(
103 vertex, useUnsafeSerialization, getConf());
104 byte[] oldVertexBytes = vertexMap.put(vertex.getId(), vertexData);
105 if (oldVertexBytes == null) {
106 return null;
107 } else {
108 WritableUtils.reinitializeVertexFromByteArray(oldVertexBytes,
109 representativeVertex, useUnsafeSerialization, getConf());
110 return representativeVertex;
111 }
112 }
113
114 @Override
115 public Vertex<I, V, E> removeVertex(I vertexIndex) {
116 byte[] vertexBytes = vertexMap.remove(vertexIndex);
117 if (vertexBytes == null) {
118 return null;
119 }
120 WritableUtils.reinitializeVertexFromByteArray(vertexBytes,
121 representativeVertex, useUnsafeSerialization, getConf());
122 return representativeVertex;
123 }
124
125 @Override
126 public void addPartition(Partition<I, V, E> partition) {
127
128 if (!(partition instanceof ByteArrayPartition)) {
129 throw new IllegalStateException("addPartition: Cannot add partition " +
130 "of type " + partition.getClass());
131 }
132
133 ByteArrayPartition<I, V, E> byteArrayPartition =
134 (ByteArrayPartition<I, V, E>) partition;
135 for (Map.Entry<I, byte[]> entry :
136 byteArrayPartition.vertexMap.entrySet()) {
137
138 byte[] oldVertexBytes =
139 vertexMap.putIfAbsent(entry.getKey(), entry.getValue());
140 if (oldVertexBytes == null) {
141 continue;
142 }
143
144
145
146
147
148 synchronized (this) {
149
150 WritableUtils.reinitializeVertexFromByteArray(oldVertexBytes,
151 representativeVertex, useUnsafeSerialization, getConf());
152 WritableUtils.reinitializeVertexFromByteArray(entry.getValue(),
153 representativeCombinerVertex, useUnsafeSerialization, getConf());
154 combine(representativeVertex, representativeCombinerVertex);
155 }
156 }
157 }
158
159 @Override
160 public synchronized boolean putOrCombine(Vertex<I, V, E> vertex) {
161
162 byte[] vertexData =
163 WritableUtils.writeVertexToByteArray(
164 vertex, useUnsafeSerialization, getConf());
165 byte[] oldVertexBytes = vertexMap.putIfAbsent(vertex.getId(), vertexData);
166 if (oldVertexBytes == null) {
167 return true;
168 }
169
170 WritableUtils.reinitializeVertexFromByteArray(oldVertexBytes,
171 representativeVertex, useUnsafeSerialization, getConf());
172 combine(representativeVertex, vertex);
173 return false;
174 }
175
176
177
178
179
180
181
182
183 private void combine(Vertex<I, V, E> representativeVertex,
184 Vertex<I, V, E> representativeCombinerVertex) {
185 getVertexValueCombiner().combine(representativeVertex.getValue(),
186 representativeCombinerVertex.getValue());
187
188 for (Edge<I, E> edge : representativeCombinerVertex.getEdges()) {
189 representativeVertex.addEdge(edge);
190 }
191 vertexMap.put(representativeCombinerVertex.getId(),
192 WritableUtils.writeVertexToByteArray(
193 representativeVertex, useUnsafeSerialization, getConf()));
194 }
195
196 @Override
197 public long getVertexCount() {
198 return vertexMap.size();
199 }
200
201 @Override
202 public long getEdgeCount() {
203 long edges = 0;
204 for (byte[] vertexBytes : vertexMap.values()) {
205 WritableUtils.reinitializeVertexFromByteArray(vertexBytes,
206 representativeVertex, useUnsafeSerialization, getConf());
207 edges += representativeVertex.getNumEdges();
208 }
209 return edges;
210 }
211
212 @Override
213 public void saveVertex(Vertex<I, V, E> vertex) {
214
215 byte[] oldVertexData = vertexMap.get(vertex.getId());
216 if (oldVertexData != null) {
217 vertexMap.put(vertex.getId(),
218 WritableUtils.writeVertexToByteArray(
219 vertex, oldVertexData, useUnsafeSerialization, getConf()));
220 } else {
221 vertexMap.put(vertex.getId(),
222 WritableUtils.writeVertexToByteArray(
223 vertex, useUnsafeSerialization, getConf()));
224 }
225 }
226
227 @Override
228 public void write(DataOutput output) throws IOException {
229 super.write(output);
230 output.writeInt(vertexMap.size());
231 for (Map.Entry<I, byte[]> entry : vertexMap.entrySet()) {
232 progress();
233 entry.getKey().write(output);
234
235
236 int vertexDataSize;
237 if (useUnsafeSerialization) {
238 vertexDataSize = UnsafeByteArrayInputStream.getInt(entry.getValue(),
239 0);
240 } else {
241 vertexDataSize = Ints.fromByteArray(entry.getValue());
242 }
243
244 output.writeInt(vertexDataSize);
245 output.write(entry.getValue(), 0, vertexDataSize);
246 }
247 }
248
249 @Override
250 public void readFields(DataInput input) throws IOException {
251 super.readFields(input);
252 int size = input.readInt();
253 vertexMap = new MapMaker().concurrencyLevel(
254 getConf().getNettyServerExecutionConcurrency()).initialCapacity(
255 size).makeMap();
256 representativeVertex = getConf().createVertex();
257 representativeVertex.initialize(
258 getConf().createVertexId(),
259 getConf().createVertexValue(),
260 getConf().createOutEdges());
261 useUnsafeSerialization = getConf().useUnsafeSerialization();
262 for (int i = 0; i < size; ++i) {
263 progress();
264 I vertexId = getConf().createVertexId();
265 vertexId.readFields(input);
266 int vertexDataSize = input.readInt();
267 byte[] vertexData = new byte[vertexDataSize];
268 input.readFully(vertexData);
269 if (vertexMap.put(vertexId, vertexData) != null) {
270 throw new IllegalStateException("readFields: Already saw vertex " +
271 vertexId);
272 }
273 }
274 }
275
276 @Override
277 public Iterator<Vertex<I, V, E>> iterator() {
278 return new RepresentativeVertexIterator();
279 }
280
281
282
283
284
285 private class RepresentativeVertexIterator implements
286 Iterator<Vertex<I, V, E>> {
287
288 private Iterator<byte[]> vertexDataIterator =
289 vertexMap.values().iterator();
290
291 @Override
292 public boolean hasNext() {
293 return vertexDataIterator.hasNext();
294 }
295
296 @Override
297 public Vertex<I, V, E> next() {
298 WritableUtils.reinitializeVertexFromByteArray(
299 vertexDataIterator.next(), representativeVertex,
300 useUnsafeSerialization, getConf());
301 return representativeVertex;
302 }
303
304 @Override
305 public void remove() {
306 throw new IllegalAccessError("remove: This method is not supported.");
307 }
308 }
309 }