This project has retired. For details please refer to its Attic page.
JsonBase64VertexInputFormat xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.io.formats;
20  
21  import com.google.common.collect.Lists;
22  import net.iharder.Base64;
23  import org.apache.giraph.edge.Edge;
24  import org.apache.giraph.edge.EdgeFactory;
25  import org.apache.hadoop.io.Text;
26  import org.apache.hadoop.io.Writable;
27  import org.apache.hadoop.io.WritableComparable;
28  import org.apache.hadoop.mapreduce.InputSplit;
29  import org.apache.hadoop.mapreduce.TaskAttemptContext;
30  import org.json.JSONArray;
31  import org.json.JSONException;
32  import org.json.JSONObject;
33  
34  import java.io.ByteArrayInputStream;
35  import java.io.DataInput;
36  import java.io.DataInputStream;
37  import java.io.IOException;
38  import java.util.List;
39  
40  /**
41   * Simple way to represent the structure of the graph with a JSON object.
42   * The actual vertex ids, values, edges are stored by the
43   * Writable serialized bytes that are Byte64 encoded.
44   * Works with {@link JsonBase64VertexOutputFormat}
45   *
46   * @param <I> Vertex index value
47   * @param <V> Vertex value
48   * @param <E> Edge value
49   */
50  @SuppressWarnings("rawtypes")
51  public class JsonBase64VertexInputFormat<I extends WritableComparable,
52      V extends Writable, E extends Writable>
53      extends TextVertexInputFormat<I, V, E> {
54  
55    @Override
56    public TextVertexReader createVertexReader(InputSplit split,
57        TaskAttemptContext context) {
58      return new JsonBase64VertexReader();
59    }
60  
61    /**
62     * Simple reader that supports {@link JsonBase64VertexInputFormat}
63     */
64    protected class JsonBase64VertexReader extends
65      TextVertexReaderFromEachLineProcessed<JSONObject> {
66  
67  
68      @Override
69      public void initialize(InputSplit inputSplit, TaskAttemptContext context)
70        throws IOException, InterruptedException {
71        super.initialize(inputSplit, context);
72      }
73  
74      @Override
75      protected JSONObject preprocessLine(Text line) {
76        try {
77          return new JSONObject(line.toString());
78        } catch (JSONException e) {
79          throw new IllegalArgumentException(
80            "next: Failed to get the vertex", e);
81        }
82      }
83  
84      @Override
85      protected I getId(JSONObject vertexObject) throws IOException {
86        try {
87          byte[] decodedWritable = Base64.decode(
88              vertexObject.getString(JsonBase64VertexFormat.VERTEX_ID_KEY));
89          DataInput input = new DataInputStream(
90              new ByteArrayInputStream(decodedWritable));
91          I vertexId = getConf().createVertexId();
92          vertexId.readFields(input);
93          return vertexId;
94        } catch (JSONException e) {
95          throw new IllegalArgumentException(
96            "next: Failed to get vertex id", e);
97        }
98      }
99  
100     @Override
101     protected V getValue(JSONObject vertexObject) throws IOException {
102       try {
103         byte[] decodedWritable = Base64.decode(
104             vertexObject.getString(JsonBase64VertexFormat.VERTEX_VALUE_KEY));
105         DataInputStream input = new DataInputStream(
106             new ByteArrayInputStream(decodedWritable));
107         V vertexValue = getConf().createVertexValue();
108         vertexValue.readFields(input);
109         return vertexValue;
110       } catch (JSONException e) {
111         throw new IllegalArgumentException(
112           "next: Failed to get vertex value", e);
113       }
114     }
115 
116     @Override
117     protected Iterable<Edge<I, E>> getEdges(JSONObject vertexObject) throws
118     IOException {
119       JSONArray edgeArray = null;
120       try {
121         edgeArray = vertexObject.getJSONArray(
122           JsonBase64VertexFormat.EDGE_ARRAY_KEY);
123       } catch (JSONException e) {
124         throw new IllegalArgumentException(
125           "next: Failed to get edge array", e);
126       }
127       byte[] decodedWritable;
128       List<Edge<I, E>> edges = Lists.newArrayListWithCapacity(
129           edgeArray.length());
130       for (int i = 0; i < edgeArray.length(); ++i) {
131         try {
132           decodedWritable = Base64.decode(edgeArray.getString(i));
133         } catch (JSONException e) {
134           throw new IllegalArgumentException(
135             "next: Failed to get edge value", e);
136         }
137         DataInputStream input = new DataInputStream(
138             new ByteArrayInputStream(decodedWritable));
139         I targetVertexId = getConf().createVertexId();
140         targetVertexId.readFields(input);
141         E edgeValue = getConf().createEdgeValue();
142         edgeValue.readFields(input);
143         edges.add(EdgeFactory.create(targetVertexId, edgeValue));
144       }
145       return edges;
146     }
147 
148   }
149 
150 }