This project has retired. For details please refer to its Attic page.
WrappedVertexReader xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.io.internal;
20  
21  import java.io.IOException;
22  
23  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24  import org.apache.giraph.graph.Vertex;
25  import org.apache.giraph.io.VertexReader;
26  import org.apache.giraph.job.HadoopUtils;
27  import org.apache.giraph.worker.WorkerGlobalCommUsage;
28  import org.apache.hadoop.io.Writable;
29  import org.apache.hadoop.io.WritableComparable;
30  import org.apache.hadoop.mapreduce.InputSplit;
31  import org.apache.hadoop.mapreduce.TaskAttemptContext;
32  
33  /**
34   * For internal use only.
35   *
36   * Wraps {@link VertexReader} to make sure proper configuration
37   * parameters are passed around, that parameters set in original
38   * configuration are available in methods of this reader
39   *
40   * @param <I> Vertex id
41   * @param <V> Vertex data
42   * @param <E> Edge data
43   */
44  public class WrappedVertexReader<I extends WritableComparable,
45      V extends Writable, E extends Writable> extends VertexReader<I, V, E> {
46    /** VertexReader to delegate the methods to */
47    private final VertexReader<I, V, E> baseVertexReader;
48  
49    /**
50     * Constructor
51     *
52     * @param baseVertexReader VertexReader to delegate all the methods to
53     * @param conf Configuration
54     */
55    public WrappedVertexReader(VertexReader<I, V, E> baseVertexReader,
56        ImmutableClassesGiraphConfiguration<I, V, E> conf) {
57      this.baseVertexReader = baseVertexReader;
58      super.setConf(conf);
59      baseVertexReader.setConf(conf);
60    }
61  
62    @Override
63    public void setConf(
64        ImmutableClassesGiraphConfiguration<I, V, E> conf) {
65      // We don't want to use external configuration
66    }
67  
68    @Override
69    public void initialize(InputSplit inputSplit,
70        TaskAttemptContext context) throws IOException, InterruptedException {
71      baseVertexReader.initialize(inputSplit,
72          HadoopUtils.makeTaskAttemptContext(getConf(), context));
73    }
74  
75    @Override
76    public void setWorkerGlobalCommUsage(WorkerGlobalCommUsage usage) {
77      super.setWorkerGlobalCommUsage(usage);
78      // Set aggregator usage for vertex reader
79      baseVertexReader.setWorkerGlobalCommUsage(usage);
80    }
81  
82    @Override
83    public boolean nextVertex() throws IOException, InterruptedException {
84      return baseVertexReader.nextVertex();
85    }
86  
87    @Override
88    public Vertex<I, V, E> getCurrentVertex() throws IOException,
89        InterruptedException {
90      return baseVertexReader.getCurrentVertex();
91    }
92  
93    @Override
94    public void close() throws IOException {
95      baseVertexReader.close();
96    }
97  
98    @Override
99    public float getProgress() throws IOException, InterruptedException {
100     return baseVertexReader.getProgress();
101   }
102 }