1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.io.accumulo;
2021import org.apache.accumulo.core.client.BatchWriter;
22import org.apache.accumulo.core.client.Connector;
23import org.apache.accumulo.core.client.Scanner;
24import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
25import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
26import org.apache.accumulo.core.client.mock.MockInstance;
27import org.apache.accumulo.core.data.Key;
28import org.apache.accumulo.core.data.Mutation;
29import org.apache.accumulo.core.data.Range;
30import org.apache.accumulo.core.data.Value;
31import org.apache.accumulo.core.security.Authorizations;
32import org.apache.accumulo.core.util.ByteBufferUtil;
33import org.apache.accumulo.core.util.Pair;
34import org.apache.giraph.BspCase;
35import org.apache.giraph.graph.BasicComputation;
36import org.apache.giraph.conf.GiraphConfiguration;
37import org.apache.giraph.io.accumulo.edgemarker.AccumuloEdgeInputFormat;
38import org.apache.giraph.io.accumulo.edgemarker.AccumuloEdgeOutputFormat;
39import org.apache.giraph.job.GiraphJob;
40import org.apache.giraph.graph.Vertex;
41import org.apache.hadoop.conf.Configuration;
42import org.apache.hadoop.io.Text;
43import org.apache.log4j.Logger;
44import org.junit.Test;
4546import java.io.File;
47import java.io.IOException;
48import java.nio.ByteBuffer;
49import java.util.HashSet;
50import java.util.Map;
5152importstatic org.junit.Assert.assertEquals;
53importstatic org.junit.Assert.assertTrue;
54importstatic org.junit.Assert.fail;
5556/*57 Test class for Accumulo vertex input/output formats.58 */59publicclassTestAccumuloVertexFormatextendsBspCase{
6061privatefinal String TABLE_NAME = "simple_graph";
62privatefinal String INSTANCE_NAME = "instance";
63privatefinal Text FAMILY = new Text("cf");
64privatefinal Text CHILDREN = new Text("children");
65privatefinal String USER = "root";
66privatefinal byte[] PASSWORD = new byte[] {};
67privatefinal Text OUTPUT_FIELD = new Text("parent");
686970privatefinal Logger log = Logger.getLogger(TestAccumuloVertexFormat.class);
7172/**73 * Create the test case74 */75publicTestAccumuloVertexFormat() {
76super(TestAccumuloVertexFormat.class.getName());
77 }
7879/*80 Write a simple parent-child directed graph to Accumulo.81 Run a job which reads the values82 into subclasses that extend AccumuloVertex I/O formats.83 Check the output after the job.84 */85 @Test
86publicvoid testAccumuloInputOutput() throws Exception {
87if (System.getProperty("prop.mapred.job.tracker") != null) {
88if(log.isInfoEnabled())
89 log.info("testAccumuloInputOutput: " +
90"Ignore this test if not local mode.");
91return;
92 }
9394 File jarTest = new File(System.getProperty("prop.jarLocation"));
95if(!jarTest.exists()) {
96 fail("Could not find Giraph jar at " +
97"location specified by 'prop.jarLocation'. " +
98"Make sure you built the main Giraph artifact?.");
99 }
100101//Write out vertices and edges out to a mock instance.102 MockInstance mockInstance = new MockInstance(INSTANCE_NAME);
103 Connector c = mockInstance.getConnector("root", new byte[] {});
104 c.tableOperations().create(TABLE_NAME);
105 BatchWriter bw = c.createBatchWriter(TABLE_NAME, 10000L, 1000L, 4);
106107 Mutation m1 = new Mutation(new Text("0001"));
108 m1.put(FAMILY, CHILDREN, new Value("0002".getBytes()));
109 bw.addMutation(m1);
110111 Mutation m2 = new Mutation(new Text("0002"));
112 m2.put(FAMILY, CHILDREN, new Value("0003".getBytes()));
113 bw.addMutation(m2);
114if(log.isInfoEnabled())
115 log.info("Writing mutations to Accumulo table");
116 bw.close();
117118 Configuration conf = new Configuration();
119 conf.set(AccumuloVertexOutputFormat.OUTPUT_TABLE, TABLE_NAME);
120121/*122 Very important to initialize the formats before123 sending configuration to the GiraphJob. Otherwise124 the internally constructed Job in GiraphJob will125 not have the proper context initialization.126 */127 AccumuloInputFormat.setInputInfo(conf, USER, "".getBytes(),
128 TABLE_NAME, new Authorizations());
129 AccumuloInputFormat.setMockInstance(conf, INSTANCE_NAME);
130131 AccumuloOutputFormat.setOutputInfo(conf, USER, PASSWORD, true, null);
132 AccumuloOutputFormat.setMockInstance(conf, INSTANCE_NAME);
133134 GiraphJob job = new GiraphJob(conf, getCallingMethodName());
135 setupConfiguration(job);
136 GiraphConfiguration giraphConf = job.getConfiguration();
137 giraphConf.setComputationClass(EdgeNotification.class);
138 giraphConf.setVertexInputFormatClass(AccumuloEdgeInputFormat.class);
139 giraphConf.setVertexOutputFormatClass(AccumuloEdgeOutputFormat.class);
140141 HashSet<Pair<Text, Text>> columnsToFetch = new HashSet<Pair<Text,Text>>();
142 columnsToFetch.add(new Pair<Text, Text>(FAMILY, CHILDREN));
143 AccumuloInputFormat.fetchColumns(job.getConfiguration(), columnsToFetch);
144145if(log.isInfoEnabled())
146 log.info("Running edge notification job using Accumulo input");
147 assertTrue(job.run(true));
148 Scanner scanner = c.createScanner(TABLE_NAME, new Authorizations());
149 scanner.setRange(new Range("0002", "0002"));
150 scanner.fetchColumn(FAMILY, OUTPUT_FIELD);
151boolean foundColumn = false;
152153if(log.isInfoEnabled())
154 log.info("Verify job output persisted correctly.");
155//make sure we found the qualifier.156 assertTrue(scanner.iterator().hasNext());
157158159//now we check to make sure the expected value from the job persisted correctly.160for(Map.Entry<Key,Value> entry : scanner) {
161 Text row = entry.getKey().getRow();
162 assertEquals("0002", row.toString());
163 Value value = entry.getValue();
164 assertEquals("0001", ByteBufferUtil.toString(
165 ByteBuffer.wrap(value.get())));
166 foundColumn = true;
167 }
168 }
169/*170 Test compute method that sends each edge a notification of its parents.171 The test set only has a 1-1 parent-to-child ratio for this unit test.172 */173publicstaticclassEdgeNotification174extends BasicComputation<Text, Text, Text, Text> {
175 @Override
176publicvoid compute(Vertex<Text, Text, Text> vertex,
177 Iterable<Text> messages) throws IOException {
178for (Text message : messages) {
179 vertex.getValue().set(message);
180 }
181if(getSuperstep() == 0) {
182 sendMessageToAllEdges(vertex, vertex.getId());
183 }
184 vertex.voteToHalt();
185 }
186 }
187 }