1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.io.formats;
2021import java.io.DataOutputStream;
22import java.io.IOException;
2324import org.apache.hadoop.conf.Configuration;
25import org.apache.hadoop.fs.FSDataOutputStream;
26import org.apache.hadoop.fs.FileSystem;
27import org.apache.hadoop.fs.Path;
28import org.apache.hadoop.io.Text;
29import org.apache.hadoop.io.compress.CompressionCodec;
30import org.apache.hadoop.io.compress.GzipCodec;
31import org.apache.hadoop.mapreduce.RecordWriter;
32import org.apache.hadoop.mapreduce.TaskAttemptContext;
33import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
34import org.apache.hadoop.util.ReflectionUtils;
3536/**37 * The text output format used for Giraph text writing.38 */39publicabstractclassGiraphTextOutputFormat40extends TextOutputFormat<Text, Text> {
4142 @Override
43public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext job)
44throws IOException, InterruptedException {
45 String extension = "";
46 CompressionCodec codec = null;
47 Configuration conf = job.getConfiguration();
48boolean isCompressed = getCompressOutput(job);
4950if (isCompressed) {
51 Class<? extends CompressionCodec> codecClass =
52 getOutputCompressorClass(job, GzipCodec.class);
53 codec =
54 (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
55 extension = codec.getDefaultExtension();
56 }
57 Path file = getDefaultWorkFile(job, extension);
5859/* adjust the path */60 FSDataOutputStream fileOut;
61 FileSystem fs = file.getFileSystem(conf);
62 String subdir = getSubdir();
63if (!subdir.isEmpty()) {
64 Path subdirPath = new Path(subdir);
65 Path subdirAbsPath = new Path(file.getParent(), subdirPath);
66 Path vertexFile = new Path(subdirAbsPath, file.getName());
67 fileOut = fs.create(vertexFile, false);
68 } else {
69 fileOut = fs.create(file, false);
70 }
7172 String separator = "\t";
7374if (!isCompressed) {
75returnnew LineRecordWriter<Text, Text>(fileOut, separator);
76 } else {
77 DataOutputStream out =
78new DataOutputStream(codec.createOutputStream(fileOut));
79returnnew LineRecordWriter<Text, Text>(out, separator);
80 }
81 }
8283/**84 * This function is used to provide an additional path level to keep85 * different text outputs into different directories.86 *87 * @return the subdirectory to be created under the output path88 */89protectedabstract String getSubdir();
90 }