1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.io.internal;
2021import java.io.IOException;
2223import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24import org.apache.giraph.io.MappingReader;
25import org.apache.giraph.job.HadoopUtils;
26import org.apache.giraph.mapping.MappingEntry;
27import org.apache.giraph.worker.WorkerGlobalCommUsage;
28import org.apache.hadoop.io.Writable;
29import org.apache.hadoop.io.WritableComparable;
30import org.apache.hadoop.mapreduce.InputSplit;
31import org.apache.hadoop.mapreduce.TaskAttemptContext;
3233/**34 * For internal use only.35 *36 * Wraps {@link org.apache.giraph.io.MappingReader} to make sure proper37 * configuration parameters are passed around, that parameters set in original38 * configuration are available in methods of this reader39 *40 * @param <I> vertexId type41 * @param <V> vertexValue type42 * @param <E> edgeValue type43 * @param <B> mappingTarget type44 */45publicclass WrappedMappingReader<I extends WritableComparable,
46 V extends Writable, E extends Writable, B extends Writable>
47extends MappingReader<I, V, E, B> {
48/** User set baseMappingReader wrapped over */49privatefinal MappingReader<I, V, E, B> baseMappingReader;
5051/**52 * Constructor53 *54 * @param baseMappingReader User set baseMappingReader55 * @param conf configuration56 */57publicWrappedMappingReader(MappingReader<I, V, E, B> baseMappingReader,
58 ImmutableClassesGiraphConfiguration<I, V, E> conf) {
59this.baseMappingReader = baseMappingReader;
60super.setConf(conf);
61 baseMappingReader.setConf(conf);
62 }
6364 @Override
65publicvoid setConf(
66 ImmutableClassesGiraphConfiguration<I, V, E> conf) {
67// We don't want to use external configuration68 }
6970 @Override
71publicvoid initialize(InputSplit inputSplit,
72 TaskAttemptContext context) throws IOException, InterruptedException {
73 baseMappingReader.initialize(inputSplit,
74 HadoopUtils.makeTaskAttemptContext(getConf(), context));
75 }
7677 @Override
78publicvoid setWorkerGlobalCommUsage(WorkerGlobalCommUsage usage) {
79super.setWorkerGlobalCommUsage(usage);
80// Set global communication usage for edge reader81 baseMappingReader.setWorkerGlobalCommUsage(usage);
82 }
8384 @Override
85publicboolean nextEntry() throws IOException, InterruptedException {
86return baseMappingReader.nextEntry();
87 }
8889 @Override
90public MappingEntry<I, B> getCurrentEntry()
91throws IOException, InterruptedException {
92return baseMappingReader.getCurrentEntry();
93 }
949596 @Override
97publicvoid close() throws IOException {
98 baseMappingReader.close();
99 }
100101 @Override
102publicfloat getProgress() throws IOException, InterruptedException {
103return baseMappingReader.getProgress();
104 }
105 }