This project has retired. For details please refer to its Attic page.
SequenceFileVertexOutputFormat xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.io.formats;
19  
20  import java.io.IOException;
21  import org.apache.giraph.graph.Vertex;
22  import org.apache.giraph.io.VertexOutputFormat;
23  import org.apache.giraph.io.VertexWriter;
24  import org.apache.hadoop.io.Writable;
25  import org.apache.hadoop.io.WritableComparable;
26  import org.apache.hadoop.mapreduce.JobContext;
27  import org.apache.hadoop.mapreduce.OutputCommitter;
28  import org.apache.hadoop.mapreduce.RecordWriter;
29  import org.apache.hadoop.mapreduce.TaskAttemptContext;
30  import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
31  
32  /**
33   * Sequence file vertex output format. It allows to convert a vertex into a key
34   * and value pair of desired types, and output the pair into a sequence file.
35   * A subclass has to provide two conversion methods convertToSequenceFileKey()
36   * and convertToSequenceFileValue().
37   *
38   * @param <I> Vertex id type
39   * @param <V> Vertex value type
40   * @param <E> Edge value type
41   * @param <OK> Output key data type for a sequence file
42   * @param <OV> Output value data type for a sequence file
43   */
44  public abstract class SequenceFileVertexOutputFormat<
45    I extends WritableComparable,
46    V extends Writable,
47    E extends Writable,
48    OK extends Writable,
49    OV extends Writable>
50    extends VertexOutputFormat<I, V, E> {
51    /**
52     * Output format of a sequence file that stores key-value pairs of the
53     * desired types.
54     */
55    private SequenceFileOutputFormat<OK, OV> sequenceFileOutputFormat =
56        new SequenceFileOutputFormat<OK, OV>();
57  
58    @Override
59    public void checkOutputSpecs(JobContext context)
60      throws IOException, InterruptedException {
61      sequenceFileOutputFormat.checkOutputSpecs(context);
62    }
63  
64    @Override
65    public OutputCommitter getOutputCommitter(TaskAttemptContext context)
66      throws IOException, InterruptedException {
67      return sequenceFileOutputFormat.getOutputCommitter(context);
68    }
69  
70    @Override
71    public VertexWriter createVertexWriter(TaskAttemptContext
72        context) throws IOException, InterruptedException {
73      return new SequenceFileVertexWriter();
74    }
75  
76    /**
77     * Converts a vertex identifier into a sequence file key.
78     * @param vertexId Vertex identifier.
79     * @return Sequence file key.
80     */
81    protected abstract OK convertToSequenceFileKey(I vertexId);
82  
83    /**
84     * Converts a vertex value into a sequence file value.
85     * @param vertexValue Vertex value.
86     * @return Sequence file value.
87     */
88    protected abstract OV convertToSequenceFileValue(V vertexValue);
89  
90    /**
91     * Vertex writer that converts a vertex into a key-value pair and writes
92     * the result into a sequence file for a context.
93     */
94    private class SequenceFileVertexWriter extends VertexWriter<I, V, E> {
95      /**
96       * A record writer that will write into a sequence file initialized for
97       * a context.
98       */
99      private RecordWriter<OK, OV> recordWriter;
100 
101     @Override
102     public void initialize(TaskAttemptContext context) throws IOException,
103            InterruptedException {
104       recordWriter = sequenceFileOutputFormat.getRecordWriter(context);
105     }
106 
107     @Override
108     public final void writeVertex(Vertex<I, V, E> vertex) throws
109       IOException, InterruptedException {
110       // Convert vertex id to type OK.
111       OK outKey = convertToSequenceFileKey(vertex.getId());
112       // Convert vertex value to type OV.
113       OV outValue = convertToSequenceFileValue(vertex.getValue());
114       recordWriter.write(outKey, outValue);
115     }
116 
117     @Override
118     public void close(TaskAttemptContext context) throws IOException,
119         InterruptedException {
120       recordWriter.close(context);
121     }
122   }
123 }