1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.io.internal;
2021import java.io.IOException;
2223import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24import org.apache.giraph.edge.Edge;
25import org.apache.giraph.io.EdgeReader;
26import org.apache.giraph.job.HadoopUtils;
27import org.apache.giraph.worker.WorkerGlobalCommUsage;
28import org.apache.hadoop.io.Writable;
29import org.apache.hadoop.io.WritableComparable;
30import org.apache.hadoop.mapreduce.InputSplit;
31import org.apache.hadoop.mapreduce.TaskAttemptContext;
3233/**34 * For internal use only.35 *36 * Wraps {@link EdgeReader} to make sure proper configuration37 * parameters are passed around, that parameters set in original38 * configuration are available in methods of this reader39 *40 * @param <I> Vertex id41 * @param <E> Edge data42 */43publicclass WrappedEdgeReader<I extends WritableComparable,
44 E extends Writable> extends EdgeReader<I, E> {
45/**EdgeReader to delegate the methods to */46privatefinal EdgeReader<I, E> baseEdgeReader;
4748/**49 * Constructor50 *51 * @param baseEdgeReader EdgeReader to delegate all the methods to52 * @param conf Configuration53 */54publicWrappedEdgeReader(EdgeReader<I, E> baseEdgeReader,
55 ImmutableClassesGiraphConfiguration<I, Writable, E> conf) {
56this.baseEdgeReader = baseEdgeReader;
57super.setConf(conf);
58 baseEdgeReader.setConf(conf);
59 }
6061 @Override
62publicvoid setConf(
63 ImmutableClassesGiraphConfiguration<I, Writable, E> conf) {
64// We don't want to use external configuration65 }
6667 @Override
68publicvoid initialize(InputSplit inputSplit,
69 TaskAttemptContext context) throws IOException, InterruptedException {
70 baseEdgeReader.initialize(inputSplit,
71 HadoopUtils.makeTaskAttemptContext(getConf(), context));
72 }
7374 @Override
75publicvoid setWorkerGlobalCommUsage(WorkerGlobalCommUsage usage) {
76super.setWorkerGlobalCommUsage(usage);
77// Set global communication usage for edge reader78 baseEdgeReader.setWorkerGlobalCommUsage(usage);
79 }
8081 @Override
82publicboolean nextEdge() throws IOException, InterruptedException {
83return baseEdgeReader.nextEdge();
84 }
8586 @Override
87public I getCurrentSourceId() throws IOException, InterruptedException {
88return baseEdgeReader.getCurrentSourceId();
89 }
9091 @Override
92public Edge<I, E> getCurrentEdge() throws IOException, InterruptedException {
93return baseEdgeReader.getCurrentEdge();
94 }
9596 @Override
97publicvoid close() throws IOException {
98 baseEdgeReader.close();
99 }
100101 @Override
102publicfloat getProgress() throws IOException, InterruptedException {
103return baseEdgeReader.getProgress();
104 }
105 }