This project has retired. For details please refer to its Attic page.
TextEdgeOutputFormat xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.io.formats;
20  
21  import static org.apache.giraph.conf.GiraphConstants.EDGE_OUTPUT_FORMAT_SUBDIR;
22  
23  import java.io.IOException;
24  
25  import org.apache.giraph.edge.Edge;
26  import org.apache.giraph.io.EdgeOutputFormat;
27  import org.apache.giraph.io.EdgeWriter;
28  import org.apache.hadoop.io.Text;
29  import org.apache.hadoop.io.Writable;
30  import org.apache.hadoop.io.WritableComparable;
31  import org.apache.hadoop.mapreduce.JobContext;
32  import org.apache.hadoop.mapreduce.OutputCommitter;
33  import org.apache.hadoop.mapreduce.RecordWriter;
34  import org.apache.hadoop.mapreduce.TaskAttemptContext;
35  
36  /**
37   * Abstract class that users should subclass to use their own text based
38   * edge output format.
39   *
40   * @param <I> Vertex index value
41   * @param <V> Vertex value
42   * @param <E> Edge value
43   */
44  @SuppressWarnings("rawtypes")
45  public abstract class TextEdgeOutputFormat<I extends WritableComparable,
46      V extends Writable, E extends Writable>
47      extends EdgeOutputFormat<I, V, E> {
48    /** Uses the TextOutputFormat to do everything */
49    protected GiraphTextOutputFormat textOutputFormat =
50      new GiraphTextOutputFormat() {
51        @Override
52        protected String getSubdir() {
53          return EDGE_OUTPUT_FORMAT_SUBDIR.get(getConf());
54        }
55      };
56  
57    @Override
58    public void checkOutputSpecs(JobContext context)
59      throws IOException, InterruptedException {
60      textOutputFormat.checkOutputSpecs(context);
61    }
62  
63    @Override
64    public OutputCommitter getOutputCommitter(TaskAttemptContext context)
65      throws IOException, InterruptedException {
66      return textOutputFormat.getOutputCommitter(context);
67    }
68  
69    /**
70     * The factory method which produces the {@link TextEdgeWriter} used by this
71     * output format.
72     *
73     * @param context  the information about the task
74     * @return         the text edge writer to be used
75     */
76    @Override
77    public abstract TextEdgeWriter createEdgeWriter(TaskAttemptContext
78        context) throws IOException, InterruptedException;
79  
80    /**
81     * Abstract class to be implemented by the user based on their specific
82     * edge output.  Easiest to ignore the key value separator and only use
83     * key instead.
84     */
85    protected abstract class TextEdgeWriter<I extends WritableComparable,
86      V extends Writable, E extends Writable>
87        extends EdgeWriter<I, V, E> {
88      /** Internal line record writer */
89      private RecordWriter<Text, Text> lineRecordWriter;
90      /** Context passed to initialize */
91      private TaskAttemptContext context;
92  
93      @Override
94      public void initialize(TaskAttemptContext context) throws IOException,
95             InterruptedException {
96        lineRecordWriter = createLineRecordWriter(context);
97        this.context = context;
98      }
99  
100     /**
101      * Create the line record writer. Override this to use a different
102      * underlying record writer (useful for testing).
103      *
104      * @param  context the context passed to initialize
105      * @return the record writer to be used
106      * @throws IOException          exception that can be thrown during creation
107      * @throws InterruptedException exception that can be thrown during creation
108      */
109     protected RecordWriter<Text, Text> createLineRecordWriter(
110         TaskAttemptContext context) throws IOException, InterruptedException {
111       return textOutputFormat.getRecordWriter(context);
112     }
113 
114     @Override
115     public void close(TaskAttemptContext context) throws IOException,
116         InterruptedException {
117       lineRecordWriter.close(context);
118     }
119 
120     /**
121      * Get the line record writer.
122      *
123      * @return Record writer to be used for writing.
124      */
125     public RecordWriter<Text, Text> getRecordWriter() {
126       return lineRecordWriter;
127     }
128 
129     /**
130      * Get the context.
131      *
132      * @return Context passed to initialize.
133      */
134     public TaskAttemptContext getContext() {
135       return context;
136     }
137   }
138 
139   /**
140    * Abstract class to be implemented by the user to write a line for each
141    * edge.
142    */
143   protected abstract class TextEdgeWriterToEachLine<
144     I extends WritableComparable, V extends Writable, E extends Writable>
145     extends TextEdgeWriter<I, V, E> {
146 
147     @Override
148     public final void writeEdge(I sourceId, V sourceValue, Edge<I, E> edge)
149       throws IOException, InterruptedException {
150 
151       // Note we are writing line as key with null value
152       getRecordWriter().write(
153         convertEdgeToLine(sourceId, sourceValue, edge), null);
154     }
155 
156     /**
157      * Writes a line for the given edge.
158      *
159      * @param sourceId    the current id of the source vertex
160      * @param sourceValue the current value of the source vertex
161      * @param edge        the current vertex for writing
162      * @return the text line to be written
163      * @throws IOException exception that can be thrown while writing
164      */
165     protected abstract Text convertEdgeToLine(I sourceId,
166       V sourceValue, Edge<I, E> edge) throws IOException;
167   }
168 }