This project has retired. For details please refer to its Attic page.
HBaseVertexInputFormat xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.io.hbase;
19  
20  import java.io.IOException;
21  import java.util.List;
22  import org.apache.giraph.io.VertexInputFormat;
23  import org.apache.giraph.io.VertexReader;
24  import org.apache.hadoop.hbase.client.Result;
25  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
26  import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
27  import org.apache.hadoop.io.Writable;
28  import org.apache.hadoop.io.WritableComparable;
29  import org.apache.hadoop.mapreduce.InputSplit;
30  import org.apache.hadoop.mapreduce.JobContext;
31  import org.apache.hadoop.mapreduce.RecordReader;
32  import org.apache.hadoop.mapreduce.TaskAttemptContext;
33  import org.apache.log4j.Logger;
34  
35  /**
36   *
37   * Base class that wraps an HBase TableInputFormat and underlying Scan object
38   * to help instantiate vertices from an HBase table. All
39   * the static TableInputFormat properties necessary to configure
40   * an HBase job are available.
41   *
42   * For example, setting conf.set(TableInputFormat.INPUT_TABLE, "in_table");
43   * from the job setup routine will properly delegate to the
44   * TableInputFormat instance. The Configurable interface prevents specific
45   * wrapper methods from having to be called.
46   *
47   * Works with {@link HBaseVertexOutputFormat}
48   *
49   * @param <I> Vertex index value
50   * @param <V> Vertex value
51   * @param <E> Edge value
52   */
53  @SuppressWarnings("rawtypes")
54  public abstract class HBaseVertexInputFormat<
55      I extends WritableComparable,
56      V extends Writable,
57      E extends Writable>
58      extends VertexInputFormat<I, V, E>  {
59  
60  
61     /**
62     * delegate HBase table input format
63     */
64    protected static final TableInputFormat BASE_FORMAT =
65            new TableInputFormat();
66    /**
67    * logger
68    */
69    private static final Logger LOG =
70            Logger.getLogger(HBaseVertexInputFormat.class);
71  
72    /**
73     * Takes an instance of RecordReader that supports
74     * HBase row-key, result records.  Subclasses can focus on
75     * vertex instantiation details without worrying about connection
76     * semantics. Subclasses are expected to implement nextVertex() and
77     * getCurrentVertex()
78     *
79     *
80     *
81     * @param <I> Vertex index value
82     * @param <V> Vertex value
83     * @param <E> Edge value
84     */
85    public abstract static class HBaseVertexReader<
86        I extends WritableComparable,
87        V extends Writable,
88        E extends Writable>
89        extends VertexReader<I, V, E> {
90  
91      /** Reader instance */
92      private final RecordReader<ImmutableBytesWritable, Result> reader;
93      /** Context passed to initialize */
94      private TaskAttemptContext context;
95  
96      /**
97       * Sets the base TableInputFormat and creates a record reader.
98       *
99       * @param split InputSplit
100      * @param context Context
101      * @throws IOException
102      */
103     public HBaseVertexReader(InputSplit split, TaskAttemptContext context)
104       throws IOException {
105       BASE_FORMAT.setConf(context.getConfiguration());
106       this.reader = BASE_FORMAT.createRecordReader(split, context);
107     }
108 
109     /**
110      * initialize
111      *
112      * @param inputSplit Input split to be used for reading vertices.
113      * @param context Context from the task.
114      * @throws IOException
115      * @throws InterruptedException
116      */
117     public void initialize(InputSplit inputSplit,
118                            TaskAttemptContext context)
119       throws IOException, InterruptedException {
120       reader.initialize(inputSplit, context);
121       this.context = context;
122     }
123 
124     /**
125      * close
126      * @throws IOException
127      */
128     public void close() throws IOException {
129       reader.close();
130     }
131 
132     /**
133      * getProgress
134      *
135      * @return progress
136      * @throws IOException
137      * @throws InterruptedException
138      */
139     public float getProgress() throws
140       IOException, InterruptedException {
141       return reader.getProgress();
142     }
143 
144     /**
145      * getRecordReader
146      *
147      * @return Record reader to be used for reading.
148      */
149     protected RecordReader<ImmutableBytesWritable,
150       Result> getRecordReader() {
151       return reader;
152     }
153 
154    /**
155     * getContext
156     *
157     * @return Context passed to initialize.
158     */
159     protected TaskAttemptContext getContext() {
160       return context;
161     }
162 
163   }
164 
165   @Override
166   public List<InputSplit> getSplits(
167   JobContext context, int minSplitCountHint)
168     throws IOException, InterruptedException {
169     BASE_FORMAT.setConf(getConf());
170     return BASE_FORMAT.getSplits(context);
171   }
172 }