This project has retired. For details please refer to its Attic page.
TextVertexOutputFormat xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.io.formats;
20  
21  import java.io.IOException;
22  
23  import org.apache.giraph.graph.Vertex;
24  import org.apache.giraph.io.VertexOutputFormat;
25  import org.apache.giraph.io.VertexWriter;
26  import org.apache.hadoop.io.Text;
27  import org.apache.hadoop.io.Writable;
28  import org.apache.hadoop.io.WritableComparable;
29  import org.apache.hadoop.mapreduce.JobContext;
30  import org.apache.hadoop.mapreduce.OutputCommitter;
31  import org.apache.hadoop.mapreduce.RecordWriter;
32  import org.apache.hadoop.mapreduce.TaskAttemptContext;
33  
34  import static org.apache.giraph.conf.GiraphConstants.VERTEX_OUTPUT_FORMAT_SUBDIR;
35  
36  /**
37   * Abstract class that users should subclass to use their own text based
38   * vertex output format.
39   *
40   * @param <I> Vertex index value
41   * @param <V> Vertex value
42   * @param <E> Edge value
43   */
44  @SuppressWarnings("rawtypes")
45  public abstract class TextVertexOutputFormat<I extends WritableComparable,
46      V extends Writable, E extends Writable>
47      extends VertexOutputFormat<I, V, E> {
48    /** Uses the TextOutputFormat to do everything */
49    protected GiraphTextOutputFormat textOutputFormat =
50      new GiraphTextOutputFormat() {
51        @Override
52        protected String getSubdir() {
53          return VERTEX_OUTPUT_FORMAT_SUBDIR.get(getConf());
54        }
55      };
56  
57    @Override
58    public void checkOutputSpecs(JobContext context)
59      throws IOException, InterruptedException {
60      textOutputFormat.checkOutputSpecs(context);
61    }
62  
63    @Override
64    public OutputCommitter getOutputCommitter(TaskAttemptContext context)
65      throws IOException, InterruptedException {
66      return textOutputFormat.getOutputCommitter(context);
67    }
68  
69    /**
70     * The factory method which produces the {@link TextVertexWriter} used by this
71     * output format.
72     *
73     * @param context
74     *          the information about the task
75     * @return
76     *         the text vertex writer to be used
77     */
78    @Override
79    public abstract TextVertexWriter createVertexWriter(TaskAttemptContext
80        context) throws IOException, InterruptedException;
81  
82    /**
83     * Abstract class to be implemented by the user based on their specific
84     * vertex output.  Easiest to ignore the key value separator and only use
85     * key instead.
86     */
87    protected abstract class TextVertexWriter
88        extends VertexWriter<I, V, E> {
89      /** Internal line record writer */
90      private RecordWriter<Text, Text> lineRecordWriter;
91      /** Context passed to initialize */
92      private TaskAttemptContext context;
93  
94      @Override
95      public void initialize(TaskAttemptContext context) throws IOException,
96             InterruptedException {
97        lineRecordWriter = createLineRecordWriter(context);
98        this.context = context;
99      }
100 
101     /**
102      * Create the line record writer. Override this to use a different
103      * underlying record writer (useful for testing).
104      *
105      * @param context
106      *          the context passed to initialize
107      * @return
108      *         the record writer to be used
109      * @throws IOException
110      *           exception that can be thrown during creation
111      * @throws InterruptedException
112      *           exception that can be thrown during creation
113      */
114     protected RecordWriter<Text, Text> createLineRecordWriter(
115         TaskAttemptContext context) throws IOException, InterruptedException {
116       return textOutputFormat.getRecordWriter(context);
117     }
118 
119     @Override
120     public void close(TaskAttemptContext context) throws IOException,
121         InterruptedException {
122       lineRecordWriter.close(context);
123     }
124 
125     /**
126      * Get the line record writer.
127      *
128      * @return Record writer to be used for writing.
129      */
130     public RecordWriter<Text, Text> getRecordWriter() {
131       return lineRecordWriter;
132     }
133 
134     /**
135      * Get the context.
136      *
137      * @return Context passed to initialize.
138      */
139     public TaskAttemptContext getContext() {
140       return context;
141     }
142   }
143 
144   /**
145    * Abstract class to be implemented by the user to write a line for each
146    * vertex.
147    */
148   protected abstract class TextVertexWriterToEachLine extends TextVertexWriter {
149 
150     @SuppressWarnings("unchecked")
151     @Override
152     public final void writeVertex(Vertex vertex) throws
153       IOException, InterruptedException {
154       // Note we are writing line as key with null value
155       getRecordWriter().write(convertVertexToLine(vertex), null);
156     }
157 
158     /**
159      * Writes a line for the given vertex.
160      *
161      * @param vertex
162      *          the current vertex for writing
163      * @return the text line to be written
164      * @throws IOException
165      *           exception that can be thrown while writing
166      */
167     protected abstract Text convertVertexToLine(Vertex<I, V, E> vertex)
168       throws IOException;
169   }
170 }