This project has retired. For details please refer to its Attic page.
WrappedEdgeReader xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.io.internal;
20  
21  import java.io.IOException;
22  
23  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24  import org.apache.giraph.edge.Edge;
25  import org.apache.giraph.io.EdgeReader;
26  import org.apache.giraph.job.HadoopUtils;
27  import org.apache.giraph.worker.WorkerGlobalCommUsage;
28  import org.apache.hadoop.io.Writable;
29  import org.apache.hadoop.io.WritableComparable;
30  import org.apache.hadoop.mapreduce.InputSplit;
31  import org.apache.hadoop.mapreduce.TaskAttemptContext;
32  
33  /**
34   * For internal use only.
35   *
36   * Wraps {@link EdgeReader} to make sure proper configuration
37   * parameters are passed around, that parameters set in original
38   * configuration are available in methods of this reader
39   *
40   * @param <I> Vertex id
41   * @param <E> Edge data
42   */
43  public class WrappedEdgeReader<I extends WritableComparable,
44      E extends Writable> extends EdgeReader<I, E> {
45    /** EdgeReader to delegate the methods to */
46    private final EdgeReader<I, E> baseEdgeReader;
47  
48    /**
49     * Constructor
50     *
51     * @param baseEdgeReader EdgeReader to delegate all the methods to
52     * @param conf Configuration
53     */
54    public WrappedEdgeReader(EdgeReader<I, E> baseEdgeReader,
55        ImmutableClassesGiraphConfiguration<I, Writable, E> conf) {
56      this.baseEdgeReader = baseEdgeReader;
57      super.setConf(conf);
58      baseEdgeReader.setConf(conf);
59    }
60  
61    @Override
62    public void setConf(
63        ImmutableClassesGiraphConfiguration<I, Writable, E> conf) {
64      // We don't want to use external configuration
65    }
66  
67    @Override
68    public void initialize(InputSplit inputSplit,
69        TaskAttemptContext context) throws IOException, InterruptedException {
70      baseEdgeReader.initialize(inputSplit,
71          HadoopUtils.makeTaskAttemptContext(getConf(), context));
72    }
73  
74    @Override
75    public void setWorkerGlobalCommUsage(WorkerGlobalCommUsage usage) {
76      super.setWorkerGlobalCommUsage(usage);
77      // Set global communication usage for edge reader
78      baseEdgeReader.setWorkerGlobalCommUsage(usage);
79    }
80  
81    @Override
82    public boolean nextEdge() throws IOException, InterruptedException {
83      return baseEdgeReader.nextEdge();
84    }
85  
86    @Override
87    public I getCurrentSourceId() throws IOException, InterruptedException {
88      return baseEdgeReader.getCurrentSourceId();
89    }
90  
91    @Override
92    public Edge<I, E> getCurrentEdge() throws IOException, InterruptedException {
93      return baseEdgeReader.getCurrentEdge();
94    }
95  
96    @Override
97    public void close() throws IOException {
98      baseEdgeReader.close();
99    }
100 
101   @Override
102   public float getProgress() throws IOException, InterruptedException {
103     return baseEdgeReader.getProgress();
104   }
105 }