1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.io.formats.multi;
2021import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
22import org.apache.giraph.io.VertexInputFormat;
23import org.apache.giraph.io.VertexReader;
24import org.apache.giraph.io.internal.WrappedVertexReader;
25import org.apache.hadoop.conf.Configuration;
26import org.apache.hadoop.io.Writable;
27import org.apache.hadoop.io.WritableComparable;
28import org.apache.hadoop.mapreduce.InputSplit;
29import org.apache.hadoop.mapreduce.JobContext;
30import org.apache.hadoop.mapreduce.TaskAttemptContext;
3132import java.io.DataInput;
33import java.io.DataOutput;
34import java.io.IOException;
35import java.util.List;
3637/**38 * Vertex input format which wraps several vertex input formats.39 * Provides the way to read data from multiple sources,40 * using several different input formats.41 *42 * @param <I> Vertex id43 * @param <V> Vertex data44 * @param <E> Edge data45 */46publicclass MultiVertexInputFormat<I extends WritableComparable,
47 V extends Writable, E extends Writable> extends VertexInputFormat<I, V, E> {
48/** Vertex input formats */49private List<VertexInputFormat<I, V, E>> vertexInputFormats;
5051 @Override publicvoid checkInputSpecs(Configuration conf) {
52for (VertexInputFormat vertexInputFormat : vertexInputFormats) {
53 vertexInputFormat.checkInputSpecs(conf);
54 }
55 }
5657 @Override
58publicvoid setConf(
59 ImmutableClassesGiraphConfiguration<I, V, E> conf) {
60super.setConf(conf);
61 vertexInputFormats =
62 VertexInputFormatDescription.createVertexInputFormats(getConf());
63if (vertexInputFormats.isEmpty()) {
64thrownew IllegalStateException("setConf: Using MultiVertexInputFormat " +
65"without specifying vertex inputs");
66 }
67 }
6869 @Override
70public VertexReader<I, V, E> createVertexReader(InputSplit inputSplit,
71 TaskAttemptContext context) throws IOException {
72if (inputSplit instanceof InputSplitWithInputFormatIndex) {
73// When multithreaded input is used we need to make sure other threads74// don't change context's configuration while we use it75synchronized (context) {
76InputSplitWithInputFormatIndex split =
77 (InputSplitWithInputFormatIndex) inputSplit;
78 VertexInputFormat<I, V, E> vertexInputFormat =
79 vertexInputFormats.get(split.getInputFormatIndex());
80 VertexReader<I, V, E> vertexReader =
81 vertexInputFormat.createVertexReader(split.getSplit(), context);
82returnnew WrappedVertexReader<I, V, E>(
83 vertexReader, vertexInputFormat.getConf()) {
84 @Override
85publicvoid initialize(InputSplit inputSplit,
86 TaskAttemptContext context) throws IOException,
87 InterruptedException {
88// When multithreaded input is used we need to make sure other89// threads don't change context's configuration while we use it90synchronized (context) {
91super.initialize(inputSplit, context);
92 }
93 }
94 };
95 }
96 } else {
97thrownew IllegalStateException("createVertexReader: Got InputSplit " +
98"which was not created by this class: " +
99 inputSplit.getClass().getName());
100 }
101 }
102103 @Override
104public List<InputSplit> getSplits(JobContext context,
105int minSplitCountHint) throws IOException, InterruptedException {
106// When multithreaded input is used we need to make sure other threads don't107// change context's configuration while we use it108synchronized (context) {
109return MultiInputUtils.getSplits(
110 context, minSplitCountHint, vertexInputFormats);
111 }
112 }
113114 @Override
115publicvoid writeInputSplit(InputSplit inputSplit,
116 DataOutput dataOutput) throws IOException {
117 MultiInputUtils.writeInputSplit(inputSplit, dataOutput, vertexInputFormats);
118 }
119120 @Override
121public InputSplit readInputSplit(
122 DataInput dataInput) throws IOException, ClassNotFoundException {
123return MultiInputUtils.readInputSplit(dataInput, vertexInputFormats);
124 }
125 }