This project has retired. For details please refer to its
        
        Attic page.
      
 
BlockOutputFormat xref
1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  package org.apache.giraph.block_app.framework.output;
19  
20  import java.io.IOException;
21  import java.util.HashMap;
22  import java.util.Map;
23  
24  import org.apache.giraph.bsp.BspOutputFormat;
25  import org.apache.giraph.conf.GiraphConfiguration;
26  import org.apache.giraph.conf.GiraphConstants;
27  import org.apache.giraph.conf.StrConfOption;
28  import org.apache.giraph.utils.ConfigurationObjectUtils;
29  import org.apache.giraph.utils.DefaultOutputCommitter;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.mapreduce.JobContext;
32  import org.apache.hadoop.mapreduce.OutputCommitter;
33  import org.apache.hadoop.mapreduce.TaskAttemptContext;
34  
35  
36  
37  
38  
39  public class BlockOutputFormat extends BspOutputFormat {
40    private static final StrConfOption OUTPUT_CONF_OPTIONS = new StrConfOption(
41        "giraph.outputConfOptions", "",
42        "List of conf options for outputs used");
43  
44    public static <OD> void addOutputDesc(OD outputDesc, String confOption,
45        GiraphConfiguration conf) {
46      GiraphConstants.HADOOP_OUTPUT_FORMAT_CLASS.set(conf,
47          BlockOutputFormat.class);
48      String currentOutputs = OUTPUT_CONF_OPTIONS.get(conf);
49      if (!currentOutputs.isEmpty()) {
50        currentOutputs = currentOutputs + ",";
51      }
52      OUTPUT_CONF_OPTIONS.set(conf, currentOutputs + confOption);
53      ConfigurationObjectUtils.setObjectKryo(outputDesc, confOption, conf);
54    }
55  
56    
57  
58  
59  
60  
61  
62  
63    public static String[] getOutputConfOptions(Configuration conf) {
64      String outputConfOptions = OUTPUT_CONF_OPTIONS.get(conf);
65      return outputConfOptions.isEmpty() ?
66          new String[0] : outputConfOptions.split(",");
67    }
68  
69    public static <OW extends BlockOutputWriter, OD extends BlockOutputDesc<OW>>
70    OD createInitAndCheckOutputDesc(String confOption, Configuration conf,
71        String jobIdentifier) {
72      OD outputDesc = ConfigurationObjectUtils.getObjectKryo(confOption, conf);
73      outputDesc.initializeAndCheck(jobIdentifier, conf);
74      return outputDesc;
75    }
76  
77    public static Map<String, BlockOutputDesc>
78    createInitAndCheckOutputDescsMap(Configuration conf, String jobIdentifier) {
79      String[] outputConfOptions = getOutputConfOptions(conf);
80      Map<String, BlockOutputDesc> ret = new HashMap<>(outputConfOptions.length);
81      for (String outputConfOption : outputConfOptions) {
82        ret.put(outputConfOption,
83            createInitAndCheckOutputDesc(outputConfOption, conf, jobIdentifier));
84      }
85      return ret;
86    }
87  
88    public static Map<String, BlockOutputDesc> createInitAndCheckOutputDescsMap(
89        JobContext jobContext) {
90      return createInitAndCheckOutputDescsMap(jobContext.getConfiguration(),
91          jobContext.getJobID().toString());
92    }
93  
94    @Override
95    public void checkOutputSpecs(JobContext jobContext)
96        throws IOException, InterruptedException {
97      createInitAndCheckOutputDescsMap(jobContext);
98    }
99  
100   @Override
101   public OutputCommitter getOutputCommitter(
102       TaskAttemptContext context) throws IOException, InterruptedException {
103     return new DefaultOutputCommitter() {
104       @Override
105       public void commit(JobContext jobContext) throws IOException {
106         Map<String, BlockOutputDesc> map =
107             createInitAndCheckOutputDescsMap(jobContext);
108         for (BlockOutputDesc outputDesc : map.values()) {
109           outputDesc.commit();
110         }
111       }
112     };
113   }
114 }