1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.io.accumulo.edgemarker;
1920import java.io.IOException;
21import org.apache.accumulo.core.data.Mutation;
22import org.apache.accumulo.core.data.Value;
23import org.apache.giraph.graph.Vertex;
24import org.apache.giraph.io.VertexWriter;
25import org.apache.giraph.io.accumulo.AccumuloVertexOutputFormat;
26import org.apache.hadoop.io.Text;
27import org.apache.hadoop.mapreduce.RecordWriter;
28import org.apache.hadoop.mapreduce.TaskAttemptContext;
2930/*31 Example subclass for writing vertices back to Accumulo.32 */33publicclassAccumuloEdgeOutputFormat34extends AccumuloVertexOutputFormat<Text, Text, Text> {
3536public VertexWriter<Text, Text, Text>
37 createVertexWriter(TaskAttemptContext context)
38throws IOException, InterruptedException {
39 RecordWriter<Text, Mutation> writer =
40 accumuloOutputFormat.getRecordWriter(context);
41 String tableName = getConf().get(OUTPUT_TABLE);
42if(tableName == null)
43thrownew IOException("Forgot to set table name " +
44"using AccumuloVertexOutputFormat.OUTPUT_TABLE");
45returnnewAccumuloEdgeVertexWriter(writer, tableName);
46 }
4748/*49 Wraps RecordWriter for writing Mutations back to the configured Accumulo Table.50 */51publicstaticclassAccumuloEdgeVertexWriter52extends AccumuloVertexWriter<Text, Text, Text> {
5354privatefinal Text CF = new Text("cf");
55privatefinal Text PARENT = new Text("parent");
56private Text tableName;
5758publicAccumuloEdgeVertexWriter(
59 RecordWriter<Text, Mutation> writer, String tableName) {
60super(writer);
61this.tableName = new Text(tableName);
62 }
63/*64 Write back a mutation that adds a qualifier for 'parent' containing the vertex value65 as the cell value. Assume the vertex ID corresponds to a key.66 */67publicvoid writeVertex(Vertex<Text, Text, Text> vertex)
68throws IOException, InterruptedException {
69 RecordWriter<Text, Mutation> writer = getRecordWriter();
70 Mutation mt = new Mutation(vertex.getId());
71 mt.put(CF, PARENT, new Value(
72 vertex.getValue().toString().getBytes()));
73 writer.write(tableName, mt);
74 }
75 }
76 }