This project has retired. For details please refer to its Attic page.
GraphvizOutputFormat xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.io.formats;
19  
20  import org.apache.giraph.edge.Edge;
21  import org.apache.giraph.graph.Vertex;
22  import org.apache.hadoop.fs.FSDataOutputStream;
23  import org.apache.hadoop.fs.FileSystem;
24  import org.apache.hadoop.fs.Path;
25  import org.apache.hadoop.io.NullWritable;
26  import org.apache.hadoop.io.Text;
27  import org.apache.hadoop.io.Writable;
28  import org.apache.hadoop.io.WritableComparable;
29  import org.apache.hadoop.mapreduce.JobContext;
30  import org.apache.hadoop.mapreduce.JobStatus;
31  import org.apache.hadoop.mapreduce.OutputCommitter;
32  import org.apache.hadoop.mapreduce.TaskAttemptContext;
33  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
34  
35  import java.io.IOException;
36  
37  /**
38   * Writes graph to a dot file (graphviz format). This writes it out in parts. At
39   * the end of the job you can use the following to get a single graphviz file:
40   *
41   * hadoop fs -getmerge /hadoop/output/path data.txt
42   */
43  public class GraphvizOutputFormat extends TextVertexOutputFormat<
44      WritableComparable, Writable, Writable> {
45    /** Color of node text */
46    private static final String NODE_TEXT_COLOR = "blue:orange";
47  
48    @Override
49    public TextVertexWriter createVertexWriter(TaskAttemptContext context)
50      throws IOException, InterruptedException {
51      return new VertexWriter();
52    }
53  
54    @Override
55    public OutputCommitter getOutputCommitter(TaskAttemptContext context)
56      throws IOException, InterruptedException {
57      return new GraphvizOutputCommitter(super.getOutputCommitter(context));
58    }
59  
60    /**
61     * Get output directory job is using
62     * @param context job context
63     * @return Path for output directory
64     */
65  
66    private static Path getOutputDir(JobContext context) {
67      return FileOutputFormat.getOutputPath(context);
68    }
69  
70    /**
71     * Get path which sorts at the beginning of a directory
72     * @param context job context
73     * @return Path at beginning
74     */
75  
76    private static Path getPathAtBeginning(JobContext context) {
77      return new Path(getOutputDir(context), "____" + System.currentTimeMillis());
78    }
79  
80    /**
81     * Get path which sorts at the end of a directory
82     * @param context job context
83     * @return Path at end
84     */
85    private static Path getPathAtEnd(JobContext context) {
86      return new Path(getOutputDir(context), "zzz_" + System.currentTimeMillis());
87    }
88  
89    /**
90     * Write start of graph data
91     * @param context task attempt context
92     * @throws IOException I/O errors
93     */
94    private static void writeStart(JobContext context) throws IOException {
95      Path path = getPathAtBeginning(context);
96      FileSystem fs = path.getFileSystem(context.getConfiguration());
97      FSDataOutputStream file = fs.create(path, false);
98      file.writeBytes("digraph g {\n");
99      file.close();
100   }
101 
102   /**
103    * Write end of graph data
104    * @param context job attempt context
105    * @throws IOException I/O errors
106    */
107   private static void writeEnd(JobContext context) throws IOException {
108     Path path = getPathAtEnd(context);
109     FileSystem fs = path.getFileSystem(context.getConfiguration());
110     FSDataOutputStream file = fs.create(path, false);
111     file.writeBytes("}\n");
112     file.close();
113   }
114 
115   /**
116    * Add node information to output
117    * @param vertex node
118    * @param sb string builder
119    */
120   private static void addNodeInfo(
121       Vertex<WritableComparable, Writable, Writable> vertex, StringBuilder sb) {
122     sb.append('"').append(vertex.getId()).append('"');
123     sb.append(" [").append("label=").append('"').append("<id> ");
124     sb.append(vertex.getId());
125     if (!(vertex.getValue() instanceof NullWritable)) {
126       sb.append("|").append(vertex.getValue());
127     }
128     sb.append('"').append(",shape=record,fillcolor=")
129         .append('"').append(NODE_TEXT_COLOR).append('"')
130         .append("];");
131   }
132 
133   /**
134    * Write an edge
135    * @param sb string builder
136    * @param sourceID source vertex ID
137    * @param edge the edge
138    */
139   private static void addEdge(StringBuilder sb, Writable sourceID,
140       Edge<WritableComparable, Writable> edge) {
141     sb.append(sourceID).append(":id")
142         .append(" -> ")
143         .append(edge.getTargetVertexId()).append(":id");
144     addEdgeInfo(sb, edge);
145     sb.append("\n");
146   }
147 
148   /**
149    * Add edge information to output
150    * @param sb string builder
151    * @param edge the edge
152    */
153   private static void addEdgeInfo(StringBuilder sb,
154     Edge<WritableComparable, Writable> edge) {
155     if (!(edge.getValue() instanceof NullWritable)) {
156       sb.append(" [label=").append(edge.getValue()).append(" ];");
157     }
158   }
159 
160   /**
161    * Wrapper around output committer which writes our begin/end files.
162    */
163   private static class GraphvizOutputCommitter extends OutputCommitter {
164     /** delegate committer */
165     private final OutputCommitter delegate;
166 
167     /**
168      * Constructor with delegate
169      * @param delegate committer to use
170      */
171     private GraphvizOutputCommitter(OutputCommitter delegate) {
172       this.delegate = delegate;
173     }
174 
175     @Override public boolean equals(Object o) {
176       return delegate.equals(o);
177     }
178 
179     @Override public String toString() {
180       return delegate.toString();
181     }
182 
183     @Override public int hashCode() {
184       return delegate.hashCode();
185     }
186 
187     @Override public void abortJob(JobContext jobContext, JobStatus.State state)
188       throws IOException {
189       delegate.abortJob(jobContext, state);
190     }
191 
192     @Override public void abortTask(TaskAttemptContext taskContext)
193       throws IOException {
194       delegate.abortTask(taskContext);
195     }
196 
197     @Override @Deprecated public void cleanupJob(JobContext context)
198       throws IOException {
199       delegate.cleanupJob(context);
200     }
201 
202     @Override public void commitJob(JobContext jobContext) throws IOException {
203       writeEnd(jobContext);
204       delegate.commitJob(jobContext);
205     }
206 
207     @Override public void commitTask(TaskAttemptContext taskContext)
208       throws IOException {
209       delegate.commitTask(taskContext);
210     }
211 
212     @Override public boolean needsTaskCommit(TaskAttemptContext taskContext)
213       throws IOException {
214       return delegate.needsTaskCommit(taskContext);
215     }
216 
217     @Override public void setupJob(JobContext jobContext) throws IOException {
218       delegate.setupJob(jobContext);
219       writeStart(jobContext);
220     }
221 
222     @Override public void setupTask(TaskAttemptContext taskContext)
223       throws IOException {
224       delegate.setupTask(taskContext);
225     }
226   }
227 
228   /**
229    * Writes vertices to graphviz files.
230    */
231   private class VertexWriter extends TextVertexWriter {
232     @Override
233     public void writeVertex(
234       Vertex<WritableComparable, Writable, Writable> vertex)
235       throws IOException, InterruptedException {
236       StringBuilder sb = new StringBuilder(vertex.getNumEdges() * 10);
237       for (Edge<WritableComparable, Writable> edge : vertex.getEdges()) {
238         addEdge(sb, vertex.getId(), edge);
239       }
240       addNodeInfo(vertex, sb);
241       getRecordWriter().write(new Text(sb.toString()), null);
242     }
243   }
244 }