1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.io.internal;
2021import java.io.IOException;
2223import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24import org.apache.giraph.graph.Vertex;
25import org.apache.giraph.io.VertexReader;
26import org.apache.giraph.job.HadoopUtils;
27import org.apache.giraph.worker.WorkerGlobalCommUsage;
28import org.apache.hadoop.io.Writable;
29import org.apache.hadoop.io.WritableComparable;
30import org.apache.hadoop.mapreduce.InputSplit;
31import org.apache.hadoop.mapreduce.TaskAttemptContext;
3233/**34 * For internal use only.35 *36 * Wraps {@link VertexReader} to make sure proper configuration37 * parameters are passed around, that parameters set in original38 * configuration are available in methods of this reader39 *40 * @param <I> Vertex id41 * @param <V> Vertex data42 * @param <E> Edge data43 */44publicclass WrappedVertexReader<I extends WritableComparable,
45 V extends Writable, E extends Writable> extends VertexReader<I, V, E> {
46/**VertexReader to delegate the methods to */47privatefinal VertexReader<I, V, E> baseVertexReader;
4849/**50 * Constructor51 *52 * @param baseVertexReader VertexReader to delegate all the methods to53 * @param conf Configuration54 */55publicWrappedVertexReader(VertexReader<I, V, E> baseVertexReader,
56 ImmutableClassesGiraphConfiguration<I, V, E> conf) {
57this.baseVertexReader = baseVertexReader;
58super.setConf(conf);
59 baseVertexReader.setConf(conf);
60 }
6162 @Override
63publicvoid setConf(
64 ImmutableClassesGiraphConfiguration<I, V, E> conf) {
65// We don't want to use external configuration66 }
6768 @Override
69publicvoid initialize(InputSplit inputSplit,
70 TaskAttemptContext context) throws IOException, InterruptedException {
71 baseVertexReader.initialize(inputSplit,
72 HadoopUtils.makeTaskAttemptContext(getConf(), context));
73 }
7475 @Override
76publicvoid setWorkerGlobalCommUsage(WorkerGlobalCommUsage usage) {
77super.setWorkerGlobalCommUsage(usage);
78// Set aggregator usage for vertex reader79 baseVertexReader.setWorkerGlobalCommUsage(usage);
80 }
8182 @Override
83publicboolean nextVertex() throws IOException, InterruptedException {
84return baseVertexReader.nextVertex();
85 }
8687 @Override
88public Vertex<I, V, E> getCurrentVertex() throws IOException,
89 InterruptedException {
90return baseVertexReader.getCurrentVertex();
91 }
9293 @Override
94publicvoid close() throws IOException {
95 baseVertexReader.close();
96 }
9798 @Override
99publicfloat getProgress() throws IOException, InterruptedException {
100return baseVertexReader.getProgress();
101 }
102 }