View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.io.accumulo;
19  
20  import java.io.IOException;
21  import java.util.List;
22  import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
23  import org.apache.accumulo.core.data.Key;
24  import org.apache.accumulo.core.data.Value;
25  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
26  import org.apache.giraph.io.VertexInputFormat;
27  import org.apache.giraph.io.VertexReader;
28  import org.apache.hadoop.io.Writable;
29  import org.apache.hadoop.io.WritableComparable;
30  import org.apache.hadoop.mapreduce.InputSplit;
31  import org.apache.hadoop.mapreduce.JobContext;
32  import org.apache.hadoop.mapreduce.RecordReader;
33  import org.apache.hadoop.mapreduce.TaskAttemptContext;
34  
35  /**
36   *  Class which wraps the AccumuloInputFormat. It's designed
37   *  as an extension point to VertexInputFormat subclasses who wish
38   *  to read from AccumuloTables.
39   *
40   *  Works with
41   *  {@link AccumuloVertexOutputFormat}
42   *
43   * @param <I> vertex id type
44   * @param <V>  vertex value type
45   * @param <E>  edge type
46   */
47  public abstract class AccumuloVertexInputFormat<
48          I extends WritableComparable,
49          V extends Writable,
50          E extends Writable>
51          extends VertexInputFormat<I, V, E> {
52    /**
53     * delegate input format for all accumulo operations.
54     */
55    protected AccumuloInputFormat accumuloInputFormat =
56        new AccumuloInputFormat();
57  
58    /**
59    * Abstract class which provides a template for instantiating vertices
60    * from Accumulo Key/Value pairs.
61    *
62    * @param <I>  vertex id type
63    * @param <V>  vertex value type
64    * @param <E>  edge type
65    */
66    public abstract static class AccumuloVertexReader<
67        I extends WritableComparable,
68        V extends Writable, E extends Writable>
69        extends VertexReader<I, V, E> {
70  
71      /** Giraph configuration */
72      private ImmutableClassesGiraphConfiguration<I, V, E>
73      configuration;
74      /**
75       * Used by subclasses to read key/value pairs.
76       */
77      private final RecordReader<Key, Value> reader;
78      /** Context passed to initialize */
79      private TaskAttemptContext context;
80  
81      /**
82       * Constructor used to pass Record Reader instance
83       * @param reader  Accumulo record reader
84       */
85      public AccumuloVertexReader(RecordReader<Key, Value> reader) {
86        this.reader = reader;
87      }
88  
89      public ImmutableClassesGiraphConfiguration<I, V, E>
90      getConfiguration() {
91        return configuration;
92      }
93  
94      /**
95       * initialize the reader.
96       *
97       * @param inputSplit Input split to be used for reading vertices.
98       * @param context Context from the task.
99       * @throws IOException
100      * @throws InterruptedException
101      */
102     public void initialize(InputSplit inputSplit,
103                            TaskAttemptContext context)
104       throws IOException, InterruptedException {
105       reader.initialize(inputSplit, context);
106       this.context = context;
107       this.configuration =
108           new ImmutableClassesGiraphConfiguration<I, V, E>(
109               context.getConfiguration());
110     }
111 
112     /**
113      * close
114      *
115      * @throws IOException
116      */
117     public void close() throws IOException {
118       reader.close();
119     }
120 
121     /**
122      * getProgress
123      *
124      * @return progress
125      * @throws IOException
126      * @throws InterruptedException
127      */
128     public float getProgress() throws IOException, InterruptedException {
129       return reader.getProgress();
130     }
131 
132     /**
133     * Get the result record reader
134     *
135     * @return Record reader to be used for reading.
136     */
137     protected RecordReader<Key, Value> getRecordReader() {
138       return reader;
139     }
140 
141     /**
142      * getContext
143      *
144      * @return Context passed to initialize.
145      */
146     protected TaskAttemptContext getContext() {
147       return context;
148     }
149 
150   }
151 
152   @Override
153   public List<InputSplit> getSplits(
154     JobContext context, int minSplitCountHint)
155     throws IOException, InterruptedException {
156     List<InputSplit> splits = null;
157     try {
158       splits = accumuloInputFormat.getSplits(context);
159     } catch (IOException e) {
160       if (e.getMessage().contains("Input info has not been set")) {
161         throw new IOException(e.getMessage() +
162                 " Make sure you initialized" +
163                 " AccumuloInputFormat static setters " +
164                 "before passing the config to GiraphJob.");
165       }
166     }
167     return splits;
168   }
169 }