This project has retired. For details please refer to its
Attic page.
JsonBase64VertexInputFormat xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.io.formats;
20
21 import com.google.common.collect.Lists;
22 import net.iharder.Base64;
23 import org.apache.giraph.edge.Edge;
24 import org.apache.giraph.edge.EdgeFactory;
25 import org.apache.hadoop.io.Text;
26 import org.apache.hadoop.io.Writable;
27 import org.apache.hadoop.io.WritableComparable;
28 import org.apache.hadoop.mapreduce.InputSplit;
29 import org.apache.hadoop.mapreduce.TaskAttemptContext;
30 import org.json.JSONArray;
31 import org.json.JSONException;
32 import org.json.JSONObject;
33
34 import java.io.ByteArrayInputStream;
35 import java.io.DataInput;
36 import java.io.DataInputStream;
37 import java.io.IOException;
38 import java.util.List;
39
40
41
42
43
44
45
46
47
48
49
50 @SuppressWarnings("rawtypes")
51 public class JsonBase64VertexInputFormat<I extends WritableComparable,
52 V extends Writable, E extends Writable>
53 extends TextVertexInputFormat<I, V, E> {
54
55 @Override
56 public TextVertexReader createVertexReader(InputSplit split,
57 TaskAttemptContext context) {
58 return new JsonBase64VertexReader();
59 }
60
61
62
63
64 protected class JsonBase64VertexReader extends
65 TextVertexReaderFromEachLineProcessed<JSONObject> {
66
67
68 @Override
69 public void initialize(InputSplit inputSplit, TaskAttemptContext context)
70 throws IOException, InterruptedException {
71 super.initialize(inputSplit, context);
72 }
73
74 @Override
75 protected JSONObject preprocessLine(Text line) {
76 try {
77 return new JSONObject(line.toString());
78 } catch (JSONException e) {
79 throw new IllegalArgumentException(
80 "next: Failed to get the vertex", e);
81 }
82 }
83
84 @Override
85 protected I getId(JSONObject vertexObject) throws IOException {
86 try {
87 byte[] decodedWritable = Base64.decode(
88 vertexObject.getString(JsonBase64VertexFormat.VERTEX_ID_KEY));
89 DataInput input = new DataInputStream(
90 new ByteArrayInputStream(decodedWritable));
91 I vertexId = getConf().createVertexId();
92 vertexId.readFields(input);
93 return vertexId;
94 } catch (JSONException e) {
95 throw new IllegalArgumentException(
96 "next: Failed to get vertex id", e);
97 }
98 }
99
100 @Override
101 protected V getValue(JSONObject vertexObject) throws IOException {
102 try {
103 byte[] decodedWritable = Base64.decode(
104 vertexObject.getString(JsonBase64VertexFormat.VERTEX_VALUE_KEY));
105 DataInputStream input = new DataInputStream(
106 new ByteArrayInputStream(decodedWritable));
107 V vertexValue = getConf().createVertexValue();
108 vertexValue.readFields(input);
109 return vertexValue;
110 } catch (JSONException e) {
111 throw new IllegalArgumentException(
112 "next: Failed to get vertex value", e);
113 }
114 }
115
116 @Override
117 protected Iterable<Edge<I, E>> getEdges(JSONObject vertexObject) throws
118 IOException {
119 JSONArray edgeArray = null;
120 try {
121 edgeArray = vertexObject.getJSONArray(
122 JsonBase64VertexFormat.EDGE_ARRAY_KEY);
123 } catch (JSONException e) {
124 throw new IllegalArgumentException(
125 "next: Failed to get edge array", e);
126 }
127 byte[] decodedWritable;
128 List<Edge<I, E>> edges = Lists.newArrayListWithCapacity(
129 edgeArray.length());
130 for (int i = 0; i < edgeArray.length(); ++i) {
131 try {
132 decodedWritable = Base64.decode(edgeArray.getString(i));
133 } catch (JSONException e) {
134 throw new IllegalArgumentException(
135 "next: Failed to get edge value", e);
136 }
137 DataInputStream input = new DataInputStream(
138 new ByteArrayInputStream(decodedWritable));
139 I targetVertexId = getConf().createVertexId();
140 targetVertexId.readFields(input);
141 E edgeValue = getConf().createEdgeValue();
142 edgeValue.readFields(input);
143 edges.add(EdgeFactory.create(targetVertexId, edgeValue));
144 }
145 return edges;
146 }
147
148 }
149
150 }