View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.io.hcatalog;
20  
21  import org.apache.hadoop.conf.Configuration;
22  import org.apache.hadoop.fs.FileSystem;
23  import org.apache.hadoop.fs.Path;
24  import org.apache.hadoop.io.WritableComparable;
25  import org.apache.hadoop.mapred.JobConf;
26  import org.apache.hadoop.mapreduce.InputSplit;
27  import org.apache.hadoop.mapreduce.Job;
28  import org.apache.hadoop.mapreduce.JobContext;
29  import org.apache.hadoop.mapreduce.RecordReader;
30  import org.apache.hadoop.mapreduce.TaskAttemptContext;
31  import org.apache.hadoop.util.StringUtils;
32  import org.apache.hcatalog.common.HCatConstants;
33  import org.apache.hcatalog.common.HCatUtil;
34  import org.apache.hcatalog.data.HCatRecord;
35  import org.apache.hcatalog.data.schema.HCatFieldSchema;
36  import org.apache.hcatalog.data.schema.HCatSchema;
37  import org.apache.hcatalog.mapreduce.HCatBaseInputFormat;
38  import org.apache.hcatalog.mapreduce.HCatSplit;
39  import org.apache.hcatalog.mapreduce.HCatStorageHandler;
40  import org.apache.hcatalog.mapreduce.HCatUtils;
41  import org.apache.hcatalog.mapreduce.InputJobInfo;
42  import org.apache.hcatalog.mapreduce.PartInfo;
43  
44  import java.io.IOException;
45  import java.util.ArrayList;
46  import java.util.HashMap;
47  import java.util.LinkedList;
48  import java.util.List;
49  import java.util.Map;
50  
51  /**
52   * Provides functionality similar to
53   * {@link org.apache.hcatalog.mapreduce.HCatInputFormat},
54   * but allows for different data sources (vertex and edge data).
55   */
56  public class GiraphHCatInputFormat extends HCatBaseInputFormat {
57    /** Vertex input job info for HCatalog. */
58    public static final String VERTEX_INPUT_JOB_INFO =
59        "giraph.hcat.vertex.input.job.info";
60    /** Edge input job info for HCatalog. */
61    public static final String EDGE_INPUT_JOB_INFO =
62        "giraph.hcat.edge.input.job.info";
63  
64    /**
65     * Set vertex {@link InputJobInfo}.
66     *
67     * @param job The job
68     * @param inputJobInfo Vertex input job info
69     * @throws IOException
70     */
71    public static void setVertexInput(Job job,
72                                      InputJobInfo inputJobInfo)
73      throws IOException {
74      InputJobInfo vertexInputJobInfo = InputJobInfo.create(
75          inputJobInfo.getDatabaseName(),
76          inputJobInfo.getTableName(),
77          inputJobInfo.getFilter());
78      vertexInputJobInfo.getProperties().putAll(inputJobInfo.getProperties());
79      Configuration conf = job.getConfiguration();
80      conf.set(VERTEX_INPUT_JOB_INFO, HCatUtil.serialize(
81          HCatUtils.getInputJobInfo(conf, vertexInputJobInfo)));
82    }
83  
84    /**
85     * Set edge {@link InputJobInfo}.
86     *
87     * @param job The job
88     * @param inputJobInfo Edge input job info
89     * @throws IOException
90     */
91    public static void setEdgeInput(Job job,
92                                    InputJobInfo inputJobInfo)
93      throws IOException {
94      InputJobInfo edgeInputJobInfo = InputJobInfo.create(
95          inputJobInfo.getDatabaseName(),
96          inputJobInfo.getTableName(),
97          inputJobInfo.getFilter());
98      edgeInputJobInfo.getProperties().putAll(inputJobInfo.getProperties());
99      Configuration conf = job.getConfiguration();
100     conf.set(EDGE_INPUT_JOB_INFO, HCatUtil.serialize(
101         HCatUtils.getInputJobInfo(conf, edgeInputJobInfo)));
102   }
103 
104   /**
105    * Get table schema from input job info.
106    *
107    * @param inputJobInfo Input job info
108    * @return Input table schema
109    * @throws IOException
110    */
111   private static HCatSchema getTableSchema(InputJobInfo inputJobInfo)
112     throws IOException {
113     HCatSchema allCols = new HCatSchema(new LinkedList<HCatFieldSchema>());
114     for (HCatFieldSchema field :
115         inputJobInfo.getTableInfo().getDataColumns().getFields()) {
116       allCols.append(field);
117     }
118     for (HCatFieldSchema field :
119         inputJobInfo.getTableInfo().getPartitionColumns().getFields()) {
120       allCols.append(field);
121     }
122     return allCols;
123   }
124 
125   /**
126    * Get vertex input table schema.
127    *
128    * @param conf Job configuration
129    * @return Vertex input table schema
130    * @throws IOException
131    */
132   public static HCatSchema getVertexTableSchema(Configuration conf)
133     throws IOException {
134     return getTableSchema(getVertexJobInfo(conf));
135   }
136 
137   /**
138    * Get edge input table schema.
139    *
140    * @param conf Job configuration
141    * @return Edge input table schema
142    * @throws IOException
143    */
144   public static HCatSchema getEdgeTableSchema(Configuration conf)
145     throws IOException {
146     return getTableSchema(getEdgeJobInfo(conf));
147   }
148 
149   /**
150    * Set input path for job.
151    *
152    * @param jobConf Job configuration
153    * @param location Location of input files
154    * @throws IOException
155    */
156   private void setInputPath(JobConf jobConf, String location)
157     throws IOException {
158     int length = location.length();
159     int curlyOpen = 0;
160     int pathStart = 0;
161     boolean globPattern = false;
162     List<String> pathStrings = new ArrayList<String>();
163 
164     for (int i = 0; i < length; i++) {
165       char ch = location.charAt(i);
166       switch (ch) {
167       case '{':
168         curlyOpen++;
169         if (!globPattern) {
170           globPattern = true;
171         }
172         break;
173       case '}':
174         curlyOpen--;
175         if (curlyOpen == 0 && globPattern) {
176           globPattern = false;
177         }
178         break;
179       case ',':
180         if (!globPattern) {
181           pathStrings.add(location.substring(pathStart, i));
182           pathStart = i + 1;
183         }
184         break;
185       default:
186       }
187     }
188     pathStrings.add(location.substring(pathStart, length));
189 
190     Path[] paths = StringUtils.stringToPath(pathStrings.toArray(new String[0]));
191 
192     FileSystem fs = FileSystem.get(jobConf);
193     Path path = paths[0].makeQualified(fs);
194     StringBuilder str = new StringBuilder(StringUtils.escapeString(
195         path.toString()));
196     for (int i = 1; i < paths.length; i++) {
197       str.append(StringUtils.COMMA_STR);
198       path = paths[i].makeQualified(fs);
199       str.append(StringUtils.escapeString(path.toString()));
200     }
201 
202     jobConf.set("mapred.input.dir", str.toString());
203   }
204 
205   /**
206    * Get input splits for job.
207    *
208    * @param jobContext Job context
209    * @param inputJobInfo Input job info
210    * @return MapReduce setting for file input directory
211    * @throws IOException
212    * @throws InterruptedException
213    */
214   private List<InputSplit> getSplits(JobContext jobContext,
215                                      InputJobInfo inputJobInfo)
216     throws IOException, InterruptedException {
217     Configuration conf = jobContext.getConfiguration();
218 
219     List<InputSplit> splits = new ArrayList<InputSplit>();
220     List<PartInfo> partitionInfoList = inputJobInfo.getPartitions();
221     if (partitionInfoList == null) {
222       //No partitions match the specified partition filter
223       return splits;
224     }
225 
226     HCatStorageHandler storageHandler;
227     JobConf jobConf;
228     //For each matching partition, call getSplits on the underlying InputFormat
229     for (PartInfo partitionInfo : partitionInfoList) {
230       jobConf = HCatUtil.getJobConfFromContext(jobContext);
231       setInputPath(jobConf, partitionInfo.getLocation());
232       Map<String, String> jobProperties = partitionInfo.getJobProperties();
233 
234       HCatSchema allCols = new HCatSchema(new LinkedList<HCatFieldSchema>());
235       for (HCatFieldSchema field :
236           inputJobInfo.getTableInfo().getDataColumns().getFields()) {
237         allCols.append(field);
238       }
239       for (HCatFieldSchema field :
240           inputJobInfo.getTableInfo().getPartitionColumns().getFields()) {
241         allCols.append(field);
242       }
243 
244       HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);
245 
246       storageHandler = HCatUtil.getStorageHandler(
247           jobConf, partitionInfo);
248 
249       //Get the input format
250       Class inputFormatClass = storageHandler.getInputFormatClass();
251       org.apache.hadoop.mapred.InputFormat inputFormat =
252           getMapRedInputFormat(jobConf, inputFormatClass);
253 
254       //Call getSplit on the InputFormat, create an HCatSplit for each
255       //underlying split. When the desired number of input splits is missing,
256       //use a default number (denoted by zero).
257       //TODO: Currently each partition is split independently into
258       //a desired number. However, we want the union of all partitions to be
259       //split into a desired number while maintaining balanced sizes of input
260       //splits.
261       int desiredNumSplits =
262           conf.getInt(HCatConstants.HCAT_DESIRED_PARTITION_NUM_SPLITS, 0);
263       org.apache.hadoop.mapred.InputSplit[] baseSplits =
264           inputFormat.getSplits(jobConf, desiredNumSplits);
265 
266       for (org.apache.hadoop.mapred.InputSplit split : baseSplits) {
267         splits.add(new HCatSplit(partitionInfo, split, allCols));
268       }
269     }
270 
271     return splits;
272   }
273 
274   /**
275    * Get vertex {@link InputJobInfo}.
276    *
277    * @param conf Configuration
278    * @return Vertex input job info
279    * @throws IOException
280    */
281   private static InputJobInfo getVertexJobInfo(Configuration conf)
282     throws IOException {
283     String jobString = conf.get(VERTEX_INPUT_JOB_INFO);
284     if (jobString == null) {
285       throw new IOException("Vertex job information not found in JobContext." +
286           " GiraphHCatInputFormat.setVertexInput() not called?");
287     }
288     return (InputJobInfo) HCatUtil.deserialize(jobString);
289   }
290 
291   /**
292    * Get edge {@link InputJobInfo}.
293    *
294    * @param conf Configuration
295    * @return Edge input job info
296    * @throws IOException
297    */
298   private static InputJobInfo getEdgeJobInfo(Configuration conf)
299     throws IOException {
300     String jobString = conf.get(EDGE_INPUT_JOB_INFO);
301     if (jobString == null) {
302       throw new IOException("Edge job information not found in JobContext." +
303           " GiraphHCatInputFormat.setEdgeInput() not called?");
304     }
305     return (InputJobInfo) HCatUtil.deserialize(jobString);
306   }
307 
308   /**
309    * Get vertex input splits.
310    *
311    * @param jobContext Job context
312    * @return List of vertex {@link InputSplit}s
313    * @throws IOException
314    * @throws InterruptedException
315    */
316   public List<InputSplit> getVertexSplits(JobContext jobContext)
317     throws IOException, InterruptedException {
318     return getSplits(jobContext,
319         getVertexJobInfo(jobContext.getConfiguration()));
320   }
321 
322   /**
323    * Get edge input splits.
324    *
325    * @param jobContext Job context
326    * @return List of edge {@link InputSplit}s
327    * @throws IOException
328    * @throws InterruptedException
329    */
330   public List<InputSplit> getEdgeSplits(JobContext jobContext)
331     throws IOException, InterruptedException {
332     return getSplits(jobContext,
333         getEdgeJobInfo(jobContext.getConfiguration()));
334   }
335 
336   /**
337    * Create an {@link org.apache.hcatalog.mapreduce.HCatRecordReader}.
338    *
339    * @param split Input split
340    * @param schema Table schema
341    * @param taskContext Context
342    * @return Record reader
343    * @throws IOException
344    * @throws InterruptedException
345    */
346   private RecordReader<WritableComparable, HCatRecord>
347   createRecordReader(InputSplit split,
348                      HCatSchema schema,
349                      TaskAttemptContext taskContext)
350     throws IOException, InterruptedException {
351     HCatSplit hcatSplit = HCatUtils.castToHCatSplit(split);
352     PartInfo partitionInfo = hcatSplit.getPartitionInfo();
353     JobContext jobContext = taskContext;
354     Configuration conf = jobContext.getConfiguration();
355 
356     HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(
357         conf, partitionInfo);
358 
359     JobConf jobConf = HCatUtil.getJobConfFromContext(jobContext);
360     Map<String, String> jobProperties = partitionInfo.getJobProperties();
361     HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);
362 
363     Map<String, String> valuesNotInDataCols = getColValsNotInDataColumns(
364         schema, partitionInfo);
365 
366     return HCatUtils.newHCatReader(storageHandler, valuesNotInDataCols);
367   }
368 
369   /**
370    * Create a {@link RecordReader} for vertices.
371    *
372    * @param split Input split
373    * @param taskContext Context
374    * @return Record reader
375    * @throws IOException
376    * @throws InterruptedException
377    */
378   public RecordReader<WritableComparable, HCatRecord>
379   createVertexRecordReader(InputSplit split, TaskAttemptContext taskContext)
380     throws IOException, InterruptedException {
381     return createRecordReader(split, getVertexTableSchema(
382         taskContext.getConfiguration()), taskContext);
383   }
384 
385   /**
386    * Create a {@link RecordReader} for edges.
387    *
388    * @param split Input split
389    * @param taskContext Context
390    * @return Record reader
391    * @throws IOException
392    * @throws InterruptedException
393    */
394   public RecordReader<WritableComparable, HCatRecord>
395   createEdgeRecordReader(InputSplit split, TaskAttemptContext taskContext)
396     throws IOException, InterruptedException {
397     return createRecordReader(split, getEdgeTableSchema(
398         taskContext.getConfiguration()), taskContext);
399   }
400 
401   /**
402    * Get values for fields requested by output schema which will not be in the
403    * data.
404    *
405    * @param outputSchema Output schema
406    * @param partInfo Partition info
407    * @return Values not in data columns
408    */
409   private static Map<String, String> getColValsNotInDataColumns(
410       HCatSchema outputSchema,
411       PartInfo partInfo) {
412     HCatSchema dataSchema = partInfo.getPartitionSchema();
413     Map<String, String> vals = new HashMap<String, String>();
414     for (String fieldName : outputSchema.getFieldNames()) {
415       if (dataSchema.getPosition(fieldName) == null) {
416         // this entry of output is not present in the output schema
417         // so, we first check the table schema to see if it is a part col
418         if (partInfo.getPartitionValues().containsKey(fieldName)) {
419           vals.put(fieldName, partInfo.getPartitionValues().get(fieldName));
420         } else {
421           vals.put(fieldName, null);
422         }
423       }
424     }
425     return vals;
426   }
427 }