View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.io.accumulo;
19  
20  import java.io.IOException;
21  import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
22  import org.apache.accumulo.core.data.Mutation;
23  import org.apache.giraph.io.VertexOutputFormat;
24  import org.apache.giraph.io.VertexWriter;
25  import org.apache.hadoop.io.Text;
26  import org.apache.hadoop.io.Writable;
27  import org.apache.hadoop.io.WritableComparable;
28  import org.apache.hadoop.mapreduce.JobContext;
29  import org.apache.hadoop.mapreduce.OutputCommitter;
30  import org.apache.hadoop.mapreduce.RecordWriter;
31  import org.apache.hadoop.mapreduce.TaskAttemptContext;
32  /**
33   *
34   *  Class which wraps the AccumuloOutputFormat. It's designed
35   *  as an extension point to VertexOutputFormat subclasses who wish
36   *  to write vertices back to an Accumulo table.
37   *
38   *  Works with
39   *  {@link AccumuloVertexInputFormat}
40   *
41   *
42   * @param <I> vertex id type
43   * @param <V>  vertex value type
44   * @param <E>  edge type
45   */
46  public abstract class AccumuloVertexOutputFormat<
47          I extends WritableComparable,
48          V extends Writable,
49          E extends Writable>
50          extends VertexOutputFormat<I, V, E> {
51  
52  
53    /**
54     * Output table parameter
55     */
56    public static final String OUTPUT_TABLE = "OUTPUT_TABLE";
57  
58    /**
59     * Accumulo delegate for table output
60     */
61    protected AccumuloOutputFormat accumuloOutputFormat =
62            new AccumuloOutputFormat();
63  
64    /**
65     *
66     * Main abstraction point for vertex writers to persist back
67     * to Accumulo tables.
68     *
69     * @param <I> vertex id type
70     * @param <V> vertex value type
71     * @param <E>  edge type
72     */
73    public abstract static class AccumuloVertexWriter<
74        I extends WritableComparable,
75        V extends Writable,
76        E extends Writable>
77        extends VertexWriter<I, V, E> {
78  
79      /**
80       * task attempt context.
81       */
82      private TaskAttemptContext context;
83  
84      /**
85       * Accumulo record writer
86       */
87      private RecordWriter<Text, Mutation> recordWriter;
88  
89      /**
90       * Constructor for use with subclasses
91       *
92       * @param recordWriter accumulo record writer
93       */
94      public AccumuloVertexWriter(RecordWriter<Text, Mutation> recordWriter) {
95        this.recordWriter = recordWriter;
96      }
97  
98      /**
99       * initialize
100      *
101      * @param context Context used to write the vertices.
102      * @throws IOException
103      */
104     public void initialize(TaskAttemptContext context) throws IOException {
105       this.context = context;
106     }
107 
108     /**
109      *  close
110      *
111      * @param context the context of the task
112      * @throws IOException
113      * @throws InterruptedException
114      */
115     public void close(TaskAttemptContext context)
116       throws IOException, InterruptedException {
117       recordWriter.close(context);
118     }
119 
120     /**
121      * Get the table record writer;
122      *
123      * @return Record writer to be used for writing.
124      */
125     public RecordWriter<Text, Mutation> getRecordWriter() {
126       return recordWriter;
127     }
128 
129     /**
130      * Get the context.
131      *
132      * @return Context passed to initialize.
133      */
134     public TaskAttemptContext getContext() {
135       return context;
136     }
137 
138   }
139   /**
140    *
141    * checkOutputSpecs
142    *
143    * @param context information about the job
144    * @throws IOException
145    * @throws InterruptedException
146    */
147   @Override
148   public void checkOutputSpecs(JobContext context)
149     throws IOException, InterruptedException {
150     try {
151       accumuloOutputFormat.checkOutputSpecs(context);
152     } catch (IOException e) {
153       if (e.getMessage().contains("Output info has not been set")) {
154         throw new IOException(e.getMessage() + " Make sure you initialized" +
155                 " AccumuloOutputFormat static setters " +
156                 "before passing the config to GiraphJob.");
157       }
158     }
159   }
160 
161   /**
162    * getOutputCommitter
163    *
164    * @param context the task context
165    * @return OutputCommitter
166    * @throws IOException
167    * @throws InterruptedException
168    */
169   @Override
170   public OutputCommitter getOutputCommitter(TaskAttemptContext context)
171     throws IOException, InterruptedException {
172     return accumuloOutputFormat.getOutputCommitter(context);
173   }
174 }