This project has retired. For details please refer to its Attic page.
TestVertexAndEdges xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.graph;
19  
20  import com.google.common.collect.Lists;
21  
22  import org.apache.giraph.conf.GiraphConfiguration;
23  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24  import org.apache.giraph.edge.ArrayListEdges;
25  import org.apache.giraph.edge.ByteArrayEdges;
26  import org.apache.giraph.edge.Edge;
27  import org.apache.giraph.edge.EdgeFactory;
28  import org.apache.giraph.edge.HashMapEdges;
29  import org.apache.giraph.edge.HashMultimapEdges;
30  import org.apache.giraph.edge.LongDoubleArrayEdges;
31  import org.apache.giraph.edge.LongDoubleHashMapEdges;
32  import org.apache.giraph.edge.MutableEdge;
33  import org.apache.giraph.edge.OutEdges;
34  import org.apache.giraph.time.SystemTime;
35  import org.apache.giraph.time.Time;
36  import org.apache.giraph.time.Times;
37  import org.apache.giraph.utils.DynamicChannelBufferInputStream;
38  import org.apache.giraph.utils.DynamicChannelBufferOutputStream;
39  import org.apache.giraph.utils.EdgeIterables;
40  import org.apache.giraph.utils.NoOpComputation;
41  import org.apache.giraph.utils.UnsafeByteArrayInputStream;
42  import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
43  import org.apache.giraph.utils.WritableUtils;
44  import org.apache.hadoop.io.DoubleWritable;
45  import org.apache.hadoop.io.FloatWritable;
46  import org.apache.hadoop.io.LongWritable;
47  import org.junit.Before;
48  import org.junit.Test;
49  
50  import java.io.DataInput;
51  import java.io.DataOutput;
52  import java.io.IOException;
53  import java.util.Collection;
54  import java.util.Iterator;
55  import java.util.List;
56  
57  import static org.junit.Assert.assertEquals;
58  import static org.junit.Assert.assertNotNull;
59  import static org.junit.Assert.assertTrue;
60  
61  /**
62   * Test {@link Vertex} functionality across the provided {@link org.apache.giraph.edge.OutEdges}
63   * classes.
64   */
65  public class TestVertexAndEdges {
66    /** Number of repetitions. */
67    public static final int REPS = 100;
68    /** {@link org.apache.giraph.edge.OutEdges} classes to be tested. */
69    private Collection<Class<? extends OutEdges>> edgesClasses =
70        Lists.newArrayList();
71  
72    /**
73     * Dummy concrete vertex.
74     */
75    public static class TestComputation extends NoOpComputation<LongWritable,
76        FloatWritable, DoubleWritable, LongWritable> { }
77  
78    /**
79     * A basic {@link org.apache.giraph.edge.OutEdges} implementation that doesn't provide any
80     * special functionality. Used to test the default implementations of
81     * Vertex#getEdgeValue(), Vertex#getMutableEdges(), etc.
82     */
83    public static class TestOutEdges
84        implements OutEdges<LongWritable, DoubleWritable> {
85      private List<Edge<LongWritable, DoubleWritable>> edgeList;
86  
87  
88      @Override
89      public void initialize(Iterable<Edge<LongWritable, DoubleWritable>> edges) {
90        this.edgeList = Lists.newArrayList(edges);
91      }
92  
93      @Override
94      public void initialize(int capacity) {
95        this.edgeList = Lists.newArrayListWithCapacity(capacity);
96      }
97  
98      @Override
99      public void initialize() {
100       this.edgeList = Lists.newArrayList();
101     }
102 
103     @Override
104     public void add(Edge<LongWritable, DoubleWritable> edge) {
105       edgeList.add(edge);
106     }
107 
108     @Override
109     public void remove(LongWritable targetVertexId) {
110       for (Iterator<Edge<LongWritable, DoubleWritable>> edges =
111                edgeList.iterator(); edges.hasNext();) {
112         Edge<LongWritable, DoubleWritable> edge = edges.next();
113         if (edge.getTargetVertexId().equals(targetVertexId)) {
114           edges.remove();
115         }
116       }
117     }
118 
119     @Override
120     public int size() {
121       return edgeList.size();
122     }
123 
124     @Override
125     public Iterator<Edge<LongWritable, DoubleWritable>> iterator() {
126       return edgeList.iterator();
127     }
128 
129     @Override
130     public void write(DataOutput out) throws IOException {
131       out.writeInt(edgeList.size());
132       for (Edge<LongWritable, DoubleWritable> edge : edgeList) {
133         edge.getTargetVertexId().write(out);
134         edge.getValue().write(out);
135       }
136     }
137 
138     @Override
139     public void readFields(DataInput in) throws IOException {
140       int numEdges = in.readInt();
141       initialize(numEdges);
142       for (int i = 0; i < numEdges; ++i) {
143         Edge<LongWritable, DoubleWritable> edge = EdgeFactory.createReusable(
144             new LongWritable(), new DoubleWritable());
145         WritableUtils.readEdge(in, edge);
146         edgeList.add(edge);
147       }
148     }
149   }
150 
151   @Before
152   public void setUp() {
153     edgesClasses.add(TestOutEdges.class);
154     edgesClasses.add(ByteArrayEdges.class);
155     edgesClasses.add(ArrayListEdges.class);
156     edgesClasses.add(HashMapEdges.class);
157     edgesClasses.add(HashMultimapEdges.class);
158     edgesClasses.add(LongDoubleArrayEdges.class);
159     edgesClasses.add(LongDoubleHashMapEdges.class);
160   }
161 
162   protected Vertex<LongWritable, FloatWritable, DoubleWritable>
163   instantiateVertex(Class<? extends OutEdges> edgesClass) {
164     GiraphConfiguration giraphConfiguration = new GiraphConfiguration();
165     giraphConfiguration.setComputationClass(TestComputation.class);
166     giraphConfiguration.setOutEdgesClass(edgesClass);
167     ImmutableClassesGiraphConfiguration immutableClassesGiraphConfiguration =
168         new ImmutableClassesGiraphConfiguration(giraphConfiguration);
169     return immutableClassesGiraphConfiguration.createVertex();
170   }
171 
172   /**
173    * Test vertex instantiation, initialization, and updating the vertex value.
174    */
175   @Test
176   public void testVertexIdAndValue() {
177     Vertex<LongWritable, FloatWritable, DoubleWritable> vertex =
178         instantiateVertex(ArrayListEdges.class);
179     assertNotNull(vertex);
180     vertex.initialize(new LongWritable(7), new FloatWritable(3.0f));
181     assertEquals(7, vertex.getId().get());
182     assertEquals(3.0f, vertex.getValue().get(), 0d);
183     vertex.setValue(new FloatWritable(5.5f));
184     assertEquals(5.5f, vertex.getValue().get(), 0d);
185   }
186 
187   public static OutEdges
188   instantiateOutEdges(Class<? extends OutEdges> edgesClass) {
189     GiraphConfiguration giraphConfiguration = new GiraphConfiguration();
190     // Needed to extract type arguments in ReflectionUtils.
191     giraphConfiguration.setComputationClass(TestComputation.class);
192     giraphConfiguration.setOutEdgesClass(edgesClass);
193     ImmutableClassesGiraphConfiguration immutableClassesGiraphConfiguration =
194         new ImmutableClassesGiraphConfiguration(giraphConfiguration);
195     return immutableClassesGiraphConfiguration.createOutEdges();
196   }
197 
198   /**
199    * Test the provided {@link org.apache.giraph.edge.OutEdges} implementations for instantiation,
200    * initialization, edge addition, and edge removal.
201    */
202   @Test
203   public void testEdges() {
204     for (Class<? extends OutEdges> edgesClass : edgesClasses) {
205       testEdgesClass(edgesClass);
206     }
207   }
208 
209   private void testEdgesClass(Class<? extends OutEdges> edgesClass) {
210     Vertex<LongWritable, FloatWritable, DoubleWritable> vertex =
211         instantiateVertex(edgesClass);
212     OutEdges<LongWritable, DoubleWritable> outEdges =
213         instantiateOutEdges(edgesClass);
214     assertNotNull(outEdges);
215 
216     List<Edge<LongWritable, DoubleWritable>> edges = Lists.newLinkedList();
217     for (int i = 1000; i > 0; --i) {
218       edges.add(EdgeFactory.create(new LongWritable(i),
219           new DoubleWritable(i * 2.0)));
220     }
221 
222     outEdges.initialize(edges);
223     vertex.initialize(new LongWritable(1), new FloatWritable(1), outEdges);
224 
225     assertEquals(20.0, vertex.getEdgeValue(new LongWritable(10)).get(), 0.0);
226 
227     assertEquals(1000, vertex.getNumEdges());
228     for (Edge<LongWritable, DoubleWritable> edge : vertex.getEdges()) {
229       assertEquals(edge.getTargetVertexId().get() * 2.0d,
230           edge.getValue().get(), 0d);
231     }
232     vertex.removeEdges(new LongWritable(500));
233     assertEquals(999, vertex.getNumEdges());
234     for (Edge<LongWritable, DoubleWritable> edge : vertex.getEdges()) {
235       assertTrue(edge.getTargetVertexId().get() != 500);
236     }
237 
238     vertex.setEdgeValue(new LongWritable(10), new DoubleWritable(33.0));
239     assertEquals(33.0, vertex.getEdgeValue(new LongWritable(10)).get(), 0);
240   }
241 
242   /**
243    * Test in-place edge mutations via the iterable returned by {@link
244    * Vertex#getMutableEdges()}.
245    */
246   @Test
247   public void testMutateEdges() {
248     for (Class<? extends OutEdges> edgesClass : edgesClasses) {
249       testMutateEdgesClass(edgesClass);
250     }
251   }
252 
253   private void testMutateEdgesClass(Class<? extends OutEdges> edgesClass) {
254     Vertex<LongWritable, FloatWritable, DoubleWritable> vertex =
255         instantiateVertex(edgesClass);
256     OutEdges<LongWritable, DoubleWritable> outEdges =
257         instantiateOutEdges(edgesClass);
258 
259     outEdges.initialize();
260     vertex.initialize(new LongWritable(0), new FloatWritable(0), outEdges);
261 
262     // Add 10 edges with id i, value i for i = 0..9
263     for (int i = 0; i < 10; ++i) {
264       vertex.addEdge(EdgeFactory.create(
265           new LongWritable(i), new DoubleWritable(i)));
266     }
267 
268     // Use the mutable iterable to multiply each edge value by 2
269     for (MutableEdge<LongWritable, DoubleWritable> edge :
270         vertex.getMutableEdges()) {
271       edge.setValue(new DoubleWritable(edge.getValue().get() * 2));
272     }
273 
274     // We should still have 10 edges
275     assertEquals(10, vertex.getNumEdges());
276     // The edge values should now be double the ids
277     for (Edge<LongWritable, DoubleWritable> edge : vertex.getEdges()) {
278       long id = edge.getTargetVertexId().get();
279       double value = edge.getValue().get();
280       assertEquals(id * 2, value, 0);
281     }
282 
283     // Use the mutable iterator to remove edges with even id
284     Iterator<MutableEdge<LongWritable, DoubleWritable>> edgeIt =
285         vertex.getMutableEdges().iterator();
286     while (edgeIt.hasNext()) {
287       if (edgeIt.next().getTargetVertexId().get() % 2 == 0) {
288         edgeIt.remove();
289       }
290     }
291 
292     // We should now have 5 edges
293     assertEquals(5, vertex.getNumEdges());
294     // The edge ids should be all odd
295     for (Edge<LongWritable, DoubleWritable> edge : vertex.getEdges()) {
296       assertEquals(1, edge.getTargetVertexId().get() % 2);
297     }
298 
299     // Breaking iteration early should not make us lose edges.
300     // This version uses repeated calls to next():
301     Iterator<MutableEdge<LongWritable, DoubleWritable>> it =
302         vertex.getMutableEdges().iterator();
303     it.next();
304     it.next();
305     assertEquals(5, vertex.getNumEdges());
306 
307     // This version uses a for-each loop, and the break statement:
308     int i = 2;
309     for (MutableEdge<LongWritable, DoubleWritable> edge :
310         vertex.getMutableEdges()) {
311       if (i-- == 0) {
312         break;
313       }
314     }
315     assertEquals(5, vertex.getNumEdges());
316 
317     // This version uses a normal, immutable iterable:
318     i = 2;
319     for (Edge<LongWritable, DoubleWritable> edge : vertex.getEdges()) {
320       if (i-- == 0) {
321         break;
322       }
323     }
324     assertEquals(5, vertex.getNumEdges());
325 
326     // Calling size() during iteration shouldn't modify the data structure.
327     int iterations = 0;
328     for (MutableEdge<LongWritable, DoubleWritable> edge : vertex.getMutableEdges()) {
329       edge.setValue(new DoubleWritable(3));
330       assertEquals(5, vertex.getNumEdges());
331       ++iterations;
332     }
333     assertEquals(5, vertex.getNumEdges());
334     assertEquals(5, iterations);
335 
336     // If we remove an edge after calling next(), size() should return the
337     // correct number of edges.
338     it = vertex.getMutableEdges().iterator();
339     it.next();
340     it.remove();
341     assertEquals(4, vertex.getNumEdges());
342     it.next();
343     it.remove();
344     assertEquals(3, vertex.getNumEdges());
345   }
346 
347   /**
348    * Test {@link Vertex} and {@link org.apache.giraph.edge.OutEdges} serialization.
349    * @throws IOException
350    */
351   @Test
352   public void testSerialize() throws IOException {
353     for (Class<? extends OutEdges> edgesClass : edgesClasses) {
354       testSerializeOutEdgesClass(edgesClass);
355       testDynamicChannelBufferSerializeOutEdgesClass(edgesClass);
356       testUnsafeSerializeOutEdgesClass(edgesClass);
357     }
358   }
359 
360   protected Vertex<LongWritable, FloatWritable, DoubleWritable>
361   buildVertex(Class<? extends OutEdges> edgesClass) {
362     Vertex<LongWritable, FloatWritable, DoubleWritable> vertex =
363         instantiateVertex(edgesClass);
364     OutEdges<LongWritable, DoubleWritable> outEdges =
365         instantiateOutEdges(edgesClass);
366 
367     int edgesCount = 200;
368     List<Edge<LongWritable, DoubleWritable>> edges =
369         Lists.newArrayListWithCapacity(edgesCount);
370     for (int i = edgesCount; i > 0; --i) {
371       edges.add(EdgeFactory.create(new LongWritable(i),
372           new DoubleWritable(i * 2.0)));
373     }
374 
375     outEdges.initialize(edges);
376     vertex.initialize(new LongWritable(2), new FloatWritable(3.0f),
377         outEdges);
378     return vertex;
379   }
380 
381   private void testSerializeOutEdgesClass(
382       Class<? extends OutEdges> edgesClass) {
383     Vertex<LongWritable, FloatWritable, DoubleWritable> vertex =
384         buildVertex(edgesClass);
385 
386     long serializeNanosStart;
387     long serializeNanos = 0;
388     byte[] byteArray = null;
389     for (int i = 0; i < REPS; ++i) {
390       serializeNanosStart = SystemTime.get().getNanoseconds();
391       byteArray = WritableUtils.writeVertexToByteArray(
392           vertex, false, vertex.getConf());
393       serializeNanos += Times.getNanosecondsSince(SystemTime.get(),
394           serializeNanosStart);
395     }
396     serializeNanos /= REPS;
397     System.out.println("testSerialize: Serializing took " +
398         serializeNanos + " ns for " + byteArray.length + " bytes " +
399         (byteArray.length * 1f * Time.NS_PER_SECOND / serializeNanos) +
400         " bytes / sec for " + edgesClass.getName());
401 
402     Vertex<LongWritable, FloatWritable, DoubleWritable>
403         readVertex = buildVertex(edgesClass);
404     
405     long deserializeNanosStart;
406     long deserializeNanos = 0;
407     for (int i = 0; i < REPS; ++i) {
408       deserializeNanosStart = SystemTime.get().getNanoseconds();
409       WritableUtils.reinitializeVertexFromByteArray(byteArray, readVertex, false, 
410           readVertex.getConf());
411       deserializeNanos += Times.getNanosecondsSince(SystemTime.get(),
412           deserializeNanosStart);
413     }
414     deserializeNanos /= REPS;
415     System.out.println("testSerialize: Deserializing took " +
416         deserializeNanos + " ns for " + byteArray.length + " bytes " +
417         (byteArray.length * 1f * Time.NS_PER_SECOND / deserializeNanos) +
418         " bytes / sec for " + edgesClass.getName());
419 
420     assertEquals(vertex.getId(), readVertex.getId());
421     assertEquals(vertex.getValue(), readVertex.getValue());
422     assertTrue(EdgeIterables.sameEdges(vertex.getEdges(), readVertex.getEdges()));
423   }
424 
425   private void testDynamicChannelBufferSerializeOutEdgesClass(
426       Class<? extends OutEdges> edgesClass)
427       throws IOException {
428     Vertex<LongWritable, FloatWritable, DoubleWritable> vertex =
429         buildVertex(edgesClass);
430 
431     long serializeNanosStart;
432     long serializeNanos = 0;
433     DynamicChannelBufferOutputStream outputStream = null;
434     for (int i = 0; i <
435         REPS; ++i) {
436       serializeNanosStart = SystemTime.get().getNanoseconds();
437       outputStream =
438           new DynamicChannelBufferOutputStream(32);
439       WritableUtils.writeVertexToDataOutput(outputStream, vertex, vertex.getConf());
440       serializeNanos += Times.getNanosecondsSince(SystemTime.get(),
441           serializeNanosStart);
442     }
443     serializeNanos /= REPS;
444     System.out.println("testDynamicChannelBufferSerializeOutEdgesClass: " +
445         "Serializing took " + serializeNanos + " ns for " +
446         outputStream.getDynamicChannelBuffer().writerIndex() + " bytes " +
447         (outputStream.getDynamicChannelBuffer().writerIndex() * 1f *
448             Time.NS_PER_SECOND / serializeNanos) +
449         " bytes / sec for " + edgesClass.getName());
450 
451     Vertex<LongWritable, FloatWritable, DoubleWritable>
452         readVertex = buildVertex(edgesClass);
453 
454     long deserializeNanosStart;
455     long deserializeNanos = 0;
456     for (int i = 0; i < REPS; ++i) {
457       deserializeNanosStart = SystemTime.get().getNanoseconds();
458       DynamicChannelBufferInputStream inputStream = new
459           DynamicChannelBufferInputStream(
460           outputStream.getDynamicChannelBuffer());
461       WritableUtils.reinitializeVertexFromDataInput(
462           inputStream, readVertex, readVertex.getConf());
463       deserializeNanos += Times.getNanosecondsSince(SystemTime.get(),
464           deserializeNanosStart);
465       outputStream.getDynamicChannelBuffer().readerIndex(0);
466     }
467     deserializeNanos /= REPS;
468     System.out.println("testDynamicChannelBufferSerializeOutEdgesClass: " +
469         "Deserializing took " + deserializeNanos + " ns for " +
470         outputStream.getDynamicChannelBuffer().writerIndex() + " bytes " +
471         (outputStream.getDynamicChannelBuffer().writerIndex() * 1f *
472             Time.NS_PER_SECOND / deserializeNanos) +
473         " bytes / sec for " + edgesClass.getName());
474 
475     assertEquals(vertex.getId(), readVertex.getId());
476     assertEquals(vertex.getValue(), readVertex.getValue());
477     assertTrue(EdgeIterables.sameEdges(vertex.getEdges(), readVertex.getEdges()));
478   }
479 
480   private void testUnsafeSerializeOutEdgesClass(
481       Class<? extends OutEdges> edgesClass)
482       throws IOException {
483     Vertex<LongWritable, FloatWritable, DoubleWritable> vertex =
484         buildVertex(edgesClass);
485 
486     long serializeNanosStart;
487     long serializeNanos = 0;
488     UnsafeByteArrayOutputStream outputStream = null;
489     for (int i = 0; i <
490         REPS; ++i) {
491       serializeNanosStart = SystemTime.get().getNanoseconds();
492       outputStream =
493           new UnsafeByteArrayOutputStream(32);
494       WritableUtils.writeVertexToDataOutput(outputStream, vertex, vertex.getConf());
495       serializeNanos += Times.getNanosecondsSince(SystemTime.get(),
496           serializeNanosStart);
497     }
498     serializeNanos /= REPS;
499     System.out.println("testUnsafeSerializeOutEdgesClass: " +
500         "Serializing took " +
501         serializeNanos +
502         " ns for " + outputStream.getPos()
503         + " bytes " +
504         (outputStream.getPos() * 1f *
505             Time.NS_PER_SECOND / serializeNanos) +
506         " bytes / sec for " + edgesClass.getName());
507 
508     Vertex<LongWritable, FloatWritable, DoubleWritable>
509         readVertex = buildVertex(edgesClass);
510 
511     long deserializeNanosStart;
512     long deserializeNanos = 0;
513     for (int i = 0; i < REPS; ++i) {
514       deserializeNanosStart = SystemTime.get().getNanoseconds();
515       UnsafeByteArrayInputStream inputStream = new
516           UnsafeByteArrayInputStream(
517           outputStream.getByteArray(), 0, outputStream.getPos());
518       WritableUtils.reinitializeVertexFromDataInput(
519           inputStream, readVertex, readVertex.getConf());
520       deserializeNanos += Times.getNanosecondsSince(SystemTime.get(),
521           deserializeNanosStart);
522     }
523     deserializeNanos /= REPS;
524     System.out.println("testUnsafeSerializeOutEdgesClass: " +
525         "Deserializing took " +
526         deserializeNanos +
527         " ns for " + outputStream.getPos() +
528         " bytes " +
529         (outputStream.getPos() * 1f *
530             Time.NS_PER_SECOND / deserializeNanos) +
531         " bytes / sec for " + edgesClass.getName());
532 
533     assertEquals(vertex.getId(), readVertex.getId());
534     assertEquals(vertex.getValue(), readVertex.getValue());
535     assertTrue(EdgeIterables.sameEdges(vertex.getEdges(), readVertex.getEdges()));
536   }
537 }