This project has retired. For details please refer to its
Attic page.
TestVertexAndEdges xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.graph;
19
20 import com.google.common.collect.Lists;
21
22 import org.apache.giraph.conf.GiraphConfiguration;
23 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24 import org.apache.giraph.edge.ArrayListEdges;
25 import org.apache.giraph.edge.ByteArrayEdges;
26 import org.apache.giraph.edge.Edge;
27 import org.apache.giraph.edge.EdgeFactory;
28 import org.apache.giraph.edge.HashMapEdges;
29 import org.apache.giraph.edge.HashMultimapEdges;
30 import org.apache.giraph.edge.LongDoubleArrayEdges;
31 import org.apache.giraph.edge.LongDoubleHashMapEdges;
32 import org.apache.giraph.edge.MutableEdge;
33 import org.apache.giraph.edge.OutEdges;
34 import org.apache.giraph.time.SystemTime;
35 import org.apache.giraph.time.Time;
36 import org.apache.giraph.time.Times;
37 import org.apache.giraph.utils.DynamicChannelBufferInputStream;
38 import org.apache.giraph.utils.DynamicChannelBufferOutputStream;
39 import org.apache.giraph.utils.EdgeIterables;
40 import org.apache.giraph.utils.NoOpComputation;
41 import org.apache.giraph.utils.UnsafeByteArrayInputStream;
42 import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
43 import org.apache.giraph.utils.WritableUtils;
44 import org.apache.hadoop.io.DoubleWritable;
45 import org.apache.hadoop.io.FloatWritable;
46 import org.apache.hadoop.io.LongWritable;
47 import org.junit.Before;
48 import org.junit.Test;
49
50 import java.io.DataInput;
51 import java.io.DataOutput;
52 import java.io.IOException;
53 import java.util.Collection;
54 import java.util.Iterator;
55 import java.util.List;
56
57 import static org.junit.Assert.assertEquals;
58 import static org.junit.Assert.assertNotNull;
59 import static org.junit.Assert.assertTrue;
60
61
62
63
64
65 public class TestVertexAndEdges {
66
67 public static final int REPS = 100;
68
69 private Collection<Class<? extends OutEdges>> edgesClasses =
70 Lists.newArrayList();
71
72
73
74
75 public static class TestComputation extends NoOpComputation<LongWritable,
76 FloatWritable, DoubleWritable, LongWritable> { }
77
78
79
80
81
82
83 public static class TestOutEdges
84 implements OutEdges<LongWritable, DoubleWritable> {
85 private List<Edge<LongWritable, DoubleWritable>> edgeList;
86
87
88 @Override
89 public void initialize(Iterable<Edge<LongWritable, DoubleWritable>> edges) {
90 this.edgeList = Lists.newArrayList(edges);
91 }
92
93 @Override
94 public void initialize(int capacity) {
95 this.edgeList = Lists.newArrayListWithCapacity(capacity);
96 }
97
98 @Override
99 public void initialize() {
100 this.edgeList = Lists.newArrayList();
101 }
102
103 @Override
104 public void add(Edge<LongWritable, DoubleWritable> edge) {
105 edgeList.add(edge);
106 }
107
108 @Override
109 public void remove(LongWritable targetVertexId) {
110 for (Iterator<Edge<LongWritable, DoubleWritable>> edges =
111 edgeList.iterator(); edges.hasNext();) {
112 Edge<LongWritable, DoubleWritable> edge = edges.next();
113 if (edge.getTargetVertexId().equals(targetVertexId)) {
114 edges.remove();
115 }
116 }
117 }
118
119 @Override
120 public int size() {
121 return edgeList.size();
122 }
123
124 @Override
125 public Iterator<Edge<LongWritable, DoubleWritable>> iterator() {
126 return edgeList.iterator();
127 }
128
129 @Override
130 public void write(DataOutput out) throws IOException {
131 out.writeInt(edgeList.size());
132 for (Edge<LongWritable, DoubleWritable> edge : edgeList) {
133 edge.getTargetVertexId().write(out);
134 edge.getValue().write(out);
135 }
136 }
137
138 @Override
139 public void readFields(DataInput in) throws IOException {
140 int numEdges = in.readInt();
141 initialize(numEdges);
142 for (int i = 0; i < numEdges; ++i) {
143 Edge<LongWritable, DoubleWritable> edge = EdgeFactory.createReusable(
144 new LongWritable(), new DoubleWritable());
145 WritableUtils.readEdge(in, edge);
146 edgeList.add(edge);
147 }
148 }
149 }
150
151 @Before
152 public void setUp() {
153 edgesClasses.add(TestOutEdges.class);
154 edgesClasses.add(ByteArrayEdges.class);
155 edgesClasses.add(ArrayListEdges.class);
156 edgesClasses.add(HashMapEdges.class);
157 edgesClasses.add(HashMultimapEdges.class);
158 edgesClasses.add(LongDoubleArrayEdges.class);
159 edgesClasses.add(LongDoubleHashMapEdges.class);
160 }
161
162 protected Vertex<LongWritable, FloatWritable, DoubleWritable>
163 instantiateVertex(Class<? extends OutEdges> edgesClass) {
164 GiraphConfiguration giraphConfiguration = new GiraphConfiguration();
165 giraphConfiguration.setComputationClass(TestComputation.class);
166 giraphConfiguration.setOutEdgesClass(edgesClass);
167 ImmutableClassesGiraphConfiguration immutableClassesGiraphConfiguration =
168 new ImmutableClassesGiraphConfiguration(giraphConfiguration);
169 return immutableClassesGiraphConfiguration.createVertex();
170 }
171
172
173
174
175 @Test
176 public void testVertexIdAndValue() {
177 Vertex<LongWritable, FloatWritable, DoubleWritable> vertex =
178 instantiateVertex(ArrayListEdges.class);
179 assertNotNull(vertex);
180 vertex.initialize(new LongWritable(7), new FloatWritable(3.0f));
181 assertEquals(7, vertex.getId().get());
182 assertEquals(3.0f, vertex.getValue().get(), 0d);
183 vertex.setValue(new FloatWritable(5.5f));
184 assertEquals(5.5f, vertex.getValue().get(), 0d);
185 }
186
187 public static OutEdges
188 instantiateOutEdges(Class<? extends OutEdges> edgesClass) {
189 GiraphConfiguration giraphConfiguration = new GiraphConfiguration();
190
191 giraphConfiguration.setComputationClass(TestComputation.class);
192 giraphConfiguration.setOutEdgesClass(edgesClass);
193 ImmutableClassesGiraphConfiguration immutableClassesGiraphConfiguration =
194 new ImmutableClassesGiraphConfiguration(giraphConfiguration);
195 return immutableClassesGiraphConfiguration.createOutEdges();
196 }
197
198
199
200
201
202 @Test
203 public void testEdges() {
204 for (Class<? extends OutEdges> edgesClass : edgesClasses) {
205 testEdgesClass(edgesClass);
206 }
207 }
208
209 private void testEdgesClass(Class<? extends OutEdges> edgesClass) {
210 Vertex<LongWritable, FloatWritable, DoubleWritable> vertex =
211 instantiateVertex(edgesClass);
212 OutEdges<LongWritable, DoubleWritable> outEdges =
213 instantiateOutEdges(edgesClass);
214 assertNotNull(outEdges);
215
216 List<Edge<LongWritable, DoubleWritable>> edges = Lists.newLinkedList();
217 for (int i = 1000; i > 0; --i) {
218 edges.add(EdgeFactory.create(new LongWritable(i),
219 new DoubleWritable(i * 2.0)));
220 }
221
222 outEdges.initialize(edges);
223 vertex.initialize(new LongWritable(1), new FloatWritable(1), outEdges);
224
225 assertEquals(20.0, vertex.getEdgeValue(new LongWritable(10)).get(), 0.0);
226
227 assertEquals(1000, vertex.getNumEdges());
228 for (Edge<LongWritable, DoubleWritable> edge : vertex.getEdges()) {
229 assertEquals(edge.getTargetVertexId().get() * 2.0d,
230 edge.getValue().get(), 0d);
231 }
232 vertex.removeEdges(new LongWritable(500));
233 assertEquals(999, vertex.getNumEdges());
234 for (Edge<LongWritable, DoubleWritable> edge : vertex.getEdges()) {
235 assertTrue(edge.getTargetVertexId().get() != 500);
236 }
237
238 vertex.setEdgeValue(new LongWritable(10), new DoubleWritable(33.0));
239 assertEquals(33.0, vertex.getEdgeValue(new LongWritable(10)).get(), 0);
240 }
241
242
243
244
245
246 @Test
247 public void testMutateEdges() {
248 for (Class<? extends OutEdges> edgesClass : edgesClasses) {
249 testMutateEdgesClass(edgesClass);
250 }
251 }
252
253 private void testMutateEdgesClass(Class<? extends OutEdges> edgesClass) {
254 Vertex<LongWritable, FloatWritable, DoubleWritable> vertex =
255 instantiateVertex(edgesClass);
256 OutEdges<LongWritable, DoubleWritable> outEdges =
257 instantiateOutEdges(edgesClass);
258
259 outEdges.initialize();
260 vertex.initialize(new LongWritable(0), new FloatWritable(0), outEdges);
261
262
263 for (int i = 0; i < 10; ++i) {
264 vertex.addEdge(EdgeFactory.create(
265 new LongWritable(i), new DoubleWritable(i)));
266 }
267
268
269 for (MutableEdge<LongWritable, DoubleWritable> edge :
270 vertex.getMutableEdges()) {
271 edge.setValue(new DoubleWritable(edge.getValue().get() * 2));
272 }
273
274
275 assertEquals(10, vertex.getNumEdges());
276
277 for (Edge<LongWritable, DoubleWritable> edge : vertex.getEdges()) {
278 long id = edge.getTargetVertexId().get();
279 double value = edge.getValue().get();
280 assertEquals(id * 2, value, 0);
281 }
282
283
284 Iterator<MutableEdge<LongWritable, DoubleWritable>> edgeIt =
285 vertex.getMutableEdges().iterator();
286 while (edgeIt.hasNext()) {
287 if (edgeIt.next().getTargetVertexId().get() % 2 == 0) {
288 edgeIt.remove();
289 }
290 }
291
292
293 assertEquals(5, vertex.getNumEdges());
294
295 for (Edge<LongWritable, DoubleWritable> edge : vertex.getEdges()) {
296 assertEquals(1, edge.getTargetVertexId().get() % 2);
297 }
298
299
300
301 Iterator<MutableEdge<LongWritable, DoubleWritable>> it =
302 vertex.getMutableEdges().iterator();
303 it.next();
304 it.next();
305 assertEquals(5, vertex.getNumEdges());
306
307
308 int i = 2;
309 for (MutableEdge<LongWritable, DoubleWritable> edge :
310 vertex.getMutableEdges()) {
311 if (i-- == 0) {
312 break;
313 }
314 }
315 assertEquals(5, vertex.getNumEdges());
316
317
318 i = 2;
319 for (Edge<LongWritable, DoubleWritable> edge : vertex.getEdges()) {
320 if (i-- == 0) {
321 break;
322 }
323 }
324 assertEquals(5, vertex.getNumEdges());
325
326
327 int iterations = 0;
328 for (MutableEdge<LongWritable, DoubleWritable> edge : vertex.getMutableEdges()) {
329 edge.setValue(new DoubleWritable(3));
330 assertEquals(5, vertex.getNumEdges());
331 ++iterations;
332 }
333 assertEquals(5, vertex.getNumEdges());
334 assertEquals(5, iterations);
335
336
337
338 it = vertex.getMutableEdges().iterator();
339 it.next();
340 it.remove();
341 assertEquals(4, vertex.getNumEdges());
342 it.next();
343 it.remove();
344 assertEquals(3, vertex.getNumEdges());
345 }
346
347
348
349
350
351 @Test
352 public void testSerialize() throws IOException {
353 for (Class<? extends OutEdges> edgesClass : edgesClasses) {
354 testSerializeOutEdgesClass(edgesClass);
355 testDynamicChannelBufferSerializeOutEdgesClass(edgesClass);
356 testUnsafeSerializeOutEdgesClass(edgesClass);
357 }
358 }
359
360 protected Vertex<LongWritable, FloatWritable, DoubleWritable>
361 buildVertex(Class<? extends OutEdges> edgesClass) {
362 Vertex<LongWritable, FloatWritable, DoubleWritable> vertex =
363 instantiateVertex(edgesClass);
364 OutEdges<LongWritable, DoubleWritable> outEdges =
365 instantiateOutEdges(edgesClass);
366
367 int edgesCount = 200;
368 List<Edge<LongWritable, DoubleWritable>> edges =
369 Lists.newArrayListWithCapacity(edgesCount);
370 for (int i = edgesCount; i > 0; --i) {
371 edges.add(EdgeFactory.create(new LongWritable(i),
372 new DoubleWritable(i * 2.0)));
373 }
374
375 outEdges.initialize(edges);
376 vertex.initialize(new LongWritable(2), new FloatWritable(3.0f),
377 outEdges);
378 return vertex;
379 }
380
381 private void testSerializeOutEdgesClass(
382 Class<? extends OutEdges> edgesClass) {
383 Vertex<LongWritable, FloatWritable, DoubleWritable> vertex =
384 buildVertex(edgesClass);
385
386 long serializeNanosStart;
387 long serializeNanos = 0;
388 byte[] byteArray = null;
389 for (int i = 0; i < REPS; ++i) {
390 serializeNanosStart = SystemTime.get().getNanoseconds();
391 byteArray = WritableUtils.writeVertexToByteArray(
392 vertex, false, vertex.getConf());
393 serializeNanos += Times.getNanosecondsSince(SystemTime.get(),
394 serializeNanosStart);
395 }
396 serializeNanos /= REPS;
397 System.out.println("testSerialize: Serializing took " +
398 serializeNanos + " ns for " + byteArray.length + " bytes " +
399 (byteArray.length * 1f * Time.NS_PER_SECOND / serializeNanos) +
400 " bytes / sec for " + edgesClass.getName());
401
402 Vertex<LongWritable, FloatWritable, DoubleWritable>
403 readVertex = buildVertex(edgesClass);
404
405 long deserializeNanosStart;
406 long deserializeNanos = 0;
407 for (int i = 0; i < REPS; ++i) {
408 deserializeNanosStart = SystemTime.get().getNanoseconds();
409 WritableUtils.reinitializeVertexFromByteArray(byteArray, readVertex, false,
410 readVertex.getConf());
411 deserializeNanos += Times.getNanosecondsSince(SystemTime.get(),
412 deserializeNanosStart);
413 }
414 deserializeNanos /= REPS;
415 System.out.println("testSerialize: Deserializing took " +
416 deserializeNanos + " ns for " + byteArray.length + " bytes " +
417 (byteArray.length * 1f * Time.NS_PER_SECOND / deserializeNanos) +
418 " bytes / sec for " + edgesClass.getName());
419
420 assertEquals(vertex.getId(), readVertex.getId());
421 assertEquals(vertex.getValue(), readVertex.getValue());
422 assertTrue(EdgeIterables.sameEdges(vertex.getEdges(), readVertex.getEdges()));
423 }
424
425 private void testDynamicChannelBufferSerializeOutEdgesClass(
426 Class<? extends OutEdges> edgesClass)
427 throws IOException {
428 Vertex<LongWritable, FloatWritable, DoubleWritable> vertex =
429 buildVertex(edgesClass);
430
431 long serializeNanosStart;
432 long serializeNanos = 0;
433 DynamicChannelBufferOutputStream outputStream = null;
434 for (int i = 0; i <
435 REPS; ++i) {
436 serializeNanosStart = SystemTime.get().getNanoseconds();
437 outputStream =
438 new DynamicChannelBufferOutputStream(32);
439 WritableUtils.writeVertexToDataOutput(outputStream, vertex, vertex.getConf());
440 serializeNanos += Times.getNanosecondsSince(SystemTime.get(),
441 serializeNanosStart);
442 }
443 serializeNanos /= REPS;
444 System.out.println("testDynamicChannelBufferSerializeOutEdgesClass: " +
445 "Serializing took " + serializeNanos + " ns for " +
446 outputStream.getDynamicChannelBuffer().writerIndex() + " bytes " +
447 (outputStream.getDynamicChannelBuffer().writerIndex() * 1f *
448 Time.NS_PER_SECOND / serializeNanos) +
449 " bytes / sec for " + edgesClass.getName());
450
451 Vertex<LongWritable, FloatWritable, DoubleWritable>
452 readVertex = buildVertex(edgesClass);
453
454 long deserializeNanosStart;
455 long deserializeNanos = 0;
456 for (int i = 0; i < REPS; ++i) {
457 deserializeNanosStart = SystemTime.get().getNanoseconds();
458 DynamicChannelBufferInputStream inputStream = new
459 DynamicChannelBufferInputStream(
460 outputStream.getDynamicChannelBuffer());
461 WritableUtils.reinitializeVertexFromDataInput(
462 inputStream, readVertex, readVertex.getConf());
463 deserializeNanos += Times.getNanosecondsSince(SystemTime.get(),
464 deserializeNanosStart);
465 outputStream.getDynamicChannelBuffer().readerIndex(0);
466 }
467 deserializeNanos /= REPS;
468 System.out.println("testDynamicChannelBufferSerializeOutEdgesClass: " +
469 "Deserializing took " + deserializeNanos + " ns for " +
470 outputStream.getDynamicChannelBuffer().writerIndex() + " bytes " +
471 (outputStream.getDynamicChannelBuffer().writerIndex() * 1f *
472 Time.NS_PER_SECOND / deserializeNanos) +
473 " bytes / sec for " + edgesClass.getName());
474
475 assertEquals(vertex.getId(), readVertex.getId());
476 assertEquals(vertex.getValue(), readVertex.getValue());
477 assertTrue(EdgeIterables.sameEdges(vertex.getEdges(), readVertex.getEdges()));
478 }
479
480 private void testUnsafeSerializeOutEdgesClass(
481 Class<? extends OutEdges> edgesClass)
482 throws IOException {
483 Vertex<LongWritable, FloatWritable, DoubleWritable> vertex =
484 buildVertex(edgesClass);
485
486 long serializeNanosStart;
487 long serializeNanos = 0;
488 UnsafeByteArrayOutputStream outputStream = null;
489 for (int i = 0; i <
490 REPS; ++i) {
491 serializeNanosStart = SystemTime.get().getNanoseconds();
492 outputStream =
493 new UnsafeByteArrayOutputStream(32);
494 WritableUtils.writeVertexToDataOutput(outputStream, vertex, vertex.getConf());
495 serializeNanos += Times.getNanosecondsSince(SystemTime.get(),
496 serializeNanosStart);
497 }
498 serializeNanos /= REPS;
499 System.out.println("testUnsafeSerializeOutEdgesClass: " +
500 "Serializing took " +
501 serializeNanos +
502 " ns for " + outputStream.getPos()
503 + " bytes " +
504 (outputStream.getPos() * 1f *
505 Time.NS_PER_SECOND / serializeNanos) +
506 " bytes / sec for " + edgesClass.getName());
507
508 Vertex<LongWritable, FloatWritable, DoubleWritable>
509 readVertex = buildVertex(edgesClass);
510
511 long deserializeNanosStart;
512 long deserializeNanos = 0;
513 for (int i = 0; i < REPS; ++i) {
514 deserializeNanosStart = SystemTime.get().getNanoseconds();
515 UnsafeByteArrayInputStream inputStream = new
516 UnsafeByteArrayInputStream(
517 outputStream.getByteArray(), 0, outputStream.getPos());
518 WritableUtils.reinitializeVertexFromDataInput(
519 inputStream, readVertex, readVertex.getConf());
520 deserializeNanos += Times.getNanosecondsSince(SystemTime.get(),
521 deserializeNanosStart);
522 }
523 deserializeNanos /= REPS;
524 System.out.println("testUnsafeSerializeOutEdgesClass: " +
525 "Deserializing took " +
526 deserializeNanos +
527 " ns for " + outputStream.getPos() +
528 " bytes " +
529 (outputStream.getPos() * 1f *
530 Time.NS_PER_SECOND / deserializeNanos) +
531 " bytes / sec for " + edgesClass.getName());
532
533 assertEquals(vertex.getId(), readVertex.getId());
534 assertEquals(vertex.getValue(), readVertex.getValue());
535 assertTrue(EdgeIterables.sameEdges(vertex.getEdges(), readVertex.getEdges()));
536 }
537 }