This project has retired. For details please refer to its Attic page.
TestFilters xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.io;
19  
20  import org.apache.giraph.BspCase;
21  import org.apache.giraph.conf.GiraphConfiguration;
22  import org.apache.giraph.edge.Edge;
23  import org.apache.giraph.graph.Vertex;
24  import org.apache.giraph.io.filters.EdgeInputFilter;
25  import org.apache.giraph.io.filters.VertexInputFilter;
26  import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
27  import org.apache.giraph.io.formats.IntIntTextVertexValueInputFormat;
28  import org.apache.giraph.io.formats.IntNullTextEdgeInputFormat;
29  import org.apache.giraph.utils.ComputationCountEdges;
30  import org.apache.giraph.utils.IntNoOpComputation;
31  import org.apache.giraph.utils.InternalVertexRunner;
32  import org.apache.hadoop.io.IntWritable;
33  import org.apache.hadoop.io.NullWritable;
34  import org.junit.Test;
35  
36  import com.google.common.collect.Maps;
37  
38  import java.util.Map;
39  
40  import static org.junit.Assert.assertEquals;
41  
42  public class TestFilters extends BspCase {
43    public TestFilters() {
44      super(TestFilters.class.getName());
45    }
46  
47    public static class EdgeFilter implements EdgeInputFilter<IntWritable, NullWritable> {
48      @Override public boolean dropEdge(IntWritable sourceId, Edge<IntWritable, NullWritable> edge) {
49        return sourceId.get() == 2;
50      }
51    }
52  
53    @Test
54    public void testEdgeFilter() throws Exception {
55      String[] edges = new String[] {
56          "1 2",
57          "2 3",
58          "2 4",
59          "4 1"
60      };
61  
62      GiraphConfiguration conf = new GiraphConfiguration();
63      conf.setComputationClass(ComputationCountEdges.class);
64      conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
65      conf.setEdgeInputFilterClass(EdgeFilter.class);
66      conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
67      Iterable<String> results = InternalVertexRunner.run(conf, null, edges);
68  
69      Map<Integer, Integer> values = parseResults(results);
70  
71      assertEquals(2, values.size());
72      assertEquals(1, (int) values.get(1));
73      assertEquals(1, (int) values.get(4));
74    }
75  
76    public static class VertexFilter implements VertexInputFilter<IntWritable,
77        NullWritable, NullWritable> {
78      @Override
79      public boolean dropVertex(Vertex<IntWritable, NullWritable,
80          NullWritable> vertex) {
81        int id = vertex.getId().get();
82        return id == 2 || id == 3;
83      }
84    }
85  
86    @Test
87    public void testVertexFilter() throws Exception {
88      String[] vertices = new String[] {
89          "1 1",
90          "2 2",
91          "3 3",
92          "4 4"
93      };
94  
95      GiraphConfiguration conf = new GiraphConfiguration();
96      conf.setComputationClass(IntNoOpComputation.class);
97      conf.setVertexInputFormatClass(IntIntTextVertexValueInputFormat.class);
98      conf.setVertexInputFilterClass(VertexFilter.class);
99      conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
100     Iterable<String> results = InternalVertexRunner.run(conf, vertices);
101 
102     Map<Integer, Integer> values = parseResults(results);
103 
104     assertEquals(2, values.size());
105     assertEquals(1, (int) values.get(1));
106     assertEquals(4, (int) values.get(4));
107   }
108 
109   private static Map<Integer, Integer> parseResults(Iterable<String> results) {
110     Map<Integer, Integer> values = Maps.newHashMap();
111     for (String line : results) {
112       String[] tokens = line.split("\\s+");
113       int id = Integer.valueOf(tokens[0]);
114       int value = Integer.valueOf(tokens[1]);
115       values.put(id, value);
116     }
117     return values;
118   }
119 }