View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.io.hbase.edgemarker;
19  
20  import org.apache.giraph.edge.Edge;
21  import org.apache.giraph.edge.EdgeFactory;
22  import org.apache.giraph.graph.Vertex;
23  import org.apache.giraph.io.VertexReader;
24  import org.apache.giraph.io.hbase.HBaseVertexInputFormat;
25  import org.apache.hadoop.conf.Configuration;
26  import org.apache.hadoop.hbase.client.Result;
27  import org.apache.hadoop.hbase.util.Bytes;
28  import org.apache.hadoop.io.Text;
29  import org.apache.hadoop.mapreduce.InputSplit;
30  import org.apache.hadoop.mapreduce.TaskAttemptContext;
31  import org.apache.log4j.Logger;
32  
33  import com.google.common.collect.Lists;
34  
35  import java.io.IOException;
36  import java.util.List;
37  
38  /**
39   *  Test subclass for HBaseVertexInputFormat. Reads a simple
40   *  children qualifier to create an edge.
41   */
42  public class TableEdgeInputFormat extends
43      HBaseVertexInputFormat<Text, Text, Text> {
44  
45    private static final Logger LOG =
46        Logger.getLogger(TableEdgeInputFormat.class);
47    private static final Text uselessEdgeValue = new Text();
48  
49    @Override public void checkInputSpecs(Configuration conf) { }
50  
51    public VertexReader<Text, Text, Text>
52    createVertexReader(InputSplit split,
53                       TaskAttemptContext context) throws IOException {
54  
55      return new TableEdgeVertexReader(split, context);
56  
57    }
58  
59    /**
60     * Uses the RecordReader to return Hbase rows
61     */
62    public static class TableEdgeVertexReader
63        extends HBaseVertexReader<Text, Text, Text> {
64  
65      private final byte[] CF = Bytes.toBytes("cf");
66      private final byte[] CHILDREN = Bytes.toBytes("children");
67  
68      public TableEdgeVertexReader(InputSplit split, TaskAttemptContext context) throws IOException {
69        super(split, context);
70      }
71  
72      @Override
73      public boolean nextVertex() throws IOException,
74          InterruptedException {
75        return getRecordReader().nextKeyValue();
76      }
77  
78      /**
79       * For each row, create a vertex with the row ID as a text,
80       * and it's 'children' qualifier as a single edge.
81       */
82      @Override
83      public Vertex<Text, Text, Text>
84      getCurrentVertex()
85          throws IOException, InterruptedException {
86        Result row = getRecordReader().getCurrentValue();
87        Vertex<Text, Text, Text> vertex =
88            getConf().createVertex();
89        Text vertexId = new Text(Bytes.toString(row.getRow()));
90        List<Edge<Text, Text>> edges = Lists.newLinkedList();
91        String edge = Bytes.toString(row.getValue(CF, CHILDREN));
92        Text vertexValue = new Text();
93        Text edgeId = new Text(edge);
94        edges.add(EdgeFactory.create(edgeId, uselessEdgeValue));
95        vertex.initialize(vertexId, vertexValue, edges);
96  
97        return vertex;
98      }
99    }
100 }