1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.io.formats.multi;
2021import org.apache.giraph.io.GiraphInputFormat;
22import org.apache.hadoop.mapreduce.InputSplit;
23import org.apache.hadoop.mapreduce.JobContext;
2425import com.google.common.collect.Lists;
2627import java.io.DataInput;
28import java.io.DataOutput;
29import java.io.IOException;
30import java.util.List;
3132/**33 * Utility methods used by {@link MultiVertexInputFormat} and34 * {@link MultiEdgeInputFormat}35 */36publicclassMultiInputUtils {
37/** Do not instantiate */38privateMultiInputUtils() {
39 }
4041/**42 * Get the list of input splits for all formats.43 *44 * @param context The job context45 * @param minSplitCountHint Minimum number of splits to create (hint)46 * @param inputFormats List of input formats47 * @return The list of input splits48 */49publicstatic List<InputSplit> getSplits(JobContext context,
50int minSplitCountHint,
51 List<? extends GiraphInputFormat> inputFormats) throws IOException,
52 InterruptedException {
53 List<InputSplit> splits = Lists.newArrayList();
54for (int index = 0; index < inputFormats.size(); index++) {
55 List<InputSplit> inputFormatSplits =
56 inputFormats.get(index).getSplits(context, minSplitCountHint);
57for (InputSplit split : inputFormatSplits) {
58 splits.add(newInputSplitWithInputFormatIndex(split, index));
59 }
60 }
61return splits;
62 }
6364/**65 * Write input split info to DataOutput. Input split belongs to one of the66 * formats.67 *68 * @param inputSplit InputSplit69 * @param dataOutput DataOutput70 * @param inputFormats List of input formats71 */72publicstaticvoid writeInputSplit(InputSplit inputSplit,
73 DataOutput dataOutput,
74 List<? extends GiraphInputFormat> inputFormats) throws IOException {
75if (inputSplit instanceof InputSplitWithInputFormatIndex) {
76InputSplitWithInputFormatIndex split =
77 (InputSplitWithInputFormatIndex) inputSplit;
78int index = split.getInputFormatIndex();
79 dataOutput.writeInt(index);
80 inputFormats.get(index).writeInputSplit(split.getSplit(), dataOutput);
81 } else {
82thrownew IllegalStateException("writeInputSplit: Got InputSplit which " +
83"was not created by multi input: " + inputSplit.getClass().getName());
84 }
85 }
8687/**88 * Read input split info from DataInput. Input split belongs to one of the89 * formats.90 *91 * @param dataInput DataInput92 * @param inputFormats List of input formats93 * @return InputSplit94 */95publicstatic InputSplit readInputSplit(
96 DataInput dataInput,
97 List<? extends GiraphInputFormat> inputFormats) throws IOException,
98 ClassNotFoundException {
99int index = dataInput.readInt();
100 InputSplit split = inputFormats.get(index).readInputSplit(dataInput);
101returnnewInputSplitWithInputFormatIndex(split, index);
102 }
103 }