This project has retired. For details please refer to its
Attic page.
GraphvizOutputFormat xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.io.formats;
19
20 import org.apache.giraph.edge.Edge;
21 import org.apache.giraph.graph.Vertex;
22 import org.apache.hadoop.fs.FSDataOutputStream;
23 import org.apache.hadoop.fs.FileSystem;
24 import org.apache.hadoop.fs.Path;
25 import org.apache.hadoop.io.NullWritable;
26 import org.apache.hadoop.io.Text;
27 import org.apache.hadoop.io.Writable;
28 import org.apache.hadoop.io.WritableComparable;
29 import org.apache.hadoop.mapreduce.JobContext;
30 import org.apache.hadoop.mapreduce.JobStatus;
31 import org.apache.hadoop.mapreduce.OutputCommitter;
32 import org.apache.hadoop.mapreduce.TaskAttemptContext;
33 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
34
35 import java.io.IOException;
36
37
38
39
40
41
42
43 public class GraphvizOutputFormat extends TextVertexOutputFormat<
44 WritableComparable, Writable, Writable> {
45
46 private static final String NODE_TEXT_COLOR = "blue:orange";
47
48 @Override
49 public TextVertexWriter createVertexWriter(TaskAttemptContext context)
50 throws IOException, InterruptedException {
51 return new VertexWriter();
52 }
53
54 @Override
55 public OutputCommitter getOutputCommitter(TaskAttemptContext context)
56 throws IOException, InterruptedException {
57 return new GraphvizOutputCommitter(super.getOutputCommitter(context));
58 }
59
60
61
62
63
64
65
66 private static Path getOutputDir(JobContext context) {
67 return FileOutputFormat.getOutputPath(context);
68 }
69
70
71
72
73
74
75
76 private static Path getPathAtBeginning(JobContext context) {
77 return new Path(getOutputDir(context), "____" + System.currentTimeMillis());
78 }
79
80
81
82
83
84
85 private static Path getPathAtEnd(JobContext context) {
86 return new Path(getOutputDir(context), "zzz_" + System.currentTimeMillis());
87 }
88
89
90
91
92
93
94 private static void writeStart(JobContext context) throws IOException {
95 Path path = getPathAtBeginning(context);
96 FileSystem fs = path.getFileSystem(context.getConfiguration());
97 FSDataOutputStream file = fs.create(path, false);
98 file.writeBytes("digraph g {\n");
99 file.close();
100 }
101
102
103
104
105
106
107 private static void writeEnd(JobContext context) throws IOException {
108 Path path = getPathAtEnd(context);
109 FileSystem fs = path.getFileSystem(context.getConfiguration());
110 FSDataOutputStream file = fs.create(path, false);
111 file.writeBytes("}\n");
112 file.close();
113 }
114
115
116
117
118
119
120 private static void addNodeInfo(
121 Vertex<WritableComparable, Writable, Writable> vertex, StringBuilder sb) {
122 sb.append('"').append(vertex.getId()).append('"');
123 sb.append(" [").append("label=").append('"').append("<id> ");
124 sb.append(vertex.getId());
125 if (!(vertex.getValue() instanceof NullWritable)) {
126 sb.append("|").append(vertex.getValue());
127 }
128 sb.append('"').append(",shape=record,fillcolor=")
129 .append('"').append(NODE_TEXT_COLOR).append('"')
130 .append("];");
131 }
132
133
134
135
136
137
138
139 private static void addEdge(StringBuilder sb, Writable sourceID,
140 Edge<WritableComparable, Writable> edge) {
141 sb.append(sourceID).append(":id")
142 .append(" -> ")
143 .append(edge.getTargetVertexId()).append(":id");
144 addEdgeInfo(sb, edge);
145 sb.append("\n");
146 }
147
148
149
150
151
152
153 private static void addEdgeInfo(StringBuilder sb,
154 Edge<WritableComparable, Writable> edge) {
155 if (!(edge.getValue() instanceof NullWritable)) {
156 sb.append(" [label=").append(edge.getValue()).append(" ];");
157 }
158 }
159
160
161
162
163 private static class GraphvizOutputCommitter extends OutputCommitter {
164
165 private final OutputCommitter delegate;
166
167
168
169
170
171 private GraphvizOutputCommitter(OutputCommitter delegate) {
172 this.delegate = delegate;
173 }
174
175 @Override public boolean equals(Object o) {
176 return delegate.equals(o);
177 }
178
179 @Override public String toString() {
180 return delegate.toString();
181 }
182
183 @Override public int hashCode() {
184 return delegate.hashCode();
185 }
186
187 @Override public void abortJob(JobContext jobContext, JobStatus.State state)
188 throws IOException {
189 delegate.abortJob(jobContext, state);
190 }
191
192 @Override public void abortTask(TaskAttemptContext taskContext)
193 throws IOException {
194 delegate.abortTask(taskContext);
195 }
196
197 @Override @Deprecated public void cleanupJob(JobContext context)
198 throws IOException {
199 delegate.cleanupJob(context);
200 }
201
202 @Override public void commitJob(JobContext jobContext) throws IOException {
203 writeEnd(jobContext);
204 delegate.commitJob(jobContext);
205 }
206
207 @Override public void commitTask(TaskAttemptContext taskContext)
208 throws IOException {
209 delegate.commitTask(taskContext);
210 }
211
212 @Override public boolean needsTaskCommit(TaskAttemptContext taskContext)
213 throws IOException {
214 return delegate.needsTaskCommit(taskContext);
215 }
216
217 @Override public void setupJob(JobContext jobContext) throws IOException {
218 delegate.setupJob(jobContext);
219 writeStart(jobContext);
220 }
221
222 @Override public void setupTask(TaskAttemptContext taskContext)
223 throws IOException {
224 delegate.setupTask(taskContext);
225 }
226 }
227
228
229
230
231 private class VertexWriter extends TextVertexWriter {
232 @Override
233 public void writeVertex(
234 Vertex<WritableComparable, Writable, Writable> vertex)
235 throws IOException, InterruptedException {
236 StringBuilder sb = new StringBuilder(vertex.getNumEdges() * 10);
237 for (Edge<WritableComparable, Writable> edge : vertex.getEdges()) {
238 addEdge(sb, vertex.getId(), edge);
239 }
240 addNodeInfo(vertex, sb);
241 getRecordWriter().write(new Text(sb.toString()), null);
242 }
243 }
244 }