View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.io.hbase;
20  
21  import java.io.IOException;
22  import org.apache.giraph.io.VertexOutputFormat;
23  import org.apache.giraph.io.VertexWriter;
24  import org.apache.hadoop.hbase.client.Mutation;
25  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
26  import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
27  import org.apache.hadoop.io.Writable;
28  import org.apache.hadoop.io.WritableComparable;
29  import org.apache.hadoop.mapreduce.JobContext;
30  import org.apache.hadoop.mapreduce.OutputCommitter;
31  import org.apache.hadoop.mapreduce.RecordWriter;
32  import org.apache.hadoop.mapreduce.TaskAttemptContext;
33  
34  /**
35   *
36   * Base class for writing Vertex mutations back to specific
37   * rows in an HBase table. This class wraps an instance of TableOutputFormat
38   * for easy configuration with the existing properties.
39   *
40   * Setting conf.set(TableOutputFormat.OUTPUT_TABLE, "out_table");
41   * will properly delegate to the TableOutputFormat instance contained
42   * in this class. The Configurable interface prevents specific
43   * wrapper methods from having to be called.
44   *
45   * Works with {@link HBaseVertexInputFormat}
46   *
47   * @param <I> Vertex index value
48   * @param <V> Vertex value
49   * @param <E> Edge value
50   */
51  @SuppressWarnings("rawtypes")
52  public abstract class HBaseVertexOutputFormat<
53          I extends WritableComparable,
54          V extends Writable,
55          E extends Writable>
56          extends VertexOutputFormat
57                  <I, V, E> {
58  
59    /**
60     * delegate output format that writes to HBase
61     */
62    protected static final TableOutputFormat<ImmutableBytesWritable>
63    BASE_FORMAT = new TableOutputFormat<ImmutableBytesWritable>();
64  
65    /**
66     *   Constructor
67     *
68     *   Simple class which takes an instance of RecordWriter
69     *   over Writable objects. Subclasses are
70     *   expected to implement writeVertex()
71     *
72     * @param <I> Vertex index value
73     * @param <V> Vertex value
74     * @param <E> Edge value
75     */
76    public abstract static class HBaseVertexWriter<
77        I extends WritableComparable,
78        V extends Writable,
79        E extends Writable>
80        extends VertexWriter<I, V, E> {
81  
82      /** Context */
83      private TaskAttemptContext context;
84      /** Record writer instance */
85      private RecordWriter<ImmutableBytesWritable, Mutation> recordWriter;
86  
87     /**
88      * Sets up base table output format and creates a record writer.
89      * @param context task attempt context
90      */
91      public HBaseVertexWriter(TaskAttemptContext context)
92        throws IOException, InterruptedException {
93        BASE_FORMAT.setConf(context.getConfiguration());
94        this.recordWriter = BASE_FORMAT.getRecordWriter(context);
95      }
96  
97      /**
98       * initialize
99       *
100      * @param context Context used to write the vertices.
101      * @throws IOException
102      */
103     public void initialize(TaskAttemptContext context)
104       throws IOException {
105       this.context = context;
106     }
107 
108     /**
109      * close
110      *
111      * @param context the context of the task
112      * @throws IOException
113      * @throws InterruptedException
114      */
115     public void close(TaskAttemptContext context)
116       throws IOException, InterruptedException {
117       recordWriter.close(context);
118     }
119 
120     /**
121      * Get the table record writer;
122      *
123      * @return Record writer to be used for writing.
124      */
125     public RecordWriter<ImmutableBytesWritable,
126         Mutation> getRecordWriter() {
127       return recordWriter;
128     }
129 
130     /**
131      * getContext
132      *
133      * @return Context passed to initialize.
134      */
135     public TaskAttemptContext getContext() {
136       return context;
137     }
138   }
139 
140   /**
141    * checkOutputSpecs
142    *
143    * @param context information about the job
144    * @throws IOException
145    * @throws InterruptedException
146    */
147   public void checkOutputSpecs(JobContext context)
148     throws IOException, InterruptedException {
149     BASE_FORMAT.checkOutputSpecs(context);
150   }
151 
152   /**
153    * getOutputCommitter
154    *
155    * @param context the task context
156    * @return  OutputCommitter ouputCommitter
157    * @throws IOException
158    * @throws InterruptedException
159    */
160   public OutputCommitter getOutputCommitter(
161     TaskAttemptContext context)
162     throws IOException, InterruptedException {
163     BASE_FORMAT.setConf(getConf());
164     return BASE_FORMAT.getOutputCommitter(context);
165   }
166 }