This project has retired. For details please refer to its Attic page.
MultiInputUtils xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.io.formats.multi;
20  
21  import org.apache.giraph.io.GiraphInputFormat;
22  import org.apache.hadoop.mapreduce.InputSplit;
23  import org.apache.hadoop.mapreduce.JobContext;
24  
25  import com.google.common.collect.Lists;
26  
27  import java.io.DataInput;
28  import java.io.DataOutput;
29  import java.io.IOException;
30  import java.util.List;
31  
32  /**
33   * Utility methods used by {@link MultiVertexInputFormat} and
34   * {@link MultiEdgeInputFormat}
35   */
36  public class MultiInputUtils {
37    /** Do not instantiate */
38    private MultiInputUtils() {
39    }
40  
41    /**
42     * Get the list of input splits for all formats.
43     *
44     * @param context The job context
45     * @param minSplitCountHint Minimum number of splits to create (hint)
46     * @param inputFormats List of input formats
47     * @return The list of input splits
48     */
49    public static List<InputSplit> getSplits(JobContext context,
50        int minSplitCountHint,
51        List<? extends GiraphInputFormat> inputFormats) throws IOException,
52        InterruptedException {
53      List<InputSplit> splits = Lists.newArrayList();
54      for (int index = 0; index < inputFormats.size(); index++) {
55        List<InputSplit> inputFormatSplits =
56            inputFormats.get(index).getSplits(context, minSplitCountHint);
57        for (InputSplit split : inputFormatSplits) {
58          splits.add(new InputSplitWithInputFormatIndex(split, index));
59        }
60      }
61      return splits;
62    }
63  
64    /**
65     * Write input split info to DataOutput. Input split belongs to one of the
66     * formats.
67     *
68     * @param inputSplit InputSplit
69     * @param dataOutput DataOutput
70     * @param inputFormats List of input formats
71     */
72    public static void writeInputSplit(InputSplit inputSplit,
73        DataOutput dataOutput,
74        List<? extends GiraphInputFormat> inputFormats) throws IOException {
75      if (inputSplit instanceof InputSplitWithInputFormatIndex) {
76        InputSplitWithInputFormatIndex split =
77            (InputSplitWithInputFormatIndex) inputSplit;
78        int index = split.getInputFormatIndex();
79        dataOutput.writeInt(index);
80        inputFormats.get(index).writeInputSplit(split.getSplit(), dataOutput);
81      } else {
82        throw new IllegalStateException("writeInputSplit: Got InputSplit which " +
83            "was not created by multi input: " + inputSplit.getClass().getName());
84      }
85    }
86  
87    /**
88     * Read input split info from DataInput. Input split belongs to one of the
89     * formats.
90     *
91     * @param dataInput DataInput
92     * @param inputFormats List of input formats
93     * @return InputSplit
94     */
95    public static InputSplit readInputSplit(
96        DataInput dataInput,
97        List<? extends GiraphInputFormat> inputFormats) throws IOException,
98        ClassNotFoundException {
99      int index = dataInput.readInt();
100     InputSplit split = inputFormats.get(index).readInputSplit(dataInput);
101     return new InputSplitWithInputFormatIndex(split, index);
102   }
103 }