View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.io.accumulo.edgemarker;
19  
20  import java.io.IOException;
21  import org.apache.accumulo.core.data.Mutation;
22  import org.apache.accumulo.core.data.Value;
23  import org.apache.giraph.graph.Vertex;
24  import org.apache.giraph.io.VertexWriter;
25  import org.apache.giraph.io.accumulo.AccumuloVertexOutputFormat;
26  import org.apache.hadoop.io.Text;
27  import org.apache.hadoop.mapreduce.RecordWriter;
28  import org.apache.hadoop.mapreduce.TaskAttemptContext;
29  
30  /*
31   Example subclass for writing vertices back to Accumulo.
32   */
33  public class AccumuloEdgeOutputFormat
34      extends AccumuloVertexOutputFormat<Text, Text, Text> {
35  
36    public VertexWriter<Text, Text, Text>
37    createVertexWriter(TaskAttemptContext context)
38        throws IOException, InterruptedException {
39      RecordWriter<Text, Mutation> writer =
40          accumuloOutputFormat.getRecordWriter(context);
41      String tableName = getConf().get(OUTPUT_TABLE);
42      if(tableName == null)
43        throw new IOException("Forgot to set table name " +
44            "using AccumuloVertexOutputFormat.OUTPUT_TABLE");
45      return new AccumuloEdgeVertexWriter(writer, tableName);
46    }
47  
48    /*
49    Wraps RecordWriter for writing Mutations back to the configured Accumulo Table.
50     */
51    public static class AccumuloEdgeVertexWriter
52        extends AccumuloVertexWriter<Text, Text, Text> {
53  
54      private final Text CF = new Text("cf");
55      private final Text PARENT =  new Text("parent");
56      private Text tableName;
57  
58      public AccumuloEdgeVertexWriter(
59          RecordWriter<Text, Mutation> writer, String tableName) {
60        super(writer);
61        this.tableName = new Text(tableName);
62      }
63      /*
64       Write back a mutation that adds a qualifier for 'parent' containing the vertex value
65       as the cell value. Assume the vertex ID corresponds to a key.
66       */
67      public void writeVertex(Vertex<Text, Text, Text> vertex)
68          throws IOException, InterruptedException {
69        RecordWriter<Text, Mutation> writer = getRecordWriter();
70        Mutation mt = new Mutation(vertex.getId());
71        mt.put(CF, PARENT, new Value(
72            vertex.getValue().toString().getBytes()));
73        writer.write(tableName, mt);
74      }
75    }
76  }