1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.io.gora.utils;
1920import java.io.IOException;
21import java.util.ArrayList;
22import java.util.List;
2324import org.apache.gora.mapreduce.GoraInputSplit;
25import org.apache.gora.mapreduce.GoraMapReduceUtils;
26import org.apache.gora.mapreduce.GoraRecordReader;
27import org.apache.gora.persistency.Persistent;
28import org.apache.gora.persistency.impl.PersistentBase;
29import org.apache.gora.query.PartitionQuery;
30import org.apache.gora.query.Query;
31import org.apache.gora.query.impl.PartitionQueryImpl;
32import org.apache.gora.store.DataStore;
33import org.apache.gora.util.IOUtils;
34import org.apache.hadoop.conf.Configuration;
35import org.apache.hadoop.mapreduce.InputFormat;
36import org.apache.hadoop.mapreduce.InputSplit;
37import org.apache.hadoop.mapreduce.Job;
38import org.apache.hadoop.mapreduce.JobContext;
39import org.apache.hadoop.mapreduce.RecordReader;
40import org.apache.hadoop.mapreduce.TaskAttemptContext;
4142/**43 * InputFormat to fetch the input from Gora data stores. The44 * query to fetch the items from the datastore should be prepared and45 * set via setQuery(Job, Query), before submitting the job.46 *47 * Hadoop jobs can be either configured through static48 *<code>setInput()</code> methods, or from GoraMapper.49 * @param <K> KeyClass.50 * @param <T> PersistentClass.51 */52publicclass ExtraGoraInputFormat<K, T extends PersistentBase>
53extends InputFormat<K, T> {
5455/**56 * String used to map partitioned queries into configuration object.57 */58publicstaticfinal String QUERY_KEY = "gora.inputformat.query";
5960/**61 * Data store to be used.62 */63private DataStore<K, T> dataStore;
6465/**66 * Query to be performed.67 */68private Query<K, T> query;
6970/**71 * @param split InputSplit to be used.72 * @param context JobContext to be used.73 * @return RecordReader record reader used inside Hadoop job.74 */75 @Override
76 @SuppressWarnings("unchecked")
77public RecordReader<K, T> createRecordReader(InputSplit split,
78 TaskAttemptContext context) throws IOException, InterruptedException {
7980 PartitionQuery<K, T> partitionQuery = (PartitionQuery<K, T>)
81 ((GoraInputSplit) split).getQuery();
8283//setInputPath(partitionQuery, context);84returnnew GoraRecordReader<K, T>(partitionQuery, context);
85 }
8687/**88 * Gets splits.89 * @param context for the job.90 * @return splits found91 */92 @Override
93public List<InputSplit> getSplits(JobContext context) throws IOException,
94 InterruptedException {
95 List<PartitionQuery<K, T>> queries =
96 getDataStore().getPartitions(getQuery());
97 List<InputSplit> splits = new ArrayList<InputSplit>(queries.size());
98for (PartitionQuery<K, T> partQuery : queries) {
99 ((PartitionQueryImpl) partQuery).setConf(context.getConfiguration());
100 splits.add(new GoraInputSplit(context.getConfiguration(), partQuery));
101 }
102return splits;
103 }
104105/**106 * @return the dataStore107 */108public DataStore<K, T> getDataStore() {
109return dataStore;
110 }
111112/**113 * @param datStore the dataStore to set114 */115publicvoid setDataStore(DataStore<K, T> datStore) {
116this.dataStore = datStore;
117 }
118119/**120 * @return the query121 */122public Query<K, T> getQuery() {
123return query;
124 }
125126/**127 * @param query the query to set128 */129publicvoid setQuery(Query<K, T> query) {
130this.query = query;
131 }
132133/**134 * Sets the partitioned query inside the job object.135 * @param conf Configuration used.136 * @param query Query to be executed.137 * @param <K> Key class138 * @param <T> Persistent class139 * @throws IOException Exception that be might thrown.140 */141publicstatic <K, T extends Persistent> void setQuery(Configuration conf,
142 Query<K, T> query) throws IOException {
143 IOUtils.storeToConf(query, conf, QUERY_KEY);
144 }
145146/**147 * Gets the partitioned query from the conf object passed.148 * @param conf Configuration object.149 * @return passed inside the configuration object150 * @throws IOException Exception that might be thrown.151 */152public Query<K, T> getQuery(Configuration conf) throws IOException {
153return IOUtils.loadFromConf(conf, QUERY_KEY);
154 }
155156/**157 * Sets the input parameters for the job158 * @param job the job to set the properties for159 * @param query the query to get the inputs from160 * @param dataStore the datastore as the input161 * @param reuseObjects whether to reuse objects in serialization162 * @param <K> Key class163 * @param <V> Persistent class164 * @throws IOException165 */166publicstatic <K, V extends Persistent> void setInput(Job job,
167 Query<K, V> query, DataStore<K, V> dataStore, boolean reuseObjects)
168throws IOException {
169170 Configuration conf = job.getConfiguration();
171172 GoraMapReduceUtils.setIOSerializations(conf, reuseObjects);
173174 job.setInputFormatClass(ExtraGoraInputFormat.class);
175 ExtraGoraInputFormat.setQuery(job.getConfiguration(), query);
176 }
177 }