This project has retired. For details please refer to its Attic page.
GoraTestVertexOutputFormat xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.io.gora;
19  
20  import java.io.IOException;
21  import java.util.Iterator;
22  import java.util.Map;
23  
24  import org.apache.giraph.edge.Edge;
25  import org.apache.giraph.graph.Vertex;
26  import org.apache.giraph.io.VertexWriter;
27  import org.apache.giraph.io.gora.generated.GVertex;
28  import org.apache.giraph.io.gora.generated.GVertexResult;
29  import org.apache.gora.persistency.Persistent;
30  import org.apache.hadoop.io.DoubleWritable;
31  import org.apache.hadoop.io.FloatWritable;
32  import org.apache.hadoop.io.LongWritable;
33  import org.apache.hadoop.mapreduce.TaskAttemptContext;
34  import org.junit.Assert;
35  
36  /**
37   * Implementation of a specific reader for a generated data bean.
38   */
39  public class GoraTestVertexOutputFormat
40    extends GoraVertexOutputFormat<LongWritable, DoubleWritable,
41    FloatWritable> {
42  
43    /**
44     * DEfault constructor
45     */
46    public GoraTestVertexOutputFormat() {
47    }
48  
49    @Override
50    public VertexWriter<LongWritable, DoubleWritable, FloatWritable>
51    createVertexWriter(TaskAttemptContext context)
52      throws IOException, InterruptedException {
53      return new GoraGVertexVertexWriter();
54    }
55  
56    /**
57     * Gora vertex writer.
58     */
59    protected class GoraGVertexVertexWriter extends GoraVertexWriter {
60  
61      @Override
62      protected Persistent getGoraVertex(
63          Vertex<LongWritable, DoubleWritable, FloatWritable> vertex) {
64        GVertexResult tmpGVertex = new GVertexResult();
65        tmpGVertex.setVertexId(vertex.getId().toString());
66        tmpGVertex.setVertexValue(Float.parseFloat(vertex.getValue().toString()));
67        Iterator<Edge<LongWritable, FloatWritable>> it =
68            vertex.getEdges().iterator();
69        while (it.hasNext()) {
70          Edge<LongWritable, FloatWritable> edge = it.next();
71          tmpGVertex.getEdges().put(
72              edge.getTargetVertexId().toString(),
73              edge.getValue().toString());
74        }
75        getLogger().debug("GoraObject created: " + tmpGVertex.toString());
76        return tmpGVertex;
77      }
78  
79      @SuppressWarnings("unchecked")
80      @Override
81      public void writeVertex(
82        Vertex<LongWritable, DoubleWritable, FloatWritable> vertex)
83        throws IOException, InterruptedException {
84        super.writeVertex(vertex);
85        // Asserting
86        Assert.assertEquals(createVertex(vertex.getId().toString(), null),
87                getDataStore().get(vertex.getId().toString()));
88      }
89  
90      /**
91       * Creates a vertex using an id and a set of edges.
92       * @param id Vertex id.
93       * @param edges Set of edges.
94       * @return GVertex created.
95       */
96      public GVertexResult createVertex(String id, Map<String, String> edges) {
97        GVertexResult newVrtx = new GVertexResult();
98        newVrtx.setVertexId(id);
99        if (edges != null) {
100         for (String edgeId : edges.keySet())
101           newVrtx.getEdges().put(edgeId, edges.get(edgeId));
102       }
103       return newVrtx;
104     }
105 
106     @Override
107     protected Object getGoraKey(
108         Vertex<LongWritable, DoubleWritable, FloatWritable> vertex) {
109       String goraKey = String.valueOf(vertex.getId());
110       return goraKey;
111     }
112   }
113 }