1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.io.formats;
1920import java.io.IOException;
21import org.apache.giraph.graph.Vertex;
22import org.apache.giraph.io.VertexOutputFormat;
23import org.apache.giraph.io.VertexWriter;
24import org.apache.hadoop.io.Writable;
25import org.apache.hadoop.io.WritableComparable;
26import org.apache.hadoop.mapreduce.JobContext;
27import org.apache.hadoop.mapreduce.OutputCommitter;
28import org.apache.hadoop.mapreduce.RecordWriter;
29import org.apache.hadoop.mapreduce.TaskAttemptContext;
30import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
3132/**33 * Sequence file vertex output format. It allows to convert a vertex into a key34 * and value pair of desired types, and output the pair into a sequence file.35 * A subclass has to provide two conversion methods convertToSequenceFileKey()36 * and convertToSequenceFileValue().37 *38 * @param <I> Vertex id type39 * @param <V> Vertex value type40 * @param <E> Edge value type41 * @param <OK> Output key data type for a sequence file42 * @param <OV> Output value data type for a sequence file43 */44publicabstractclass SequenceFileVertexOutputFormat<
45 I extends WritableComparable,
46 V extends Writable,
47 E extends Writable,
48 OK extends Writable,
49 OV extends Writable>
50extends VertexOutputFormat<I, V, E> {
51/**52 * Output format of a sequence file that stores key-value pairs of the53 * desired types.54 */55private SequenceFileOutputFormat<OK, OV> sequenceFileOutputFormat =
56new SequenceFileOutputFormat<OK, OV>();
5758 @Override
59publicvoid checkOutputSpecs(JobContext context)
60throws IOException, InterruptedException {
61 sequenceFileOutputFormat.checkOutputSpecs(context);
62 }
6364 @Override
65public OutputCommitter getOutputCommitter(TaskAttemptContext context)
66throws IOException, InterruptedException {
67return sequenceFileOutputFormat.getOutputCommitter(context);
68 }
6970 @Override
71publicVertexWriter createVertexWriter(TaskAttemptContext
72 context) throws IOException, InterruptedException {
73returnnewSequenceFileVertexWriter();
74 }
7576/**77 * Converts a vertex identifier into a sequence file key.78 * @param vertexId Vertex identifier.79 * @return Sequence file key.80 */81protectedabstract OK convertToSequenceFileKey(I vertexId);
8283/**84 * Converts a vertex value into a sequence file value.85 * @param vertexValue Vertex value.86 * @return Sequence file value.87 */88protectedabstract OV convertToSequenceFileValue(V vertexValue);
8990/**91 * Vertex writer that converts a vertex into a key-value pair and writes92 * the result into a sequence file for a context.93 */94privateclassSequenceFileVertexWriterextends VertexWriter<I, V, E> {
95/**96 * A record writer that will write into a sequence file initialized for97 * a context.98 */99private RecordWriter<OK, OV> recordWriter;
100101 @Override
102publicvoid initialize(TaskAttemptContext context) throws IOException,
103 InterruptedException {
104 recordWriter = sequenceFileOutputFormat.getRecordWriter(context);
105 }
106107 @Override
108publicfinalvoid writeVertex(Vertex<I, V, E> vertex) throws109 IOException, InterruptedException {
110// Convert vertex id to type OK.111 OK outKey = convertToSequenceFileKey(vertex.getId());
112// Convert vertex value to type OV.113 OV outValue = convertToSequenceFileValue(vertex.getValue());
114 recordWriter.write(outKey, outValue);
115 }
116117 @Override
118publicvoid close(TaskAttemptContext context) throws IOException,
119 InterruptedException {
120 recordWriter.close(context);
121 }
122 }
123 }