View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.io.internal;
20  
21  import java.io.IOException;
22  
23  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24  import org.apache.giraph.edge.Edge;
25  import org.apache.giraph.io.EdgeOutputFormat;
26  import org.apache.giraph.io.EdgeWriter;
27  import org.apache.giraph.job.HadoopUtils;
28  import org.apache.hadoop.io.Writable;
29  import org.apache.hadoop.io.WritableComparable;
30  import org.apache.hadoop.mapreduce.JobContext;
31  import org.apache.hadoop.mapreduce.JobStatus;
32  import org.apache.hadoop.mapreduce.OutputCommitter;
33  import org.apache.hadoop.mapreduce.TaskAttemptContext;
34  
35  /**
36   * For internal use only.
37   *
38   * Wraps user set {@link EdgeOutputFormat} to make sure proper configuration
39   * parameters are passed around, that user can set parameters in
40   * configuration and they will be available in other methods related to this
41   * format.
42   *
43   * @param <I> Vertex id
44   * @param <V> Vertex data
45   * @param <E> Edge data
46   */
47  @SuppressWarnings("rawtypes")
48  public class WrappedEdgeOutputFormat<I extends WritableComparable,
49      V extends Writable, E extends Writable>
50      extends EdgeOutputFormat<I, V, E> {
51  
52    /** {@link EdgeOutputFormat} which is wrapped */
53    private final EdgeOutputFormat<I, V, E> originalOutputFormat;
54  
55    /**
56     * Constructor
57     *
58     * @param edgeOutputFormat Edge output format to wrap
59     */
60    public WrappedEdgeOutputFormat(
61        EdgeOutputFormat<I, V, E> edgeOutputFormat) {
62      originalOutputFormat = edgeOutputFormat;
63    }
64  
65    @Override
66    public EdgeWriter<I, V, E> createEdgeWriter(
67        TaskAttemptContext context) throws IOException, InterruptedException {
68      final EdgeWriter<I, V, E> edgeWriter =
69          originalOutputFormat.createEdgeWriter(
70              HadoopUtils.makeTaskAttemptContext(getConf(), context));
71      return new EdgeWriter<I, V, E>() {
72        @Override
73        public void setConf(ImmutableClassesGiraphConfiguration<I, V, E> conf) {
74          super.setConf(conf);
75          edgeWriter.setConf(conf);
76        }
77  
78        @Override
79        public void initialize(TaskAttemptContext context)
80          throws IOException, InterruptedException {
81          edgeWriter.initialize(
82            HadoopUtils.makeTaskAttemptContext(getConf(), context));
83        }
84  
85        @Override
86        public void close(
87            TaskAttemptContext context) throws IOException, InterruptedException {
88          edgeWriter.close(
89            HadoopUtils.makeTaskAttemptContext(getConf(), context));
90        }
91  
92        @Override
93        public void writeEdge(I sourceId, V sourceValue, Edge<I, E> edge)
94          throws IOException, InterruptedException {
95          edgeWriter.writeEdge(sourceId, sourceValue, edge);
96        }
97      };
98    }
99  
100   @Override
101   public void checkOutputSpecs(JobContext context)
102     throws IOException, InterruptedException {
103     originalOutputFormat.checkOutputSpecs(
104         HadoopUtils.makeJobContext(getConf(), context));
105   }
106 
107   @Override
108   public OutputCommitter getOutputCommitter(TaskAttemptContext context)
109     throws IOException, InterruptedException {
110 
111     final OutputCommitter outputCommitter =
112         originalOutputFormat.getOutputCommitter(
113             HadoopUtils.makeTaskAttemptContext(getConf(), context));
114 
115     return new OutputCommitter() {
116       @Override
117       public void setupJob(JobContext context) throws IOException {
118         outputCommitter.setupJob(
119             HadoopUtils.makeJobContext(getConf(), context));
120       }
121 
122       @Override
123       public void setupTask(TaskAttemptContext context) throws IOException {
124         outputCommitter.setupTask(
125             HadoopUtils.makeTaskAttemptContext(getConf(), context));
126       }
127 
128       @Override
129       public boolean needsTaskCommit(
130           TaskAttemptContext context) throws IOException {
131         return outputCommitter.needsTaskCommit(
132             HadoopUtils.makeTaskAttemptContext(getConf(), context));
133       }
134 
135       @Override
136       public void commitTask(TaskAttemptContext context) throws IOException {
137         outputCommitter.commitTask(
138             HadoopUtils.makeTaskAttemptContext(getConf(), context));
139       }
140 
141       @Override
142       public void abortTask(TaskAttemptContext context) throws IOException {
143         outputCommitter.abortTask(
144             HadoopUtils.makeTaskAttemptContext(getConf(), context));
145       }
146 
147       @Override
148       public void cleanupJob(JobContext context) throws IOException {
149         outputCommitter.cleanupJob(
150             HadoopUtils.makeJobContext(getConf(), context));
151       }
152 
153 /*if_not[HADOOP_NON_COMMIT_JOB]*/
154       @Override
155       public void commitJob(JobContext context) throws IOException {
156         outputCommitter.commitJob(
157             HadoopUtils.makeJobContext(getConf(), context));
158       }
159 
160       @Override
161       public void abortJob(JobContext context,
162           JobStatus.State state) throws IOException {
163         outputCommitter.abortJob(
164             HadoopUtils.makeJobContext(getConf(), context), state);
165       }
166 /*end[HADOOP_NON_COMMIT_JOB]*/
167     };
168   }
169 }