1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.io.formats;
2021import java.io.IOException;
2223import org.apache.giraph.graph.Vertex;
24import org.apache.giraph.io.VertexOutputFormat;
25import org.apache.giraph.io.VertexWriter;
26import org.apache.hadoop.io.Text;
27import org.apache.hadoop.io.Writable;
28import org.apache.hadoop.io.WritableComparable;
29import org.apache.hadoop.mapreduce.JobContext;
30import org.apache.hadoop.mapreduce.OutputCommitter;
31import org.apache.hadoop.mapreduce.RecordWriter;
32import org.apache.hadoop.mapreduce.TaskAttemptContext;
3334importstatic org.apache.giraph.conf.GiraphConstants.VERTEX_OUTPUT_FORMAT_SUBDIR;
3536/**37 * Abstract class that users should subclass to use their own text based38 * vertex output format.39 *40 * @param <I> Vertex index value41 * @param <V> Vertex value42 * @param <E> Edge value43 */44 @SuppressWarnings("rawtypes")
45publicabstractclass TextVertexOutputFormat<I extends WritableComparable,
46 V extends Writable, E extends Writable>
47extends VertexOutputFormat<I, V, E> {
48/** Uses the TextOutputFormat to do everything */49protectedGiraphTextOutputFormat textOutputFormat =
50newGiraphTextOutputFormat() {
51 @Override
52protected String getSubdir() {
53return VERTEX_OUTPUT_FORMAT_SUBDIR.get(getConf());
54 }
55 };
5657 @Override
58publicvoid checkOutputSpecs(JobContext context)
59throws IOException, InterruptedException {
60 textOutputFormat.checkOutputSpecs(context);
61 }
6263 @Override
64public OutputCommitter getOutputCommitter(TaskAttemptContext context)
65throws IOException, InterruptedException {
66return textOutputFormat.getOutputCommitter(context);
67 }
6869/**70 * The factory method which produces the {@link TextVertexWriter} used by this71 * output format.72 *73 * @param context74 * the information about the task75 * @return76 * the text vertex writer to be used77 */78 @Override
79publicabstractTextVertexWriter createVertexWriter(TaskAttemptContext
80 context) throws IOException, InterruptedException;
8182/**83 * Abstract class to be implemented by the user based on their specific84 * vertex output. Easiest to ignore the key value separator and only use85 * key instead.86 */87protectedabstractclassTextVertexWriter88extends VertexWriter<I, V, E> {
89/** Internal line record writer */90private RecordWriter<Text, Text> lineRecordWriter;
91/** Context passed to initialize */92private TaskAttemptContext context;
9394 @Override
95publicvoid initialize(TaskAttemptContext context) throws IOException,
96 InterruptedException {
97 lineRecordWriter = createLineRecordWriter(context);
98this.context = context;
99 }
100101/**102 * Create the line record writer. Override this to use a different103 * underlying record writer (useful for testing).104 *105 * @param context106 * the context passed to initialize107 * @return108 * the record writer to be used109 * @throws IOException110 * exception that can be thrown during creation111 * @throws InterruptedException112 * exception that can be thrown during creation113 */114protected RecordWriter<Text, Text> createLineRecordWriter(
115 TaskAttemptContext context) throws IOException, InterruptedException {
116return textOutputFormat.getRecordWriter(context);
117 }
118119 @Override
120publicvoid close(TaskAttemptContext context) throws IOException,
121 InterruptedException {
122 lineRecordWriter.close(context);
123 }
124125/**126 * Get the line record writer.127 *128 * @return Record writer to be used for writing.129 */130public RecordWriter<Text, Text> getRecordWriter() {
131return lineRecordWriter;
132 }
133134/**135 * Get the context.136 *137 * @return Context passed to initialize.138 */139public TaskAttemptContext getContext() {
140return context;
141 }
142 }
143144/**145 * Abstract class to be implemented by the user to write a line for each146 * vertex.147 */148protectedabstractclassTextVertexWriterToEachLineextendsTextVertexWriter {
149150 @SuppressWarnings("unchecked")
151 @Override
152publicfinalvoid writeVertex(Vertex vertex) throws153 IOException, InterruptedException {
154// Note we are writing line as key with null value155 getRecordWriter().write(convertVertexToLine(vertex), null);
156 }
157158/**159 * Writes a line for the given vertex.160 *161 * @param vertex162 * the current vertex for writing163 * @return the text line to be written164 * @throws IOException165 * exception that can be thrown while writing166 */167protectedabstract Text convertVertexToLine(Vertex<I, V, E> vertex)
168throws IOException;
169 }
170 }