1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.io.accumulo;
1920import java.io.IOException;
21import java.util.List;
22import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
23import org.apache.accumulo.core.data.Key;
24import org.apache.accumulo.core.data.Value;
25import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
26import org.apache.giraph.io.VertexInputFormat;
27import org.apache.giraph.io.VertexReader;
28import org.apache.hadoop.io.Writable;
29import org.apache.hadoop.io.WritableComparable;
30import org.apache.hadoop.mapreduce.InputSplit;
31import org.apache.hadoop.mapreduce.JobContext;
32import org.apache.hadoop.mapreduce.RecordReader;
33import org.apache.hadoop.mapreduce.TaskAttemptContext;
3435/**36 * Class which wraps the AccumuloInputFormat. It's designed37 * as an extension point to VertexInputFormat subclasses who wish38 * to read from AccumuloTables.39 *40 * Works with41 * {@link AccumuloVertexOutputFormat}42 *43 * @param <I> vertex id type44 * @param <V> vertex value type45 * @param <E> edge type46 */47publicabstractclass AccumuloVertexInputFormat<
48 I extends WritableComparable,
49 V extends Writable,
50 E extends Writable>
51extends VertexInputFormat<I, V, E> {
52/**53 * delegate input format for all accumulo operations.54 */55protected AccumuloInputFormat accumuloInputFormat =
56new AccumuloInputFormat();
5758/**59 * Abstract class which provides a template for instantiating vertices60 * from Accumulo Key/Value pairs.61 *62 * @param <I> vertex id type63 * @param <V> vertex value type64 * @param <E> edge type65 */66publicabstractstaticclass AccumuloVertexReader<
67 I extends WritableComparable,
68 V extends Writable, E extends Writable>
69extends VertexReader<I, V, E> {
7071/** Giraph configuration */72private ImmutableClassesGiraphConfiguration<I, V, E>
73 configuration;
74/**75 * Used by subclasses to read key/value pairs.76 */77privatefinal RecordReader<Key, Value> reader;
78/** Context passed to initialize */79private TaskAttemptContext context;
8081/**82 * Constructor used to pass Record Reader instance83 * @param reader Accumulo record reader84 */85publicAccumuloVertexReader(RecordReader<Key, Value> reader) {
86this.reader = reader;
87 }
8889public ImmutableClassesGiraphConfiguration<I, V, E>
90 getConfiguration() {
91return configuration;
92 }
9394/**95 * initialize the reader.96 *97 * @param inputSplit Input split to be used for reading vertices.98 * @param context Context from the task.99 * @throws IOException100 * @throws InterruptedException101 */102publicvoid initialize(InputSplit inputSplit,
103 TaskAttemptContext context)
104throws IOException, InterruptedException {
105 reader.initialize(inputSplit, context);
106this.context = context;
107this.configuration =
108new ImmutableClassesGiraphConfiguration<I, V, E>(
109 context.getConfiguration());
110 }
111112/**113 * close114 *115 * @throws IOException116 */117publicvoid close() throws IOException {
118 reader.close();
119 }
120121/**122 * getProgress123 *124 * @return progress125 * @throws IOException126 * @throws InterruptedException127 */128publicfloat getProgress() throws IOException, InterruptedException {
129return reader.getProgress();
130 }
131132/**133 * Get the result record reader134 *135 * @return Record reader to be used for reading.136 */137protected RecordReader<Key, Value> getRecordReader() {
138return reader;
139 }
140141/**142 * getContext143 *144 * @return Context passed to initialize.145 */146protected TaskAttemptContext getContext() {
147return context;
148 }
149150 }
151152 @Override
153public List<InputSplit> getSplits(
154 JobContext context, int minSplitCountHint)
155throws IOException, InterruptedException {
156 List<InputSplit> splits = null;
157try {
158 splits = accumuloInputFormat.getSplits(context);
159 } catch (IOException e) {
160if (e.getMessage().contains("Input info has not been set")) {
161thrownew IOException(e.getMessage() +
162" Make sure you initialized" +
163" AccumuloInputFormat static setters " +
164"before passing the config to GiraphJob.");
165 }
166 }
167return splits;
168 }
169 }