1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.giraph.io.hbase;
20
21 import java.io.IOException;
22 import org.apache.giraph.io.VertexOutputFormat;
23 import org.apache.giraph.io.VertexWriter;
24 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
25 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
26 import org.apache.hadoop.io.Writable;
27 import org.apache.hadoop.io.WritableComparable;
28 import org.apache.hadoop.mapreduce.JobContext;
29 import org.apache.hadoop.mapreduce.OutputCommitter;
30 import org.apache.hadoop.mapreduce.RecordWriter;
31 import org.apache.hadoop.mapreduce.TaskAttemptContext;
32
33 /**
34 *
35 * Base class for writing Vertex mutations back to specific
36 * rows in an HBase table. This class wraps an instance of TableOutputFormat
37 * for easy configuration with the existing properties.
38 *
39 * Setting conf.set(TableOutputFormat.OUTPUT_TABLE, "out_table");
40 * will properly delegate to the TableOutputFormat instance contained
41 * in this class. The Configurable interface prevents specific
42 * wrapper methods from having to be called.
43 *
44 * Works with {@link HBaseVertexInputFormat}
45 *
46 * @param <I> Vertex index value
47 * @param <V> Vertex value
48 * @param <E> Edge value
49 */
50 @SuppressWarnings("rawtypes")
51 public abstract class HBaseVertexOutputFormat<
52 I extends WritableComparable,
53 V extends Writable,
54 E extends Writable>
55 extends VertexOutputFormat
56 <I, V, E> {
57
58 /**
59 * delegate output format that writes to HBase
60 */
61 protected static final TableOutputFormat<ImmutableBytesWritable>
62 BASE_FORMAT = new TableOutputFormat<ImmutableBytesWritable>();
63
64 /**
65 * Constructor
66 *
67 * Simple class which takes an instance of RecordWriter
68 * over Writable objects. Subclasses are
69 * expected to implement writeVertex()
70 *
71 * @param <I> Vertex index value
72 * @param <V> Vertex value
73 * @param <E> Edge value
74 */
75 public abstract static class HBaseVertexWriter<
76 I extends WritableComparable,
77 V extends Writable,
78 E extends Writable>
79 extends VertexWriter<I, V, E> {
80
81 /** Context */
82 private TaskAttemptContext context;
83 /** Record writer instance */
84 private RecordWriter<ImmutableBytesWritable, Writable> recordWriter;
85
86 /**
87 * Sets up base table output format and creates a record writer.
88 * @param context task attempt context
89 */
90 public HBaseVertexWriter(TaskAttemptContext context)
91 throws IOException, InterruptedException {
92 BASE_FORMAT.setConf(context.getConfiguration());
93 this.recordWriter = BASE_FORMAT.getRecordWriter(context);
94 }
95
96 /**
97 * initialize
98 *
99 * @param context Context used to write the vertices.
100 * @throws IOException
101 */
102 public void initialize(TaskAttemptContext context)
103 throws IOException {
104 this.context = context;
105 }
106
107 /**
108 * close
109 *
110 * @param context the context of the task
111 * @throws IOException
112 * @throws InterruptedException
113 */
114 public void close(TaskAttemptContext context)
115 throws IOException, InterruptedException {
116 recordWriter.close(context);
117 }
118
119 /**
120 * Get the table record writer;
121 *
122 * @return Record writer to be used for writing.
123 */
124 public RecordWriter<ImmutableBytesWritable,
125 Writable> getRecordWriter() {
126 return recordWriter;
127 }
128
129 /**
130 * getContext
131 *
132 * @return Context passed to initialize.
133 */
134 public TaskAttemptContext getContext() {
135 return context;
136 }
137 }
138
139 /**
140 * checkOutputSpecs
141 *
142 * @param context information about the job
143 * @throws IOException
144 * @throws InterruptedException
145 */
146 public void checkOutputSpecs(JobContext context)
147 throws IOException, InterruptedException {
148 BASE_FORMAT.checkOutputSpecs(context);
149 }
150
151 /**
152 * getOutputCommitter
153 *
154 * @param context the task context
155 * @return OutputCommitter ouputCommitter
156 * @throws IOException
157 * @throws InterruptedException
158 */
159 public OutputCommitter getOutputCommitter(
160 TaskAttemptContext context)
161 throws IOException, InterruptedException {
162 BASE_FORMAT.setConf(getConf());
163 return BASE_FORMAT.getOutputCommitter(context);
164 }
165 }