View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.io.hbase;
20  
21  
22  import org.apache.giraph.BspCase;
23  import org.apache.giraph.conf.GiraphConfiguration;
24  import org.apache.giraph.graph.BasicComputation;
25  import org.apache.giraph.graph.Vertex;
26  import org.apache.giraph.io.hbase.edgemarker.TableEdgeInputFormat;
27  import org.apache.giraph.io.hbase.edgemarker.TableEdgeOutputFormat;
28  import org.apache.giraph.job.GiraphJob;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.fs.FSDataOutputStream;
31  import org.apache.hadoop.fs.FileSystem;
32  import org.apache.hadoop.fs.Path;
33  import org.apache.hadoop.hbase.HBaseTestingUtility;
34  import org.apache.hadoop.hbase.HColumnDescriptor;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.HTableDescriptor;
37  import org.apache.hadoop.hbase.MiniHBaseCluster;
38  import org.apache.hadoop.hbase.client.Get;
39  import org.apache.hadoop.hbase.client.HBaseAdmin;
40  import org.apache.hadoop.hbase.client.HTable;
41  import org.apache.hadoop.hbase.client.Result;
42  import org.apache.hadoop.hbase.mapreduce.ImportTsv;
43  import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
44  import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
45  import org.apache.hadoop.hbase.util.Bytes;
46  import org.apache.hadoop.io.Text;
47  import org.apache.hadoop.mapreduce.Job;
48  import org.apache.hadoop.util.GenericOptionsParser;
49  import org.apache.log4j.Logger;
50  import org.junit.Test;
51  
52  import java.io.File;
53  import java.io.IOException;
54  import java.util.UUID;
55  
56  import static org.junit.Assert.assertEquals;
57  import static org.junit.Assert.assertNotNull;
58  import static org.junit.Assert.assertTrue;
59  import static org.junit.Assert.fail;
60  
61  /**
62   * Test case for HBase reading/writing vertices from an HBase instance.
63   */
64  public class TestHBaseRootMarkerVertextFormat extends BspCase {
65    private final Logger log = Logger.getLogger(TestHBaseRootMarkerVertextFormat.class);
66  
67    private final String TABLE_NAME = "simple_graph";
68    private final String FAMILY = "cf";
69    private final String QUALIFER = "children";
70    private final String OUTPUT_FIELD = "parent";
71  
72    private final HBaseTestingUtility testUtil = new HBaseTestingUtility();
73  
74    public TestHBaseRootMarkerVertextFormat() {
75      super(TestHBaseRootMarkerVertextFormat.class.getName());
76    }
77  
78    @Test
79    public void testHBaseInputOutput() throws Exception {
80      if (System.getProperty("prop.mapred.job.tracker") != null) {
81        if(log.isInfoEnabled())
82          log.info("testHBaseInputOutput: Ignore this test if not local mode.");
83        return;
84      }
85  
86      File jarTest = new File(System.getProperty("prop.jarLocation"));
87      if(!jarTest.exists()) {
88        fail("Could not find Giraph jar at " +
89            "location specified by 'prop.jarLocation'. " +
90            "Make sure you built the main Giraph artifact?.");
91      }
92  
93      FileSystem fs = null;
94      Path hbaseRootdir = null;
95      try {
96        MiniHBaseCluster cluster = testUtil.startMiniCluster(1);
97        cluster.waitForActiveAndReadyMaster();
98        testUtil.startMiniMapReduceCluster();
99  
100       // Let's set up the hbase root directory.
101       Configuration conf = testUtil.getConfiguration();
102       try {
103         fs = testUtil.getTestFileSystem();
104         String randomStr = UUID.randomUUID().toString();
105         String tmpdir = System.getProperty("java.io.tmpdir") + "/" +
106             randomStr + "/";
107         hbaseRootdir = fs.makeQualified(new Path(tmpdir));
108 
109         conf.set(HConstants.HBASE_DIR, hbaseRootdir.toString());
110         fs.mkdirs(hbaseRootdir);
111       } catch(IOException ioe) {
112         fail("Could not create hbase root directory.");
113       }
114 
115       //First let's load some data using ImportTsv into our mock table.
116       String INPUT_FILE = hbaseRootdir.toString() + "/graph.csv";
117       String[] args = new String[] {
118           "-Dimporttsv.columns=HBASE_ROW_KEY,cf:"+QUALIFER,
119           "-Dimporttsv.separator=" + "\u002c",
120           TABLE_NAME,
121           INPUT_FILE
122       };
123 
124       GenericOptionsParser opts =
125           new GenericOptionsParser(testUtil.getConfiguration(), args);
126       args = opts.getRemainingArgs();
127 
128       fs = FileSystem.get(conf);
129       fs.setConf(conf);
130       Path inputPath = fs.makeQualified(new Path(hbaseRootdir, "graph.csv"));
131       FSDataOutputStream op = fs.create(inputPath, true);
132       String line1 = "0001,0002\n";
133       String line2 = "0002,0004\n";
134       String line3 = "0003,0005\n";
135       String line4 = "0004,-1\n";
136       String line5 = "0005,-1\n";
137       op.write(line1.getBytes());
138       op.write(line2.getBytes());
139       op.write(line3.getBytes());
140       op.write(line4.getBytes());
141       op.write(line5.getBytes());
142       op.close();
143 
144       final byte[] FAM = Bytes.toBytes(FAMILY);
145       final byte[] TAB = Bytes.toBytes(TABLE_NAME);
146 
147       HTableDescriptor desc = new HTableDescriptor(TAB);
148       desc.addFamily(new HColumnDescriptor(FAM));
149       HBaseAdmin hbaseAdmin=new HBaseAdmin(conf);
150       if (hbaseAdmin.isTableAvailable(TABLE_NAME)) {
151         hbaseAdmin.disableTable(TABLE_NAME);
152         hbaseAdmin.deleteTable(TABLE_NAME);
153       }
154       hbaseAdmin.createTable(desc);
155 
156       // Do the import
157       Job job = ImportTsv.createSubmittableJob(conf, args);
158       job.waitForCompletion(false);
159       assertTrue(job.isSuccessful());
160       if(log.isInfoEnabled())
161         log.info("ImportTsv successful. Running HBase Giraph job.");
162 
163       // Now operate over HBase using Vertex I/O formats
164       conf.set(TableInputFormat.INPUT_TABLE, TABLE_NAME);
165       conf.set(TableOutputFormat.OUTPUT_TABLE, TABLE_NAME);
166 
167       GiraphJob giraphJob = new GiraphJob(conf, BspCase.getCallingMethodName());
168       GiraphConfiguration giraphConf = giraphJob.getConfiguration();
169       setupConfiguration(giraphJob);
170       giraphConf.setComputationClass(EdgeNotification.class);
171       giraphConf.setVertexInputFormatClass(TableEdgeInputFormat.class);
172       giraphConf.setVertexOutputFormatClass(TableEdgeOutputFormat.class);
173 
174       assertTrue(giraphJob.run(true));
175       if(log.isInfoEnabled())
176         log.info("Giraph job successful. Checking output qualifier.");
177 
178       // Do a get on row 0002, it should have a parent of 0001
179       // if the outputFormat worked.
180       HTable table = new HTable(conf, TABLE_NAME);
181       Result result = table.get(new Get("0002".getBytes()));
182       byte[] parentBytes = result.getValue(FAMILY.getBytes(),
183           OUTPUT_FIELD.getBytes());
184       assertNotNull(parentBytes);
185       assertTrue(parentBytes.length > 0);
186       assertEquals("0001", Bytes.toString(parentBytes));
187     } finally {
188       testUtil.shutdownMiniMapReduceCluster();
189       testUtil.shutdownMiniCluster();
190     }
191   }
192 
193   /**
194    * Test compute method that sends each edge a notification of its parents.
195    * The test set only has a 1-1 parent-to-child ratio for this unit test.
196    */
197   public static class EdgeNotification
198       extends BasicComputation<Text, Text, Text, Text> {
199     @Override
200     public void compute(Vertex<Text, Text, Text> vertex,
201         Iterable<Text> messages) throws IOException {
202       for (Text message : messages) {
203         vertex.getValue().set(message);
204       }
205       if(getSuperstep() == 0) {
206         sendMessageToAllEdges(vertex, vertex.getId());
207       }
208       vertex.voteToHalt();
209     }
210   }
211 }