1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.giraph.io.hbase;
19
20 import java.io.IOException;
21 import java.util.List;
22 import org.apache.giraph.io.VertexInputFormat;
23 import org.apache.giraph.io.VertexReader;
24 import org.apache.hadoop.hbase.client.Result;
25 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
26 import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
27 import org.apache.hadoop.io.Writable;
28 import org.apache.hadoop.io.WritableComparable;
29 import org.apache.hadoop.mapreduce.InputSplit;
30 import org.apache.hadoop.mapreduce.JobContext;
31 import org.apache.hadoop.mapreduce.RecordReader;
32 import org.apache.hadoop.mapreduce.TaskAttemptContext;
33 import org.apache.log4j.Logger;
34
35 /**
36 *
37 * Base class that wraps an HBase TableInputFormat and underlying Scan object
38 * to help instantiate vertices from an HBase table. All
39 * the static TableInputFormat properties necessary to configure
40 * an HBase job are available.
41 *
42 * For example, setting conf.set(TableInputFormat.INPUT_TABLE, "in_table");
43 * from the job setup routine will properly delegate to the
44 * TableInputFormat instance. The Configurable interface prevents specific
45 * wrapper methods from having to be called.
46 *
47 * Works with {@link HBaseVertexOutputFormat}
48 *
49 * @param <I> Vertex index value
50 * @param <V> Vertex value
51 * @param <E> Edge value
52 */
53 @SuppressWarnings("rawtypes")
54 public abstract class HBaseVertexInputFormat<
55 I extends WritableComparable,
56 V extends Writable,
57 E extends Writable>
58 extends VertexInputFormat<I, V, E> {
59
60
61 /**
62 * delegate HBase table input format
63 */
64 protected static final TableInputFormat BASE_FORMAT =
65 new TableInputFormat();
66 /**
67 * logger
68 */
69 private static final Logger LOG =
70 Logger.getLogger(HBaseVertexInputFormat.class);
71
72 /**
73 * Takes an instance of RecordReader that supports
74 * HBase row-key, result records. Subclasses can focus on
75 * vertex instantiation details without worrying about connection
76 * semantics. Subclasses are expected to implement nextVertex() and
77 * getCurrentVertex()
78 *
79 *
80 *
81 * @param <I> Vertex index value
82 * @param <V> Vertex value
83 * @param <E> Edge value
84 */
85 public abstract static class HBaseVertexReader<
86 I extends WritableComparable,
87 V extends Writable,
88 E extends Writable>
89 extends VertexReader<I, V, E> {
90
91 /** Reader instance */
92 private final RecordReader<ImmutableBytesWritable, Result> reader;
93 /** Context passed to initialize */
94 private TaskAttemptContext context;
95
96 /**
97 * Sets the base TableInputFormat and creates a record reader.
98 *
99 * @param split InputSplit
100 * @param context Context
101 * @throws IOException
102 */
103 public HBaseVertexReader(InputSplit split, TaskAttemptContext context)
104 throws IOException {
105 BASE_FORMAT.setConf(context.getConfiguration());
106 this.reader = BASE_FORMAT.createRecordReader(split, context);
107 }
108
109 /**
110 * initialize
111 *
112 * @param inputSplit Input split to be used for reading vertices.
113 * @param context Context from the task.
114 * @throws IOException
115 * @throws InterruptedException
116 */
117 public void initialize(InputSplit inputSplit,
118 TaskAttemptContext context)
119 throws IOException, InterruptedException {
120 reader.initialize(inputSplit, context);
121 this.context = context;
122 }
123
124 /**
125 * close
126 * @throws IOException
127 */
128 public void close() throws IOException {
129 reader.close();
130 }
131
132 /**
133 * getProgress
134 *
135 * @return progress
136 * @throws IOException
137 * @throws InterruptedException
138 */
139 public float getProgress() throws
140 IOException, InterruptedException {
141 return reader.getProgress();
142 }
143
144 /**
145 * getRecordReader
146 *
147 * @return Record reader to be used for reading.
148 */
149 protected RecordReader<ImmutableBytesWritable,
150 Result> getRecordReader() {
151 return reader;
152 }
153
154 /**
155 * getContext
156 *
157 * @return Context passed to initialize.
158 */
159 protected TaskAttemptContext getContext() {
160 return context;
161 }
162
163 }
164
165 @Override
166 public List<InputSplit> getSplits(
167 JobContext context, int minSplitCountHint)
168 throws IOException, InterruptedException {
169 BASE_FORMAT.setConf(getConf());
170 return BASE_FORMAT.getSplits(context);
171 }
172 }