View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.io.gora.utils;
19  
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.List;
23  
24  import org.apache.gora.mapreduce.GoraInputSplit;
25  import org.apache.gora.mapreduce.GoraMapReduceUtils;
26  import org.apache.gora.mapreduce.GoraRecordReader;
27  import org.apache.gora.persistency.Persistent;
28  import org.apache.gora.persistency.impl.PersistentBase;
29  import org.apache.gora.query.PartitionQuery;
30  import org.apache.gora.query.Query;
31  import org.apache.gora.query.impl.PartitionQueryImpl;
32  import org.apache.gora.store.DataStore;
33  import org.apache.gora.util.IOUtils;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.mapreduce.InputFormat;
36  import org.apache.hadoop.mapreduce.InputSplit;
37  import org.apache.hadoop.mapreduce.Job;
38  import org.apache.hadoop.mapreduce.JobContext;
39  import org.apache.hadoop.mapreduce.RecordReader;
40  import org.apache.hadoop.mapreduce.TaskAttemptContext;
41  
42  /**
43   * InputFormat to fetch the input from Gora data stores. The
44   * query to fetch the items from the datastore should be prepared and
45   * set via setQuery(Job, Query), before submitting the job.
46   *
47   * Hadoop jobs can be either configured through static
48   *<code>setInput()</code> methods, or from GoraMapper.
49   * @param <K> KeyClass.
50   * @param <T> PersistentClass.
51   */
52  public class ExtraGoraInputFormat<K, T extends PersistentBase>
53    extends InputFormat<K, T> {
54  
55    /**
56     * String used to map partitioned queries into configuration object.
57     */
58    public static final String QUERY_KEY = "gora.inputformat.query";
59  
60    /**
61     * Data store to be used.
62     */
63    private DataStore<K, T> dataStore;
64  
65    /**
66     * Query to be performed.
67     */
68    private Query<K, T> query;
69  
70    /**
71     * @param split InputSplit to be used.
72     * @param context JobContext to be used.
73     * @return RecordReader record reader used inside Hadoop job.
74     */
75    @Override
76    @SuppressWarnings("unchecked")
77    public RecordReader<K, T> createRecordReader(InputSplit split,
78        TaskAttemptContext context) throws IOException, InterruptedException {
79  
80      PartitionQuery<K, T> partitionQuery = (PartitionQuery<K, T>)
81          ((GoraInputSplit) split).getQuery();
82  
83      //setInputPath(partitionQuery, context);
84      return new GoraRecordReader<K, T>(partitionQuery, context);
85    }
86  
87    /**
88     * Gets splits.
89     * @param context for the job.
90     * @return splits found
91     */
92    @Override
93    public List<InputSplit> getSplits(JobContext context) throws IOException,
94        InterruptedException {
95      List<PartitionQuery<K, T>> queries =
96          getDataStore().getPartitions(getQuery());
97      List<InputSplit> splits = new ArrayList<InputSplit>(queries.size());
98      for (PartitionQuery<K, T> partQuery : queries) {
99        ((PartitionQueryImpl) partQuery).setConf(context.getConfiguration());
100       splits.add(new GoraInputSplit(context.getConfiguration(), partQuery));
101     }
102     return splits;
103   }
104 
105   /**
106    * @return the dataStore
107    */
108   public DataStore<K, T> getDataStore() {
109     return dataStore;
110   }
111 
112   /**
113    * @param datStore the dataStore to set
114    */
115   public void setDataStore(DataStore<K, T> datStore) {
116     this.dataStore = datStore;
117   }
118 
119   /**
120    * @return the query
121    */
122   public Query<K, T> getQuery() {
123     return query;
124   }
125 
126   /**
127    * @param query the query to set
128    */
129   public void setQuery(Query<K, T> query) {
130     this.query = query;
131   }
132 
133   /**
134    * Sets the partitioned query inside the job object.
135    * @param conf Configuration used.
136    * @param query Query to be executed.
137    * @param <K> Key class
138    * @param <T> Persistent class
139    * @throws IOException Exception that be might thrown.
140    */
141   public static <K, T extends Persistent> void setQuery(Configuration conf,
142       Query<K, T> query) throws IOException {
143     IOUtils.storeToConf(query, conf, QUERY_KEY);
144   }
145 
146   /**
147    * Gets the partitioned query from the conf object passed.
148    * @param conf Configuration object.
149    * @return passed inside the configuration object
150    * @throws IOException Exception that might be thrown.
151    */
152   public Query<K, T> getQuery(Configuration conf) throws IOException {
153     return IOUtils.loadFromConf(conf, QUERY_KEY);
154   }
155 
156   /**
157    * Sets the input parameters for the job
158    * @param job the job to set the properties for
159    * @param query the query to get the inputs from
160    * @param dataStore the datastore as the input
161    * @param reuseObjects whether to reuse objects in serialization
162    * @param <K> Key class
163    * @param <V> Persistent class
164    * @throws IOException
165    */
166   public static <K, V extends Persistent> void setInput(Job job,
167   Query<K, V> query, DataStore<K, V> dataStore, boolean reuseObjects)
168     throws IOException {
169 
170     Configuration conf = job.getConfiguration();
171 
172     GoraMapReduceUtils.setIOSerializations(conf, reuseObjects);
173 
174     job.setInputFormatClass(ExtraGoraInputFormat.class);
175     ExtraGoraInputFormat.setQuery(job.getConfiguration(), query);
176   }
177 }