This project has retired. For details please refer to its
Attic page.
TestHBaseRootMarkerVertextFormat xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.io.hbase;
20
21
22 import org.apache.giraph.BspCase;
23 import org.apache.giraph.conf.GiraphConfiguration;
24 import org.apache.giraph.graph.BasicComputation;
25 import org.apache.giraph.graph.Vertex;
26 import org.apache.giraph.io.hbase.edgemarker.TableEdgeInputFormat;
27 import org.apache.giraph.io.hbase.edgemarker.TableEdgeOutputFormat;
28 import org.apache.giraph.job.GiraphJob;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.fs.FSDataOutputStream;
31 import org.apache.hadoop.fs.FileSystem;
32 import org.apache.hadoop.fs.Path;
33 import org.apache.hadoop.hbase.HBaseTestingUtility;
34 import org.apache.hadoop.hbase.HColumnDescriptor;
35 import org.apache.hadoop.hbase.HConstants;
36 import org.apache.hadoop.hbase.HTableDescriptor;
37 import org.apache.hadoop.hbase.MiniHBaseCluster;
38 import org.apache.hadoop.hbase.client.Get;
39 import org.apache.hadoop.hbase.client.HBaseAdmin;
40 import org.apache.hadoop.hbase.client.HTable;
41 import org.apache.hadoop.hbase.client.Result;
42 import org.apache.hadoop.hbase.mapreduce.ImportTsv;
43 import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
44 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
45 import org.apache.hadoop.hbase.util.Bytes;
46 import org.apache.hadoop.io.Text;
47 import org.apache.hadoop.mapreduce.Job;
48 import org.apache.hadoop.util.GenericOptionsParser;
49 import org.apache.log4j.Logger;
50 import org.junit.Test;
51
52 import java.io.File;
53 import java.io.IOException;
54 import java.util.UUID;
55
56 import static org.junit.Assert.assertEquals;
57 import static org.junit.Assert.assertNotNull;
58 import static org.junit.Assert.assertTrue;
59 import static org.junit.Assert.fail;
60
61
62
63
64 public class TestHBaseRootMarkerVertextFormat extends BspCase {
65 private final Logger log = Logger.getLogger(TestHBaseRootMarkerVertextFormat.class);
66
67 private final String TABLE_NAME = "simple_graph";
68 private final String FAMILY = "cf";
69 private final String QUALIFER = "children";
70 private final String OUTPUT_FIELD = "parent";
71
72 private final HBaseTestingUtility testUtil = new HBaseTestingUtility();
73
74 public TestHBaseRootMarkerVertextFormat() {
75 super(TestHBaseRootMarkerVertextFormat.class.getName());
76 }
77
78 @Test
79 public void testHBaseInputOutput() throws Exception {
80 if (System.getProperty("prop.mapred.job.tracker") != null) {
81 if(log.isInfoEnabled())
82 log.info("testHBaseInputOutput: Ignore this test if not local mode.");
83 return;
84 }
85
86 File jarTest = new File(System.getProperty("prop.jarLocation"));
87 if(!jarTest.exists()) {
88 fail("Could not find Giraph jar at " +
89 "location specified by 'prop.jarLocation'. " +
90 "Make sure you built the main Giraph artifact?.");
91 }
92
93 FileSystem fs = null;
94 Path hbaseRootdir = null;
95 try {
96 MiniHBaseCluster cluster = testUtil.startMiniCluster(1);
97 cluster.waitForActiveAndReadyMaster();
98 testUtil.startMiniMapReduceCluster();
99
100
101 Configuration conf = testUtil.getConfiguration();
102 try {
103 fs = testUtil.getTestFileSystem();
104 String randomStr = UUID.randomUUID().toString();
105 String tmpdir = System.getProperty("java.io.tmpdir") + "/" +
106 randomStr + "/";
107 hbaseRootdir = fs.makeQualified(new Path(tmpdir));
108
109 conf.set(HConstants.HBASE_DIR, hbaseRootdir.toString());
110 fs.mkdirs(hbaseRootdir);
111 } catch(IOException ioe) {
112 fail("Could not create hbase root directory.");
113 }
114
115
116 String INPUT_FILE = hbaseRootdir.toString() + "/graph.csv";
117 String[] args = new String[] {
118 "-Dimporttsv.columns=HBASE_ROW_KEY,cf:"+QUALIFER,
119 "-Dimporttsv.separator=" + "\u002c",
120 TABLE_NAME,
121 INPUT_FILE
122 };
123
124 GenericOptionsParser opts =
125 new GenericOptionsParser(testUtil.getConfiguration(), args);
126 args = opts.getRemainingArgs();
127
128 fs = FileSystem.get(conf);
129 fs.setConf(conf);
130 Path inputPath = fs.makeQualified(new Path(hbaseRootdir, "graph.csv"));
131 FSDataOutputStream op = fs.create(inputPath, true);
132 String line1 = "0001,0002\n";
133 String line2 = "0002,0004\n";
134 String line3 = "0003,0005\n";
135 String line4 = "0004,-1\n";
136 String line5 = "0005,-1\n";
137 op.write(line1.getBytes());
138 op.write(line2.getBytes());
139 op.write(line3.getBytes());
140 op.write(line4.getBytes());
141 op.write(line5.getBytes());
142 op.close();
143
144 final byte[] FAM = Bytes.toBytes(FAMILY);
145 final byte[] TAB = Bytes.toBytes(TABLE_NAME);
146
147 HTableDescriptor desc = new HTableDescriptor(TAB);
148 desc.addFamily(new HColumnDescriptor(FAM));
149 HBaseAdmin hbaseAdmin=new HBaseAdmin(conf);
150 if (hbaseAdmin.isTableAvailable(TABLE_NAME)) {
151 hbaseAdmin.disableTable(TABLE_NAME);
152 hbaseAdmin.deleteTable(TABLE_NAME);
153 }
154 hbaseAdmin.createTable(desc);
155
156
157 Job job = ImportTsv.createSubmittableJob(conf, args);
158 job.waitForCompletion(false);
159 assertTrue(job.isSuccessful());
160 if(log.isInfoEnabled())
161 log.info("ImportTsv successful. Running HBase Giraph job.");
162
163
164 conf.set(TableInputFormat.INPUT_TABLE, TABLE_NAME);
165 conf.set(TableOutputFormat.OUTPUT_TABLE, TABLE_NAME);
166
167 GiraphJob giraphJob = new GiraphJob(conf, BspCase.getCallingMethodName());
168 GiraphConfiguration giraphConf = giraphJob.getConfiguration();
169 setupConfiguration(giraphJob);
170 giraphConf.setComputationClass(EdgeNotification.class);
171 giraphConf.setVertexInputFormatClass(TableEdgeInputFormat.class);
172 giraphConf.setVertexOutputFormatClass(TableEdgeOutputFormat.class);
173
174 assertTrue(giraphJob.run(true));
175 if(log.isInfoEnabled())
176 log.info("Giraph job successful. Checking output qualifier.");
177
178
179
180 HTable table = new HTable(conf, TABLE_NAME);
181 Result result = table.get(new Get("0002".getBytes()));
182 byte[] parentBytes = result.getValue(FAMILY.getBytes(),
183 OUTPUT_FIELD.getBytes());
184 assertNotNull(parentBytes);
185 assertTrue(parentBytes.length > 0);
186 assertEquals("0001", Bytes.toString(parentBytes));
187 } finally {
188 testUtil.shutdownMiniMapReduceCluster();
189 testUtil.shutdownMiniCluster();
190 }
191 }
192
193
194
195
196
197 public static class EdgeNotification
198 extends BasicComputation<Text, Text, Text, Text> {
199 @Override
200 public void compute(Vertex<Text, Text, Text> vertex,
201 Iterable<Text> messages) throws IOException {
202 for (Text message : messages) {
203 vertex.getValue().set(message);
204 }
205 if(getSuperstep() == 0) {
206 sendMessageToAllEdges(vertex, vertex.getId());
207 }
208 vertex.voteToHalt();
209 }
210 }
211 }