View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.io.internal;
20  
21  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
22  import org.apache.giraph.graph.Vertex;
23  import org.apache.giraph.io.VertexOutputFormat;
24  import org.apache.giraph.io.VertexWriter;
25  import org.apache.giraph.job.HadoopUtils;
26  import org.apache.hadoop.io.Writable;
27  import org.apache.hadoop.io.WritableComparable;
28  import org.apache.hadoop.mapreduce.JobContext;
29  /*if_not[HADOOP_NON_COMMIT_JOB]*/
30  import org.apache.hadoop.mapreduce.JobStatus;
31  /*end[HADOOP_NON_COMMIT_JOB]*/
32  import org.apache.hadoop.mapreduce.OutputCommitter;
33  import org.apache.hadoop.mapreduce.TaskAttemptContext;
34  
35  import java.io.IOException;
36  
37  /**
38   * For internal use only.
39   *
40   * Wraps user set {@link VertexOutputFormat} to make sure proper configuration
41   * parameters are passed around, that user can set parameters in
42   * configuration and they will be available in other methods related to this
43   * format.
44   *
45   * @param <I> Vertex id
46   * @param <V> Vertex data
47   * @param <E> Edge data
48   */
49  public class WrappedVertexOutputFormat<I extends WritableComparable,
50      V extends Writable, E extends Writable>
51      extends VertexOutputFormat<I, V, E> {
52    /** {@link VertexOutputFormat} which is wrapped */
53    private VertexOutputFormat<I, V, E> originalOutputFormat;
54  
55    /**
56     * Constructor
57     *
58     * @param vertexOutputFormat Vertex output format to wrap
59     */
60    public WrappedVertexOutputFormat(
61        VertexOutputFormat<I, V, E> vertexOutputFormat) {
62      originalOutputFormat = vertexOutputFormat;
63    }
64  
65    @Override
66    public VertexWriter<I, V, E> createVertexWriter(
67        TaskAttemptContext context) throws IOException, InterruptedException {
68      final VertexWriter<I, V, E> vertexWriter =
69          originalOutputFormat.createVertexWriter(
70              HadoopUtils.makeTaskAttemptContext(getConf(), context));
71      return new VertexWriter<I, V, E>() {
72        @Override
73        public void setConf(
74            ImmutableClassesGiraphConfiguration<I, V, E> conf) {
75          super.setConf(conf);
76          vertexWriter.setConf(conf);
77        }
78  
79        @Override
80        public void initialize(
81            TaskAttemptContext context) throws IOException, InterruptedException {
82          vertexWriter.initialize(
83              HadoopUtils.makeTaskAttemptContext(getConf(), context));
84        }
85  
86        @Override
87        public void close(
88            TaskAttemptContext context) throws IOException, InterruptedException {
89          vertexWriter.close(
90              HadoopUtils.makeTaskAttemptContext(getConf(), context));
91        }
92  
93        @Override
94        public void writeVertex(
95            Vertex<I, V, E> vertex) throws IOException, InterruptedException {
96          vertexWriter.writeVertex(vertex);
97        }
98      };
99    }
100 
101   @Override
102   public void checkOutputSpecs(
103       JobContext context) throws IOException, InterruptedException {
104     originalOutputFormat.checkOutputSpecs(
105         HadoopUtils.makeJobContext(getConf(), context));
106   }
107 
108   @Override
109   public OutputCommitter getOutputCommitter(
110       TaskAttemptContext context) throws IOException, InterruptedException {
111     final OutputCommitter outputCommitter =
112         originalOutputFormat.getOutputCommitter(
113             HadoopUtils.makeTaskAttemptContext(getConf(), context));
114     return new OutputCommitter() {
115       @Override
116       public void setupJob(JobContext context) throws IOException {
117         outputCommitter.setupJob(
118             HadoopUtils.makeJobContext(getConf(), context));
119       }
120 
121       @Override
122       public void setupTask(TaskAttemptContext context) throws IOException {
123         outputCommitter.setupTask(
124             HadoopUtils.makeTaskAttemptContext(getConf(), context));
125       }
126 
127       @Override
128       public boolean needsTaskCommit(
129           TaskAttemptContext context) throws IOException {
130         return outputCommitter.needsTaskCommit(
131             HadoopUtils.makeTaskAttemptContext(getConf(), context));
132       }
133 
134       @Override
135       public void commitTask(TaskAttemptContext context) throws IOException {
136         outputCommitter.commitTask(
137             HadoopUtils.makeTaskAttemptContext(getConf(), context));
138       }
139 
140       @Override
141       public void abortTask(TaskAttemptContext context) throws IOException {
142         outputCommitter.abortTask(
143             HadoopUtils.makeTaskAttemptContext(getConf(), context));
144       }
145 
146       @Override
147       public void cleanupJob(JobContext context) throws IOException {
148         outputCommitter.cleanupJob(
149             HadoopUtils.makeJobContext(getConf(), context));
150       }
151 
152 /*if_not[HADOOP_NON_COMMIT_JOB]*/
153       @Override
154       public void commitJob(JobContext context) throws IOException {
155         outputCommitter.commitJob(
156             HadoopUtils.makeJobContext(getConf(), context));
157       }
158 
159       @Override
160       public void abortJob(JobContext context,
161           JobStatus.State state) throws IOException {
162         outputCommitter.abortJob(
163             HadoopUtils.makeJobContext(getConf(), context), state);
164       }
165 /*end[HADOOP_NON_COMMIT_JOB]*/
166     };
167   }
168 }