1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.io.accumulo.edgemarker;
1920import org.apache.accumulo.core.data.Key;
21import org.apache.accumulo.core.data.Value;
22import org.apache.giraph.edge.Edge;
23import org.apache.giraph.edge.EdgeFactory;
24import org.apache.giraph.graph.Vertex;
25import org.apache.giraph.io.VertexReader;
26import org.apache.giraph.io.accumulo.AccumuloVertexInputFormat;
27import org.apache.hadoop.conf.Configuration;
28import org.apache.hadoop.io.Text;
29import org.apache.hadoop.mapreduce.InputSplit;
30import org.apache.hadoop.mapreduce.RecordReader;
31import org.apache.hadoop.mapreduce.TaskAttemptContext;
3233import com.google.common.collect.Lists;
3435import java.io.IOException;
36import java.util.List;
37import java.util.regex.Pattern;
3839/*40 Example subclass which reads in Key/Value pairs to construct vertex objects.41 */42publicclassAccumuloEdgeInputFormat43extends AccumuloVertexInputFormat<Text, Text, Text> {
44 @Override publicvoid checkInputSpecs(Configuration conf) { }
4546privatestaticfinal Text uselessEdgeValue = new Text();
47public VertexReader<Text, Text, Text>
48 createVertexReader(InputSplit split, TaskAttemptContext context)
49throws IOException {
50try {
5152returnnewAccumuloEdgeVertexReader(
53 accumuloInputFormat.createRecordReader(split, context)) {
54 };
55 } catch (InterruptedException e) {
56thrownew IOException(e);
57 }
5859 }
60/*61 Reader takes Key/Value pairs from the underlying input format.62 */63publicstaticclassAccumuloEdgeVertexReader64extends AccumuloVertexReader<Text, Text, Text> {
6566publicstaticfinal Pattern commaPattern = Pattern.compile("[,]");
6768publicAccumuloEdgeVertexReader(RecordReader<Key, Value> recordReader) {
69super(recordReader);
70 }
717273publicboolean nextVertex() throws IOException, InterruptedException {
74return getRecordReader().nextKeyValue();
75 }
7677/*78 Each Key/Value contains the information needed to construct the vertices.79 */80public Vertex<Text, Text, Text> getCurrentVertex()
81throws IOException, InterruptedException {
82 Key key = getRecordReader().getCurrentKey();
83 Value value = getRecordReader().getCurrentValue();
84 Vertex<Text, Text, Text> vertex =
85 getConfiguration().createVertex();
86 Text vertexId = key.getRow();
87 List<Edge<Text, Text>> edges = Lists.newLinkedList();
88 String edge = new String(value.get());
89 Text edgeId = new Text(edge);
90 edges.add(EdgeFactory.create(edgeId, uselessEdgeValue));
91 vertex.initialize(vertexId, new Text(), edges);
9293return vertex;
94 }
95 }
96 }