This project has retired. For details please refer to its
        
        Attic page.
      
 
GraphvizOutputFormat xref
1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  package org.apache.giraph.io.formats;
19  
20  import org.apache.giraph.edge.Edge;
21  import org.apache.giraph.graph.Vertex;
22  import org.apache.hadoop.fs.FSDataOutputStream;
23  import org.apache.hadoop.fs.FileSystem;
24  import org.apache.hadoop.fs.Path;
25  import org.apache.hadoop.io.NullWritable;
26  import org.apache.hadoop.io.Text;
27  import org.apache.hadoop.io.Writable;
28  import org.apache.hadoop.io.WritableComparable;
29  import org.apache.hadoop.mapreduce.JobContext;
30  import org.apache.hadoop.mapreduce.JobStatus;
31  import org.apache.hadoop.mapreduce.OutputCommitter;
32  import org.apache.hadoop.mapreduce.TaskAttemptContext;
33  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
34  
35  import java.io.IOException;
36  
37  
38  
39  
40  
41  
42  
43  public class GraphvizOutputFormat extends TextVertexOutputFormat<
44      WritableComparable, Writable, Writable> {
45    
46    private static final String NODE_TEXT_COLOR = "blue:orange";
47  
48    @Override
49    public TextVertexWriter createVertexWriter(TaskAttemptContext context)
50      throws IOException, InterruptedException {
51      return new VertexWriter();
52    }
53  
54    @Override
55    public OutputCommitter getOutputCommitter(TaskAttemptContext context)
56      throws IOException, InterruptedException {
57      return new GraphvizOutputCommitter(super.getOutputCommitter(context));
58    }
59  
60    
61  
62  
63  
64  
65  
66    private static Path getOutputDir(JobContext context) {
67      return FileOutputFormat.getOutputPath(context);
68    }
69  
70    
71  
72  
73  
74  
75  
76    private static Path getPathAtBeginning(JobContext context) {
77      return new Path(getOutputDir(context), "____" + System.currentTimeMillis());
78    }
79  
80    
81  
82  
83  
84  
85    private static Path getPathAtEnd(JobContext context) {
86      return new Path(getOutputDir(context), "zzz_" + System.currentTimeMillis());
87    }
88  
89    
90  
91  
92  
93  
94    private static void writeStart(JobContext context) throws IOException {
95      Path path = getPathAtBeginning(context);
96      FileSystem fs = path.getFileSystem(context.getConfiguration());
97      FSDataOutputStream file = fs.create(path, false);
98      file.writeBytes("digraph g {\n");
99      file.close();
100   }
101 
102   
103 
104 
105 
106 
107   private static void writeEnd(JobContext context) throws IOException {
108     Path path = getPathAtEnd(context);
109     FileSystem fs = path.getFileSystem(context.getConfiguration());
110     FSDataOutputStream file = fs.create(path, false);
111     file.writeBytes("}\n");
112     file.close();
113   }
114 
115   
116 
117 
118 
119 
120   private static void addNodeInfo(
121       Vertex<WritableComparable, Writable, Writable> vertex, StringBuilder sb) {
122     sb.append('"').append(vertex.getId()).append('"');
123     sb.append(" [").append("label=").append('"').append("<id> ");
124     sb.append(vertex.getId());
125     if (!(vertex.getValue() instanceof NullWritable)) {
126       sb.append("|").append(vertex.getValue());
127     }
128     sb.append('"').append(",shape=record,fillcolor=")
129         .append('"').append(NODE_TEXT_COLOR).append('"')
130         .append("];");
131   }
132 
133   
134 
135 
136 
137 
138 
139   private static void addEdge(StringBuilder sb, Writable sourceID,
140       Edge<WritableComparable, Writable> edge) {
141     sb.append(sourceID).append(":id")
142         .append(" -> ")
143         .append(edge.getTargetVertexId()).append(":id");
144     addEdgeInfo(sb, edge);
145     sb.append("\n");
146   }
147 
148   
149 
150 
151 
152 
153   private static void addEdgeInfo(StringBuilder sb,
154     Edge<WritableComparable, Writable> edge) {
155     if (!(edge.getValue() instanceof NullWritable)) {
156       sb.append(" [label=").append(edge.getValue()).append(" ];");
157     }
158   }
159 
160   
161 
162 
163   private static class GraphvizOutputCommitter extends OutputCommitter {
164     
165     private final OutputCommitter delegate;
166 
167     
168 
169 
170 
171     private GraphvizOutputCommitter(OutputCommitter delegate) {
172       this.delegate = delegate;
173     }
174 
175     @Override public boolean equals(Object o) {
176       return delegate.equals(o);
177     }
178 
179     @Override public String toString() {
180       return delegate.toString();
181     }
182 
183     @Override public int hashCode() {
184       return delegate.hashCode();
185     }
186 
187     @Override public void abortJob(JobContext jobContext, JobStatus.State state)
188       throws IOException {
189       delegate.abortJob(jobContext, state);
190     }
191 
192     @Override public void abortTask(TaskAttemptContext taskContext)
193       throws IOException {
194       delegate.abortTask(taskContext);
195     }
196 
197     @Override @Deprecated public void cleanupJob(JobContext context)
198       throws IOException {
199       delegate.cleanupJob(context);
200     }
201 
202     @Override public void commitJob(JobContext jobContext) throws IOException {
203       writeEnd(jobContext);
204       delegate.commitJob(jobContext);
205     }
206 
207     @Override public void commitTask(TaskAttemptContext taskContext)
208       throws IOException {
209       delegate.commitTask(taskContext);
210     }
211 
212     @Override public boolean needsTaskCommit(TaskAttemptContext taskContext)
213       throws IOException {
214       return delegate.needsTaskCommit(taskContext);
215     }
216 
217     @Override public void setupJob(JobContext jobContext) throws IOException {
218       delegate.setupJob(jobContext);
219       writeStart(jobContext);
220     }
221 
222     @Override public void setupTask(TaskAttemptContext taskContext)
223       throws IOException {
224       delegate.setupTask(taskContext);
225     }
226   }
227 
228   
229 
230 
231   private class VertexWriter extends TextVertexWriter {
232     @Override
233     public void writeVertex(
234       Vertex<WritableComparable, Writable, Writable> vertex)
235       throws IOException, InterruptedException {
236       StringBuilder sb = new StringBuilder(vertex.getNumEdges() * 10);
237       for (Edge<WritableComparable, Writable> edge : vertex.getEdges()) {
238         addEdge(sb, vertex.getId(), edge);
239       }
240       addNodeInfo(vertex, sb);
241       getRecordWriter().write(new Text(sb.toString()), null);
242     }
243   }
244 }