1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.io.gora;
1920import java.io.IOException;
21import java.util.Iterator;
22import java.util.Map;
2324import org.apache.giraph.edge.Edge;
25import org.apache.giraph.graph.Vertex;
26import org.apache.giraph.io.VertexWriter;
27import org.apache.giraph.io.gora.generated.GVertex;
28import org.apache.giraph.io.gora.generated.GVertexResult;
29import org.apache.gora.persistency.Persistent;
30import org.apache.hadoop.io.DoubleWritable;
31import org.apache.hadoop.io.FloatWritable;
32import org.apache.hadoop.io.LongWritable;
33import org.apache.hadoop.mapreduce.TaskAttemptContext;
34import org.junit.Assert;
3536/**37 * Implementation of a specific reader for a generated data bean.38 */39publicclassGoraTestVertexOutputFormat40extends GoraVertexOutputFormat<LongWritable, DoubleWritable,
41 FloatWritable> {
4243/**44 * DEfault constructor45 */46publicGoraTestVertexOutputFormat() {
47 }
4849 @Override
50public VertexWriter<LongWritable, DoubleWritable, FloatWritable>
51 createVertexWriter(TaskAttemptContext context)
52throws IOException, InterruptedException {
53returnnewGoraGVertexVertexWriter();
54 }
5556/**57 * Gora vertex writer.58 */59protectedclassGoraGVertexVertexWriterextends GoraVertexWriter {
6061 @Override
62protected Persistent getGoraVertex(
63 Vertex<LongWritable, DoubleWritable, FloatWritable> vertex) {
64 GVertexResult tmpGVertex = new GVertexResult();
65 tmpGVertex.setVertexId(vertex.getId().toString());
66 tmpGVertex.setVertexValue(Float.parseFloat(vertex.getValue().toString()));
67 Iterator<Edge<LongWritable, FloatWritable>> it =
68 vertex.getEdges().iterator();
69while (it.hasNext()) {
70 Edge<LongWritable, FloatWritable> edge = it.next();
71 tmpGVertex.getEdges().put(
72 edge.getTargetVertexId().toString(),
73 edge.getValue().toString());
74 }
75 getLogger().debug("GoraObject created: " + tmpGVertex.toString());
76return tmpGVertex;
77 }
7879 @SuppressWarnings("unchecked")
80 @Override
81publicvoid writeVertex(
82 Vertex<LongWritable, DoubleWritable, FloatWritable> vertex)
83throws IOException, InterruptedException {
84super.writeVertex(vertex);
85// Asserting86 Assert.assertEquals(createVertex(vertex.getId().toString(), null),
87 getDataStore().get(vertex.getId().toString()));
88 }
8990/**91 * Creates a vertex using an id and a set of edges.92 * @param id Vertex id.93 * @param edges Set of edges.94 * @return GVertex created.95 */96public GVertexResult createVertex(String id, Map<String, String> edges) {
97 GVertexResult newVrtx = new GVertexResult();
98 newVrtx.setVertexId(id);
99if (edges != null) {
100for (String edgeId : edges.keySet())
101 newVrtx.getEdges().put(edgeId, edges.get(edgeId));
102 }
103return newVrtx;
104 }
105106 @Override
107protected Object getGoraKey(
108 Vertex<LongWritable, DoubleWritable, FloatWritable> vertex) {
109 String goraKey = String.valueOf(vertex.getId());
110return goraKey;
111 }
112 }
113 }