View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.io.hcatalog;
20  
21  import org.apache.giraph.edge.Edge;
22  import org.apache.giraph.edge.EdgeFactory;
23  import org.apache.giraph.graph.Vertex;
24  import org.apache.giraph.io.VertexInputFormat;
25  import org.apache.giraph.io.VertexReader;
26  import org.apache.giraph.utils.TimedLogger;
27  import org.apache.hadoop.io.Writable;
28  import org.apache.hadoop.io.WritableComparable;
29  import org.apache.hadoop.mapreduce.InputSplit;
30  import org.apache.hadoop.mapreduce.JobContext;
31  import org.apache.hadoop.mapreduce.RecordReader;
32  import org.apache.hadoop.mapreduce.TaskAttemptContext;
33  import org.apache.hcatalog.data.HCatRecord;
34  import org.apache.log4j.Logger;
35  
36  import com.google.common.collect.Lists;
37  
38  import java.io.IOException;
39  import java.util.List;
40  
41  /**
42   * Abstract class that users should subclass to load data from a Hive or Pig
43   * table. You can easily implement a {@link HCatalogVertexReader} by extending
44   * either {@link SingleRowHCatalogVertexReader} or
45   * {@link MultiRowHCatalogVertexReader} depending on how data for each vertex is
46   * stored in the input table.
47   * <p>
48   * The desired database and table name to load from can be specified via
49   * {@link GiraphHCatInputFormat#setVertexInput(org.apache.hadoop.mapreduce.Job,
50   * org.apache.hcatalog.mapreduce.InputJobInfo)}
51   * as you setup your vertex input format with {@link
52   * org.apache.giraph.conf.GiraphConfiguration#setVertexInputFormatClass(Class)
53   * }.
54   *
55   * @param <I> Vertex id
56   * @param <V> Vertex value
57   * @param <E> Edge value
58   */
59  
60  @SuppressWarnings("rawtypes")
61  public abstract class HCatalogVertexInputFormat<
62      I extends WritableComparable,
63      V extends Writable,
64      E extends Writable>
65      extends VertexInputFormat<I, V, E> {
66    /**
67     * HCatalog input format.
68     */
69    private GiraphHCatInputFormat hCatInputFormat = new GiraphHCatInputFormat();
70  
71    @Override
72    public final List<InputSplit> getSplits(
73        final JobContext context, final int minSplitCountHint)
74      throws IOException, InterruptedException {
75      return hCatInputFormat.getVertexSplits(context);
76    }
77  
78    /**
79     * Abstract class that users should subclass
80     * based on their specific vertex
81     * input. HCatRecord can be parsed to get the
82     * required data for implementing
83     * getCurrentVertex(). If the vertex spans more
84     * than one HCatRecord,
85     * nextVertex() should be overwritten to handle that logic as well.
86     */
87    protected abstract class HCatalogVertexReader
88        extends VertexReader<I, V, E> {
89      /** Internal HCatRecordReader. */
90      private RecordReader<WritableComparable,
91          HCatRecord> hCatRecordReader;
92      /** Context passed to initialize. */
93      private TaskAttemptContext context;
94  
95      /**
96       * Initialize with the HCatRecordReader.
97       *
98       * @param recordReader internal reader
99       */
100     private void initialize(
101         final RecordReader<
102             WritableComparable, HCatRecord>
103             recordReader) {
104       this.hCatRecordReader = recordReader;
105     }
106 
107     @Override
108     public final void initialize(
109         final InputSplit inputSplit,
110         final TaskAttemptContext ctxt)
111       throws IOException, InterruptedException {
112       hCatRecordReader.initialize(inputSplit, ctxt);
113       this.context = ctxt;
114     }
115 
116     @Override
117     public boolean nextVertex() throws IOException, InterruptedException {
118       // Users can override this if desired,
119       // and a vertex is bigger than
120       // a single row.
121       return hCatRecordReader.nextKeyValue();
122     }
123 
124     @Override
125     public final void close() throws IOException {
126       hCatRecordReader.close();
127     }
128 
129     @Override
130     public final float getProgress() throws IOException, InterruptedException {
131       return hCatRecordReader.getProgress();
132     }
133 
134     /**
135      * Get the record reader.
136      * @return Record reader to be used for reading.
137      */
138     protected final RecordReader<WritableComparable, HCatRecord>
139     getRecordReader() {
140       return hCatRecordReader;
141     }
142 
143     /**
144      * Get the context.
145      *
146      *
147      *
148      * @return Context passed to initialize.
149      */
150     protected final TaskAttemptContext getContext() {
151       return context;
152     }
153   }
154 
155   /**
156    * create vertex reader instance.
157    * @return HCatalogVertexReader
158    */
159   protected abstract HCatalogVertexReader createVertexReader();
160 
161   @Override
162   public final VertexReader<I, V, E>
163   createVertexReader(final InputSplit split,
164                      final TaskAttemptContext context)
165     throws IOException {
166     try {
167       HCatalogVertexReader reader = createVertexReader();
168       reader.initialize(hCatInputFormat.
169           createVertexRecordReader(split, context));
170       return reader;
171     } catch (InterruptedException e) {
172       throw new IllegalStateException(
173           "createVertexReader: " +
174               "Interrupted creating reader.", e);
175     }
176   }
177 
178   /**
179    * HCatalogVertexReader for tables holding
180    * complete vertex info within each
181    * row.
182    */
183   protected abstract class SingleRowHCatalogVertexReader
184       extends HCatalogVertexReader {
185     /**
186      * 1024 const.
187      */
188     private static final int BYTE_CONST = 1024;
189     /**
190      *  logger
191      */
192     private final Logger log =
193         Logger.getLogger(SingleRowHCatalogVertexReader.class);
194     /**
195      * record count.
196      */
197     private int recordCount = 0;
198     /**
199      * modulus check counter.
200      */
201     private final int recordModLimit = 1000;
202     /**
203      * Timed logger to print every 30 seconds
204      */
205     private final TimedLogger timedLogger = new TimedLogger(30 * 1000,
206         log);
207 
208     /**
209      * get vertex id.
210      * @param record hcat record
211      * @return I id
212      */
213     protected abstract I getVertexId(HCatRecord record);
214 
215     /**
216      * get vertex value.
217      * @param record hcat record
218      * @return V value
219      */
220     protected abstract V getVertexValue(HCatRecord record);
221 
222     /**
223      * get edges.
224      * @param record hcat record
225      * @return Edges
226      */
227     protected abstract Iterable<Edge<I, E>> getEdges(HCatRecord record);
228 
229     @Override
230     public final Vertex<I, V, E> getCurrentVertex()
231       throws IOException, InterruptedException {
232       HCatRecord record = getRecordReader().getCurrentValue();
233       Vertex<I, V, E> vertex = getConf().createVertex();
234       vertex.initialize(getVertexId(record), getVertexValue(record),
235           getEdges(record));
236       ++recordCount;
237       if (log.isInfoEnabled() &&
238           ((recordCount % recordModLimit) == 0)) {
239         // memory usage
240         Runtime runtime = Runtime.getRuntime();
241         double gb = BYTE_CONST *
242             BYTE_CONST *
243             BYTE_CONST;
244         timedLogger.info(
245             "read " + recordCount + " records. Memory: " +
246             (runtime.totalMemory() / gb) +
247             "GB total = " +
248             ((runtime.totalMemory() - runtime.freeMemory()) / gb) +
249             "GB used + " + (runtime.freeMemory() / gb) +
250             "GB free, " + (runtime.maxMemory() / gb) + "GB max");
251       }
252       return vertex;
253     }
254   }
255   /**
256    * HCatalogVertexReader for tables
257    * holding vertex info across multiple rows
258    * sorted by vertex id column,
259    * so that they appear consecutively to the
260    * RecordReader.
261    */
262   protected abstract class MultiRowHCatalogVertexReader extends
263       HCatalogVertexReader {
264     /**
265      * modulus check counter.
266      */
267     private static final int RECORD_MOD_LIMIT = 1000;
268     /**
269      *  logger
270      */
271     private final Logger log =
272         Logger.getLogger(MultiRowHCatalogVertexReader.class);
273     /**
274      * current vertex id.
275      */
276     private I currentVertexId = null;
277     /**
278      * current vertex edges.
279      */
280     private List<Edge<I, E>> currentEdges = Lists.newLinkedList();
281     /**
282      * record for vertex.
283      */
284     private List<HCatRecord> recordsForVertex = Lists.newArrayList();
285     /**
286      * record count.
287      */
288     private int recordCount = 0;
289     /**
290      * vertex.
291      */
292     private Vertex<I, V, E> vertex = null;
293     /**
294      * Timed logger to print every 30 seconds
295      */
296     private final TimedLogger timedLogger = new TimedLogger(30 * 1000,
297         log);
298 
299 
300     /**
301      * get vertex id from record.
302      *
303      * @param record hcat
304      * @return I vertex id
305      */
306     protected abstract I getVertexId(HCatRecord record);
307 
308     /**
309      * get vertex value from record.
310      * @param records all vertex values
311      * @return V iterable of record values
312      */
313     protected abstract V getVertexValue(
314         Iterable<HCatRecord> records);
315 
316     /**
317      * get target vertex id from record.
318      *
319      * @param record hcat
320      * @return I vertex id of target.
321      */
322     protected abstract I getTargetVertexId(HCatRecord record);
323 
324     /**
325      * get edge value from record.
326      *
327      * @param record hcat.
328      * @return E edge value.
329      */
330     protected abstract E getEdgeValue(HCatRecord record);
331 
332     @Override
333     public final Vertex<I, V, E>
334     getCurrentVertex() throws IOException, InterruptedException {
335       return vertex;
336     }
337 
338     @Override
339     public boolean nextVertex() throws IOException, InterruptedException {
340       while (getRecordReader().nextKeyValue()) {
341         HCatRecord record = getRecordReader().getCurrentValue();
342         if (currentVertexId == null) {
343           currentVertexId = getVertexId(record);
344         }
345         if (currentVertexId.equals(getVertexId(record))) {
346           currentEdges.add(EdgeFactory.create(getTargetVertexId(record),
347               getEdgeValue(record)));
348           recordsForVertex.add(record);
349         } else {
350           createCurrentVertex();
351           if (log.isInfoEnabled() && (recordCount % RECORD_MOD_LIMIT) == 0) {
352             timedLogger.info("read " + recordCount);
353           }
354           currentVertexId = getVertexId(record);
355           recordsForVertex.add(record);
356           return true;
357         }
358       }
359 
360       if (currentEdges.isEmpty()) {
361         return false;
362       } else {
363         createCurrentVertex();
364         return true;
365       }
366     }
367 
368     /**
369      * create current vertex.
370      */
371     private void createCurrentVertex() {
372       vertex = getConf().createVertex();
373       vertex.initialize(currentVertexId, getVertexValue(recordsForVertex),
374           currentEdges);
375       currentEdges.clear();
376       recordsForVertex.clear();
377       ++recordCount;
378     }
379   }
380 }