1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.io.gora;
1920import java.io.IOException;
2122import org.apache.giraph.edge.Edge;
23import org.apache.giraph.edge.EdgeFactory;
24import org.apache.giraph.io.gora.generated.GEdge;
25import org.apache.hadoop.io.FloatWritable;
26import org.apache.hadoop.io.LongWritable;
27import org.apache.hadoop.mapreduce.InputSplit;
28import org.apache.hadoop.mapreduce.TaskAttemptContext;
2930/**31 * Implementation of a specific reader for a generated data bean.32 */33publicclassGoraTestEdgeInputFormat34extends GoraEdgeInputFormat<LongWritable, FloatWritable> {
3536/**37 * Default constructor38 */39publicGoraTestEdgeInputFormat() {
40 }
4142/**43 * Creates specific vertex reader to be used inside Hadoop.44 * @param split split to be read.45 * @param context JobContext to be used.46 * @return GoraEdgeReader Edge reader to be used by Hadoop.47 */48 @Override
49public GoraEdgeReader createEdgeReader(
50 InputSplit split, TaskAttemptContext context) throws IOException {
51 putArtificialData();
52returnnewGoraGEdgeEdgeReader();
53 }
5455/**56 * Writes data into the data store in order to test it out.57 */58 @SuppressWarnings("unchecked")
59privatestaticvoid putArtificialData() {
60 getDataStore().put("11-22",
61 createEdge("11-22", "11", "22", "11-22", (float)(11+22)));
62 getDataStore().put("22-11",
63 createEdge("22-11", "22", "11", "22-11", (float)(22+11)));
64 getDataStore().put("11-33",
65 createEdge("11-33", "11", "33", "11-33", (float)(11+33)));
66 getDataStore().put("33-11",
67 createEdge("33-11", "33", "11", "33-11", (float)(33+11)));
68 getDataStore().flush();
69 }
7071/**72 * Creates an edge using an id and a set of edges.73 * @param id Vertex id.74 * @param vertexInId Vertex source Id.75 * @param vertexOutId Vertex destination Id.76 * @param edgeLabel Edge label.77 * @param edgeWeight Edge wight.78 * @return GEdge created.79 */80privatestatic GEdge createEdge(String id, String vertexInId,
81 String vertexOutId, String edgeLabel, float edgeWeight) {
82 GEdge newEdge = new GEdge();
83 newEdge.setEdgeId(id);
84 newEdge.setVertexInId(vertexInId);
85 newEdge.setVertexOutId(vertexOutId);
86 newEdge.setLabel(edgeLabel);
87 newEdge.setEdgeWeight(edgeWeight);
88return newEdge;
89 }
9091/**92 * Gora edge reader93 */94protectedclassGoraGEdgeEdgeReaderextends GoraEdgeReader {
9596/** source vertex of the edge */97private LongWritable sourceId;
9899/**100 * Transforms a GoraObject into an Edge object.101 * @param goraObject Object from Gora to be translated.102 * @return Edge Result from transforming the gora object.103 */104 @Override
105protected Edge<LongWritable, FloatWritable> transformEdge
106 (Object goraObject) {
107 Edge<LongWritable, FloatWritable> edge = null;
108 GEdge goraEdge = (GEdge) goraObject;
109 Long dest;
110 Float value;
111 dest = Long.valueOf(goraEdge.getVertexOutId().toString());
112this.sourceId = new LongWritable();
113this.sourceId.set(Long.valueOf(goraEdge.getVertexInId().toString()));
114 value = (float) goraEdge.getEdgeWeight();
115 edge = EdgeFactory.create(new LongWritable(dest),
116new FloatWritable(value));
117return edge;
118 }
119120/**121 * Gets the currentSourceId for the edge.122 * @return LongWritable currentSourceId for the edge.123 */124 @Override
125public LongWritable getCurrentSourceId() throws IOException,
126 InterruptedException {
127returnthis.sourceId;
128 }
129 }
130 }