1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.io.hbase.edgemarker;
1920import org.apache.giraph.edge.Edge;
21import org.apache.giraph.edge.EdgeFactory;
22import org.apache.giraph.graph.Vertex;
23import org.apache.giraph.io.VertexReader;
24import org.apache.giraph.io.hbase.HBaseVertexInputFormat;
25import org.apache.hadoop.conf.Configuration;
26import org.apache.hadoop.hbase.client.Result;
27import org.apache.hadoop.hbase.util.Bytes;
28import org.apache.hadoop.io.Text;
29import org.apache.hadoop.mapreduce.InputSplit;
30import org.apache.hadoop.mapreduce.TaskAttemptContext;
31import org.apache.log4j.Logger;
3233import com.google.common.collect.Lists;
3435import java.io.IOException;
36import java.util.List;
3738/**39 * Test subclass for HBaseVertexInputFormat. Reads a simple40 * children qualifier to create an edge.41 */42publicclassTableEdgeInputFormatextends43 HBaseVertexInputFormat<Text, Text, Text> {
4445privatestaticfinal Logger LOG =
46 Logger.getLogger(TableEdgeInputFormat.class);
47privatestaticfinal Text uselessEdgeValue = new Text();
4849 @Override publicvoid checkInputSpecs(Configuration conf) { }
5051public VertexReader<Text, Text, Text>
52 createVertexReader(InputSplit split,
53 TaskAttemptContext context) throws IOException {
5455returnnewTableEdgeVertexReader(split, context);
5657 }
5859/**60 * Uses the RecordReader to return Hbase rows61 */62publicstaticclassTableEdgeVertexReader63extends HBaseVertexReader<Text, Text, Text> {
6465privatefinal byte[] CF = Bytes.toBytes("cf");
66privatefinal byte[] CHILDREN = Bytes.toBytes("children");
6768publicTableEdgeVertexReader(InputSplit split, TaskAttemptContext context) throws IOException {
69super(split, context);
70 }
7172 @Override
73publicboolean nextVertex() throws IOException,
74 InterruptedException {
75return getRecordReader().nextKeyValue();
76 }
7778/**79 * For each row, create a vertex with the row ID as a text,80 * and it's 'children' qualifier as a single edge.81 */82 @Override
83public Vertex<Text, Text, Text>
84 getCurrentVertex()
85throws IOException, InterruptedException {
86 Result row = getRecordReader().getCurrentValue();
87 Vertex<Text, Text, Text> vertex =
88 getConf().createVertex();
89 Text vertexId = new Text(Bytes.toString(row.getRow()));
90 List<Edge<Text, Text>> edges = Lists.newLinkedList();
91 String edge = Bytes.toString(row.getValue(CF, CHILDREN));
92 Text vertexValue = new Text();
93 Text edgeId = new Text(edge);
94 edges.add(EdgeFactory.create(edgeId, uselessEdgeValue));
95 vertex.initialize(vertexId, vertexValue, edges);
9697return vertex;
98 }
99 }
100 }