View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.io.hcatalog;
20  
21  import java.io.IOException;
22  import java.util.List;
23  import org.apache.giraph.edge.Edge;
24  import org.apache.giraph.edge.EdgeFactory;
25  import org.apache.giraph.io.EdgeInputFormat;
26  import org.apache.giraph.io.EdgeReader;
27  import org.apache.hadoop.io.NullWritable;
28  import org.apache.hadoop.io.Writable;
29  import org.apache.hadoop.io.WritableComparable;
30  import org.apache.hadoop.mapreduce.InputSplit;
31  import org.apache.hadoop.mapreduce.JobContext;
32  import org.apache.hadoop.mapreduce.RecordReader;
33  import org.apache.hadoop.mapreduce.TaskAttemptContext;
34  import org.apache.hcatalog.data.HCatRecord;
35  
36  /**
37   * HCatalog {@link EdgeInputFormat} for reading edges from Hive/Pig.
38   *
39   * @param <I> Vertex id
40   * @param <E> Edge value
41   */
42  public abstract class HCatalogEdgeInputFormat<
43      I extends WritableComparable,
44      E extends Writable>
45      extends EdgeInputFormat<I, E> {
46    /**
47     * HCatalog input format.
48     */
49    private GiraphHCatInputFormat hCatInputFormat = new GiraphHCatInputFormat();
50  
51    @Override
52    public final List<InputSplit> getSplits(JobContext context,
53                                            int minSplitCountHint)
54      throws IOException, InterruptedException {
55      return hCatInputFormat.getEdgeSplits(context);
56    }
57  
58    /**
59     * Get underlying HCatalog input format. Used for creating readers.
60     *
61     * @return GiraphHCatInputFormat stored.
62     */
63    protected GiraphHCatInputFormat getHCatInputFormat() {
64      return hCatInputFormat;
65    }
66  
67    /**
68     * {@link EdgeReader} for {@link HCatalogEdgeInputFormat}.
69     */
70    protected abstract static class HCatalogEdgeReader<
71        I extends WritableComparable, E extends Writable>
72        extends EdgeReader<I, E> {
73      /** HCatalog input format to use */
74      private final GiraphHCatInputFormat hCatInputFormat;
75      /** Internal {@link RecordReader}. */
76      private RecordReader<WritableComparable, HCatRecord> hCatRecordReader;
77      /** Context passed to initialize. */
78      private TaskAttemptContext context;
79  
80      /**
81       * Constructor taking hcat input format to use.
82       *
83       * @param hCatInputFormat HCatalog input format
84       */
85      public HCatalogEdgeReader(GiraphHCatInputFormat hCatInputFormat) {
86        this.hCatInputFormat = hCatInputFormat;
87      }
88  
89      @Override
90      public final void initialize(InputSplit inputSplit,
91                                   TaskAttemptContext context)
92        throws IOException, InterruptedException {
93        hCatRecordReader =
94            hCatInputFormat.createEdgeRecordReader(inputSplit, context);
95        hCatRecordReader.initialize(inputSplit, context);
96        this.context = context;
97      }
98  
99      @Override
100     public boolean nextEdge() throws IOException, InterruptedException {
101       return hCatRecordReader.nextKeyValue();
102     }
103 
104     @Override
105     public final void close() throws IOException {
106       hCatRecordReader.close();
107     }
108 
109     @Override
110     public final float getProgress() throws IOException, InterruptedException {
111       return hCatRecordReader.getProgress();
112     }
113 
114     /**
115      * Get the record reader.
116      *
117      * @return Record reader to be used for reading.
118      */
119     protected final RecordReader<WritableComparable, HCatRecord>
120     getRecordReader() {
121       return hCatRecordReader;
122     }
123 
124     /**
125      * Get the context.
126      *
127      * @return Context passed to initialize.
128      */
129     protected final TaskAttemptContext getContext() {
130       return context;
131     }
132   }
133 
134   /**
135    * Create {@link EdgeReader}.
136    *
137    * @return {@link HCatalogEdgeReader} instance.
138    */
139   protected abstract HCatalogEdgeReader<I, E> createEdgeReader();
140 
141   @Override
142   public EdgeReader<I, E>
143   createEdgeReader(InputSplit split, TaskAttemptContext context)
144     throws IOException {
145     try {
146       HCatalogEdgeReader reader = createEdgeReader();
147       reader.initialize(split, context);
148       return reader;
149     } catch (InterruptedException e) {
150       throw new IllegalStateException(
151           "createEdgeReader: Interrupted creating reader.", e);
152     }
153   }
154 
155   /**
156    * {@link HCatalogEdgeReader} for tables holding a complete edge
157    * in each row.
158    */
159   protected abstract static class SingleRowHCatalogEdgeReader<
160       I extends WritableComparable, E extends Writable>
161       extends HCatalogEdgeReader<I, E> {
162     /**
163      * Constructor
164      * @param hCatInputFormat giraph input format to use
165      */
166     public SingleRowHCatalogEdgeReader(GiraphHCatInputFormat hCatInputFormat) {
167       super(hCatInputFormat);
168     }
169 
170     /**
171      * Get source vertex id from a record.
172      *
173      * @param record Input record
174      * @return I Source vertex id
175      */
176     protected abstract I getSourceVertexId(HCatRecord record);
177 
178     /**
179      * Get target vertex id from a record.
180      *
181      * @param record Input record
182      * @return I Target vertex id
183      */
184     protected abstract I getTargetVertexId(HCatRecord record);
185 
186     /**
187      * Get edge value from a record.
188      *
189      * @param record Input record
190      * @return E Edge value
191      */
192     protected abstract E getEdgeValue(HCatRecord record);
193 
194     @Override
195     public I getCurrentSourceId() throws IOException, InterruptedException {
196       HCatRecord record = getRecordReader().getCurrentValue();
197       return getSourceVertexId(record);
198     }
199 
200     @Override
201     public Edge<I, E> getCurrentEdge() throws IOException,
202         InterruptedException {
203       HCatRecord record = getRecordReader().getCurrentValue();
204       return EdgeFactory.create(getTargetVertexId(record),
205           getEdgeValue(record));
206     }
207   }
208 
209   /**
210    * {@link HCatalogEdgeReader} for tables holding a complete edge
211    * in each row where the edges contain no data other than IDs they point to.
212    */
213   protected abstract static class SingleRowHCatalogEdgeNoValueReader<
214       I extends WritableComparable>
215       extends HCatalogEdgeReader<I, NullWritable> {
216     /**
217      * Constructor
218      * @param hCatInputFormat giraph input format to use
219      */
220     public SingleRowHCatalogEdgeNoValueReader(
221         GiraphHCatInputFormat hCatInputFormat) {
222       super(hCatInputFormat);
223     }
224 
225     /**
226      * Get source vertex id from a record.
227      *
228      * @param record Input record
229      * @return I Source vertex id
230      */
231     protected abstract I getSourceVertexId(HCatRecord record);
232 
233     /**
234      * Get target vertex id from a record.
235      *
236      * @param record Input record
237      * @return I Target vertex id
238      */
239     protected abstract I getTargetVertexId(HCatRecord record);
240 
241     @Override
242     public I getCurrentSourceId() throws IOException, InterruptedException {
243       HCatRecord record = getRecordReader().getCurrentValue();
244       return getSourceVertexId(record);
245     }
246 
247     @Override
248     public Edge<I, NullWritable> getCurrentEdge() throws IOException,
249         InterruptedException {
250       HCatRecord record = getRecordReader().getCurrentValue();
251       return EdgeFactory.create(getTargetVertexId(record));
252     }
253   }
254 }