This project has retired. For details please refer to its Attic page.
WrappedVertexOutputFormat xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.io.internal;
20  
21  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
22  import org.apache.giraph.graph.Vertex;
23  import org.apache.giraph.io.VertexOutputFormat;
24  import org.apache.giraph.io.VertexWriter;
25  import org.apache.giraph.job.HadoopUtils;
26  import org.apache.hadoop.io.Writable;
27  import org.apache.hadoop.io.WritableComparable;
28  import org.apache.hadoop.mapreduce.JobContext;
29  import org.apache.hadoop.mapreduce.JobStatus;
30  import org.apache.hadoop.mapreduce.OutputCommitter;
31  import org.apache.hadoop.mapreduce.TaskAttemptContext;
32  
33  import java.io.IOException;
34  
35  /**
36   * For internal use only.
37   *
38   * Wraps user set {@link VertexOutputFormat} to make sure proper configuration
39   * parameters are passed around, that user can set parameters in
40   * configuration and they will be available in other methods related to this
41   * format.
42   *
43   * @param <I> Vertex id
44   * @param <V> Vertex data
45   * @param <E> Edge data
46   */
47  public class WrappedVertexOutputFormat<I extends WritableComparable,
48      V extends Writable, E extends Writable>
49      extends VertexOutputFormat<I, V, E> {
50    /** {@link VertexOutputFormat} which is wrapped */
51    private VertexOutputFormat<I, V, E> originalOutputFormat;
52  
53    /**
54     * Constructor
55     *
56     * @param vertexOutputFormat Vertex output format to wrap
57     */
58    public WrappedVertexOutputFormat(
59        VertexOutputFormat<I, V, E> vertexOutputFormat) {
60      originalOutputFormat = vertexOutputFormat;
61    }
62  
63    @Override
64    public VertexWriter<I, V, E> createVertexWriter(
65        TaskAttemptContext context) throws IOException, InterruptedException {
66      final VertexWriter<I, V, E> vertexWriter =
67          originalOutputFormat.createVertexWriter(
68              HadoopUtils.makeTaskAttemptContext(getConf(), context));
69      return new VertexWriter<I, V, E>() {
70        @Override
71        public void setConf(
72            ImmutableClassesGiraphConfiguration<I, V, E> conf) {
73          super.setConf(conf);
74          vertexWriter.setConf(conf);
75        }
76  
77        @Override
78        public void initialize(
79            TaskAttemptContext context) throws IOException, InterruptedException {
80          vertexWriter.initialize(
81              HadoopUtils.makeTaskAttemptContext(getConf(), context));
82        }
83  
84        @Override
85        public void close(
86            TaskAttemptContext context) throws IOException, InterruptedException {
87          vertexWriter.close(
88              HadoopUtils.makeTaskAttemptContext(getConf(), context));
89        }
90  
91        @Override
92        public void writeVertex(
93            Vertex<I, V, E> vertex) throws IOException, InterruptedException {
94          vertexWriter.writeVertex(vertex);
95        }
96      };
97    }
98  
99    @Override
100   public void checkOutputSpecs(
101       JobContext context) throws IOException, InterruptedException {
102     originalOutputFormat.checkOutputSpecs(
103         HadoopUtils.makeJobContext(getConf(), context));
104   }
105 
106   @Override
107   public OutputCommitter getOutputCommitter(
108       TaskAttemptContext context) throws IOException, InterruptedException {
109     final OutputCommitter outputCommitter =
110         originalOutputFormat.getOutputCommitter(
111             HadoopUtils.makeTaskAttemptContext(getConf(), context));
112     return new OutputCommitter() {
113       @Override
114       public void setupJob(JobContext context) throws IOException {
115         outputCommitter.setupJob(
116             HadoopUtils.makeJobContext(getConf(), context));
117       }
118 
119       @Override
120       public void setupTask(TaskAttemptContext context) throws IOException {
121         outputCommitter.setupTask(
122             HadoopUtils.makeTaskAttemptContext(getConf(), context));
123       }
124 
125       @Override
126       public boolean needsTaskCommit(
127           TaskAttemptContext context) throws IOException {
128         return outputCommitter.needsTaskCommit(
129             HadoopUtils.makeTaskAttemptContext(getConf(), context));
130       }
131 
132       @Override
133       public void commitTask(TaskAttemptContext context) throws IOException {
134         outputCommitter.commitTask(
135             HadoopUtils.makeTaskAttemptContext(getConf(), context));
136       }
137 
138       @Override
139       public void abortTask(TaskAttemptContext context) throws IOException {
140         outputCommitter.abortTask(
141             HadoopUtils.makeTaskAttemptContext(getConf(), context));
142       }
143 
144       @Override
145       public void cleanupJob(JobContext context) throws IOException {
146         outputCommitter.cleanupJob(
147             HadoopUtils.makeJobContext(getConf(), context));
148       }
149 
150       @Override
151       public void commitJob(JobContext context) throws IOException {
152         outputCommitter.commitJob(
153             HadoopUtils.makeJobContext(getConf(), context));
154       }
155 
156       @Override
157       public void abortJob(JobContext context,
158           JobStatus.State state) throws IOException {
159         outputCommitter.abortJob(
160             HadoopUtils.makeJobContext(getConf(), context), state);
161       }
162     };
163   }
164 
165   @Override
166   public void preWriting(TaskAttemptContext context) {
167     originalOutputFormat.preWriting(context);
168   }
169 
170   @Override
171   public void postWriting(TaskAttemptContext context) {
172     originalOutputFormat.postWriting(context);
173   }
174 }