View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.io.accumulo;
20  
21  import org.apache.accumulo.core.client.BatchWriter;
22  import org.apache.accumulo.core.client.Connector;
23  import org.apache.accumulo.core.client.Scanner;
24  import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
25  import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
26  import org.apache.accumulo.core.client.mock.MockInstance;
27  import org.apache.accumulo.core.data.Key;
28  import org.apache.accumulo.core.data.Mutation;
29  import org.apache.accumulo.core.data.Range;
30  import org.apache.accumulo.core.data.Value;
31  import org.apache.accumulo.core.security.Authorizations;
32  import org.apache.accumulo.core.util.ByteBufferUtil;
33  import org.apache.accumulo.core.util.Pair;
34  import org.apache.giraph.BspCase;
35  import org.apache.giraph.graph.BasicComputation;
36  import org.apache.giraph.conf.GiraphConfiguration;
37  import org.apache.giraph.io.accumulo.edgemarker.AccumuloEdgeInputFormat;
38  import org.apache.giraph.io.accumulo.edgemarker.AccumuloEdgeOutputFormat;
39  import org.apache.giraph.job.GiraphJob;
40  import org.apache.giraph.graph.Vertex;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.io.Text;
43  import org.apache.log4j.Logger;
44  import org.junit.Test;
45  
46  import java.io.File;
47  import java.io.IOException;
48  import java.nio.ByteBuffer;
49  import java.util.HashSet;
50  import java.util.Map;
51  
52  import static org.junit.Assert.assertEquals;
53  import static org.junit.Assert.assertTrue;
54  import static org.junit.Assert.fail;
55  
56  /*
57      Test class for Accumulo vertex input/output formats.
58   */
59  public class TestAccumuloVertexFormat extends BspCase{
60  
61      private final String TABLE_NAME = "simple_graph";
62      private final String INSTANCE_NAME = "instance";
63      private final Text FAMILY = new Text("cf");
64      private final Text CHILDREN = new Text("children");
65      private final String USER = "root";
66      private final byte[] PASSWORD = new byte[] {};
67      private final Text OUTPUT_FIELD = new Text("parent");
68  
69  
70      private final Logger log = Logger.getLogger(TestAccumuloVertexFormat.class);
71  
72      /**
73       * Create the test case
74       */
75      public TestAccumuloVertexFormat() {
76          super(TestAccumuloVertexFormat.class.getName());
77      }
78  
79      /*
80       Write a simple parent-child directed graph to Accumulo.
81       Run a job which reads the values
82       into subclasses that extend AccumuloVertex I/O formats.
83       Check the output after the job.
84       */
85      @Test
86      public void testAccumuloInputOutput() throws Exception {
87          if (System.getProperty("prop.mapred.job.tracker") != null) {
88              if(log.isInfoEnabled())
89                  log.info("testAccumuloInputOutput: " +
90                          "Ignore this test if not local mode.");
91              return;
92          }
93  
94          File jarTest = new File(System.getProperty("prop.jarLocation"));
95          if(!jarTest.exists()) {
96              fail("Could not find Giraph jar at " +
97                      "location specified by 'prop.jarLocation'. " +
98                      "Make sure you built the main Giraph artifact?.");
99          }
100 
101         //Write out vertices and edges out to a mock instance.
102         MockInstance mockInstance = new MockInstance(INSTANCE_NAME);
103         Connector c = mockInstance.getConnector("root", new byte[] {});
104         c.tableOperations().create(TABLE_NAME);
105         BatchWriter bw = c.createBatchWriter(TABLE_NAME, 10000L, 1000L, 4);
106 
107         Mutation m1 = new Mutation(new Text("0001"));
108         m1.put(FAMILY, CHILDREN, new Value("0002".getBytes()));
109         bw.addMutation(m1);
110 
111         Mutation m2 = new Mutation(new Text("0002"));
112         m2.put(FAMILY, CHILDREN, new Value("0003".getBytes()));
113         bw.addMutation(m2);
114         if(log.isInfoEnabled())
115             log.info("Writing mutations to Accumulo table");
116         bw.close();
117 
118         Configuration conf = new Configuration();
119         conf.set(AccumuloVertexOutputFormat.OUTPUT_TABLE, TABLE_NAME);
120 
121         /*
122         Very important to initialize the formats before
123         sending configuration to the GiraphJob. Otherwise
124         the internally constructed Job in GiraphJob will
125         not have the proper context initialization.
126          */
127         AccumuloInputFormat.setInputInfo(conf, USER, "".getBytes(),
128                 TABLE_NAME, new Authorizations());
129         AccumuloInputFormat.setMockInstance(conf, INSTANCE_NAME);
130 
131         AccumuloOutputFormat.setOutputInfo(conf, USER, PASSWORD, true, null);
132         AccumuloOutputFormat.setMockInstance(conf, INSTANCE_NAME);
133 
134         GiraphJob job = new GiraphJob(conf, getCallingMethodName());
135         setupConfiguration(job);
136         GiraphConfiguration giraphConf = job.getConfiguration();
137         giraphConf.setComputationClass(EdgeNotification.class);
138         giraphConf.setVertexInputFormatClass(AccumuloEdgeInputFormat.class);
139         giraphConf.setVertexOutputFormatClass(AccumuloEdgeOutputFormat.class);
140 
141         HashSet<Pair<Text, Text>> columnsToFetch = new HashSet<Pair<Text,Text>>();
142         columnsToFetch.add(new Pair<Text, Text>(FAMILY, CHILDREN));
143         AccumuloInputFormat.fetchColumns(job.getConfiguration(), columnsToFetch);
144 
145         if(log.isInfoEnabled())
146             log.info("Running edge notification job using Accumulo input");
147         assertTrue(job.run(true));
148         Scanner scanner = c.createScanner(TABLE_NAME, new Authorizations());
149         scanner.setRange(new Range("0002", "0002"));
150         scanner.fetchColumn(FAMILY, OUTPUT_FIELD);
151         boolean foundColumn = false;
152 
153         if(log.isInfoEnabled())
154             log.info("Verify job output persisted correctly.");
155         //make sure we found the qualifier.
156         assertTrue(scanner.iterator().hasNext());
157 
158 
159         //now we check to make sure the expected value from the job persisted correctly.
160         for(Map.Entry<Key,Value> entry : scanner) {
161             Text row = entry.getKey().getRow();
162             assertEquals("0002", row.toString());
163             Value value = entry.getValue();
164             assertEquals("0001", ByteBufferUtil.toString(
165                     ByteBuffer.wrap(value.get())));
166             foundColumn = true;
167         }
168     }
169     /*
170     Test compute method that sends each edge a notification of its parents.
171     The test set only has a 1-1 parent-to-child ratio for this unit test.
172      */
173     public static class EdgeNotification
174             extends BasicComputation<Text, Text, Text, Text> {
175       @Override
176       public void compute(Vertex<Text, Text, Text> vertex,
177           Iterable<Text> messages) throws IOException {
178           for (Text message : messages) {
179             vertex.getValue().set(message);
180           }
181           if(getSuperstep() == 0) {
182             sendMessageToAllEdges(vertex, vertex.getId());
183           }
184         vertex.voteToHalt();
185       }
186     }
187 }