This project has retired. For details please refer to its Attic page.
BlockOutputFormat xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.framework.output;
19  
20  import java.io.IOException;
21  import java.util.HashMap;
22  import java.util.Map;
23  
24  import org.apache.giraph.bsp.BspOutputFormat;
25  import org.apache.giraph.conf.GiraphConfiguration;
26  import org.apache.giraph.conf.GiraphConstants;
27  import org.apache.giraph.conf.StrConfOption;
28  import org.apache.giraph.utils.ConfigurationObjectUtils;
29  import org.apache.giraph.utils.DefaultOutputCommitter;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.mapreduce.JobContext;
32  import org.apache.hadoop.mapreduce.OutputCommitter;
33  import org.apache.hadoop.mapreduce.TaskAttemptContext;
34  
35  /**
36   * Hadoop output format to use with block output.
37   * It keeps track of all registered outputs, and knows how to create them.
38   */
39  public class BlockOutputFormat extends BspOutputFormat {
40    private static final StrConfOption OUTPUT_CONF_OPTIONS = new StrConfOption(
41        "giraph.outputConfOptions", "",
42        "List of conf options for outputs used");
43  
44    public static <OD> void addOutputDesc(OD outputDesc, String confOption,
45        GiraphConfiguration conf) {
46      GiraphConstants.HADOOP_OUTPUT_FORMAT_CLASS.set(conf,
47          BlockOutputFormat.class);
48      String currentOutputs = OUTPUT_CONF_OPTIONS.get(conf);
49      if (!currentOutputs.isEmpty()) {
50        currentOutputs = currentOutputs + ",";
51      }
52      OUTPUT_CONF_OPTIONS.set(conf, currentOutputs + confOption);
53      ConfigurationObjectUtils.setObjectKryo(outputDesc, confOption, conf);
54    }
55  
56    /**
57     * Returns an array of output configuration options set in the input
58     * configuration.
59     *
60     * @param conf Configuration
61     * @return Array of options
62     */
63    public static String[] getOutputConfOptions(Configuration conf) {
64      String outputConfOptions = OUTPUT_CONF_OPTIONS.get(conf);
65      return outputConfOptions.isEmpty() ?
66          new String[0] : outputConfOptions.split(",");
67    }
68  
69    public static <OW extends BlockOutputWriter, OD extends BlockOutputDesc<OW>>
70    OD createInitAndCheckOutputDesc(String confOption, Configuration conf,
71        String jobIdentifier) {
72      OD outputDesc = ConfigurationObjectUtils.getObjectKryo(confOption, conf);
73      outputDesc.initializeAndCheck(jobIdentifier, conf);
74      return outputDesc;
75    }
76  
77    public static Map<String, BlockOutputDesc>
78    createInitAndCheckOutputDescsMap(Configuration conf, String jobIdentifier) {
79      String[] outputConfOptions = getOutputConfOptions(conf);
80      Map<String, BlockOutputDesc> ret = new HashMap<>(outputConfOptions.length);
81      for (String outputConfOption : outputConfOptions) {
82        ret.put(outputConfOption,
83            createInitAndCheckOutputDesc(outputConfOption, conf, jobIdentifier));
84      }
85      return ret;
86    }
87  
88    public static Map<String, BlockOutputDesc> createInitAndCheckOutputDescsMap(
89        JobContext jobContext) {
90      return createInitAndCheckOutputDescsMap(jobContext.getConfiguration(),
91          jobContext.getJobID().toString());
92    }
93  
94    @Override
95    public void checkOutputSpecs(JobContext jobContext)
96        throws IOException, InterruptedException {
97      createInitAndCheckOutputDescsMap(jobContext);
98    }
99  
100   @Override
101   public OutputCommitter getOutputCommitter(
102       TaskAttemptContext context) throws IOException, InterruptedException {
103     return new DefaultOutputCommitter() {
104       @Override
105       public void commit(JobContext jobContext) throws IOException {
106         Map<String, BlockOutputDesc> map =
107             createInitAndCheckOutputDescsMap(jobContext);
108         for (BlockOutputDesc outputDesc : map.values()) {
109           outputDesc.commit();
110         }
111       }
112     };
113   }
114 }