This project has retired. For details please refer to its Attic page.
GoraTestVertexInputFormat xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.io.gora;
19  
20  import java.io.IOException;
21  import java.util.Map;
22  import java.util.Set;
23  
24  import org.apache.giraph.edge.Edge;
25  import org.apache.giraph.edge.EdgeFactory;
26  import org.apache.giraph.graph.Vertex;
27  import org.apache.giraph.io.gora.generated.GVertex;
28  import org.apache.hadoop.io.DoubleWritable;
29  import org.apache.hadoop.io.FloatWritable;
30  import org.apache.hadoop.io.LongWritable;
31  import org.apache.hadoop.mapreduce.InputSplit;
32  import org.apache.hadoop.mapreduce.TaskAttemptContext;
33  
34  /**
35   * Implementation of a specific reader for a generated data bean.
36   */
37  public class GoraTestVertexInputFormat
38    extends GoraVertexInputFormat<LongWritable, DoubleWritable,
39            FloatWritable> {
40  
41    /**
42     * DEfault constructor
43     */
44    public GoraTestVertexInputFormat() {
45    }
46  
47    /**
48     * Creates specific vertex reader to be used inside Hadoop.
49     * @param split split to be read.
50     * @param context JobContext to be used.
51     * @return GoraVertexReader Vertex reader to be used by Hadoop.
52     */
53    @Override
54    public GoraVertexReader createVertexReader(
55        InputSplit split, TaskAttemptContext context) throws IOException {
56      putArtificialData();
57      return new GoraGVertexVertexReader();
58    }
59  
60    /**
61     * Writes data into the data store in order to test it out.
62     */
63    @SuppressWarnings("unchecked")
64    private static void putArtificialData() {
65      getDataStore().put("1", createVertex("1", null));
66      getDataStore().put("10", createVertex("10", null));
67      getDataStore().put("100", createVertex("100", null));
68      getDataStore().flush();
69    }
70  
71    /**
72     * Creates a vertex using an id and a set of edges.
73     * @param id Vertex id.
74     * @param edges Set of edges.
75     * @return GVertex created.
76     */
77    public static GVertex createVertex(String id, Map<String, String> edges) {
78      GVertex newVrtx = new GVertex();
79      newVrtx.setVertexId(id);
80      if (edges != null) {
81        for (String edgeId : edges.keySet())
82          newVrtx.getEdges().put(edgeId, edges.get(edgeId));
83      }
84      return newVrtx;
85    }
86  
87    /**
88     * Gora vertex reader
89     */
90    protected class GoraGVertexVertexReader extends GoraVertexReader {
91  
92      /**
93       * Transforms a GoraObject into a Vertex object.
94       * @param goraObject Object from Gora to be translated.
95       * @return Vertex Result from transforming the gora object.
96       */
97      @Override
98      protected Vertex<LongWritable, DoubleWritable, FloatWritable>
99      transformVertex(Object goraObject) {
100       Vertex<LongWritable, DoubleWritable, FloatWritable> vertex;
101       /* create the actual vertex */
102       vertex = getConf().createVertex();
103       GVertex tmpGVertex = (GVertex) goraObject;
104 
105       LongWritable vrtxId = new LongWritable(
106           Long.parseLong(tmpGVertex.getVertexId().toString()));
107       DoubleWritable vrtxValue = new DoubleWritable(tmpGVertex.getVertexValue());
108       vertex.initialize(vrtxId, vrtxValue);
109       if (tmpGVertex.getEdges() != null && !tmpGVertex.getEdges().isEmpty()) {
110         Set<CharSequence> keyIt = tmpGVertex.getEdges().keySet();
111         for (CharSequence key : keyIt) {
112           String keyVal = key.toString();
113           String valVal = tmpGVertex.getEdges().get(key).toString();
114           Edge<LongWritable, FloatWritable> edge;
115           if (!keyVal.contains("vertexId")) {
116             edge = EdgeFactory.create(
117                 new LongWritable(Long.parseLong(keyVal)),
118                 new FloatWritable(Float.parseFloat(valVal)));
119             vertex.addEdge(edge);
120           }
121         }
122       }
123       return vertex;
124     }
125   }
126 }