View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.framework.output;
19  
20  import java.io.IOException;
21  import java.util.HashMap;
22  import java.util.Map;
23  
24  import org.apache.giraph.bsp.BspOutputFormat;
25  import org.apache.giraph.conf.GiraphConfiguration;
26  import org.apache.giraph.conf.GiraphConstants;
27  import org.apache.giraph.conf.StrConfOption;
28  import org.apache.giraph.utils.ConfigurationObjectUtils;
29  import org.apache.giraph.utils.DefaultOutputCommitter;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.mapreduce.JobContext;
32  import org.apache.hadoop.mapreduce.OutputCommitter;
33  import org.apache.hadoop.mapreduce.TaskAttemptContext;
34  
35  /**
36   * Hadoop output format to use with block output.
37   * It keeps track of all registered outputs, and knows how to create them.
38   */
39  public class BlockOutputFormat extends BspOutputFormat {
40    private static final StrConfOption OUTPUT_CONF_OPTIONS = new StrConfOption(
41        "digraph.outputConfOptions", "",
42        "List of conf options for outputs used");
43  
44    public static <OD> void addOutputDesc(OD outputDesc, String confOption,
45        GiraphConfiguration conf) {
46      GiraphConstants.HADOOP_OUTPUT_FORMAT_CLASS.set(conf,
47          BlockOutputFormat.class);
48      String currentOutputs = OUTPUT_CONF_OPTIONS.get(conf);
49      if (!currentOutputs.isEmpty()) {
50        currentOutputs = currentOutputs + ",";
51      }
52      OUTPUT_CONF_OPTIONS.set(conf, currentOutputs + confOption);
53      ConfigurationObjectUtils.setObjectKryo(outputDesc, confOption, conf);
54    }
55  
56    private static String[] getOutputConfOptions(Configuration conf) {
57      String outputConfOptions = OUTPUT_CONF_OPTIONS.get(conf);
58      return outputConfOptions.isEmpty() ?
59          new String[0] : outputConfOptions.split(",");
60    }
61  
62    public static <OW extends BlockOutputWriter, OD extends BlockOutputDesc<OW>>
63    OD createInitAndCheckOutputDesc(String confOption, Configuration conf,
64        String jobIdentifier) {
65      OD outputDesc = ConfigurationObjectUtils.getObjectKryo(confOption, conf);
66      outputDesc.initializeAndCheck(jobIdentifier, conf);
67      return outputDesc;
68    }
69  
70    public static Map<String, BlockOutputDesc>
71    createInitAndCheckOutputDescsMap(Configuration conf, String jobIdentifier) {
72      String[] outputConfOptions = getOutputConfOptions(conf);
73      Map<String, BlockOutputDesc> ret = new HashMap<>(outputConfOptions.length);
74      for (String outputConfOption : outputConfOptions) {
75        ret.put(outputConfOption,
76            createInitAndCheckOutputDesc(outputConfOption, conf, jobIdentifier));
77      }
78      return ret;
79    }
80  
81    public static Map<String, BlockOutputDesc> createInitAndCheckOutputDescsMap(
82        JobContext jobContext) {
83      return createInitAndCheckOutputDescsMap(jobContext.getConfiguration(),
84          jobContext.getJobID().toString());
85    }
86  
87    @Override
88    public void checkOutputSpecs(JobContext jobContext)
89        throws IOException, InterruptedException {
90      createInitAndCheckOutputDescsMap(jobContext);
91    }
92  
93    @Override
94    public OutputCommitter getOutputCommitter(
95        TaskAttemptContext context) throws IOException, InterruptedException {
96      return new DefaultOutputCommitter() {
97        @Override
98        public void commit(JobContext jobContext) throws IOException {
99          Map<String, BlockOutputDesc> map =
100             createInitAndCheckOutputDescsMap(jobContext);
101         for (BlockOutputDesc outputDesc : map.values()) {
102           outputDesc.commit();
103         }
104       }
105     };
106   }
107 }