1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.io.accumulo;
1920import java.io.IOException;
21import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
22import org.apache.accumulo.core.data.Mutation;
23import org.apache.giraph.io.VertexOutputFormat;
24import org.apache.giraph.io.VertexWriter;
25import org.apache.hadoop.io.Text;
26import org.apache.hadoop.io.Writable;
27import org.apache.hadoop.io.WritableComparable;
28import org.apache.hadoop.mapreduce.JobContext;
29import org.apache.hadoop.mapreduce.OutputCommitter;
30import org.apache.hadoop.mapreduce.RecordWriter;
31import org.apache.hadoop.mapreduce.TaskAttemptContext;
32/**33 *34 * Class which wraps the AccumuloOutputFormat. It's designed35 * as an extension point to VertexOutputFormat subclasses who wish36 * to write vertices back to an Accumulo table.37 *38 * Works with39 * {@link AccumuloVertexInputFormat}40 *41 *42 * @param <I> vertex id type43 * @param <V> vertex value type44 * @param <E> edge type45 */46publicabstractclass AccumuloVertexOutputFormat<
47 I extends WritableComparable,
48 V extends Writable,
49 E extends Writable>
50extends VertexOutputFormat<I, V, E> {
515253/**54 * Output table parameter55 */56publicstaticfinal String OUTPUT_TABLE = "OUTPUT_TABLE";
5758/**59 * Accumulo delegate for table output60 */61protected AccumuloOutputFormat accumuloOutputFormat =
62new AccumuloOutputFormat();
6364/**65 *66 * Main abstraction point for vertex writers to persist back67 * to Accumulo tables.68 *69 * @param <I> vertex id type70 * @param <V> vertex value type71 * @param <E> edge type72 */73publicabstractstaticclass AccumuloVertexWriter<
74 I extends WritableComparable,
75 V extends Writable,
76 E extends Writable>
77extends VertexWriter<I, V, E> {
7879/**80 * task attempt context.81 */82private TaskAttemptContext context;
8384/**85 * Accumulo record writer86 */87private RecordWriter<Text, Mutation> recordWriter;
8889/**90 * Constructor for use with subclasses91 *92 * @param recordWriter accumulo record writer93 */94publicAccumuloVertexWriter(RecordWriter<Text, Mutation> recordWriter) {
95this.recordWriter = recordWriter;
96 }
9798/**99 * initialize100 *101 * @param context Context used to write the vertices.102 * @throws IOException103 */104publicvoid initialize(TaskAttemptContext context) throws IOException {
105this.context = context;
106 }
107108/**109 * close110 *111 * @param context the context of the task112 * @throws IOException113 * @throws InterruptedException114 */115publicvoid close(TaskAttemptContext context)
116throws IOException, InterruptedException {
117 recordWriter.close(context);
118 }
119120/**121 * Get the table record writer;122 *123 * @return Record writer to be used for writing.124 */125public RecordWriter<Text, Mutation> getRecordWriter() {
126return recordWriter;
127 }
128129/**130 * Get the context.131 *132 * @return Context passed to initialize.133 */134public TaskAttemptContext getContext() {
135return context;
136 }
137138 }
139/**140 *141 * checkOutputSpecs142 *143 * @param context information about the job144 * @throws IOException145 * @throws InterruptedException146 */147 @Override
148publicvoid checkOutputSpecs(JobContext context)
149throws IOException, InterruptedException {
150try {
151 accumuloOutputFormat.checkOutputSpecs(context);
152 } catch (IOException e) {
153if (e.getMessage().contains("Output info has not been set")) {
154thrownew IOException(e.getMessage() + " Make sure you initialized" +
155" AccumuloOutputFormat static setters " +
156"before passing the config to GiraphJob.");
157 }
158 }
159 }
160161/**162 * getOutputCommitter163 *164 * @param context the task context165 * @return OutputCommitter166 * @throws IOException167 * @throws InterruptedException168 */169 @Override
170public OutputCommitter getOutputCommitter(TaskAttemptContext context)
171throws IOException, InterruptedException {
172return accumuloOutputFormat.getOutputCommitter(context);
173 }
174 }