This project has retired. For details please refer to its Attic page.
BspOutputFormat xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.bsp;
20  
21  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
22  import org.apache.hadoop.io.Text;
23  import org.apache.hadoop.mapreduce.JobContext;
24  import org.apache.hadoop.mapreduce.OutputCommitter;
25  import org.apache.hadoop.mapreduce.OutputFormat;
26  import org.apache.hadoop.mapreduce.RecordWriter;
27  import org.apache.hadoop.mapreduce.TaskAttemptContext;
28  import org.apache.log4j.Logger;
29  
30  import java.io.IOException;
31  
32  /**
33   * This is for internal use only.  Allows the vertex output format routines
34   * to be called as if a normal Hadoop job.
35   */
36  public class BspOutputFormat extends OutputFormat<Text, Text> {
37    /** Class logger */
38    private static Logger LOG = Logger.getLogger(BspOutputFormat.class);
39  
40    @Override
41    public void checkOutputSpecs(JobContext context)
42      throws IOException, InterruptedException {
43      ImmutableClassesGiraphConfiguration conf =
44          new ImmutableClassesGiraphConfiguration(context.getConfiguration());
45      if (!conf.hasVertexOutputFormat() && !conf.hasEdgeOutputFormat()) {
46        LOG.warn("checkOutputSpecs: ImmutableOutputCommiter" +
47            " will not check anything");
48        return;
49      }
50  
51      if (conf.hasVertexOutputFormat()) {
52        conf.createWrappedVertexOutputFormat().checkOutputSpecs(context);
53      }
54      if (conf.hasEdgeOutputFormat()) {
55        conf.createWrappedEdgeOutputFormat().checkOutputSpecs(context);
56      }
57    }
58  
59    @Override
60    public OutputCommitter getOutputCommitter(TaskAttemptContext context)
61      throws IOException, InterruptedException {
62      ImmutableClassesGiraphConfiguration conf =
63          new ImmutableClassesGiraphConfiguration(context.getConfiguration());
64      if (!conf.hasVertexOutputFormat() && !conf.hasEdgeOutputFormat()) {
65        LOG.warn("getOutputCommitter: Returning " +
66            "ImmutableOutputCommiter (does nothing).");
67        return new ImmutableOutputCommitter();
68      }
69  
70      if (conf.hasVertexOutputFormat()) {
71        return conf.createWrappedVertexOutputFormat().getOutputCommitter(context);
72      } else {
73        return conf.createWrappedEdgeOutputFormat().getOutputCommitter(context);
74      }
75    }
76  
77    @Override
78    public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context)
79      throws IOException, InterruptedException {
80      return new BspRecordWriter();
81    }
82  }