View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.io.hcatalog;
20  
21  import org.apache.giraph.graph.Vertex;
22  import org.apache.giraph.io.VertexOutputFormat;
23  import org.apache.giraph.io.VertexWriter;
24  import org.apache.hadoop.io.Writable;
25  import org.apache.hadoop.io.WritableComparable;
26  import org.apache.hadoop.mapreduce.JobContext;
27  import org.apache.hadoop.mapreduce.OutputCommitter;
28  import org.apache.hadoop.mapreduce.RecordWriter;
29  import org.apache.hadoop.mapreduce.TaskAttemptContext;
30  import org.apache.hcatalog.data.DefaultHCatRecord;
31  import org.apache.hcatalog.data.HCatRecord;
32  import org.apache.hcatalog.mapreduce.HCatOutputFormat;
33  
34  import java.io.IOException;
35  
36  /**
37   * Abstract class that users should subclass to store data to Hive or Pig table.
38   * You can easily implement a {@link HCatalogVertexWriter} by extending
39   * {@link SingleRowHCatalogVertexWriter} or {@link MultiRowHCatalogVertexWriter}
40   * depending on how you want to fit your vertices into the output table.
41   * <p>
42   * The desired database and table name to store to can be specified via
43   * {@link HCatOutputFormat#setOutput(org.apache.hadoop.mapreduce.Job,
44   * org.apache.hcatalog.mapreduce.OutputJobInfo)}
45   * as you setup your vertex output format with
46   * {@link org.apache.giraph.conf.GiraphConfiguration}
47   * setVertexOutputFormatClass(Class)}. You must create the output table.
48   *
49   * @param <I> Vertex id
50   * @param <V> Vertex value
51   * @param <E> Edge value
52   */
53  @SuppressWarnings("rawtypes")
54  public abstract class HCatalogVertexOutputFormat<
55          I extends WritableComparable,
56          V extends Writable,
57          E extends Writable>
58          extends VertexOutputFormat<I, V, E> {
59    /**
60    * hcat output format
61    */
62    protected HCatOutputFormat hCatOutputFormat = new HCatOutputFormat();
63  
64    @Override
65    public final void checkOutputSpecs(JobContext context) throws IOException,
66        InterruptedException {
67      hCatOutputFormat.checkOutputSpecs(context);
68    }
69  
70    @Override
71    public final OutputCommitter getOutputCommitter(TaskAttemptContext context)
72      throws IOException, InterruptedException {
73      return hCatOutputFormat.getOutputCommitter(context);
74    }
75  
76    /**
77     * Abstract class that users should
78     * subclass based on their specific vertex
79     * output. Users should implement
80     * writeVertex to create a HCatRecord that is
81     * valid to for writing by HCatalogRecordWriter.
82     *
83     * @param <I> Vertex id
84     * @param <V> Vertex value
85     * @param <E> Edge value
86    */
87    protected abstract static class HCatalogVertexWriter<
88        I extends WritableComparable,
89        V extends Writable,
90        E extends Writable>
91        extends VertexWriter<I, V, E> {
92  
93      /** Internal HCatRecordWriter */
94      private RecordWriter<WritableComparable<?>, HCatRecord> hCatRecordWriter;
95      /** Context passed to initialize */
96      private TaskAttemptContext context;
97  
98      /**
99      * Initialize with the HCatRecordWriter
100     * @param hCatRecordWriter
101     *            Internal writer
102     */
103     protected void initialize(
104                     RecordWriter<WritableComparable<?>,
105                     HCatRecord> hCatRecordWriter) {
106       this.hCatRecordWriter = hCatRecordWriter;
107     }
108 
109     /**
110     * Get the record reader.
111     * @return Record reader to be used for reading.
112     */
113     protected RecordWriter<WritableComparable<?>,
114             HCatRecord> getRecordWriter() {
115       return hCatRecordWriter;
116     }
117 
118     /**
119     * Get the context.
120     *
121     * @return Context passed to initialize.
122     */
123     protected TaskAttemptContext getContext() {
124       return context;
125     }
126 
127     @Override
128     public void initialize(TaskAttemptContext context) throws IOException {
129       this.context = context;
130     }
131 
132     @Override
133     public void close(TaskAttemptContext context) throws IOException,
134         InterruptedException {
135       hCatRecordWriter.close(context);
136     }
137 
138   }
139 
140   /**
141   * create vertex writer.
142   * @return HCatalogVertexWriter
143   */
144   protected abstract HCatalogVertexWriter<I, V, E> createVertexWriter();
145 
146   @Override
147   public final VertexWriter<I, V, E> createVertexWriter(
148     TaskAttemptContext context) throws IOException,
149     InterruptedException {
150     HCatalogVertexWriter<I, V, E>  writer = createVertexWriter();
151     writer.initialize(hCatOutputFormat.getRecordWriter(context));
152     return writer;
153   }
154 
155   /**
156    * HCatalogVertexWriter to write each vertex in each row.
157    *
158    * @param <I> Vertex id
159    * @param <V> Vertex value
160    * @param <E> Edge value
161    */
162   protected abstract static class SingleRowHCatalogVertexWriter<
163       I extends WritableComparable,
164       V extends Writable,
165       E extends Writable>
166       extends HCatalogVertexWriter<I, V, E> {
167     /**
168     * get num columns
169     * @return intcolumns
170     */
171     protected abstract int getNumColumns();
172 
173     /**
174     * fill record
175     * @param record to fill
176     * @param vertex to populate record
177     */
178     protected abstract void fillRecord(HCatRecord record,
179         Vertex<I, V, E> vertex);
180 
181     /**
182     * create record
183     * @param vertex to populate record
184     * @return HCatRecord newly created
185     */
186     protected HCatRecord createRecord(Vertex<I, V, E> vertex) {
187       HCatRecord record = new DefaultHCatRecord(getNumColumns());
188       fillRecord(record, vertex);
189       return record;
190     }
191 
192     @Override
193     public final void writeVertex(Vertex<I, V, E> vertex) throws IOException,
194         InterruptedException {
195       getRecordWriter().write(null, createRecord(vertex));
196     }
197 
198   }
199 
200   /**
201    * HCatalogVertexWriter to write each vertex in multiple rows.
202    *
203    * @param <I> Vertex id
204    * @param <V> Vertex value
205    * @param <E> Edge value
206    */
207   public abstract static class MultiRowHCatalogVertexWriter<
208       I extends WritableComparable,
209       V extends Writable,
210       E extends Writable>
211       extends HCatalogVertexWriter<I, V, E> {
212     /**
213     * create records
214     * @param vertex to populate records
215     * @return Iterable of records
216     */
217     protected abstract Iterable<HCatRecord> createRecords(
218         Vertex<I, V, E> vertex);
219 
220     @Override
221     public final void writeVertex(Vertex<I, V, E> vertex) throws IOException,
222         InterruptedException {
223       Iterable<HCatRecord> records = createRecords(vertex);
224       for (HCatRecord record : records) {
225         getRecordWriter().write(null, record);
226       }
227     }
228   }
229 }