1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.io.formats;
2021importstatic org.apache.giraph.conf.GiraphConstants.EDGE_OUTPUT_FORMAT_SUBDIR;
2223import java.io.IOException;
2425import org.apache.giraph.edge.Edge;
26import org.apache.giraph.io.EdgeOutputFormat;
27import org.apache.giraph.io.EdgeWriter;
28import org.apache.hadoop.io.Text;
29import org.apache.hadoop.io.Writable;
30import org.apache.hadoop.io.WritableComparable;
31import org.apache.hadoop.mapreduce.JobContext;
32import org.apache.hadoop.mapreduce.OutputCommitter;
33import org.apache.hadoop.mapreduce.RecordWriter;
34import org.apache.hadoop.mapreduce.TaskAttemptContext;
3536/**37 * Abstract class that users should subclass to use their own text based38 * edge output format.39 *40 * @param <I> Vertex index value41 * @param <V> Vertex value42 * @param <E> Edge value43 */44 @SuppressWarnings("rawtypes")
45publicabstractclass TextEdgeOutputFormat<I extends WritableComparable,
46 V extends Writable, E extends Writable>
47extends EdgeOutputFormat<I, V, E> {
48/** Uses the TextOutputFormat to do everything */49protectedGiraphTextOutputFormat textOutputFormat =
50newGiraphTextOutputFormat() {
51 @Override
52protected String getSubdir() {
53return EDGE_OUTPUT_FORMAT_SUBDIR.get(getConf());
54 }
55 };
5657 @Override
58publicvoid checkOutputSpecs(JobContext context)
59throws IOException, InterruptedException {
60 textOutputFormat.checkOutputSpecs(context);
61 }
6263 @Override
64public OutputCommitter getOutputCommitter(TaskAttemptContext context)
65throws IOException, InterruptedException {
66return textOutputFormat.getOutputCommitter(context);
67 }
6869/**70 * The factory method which produces the {@link TextEdgeWriter} used by this71 * output format.72 *73 * @param context the information about the task74 * @return the text edge writer to be used75 */76 @Override
77publicabstractTextEdgeWriter createEdgeWriter(TaskAttemptContext
78 context) throws IOException, InterruptedException;
7980/**81 * Abstract class to be implemented by the user based on their specific82 * edge output. Easiest to ignore the key value separator and only use83 * key instead.84 */85protectedabstractclass TextEdgeWriter<I extends WritableComparable,
86 V extends Writable, E extends Writable>
87extends EdgeWriter<I, V, E> {
88/** Internal line record writer */89private RecordWriter<Text, Text> lineRecordWriter;
90/** Context passed to initialize */91private TaskAttemptContext context;
9293 @Override
94publicvoid initialize(TaskAttemptContext context) throws IOException,
95 InterruptedException {
96 lineRecordWriter = createLineRecordWriter(context);
97this.context = context;
98 }
99100/**101 * Create the line record writer. Override this to use a different102 * underlying record writer (useful for testing).103 *104 * @param context the context passed to initialize105 * @return the record writer to be used106 * @throws IOException exception that can be thrown during creation107 * @throws InterruptedException exception that can be thrown during creation108 */109protected RecordWriter<Text, Text> createLineRecordWriter(
110 TaskAttemptContext context) throws IOException, InterruptedException {
111return textOutputFormat.getRecordWriter(context);
112 }
113114 @Override
115publicvoid close(TaskAttemptContext context) throws IOException,
116 InterruptedException {
117 lineRecordWriter.close(context);
118 }
119120/**121 * Get the line record writer.122 *123 * @return Record writer to be used for writing.124 */125public RecordWriter<Text, Text> getRecordWriter() {
126return lineRecordWriter;
127 }
128129/**130 * Get the context.131 *132 * @return Context passed to initialize.133 */134public TaskAttemptContext getContext() {
135return context;
136 }
137 }
138139/**140 * Abstract class to be implemented by the user to write a line for each141 * edge.142 */143protectedabstractclass TextEdgeWriterToEachLine<
144 I extends WritableComparable, V extends Writable, E extends Writable>
145extends TextEdgeWriter<I, V, E> {
146147 @Override
148publicfinalvoid writeEdge(I sourceId, V sourceValue, Edge<I, E> edge)
149throws IOException, InterruptedException {
150151// Note we are writing line as key with null value152 getRecordWriter().write(
153 convertEdgeToLine(sourceId, sourceValue, edge), null);
154 }
155156/**157 * Writes a line for the given edge.158 *159 * @param sourceId the current id of the source vertex160 * @param sourceValue the current value of the source vertex161 * @param edge the current vertex for writing162 * @return the text line to be written163 * @throws IOException exception that can be thrown while writing164 */165protectedabstract Text convertEdgeToLine(I sourceId,
166 V sourceValue, Edge<I, E> edge) throws IOException;
167 }
168 }