View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.io.hcatalog;
20  
21  import org.apache.giraph.io.VertexValueInputFormat;
22  import org.apache.giraph.io.VertexValueReader;
23  import org.apache.hadoop.io.Writable;
24  import org.apache.hadoop.io.WritableComparable;
25  import org.apache.hadoop.mapreduce.InputSplit;
26  import org.apache.hadoop.mapreduce.JobContext;
27  import org.apache.hadoop.mapreduce.RecordReader;
28  import org.apache.hadoop.mapreduce.TaskAttemptContext;
29  import org.apache.hcatalog.data.HCatRecord;
30  
31  import java.io.IOException;
32  import java.util.List;
33  
34  /**
35   * HCatalog {@link VertexValueInputFormat} for reading vertex values from
36   * Hive/Pig.
37   *
38   * @param <I> Vertex id
39   * @param <V> Vertex value
40   */
41  public abstract class HCatalogVertexValueInputFormat<I extends
42      WritableComparable, V extends Writable>
43      extends VertexValueInputFormat<I, V> {
44    /**
45     * HCatalog input format.
46     */
47    private GiraphHCatInputFormat hCatInputFormat = new GiraphHCatInputFormat();
48  
49    @Override
50    public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
51      throws IOException, InterruptedException {
52      return hCatInputFormat.getVertexSplits(context);
53    }
54  
55    /**
56     * {@link VertexValueReader} for {@link HCatalogVertexValueInputFormat}.
57     */
58    protected abstract class HCatalogVertexValueReader
59        extends VertexValueReader<I, V> {
60      /** Internal {@link RecordReader}. */
61      private RecordReader<WritableComparable, HCatRecord> hCatRecordReader;
62      /** Context passed to initialize. */
63      private TaskAttemptContext context;
64  
65      @Override
66      public final void initialize(InputSplit inputSplit,
67                                   TaskAttemptContext context)
68        throws IOException, InterruptedException {
69        super.initialize(inputSplit, context);
70        hCatRecordReader =
71            hCatInputFormat.createVertexRecordReader(inputSplit, context);
72        hCatRecordReader.initialize(inputSplit, context);
73        this.context = context;
74      }
75  
76      @Override
77      public boolean nextVertex() throws IOException, InterruptedException {
78        return hCatRecordReader.nextKeyValue();
79      }
80  
81      @Override
82      public final void close() throws IOException {
83        hCatRecordReader.close();
84      }
85  
86      @Override
87      public final float getProgress() throws IOException, InterruptedException {
88        return hCatRecordReader.getProgress();
89      }
90  
91      /**
92       * Get the record reader.
93       *
94       * @return Record reader to be used for reading.
95       */
96      protected final RecordReader<WritableComparable, HCatRecord>
97      getRecordReader() {
98        return hCatRecordReader;
99      }
100 
101     /**
102      * Get the context.
103      *
104      * @return Context passed to initialize.
105      */
106     protected final TaskAttemptContext getContext() {
107       return context;
108     }
109   }
110 
111   /**
112    * Create {@link VertexValueReader}.
113 
114    * @return {@link HCatalogVertexValueReader} instance.
115    */
116   protected abstract HCatalogVertexValueReader createVertexValueReader();
117 
118   @Override
119   public final VertexValueReader<I, V>
120   createVertexValueReader(InputSplit split, TaskAttemptContext context)
121     throws IOException {
122     try {
123       HCatalogVertexValueReader reader = createVertexValueReader();
124       reader.initialize(split, context);
125       return reader;
126     } catch (InterruptedException e) {
127       throw new IllegalStateException(
128           "createVertexValueReader: Interrupted creating reader.", e);
129     }
130   }
131 
132   /**
133    * {@link HCatalogVertexValueReader} for tables holding a complete vertex
134    * value in each row.
135    */
136   protected abstract class SingleRowHCatalogVertexValueReader
137       extends HCatalogVertexValueReader {
138     /**
139      * Get vertex id from a record.
140      *
141      * @param record Input record
142      * @return I Vertex id
143      */
144     protected abstract I getVertexId(HCatRecord record);
145 
146     /**
147      * Get vertex value from a record.
148      *
149      * @param record Input record
150      * @return V Vertex value
151      */
152     protected abstract V getVertexValue(HCatRecord record);
153 
154     @Override
155     public final I getCurrentVertexId() throws IOException,
156         InterruptedException {
157       return getVertexId(getRecordReader().getCurrentValue());
158     }
159 
160     @Override
161     public final V getCurrentVertexValue() throws IOException,
162         InterruptedException {
163       return getVertexValue(getRecordReader().getCurrentValue());
164     }
165   }
166 }