This project has retired. For details please refer to its
Attic page.
SynchronizedSuperstepOutput xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.io.superstep_output;
20
21 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
22 import org.apache.giraph.graph.Vertex;
23 import org.apache.giraph.io.SimpleVertexWriter;
24 import org.apache.giraph.io.VertexWriter;
25 import org.apache.giraph.io.internal.WrappedVertexOutputFormat;
26 import org.apache.hadoop.io.Writable;
27 import org.apache.hadoop.io.WritableComparable;
28 import org.apache.hadoop.mapreduce.Mapper;
29
30 import java.io.IOException;
31
32
33
34
35
36
37
38
39
40 public class SynchronizedSuperstepOutput<I extends WritableComparable,
41 V extends Writable, E extends Writable> implements
42 SuperstepOutput<I, V, E> {
43
44 private final Mapper<?, ?, ?, ?>.Context context;
45
46 private final VertexWriter<I, V, E> vertexWriter;
47
48 private final WrappedVertexOutputFormat<I, V, E> vertexOutputFormat;
49
50
51
52
53 private final SimpleVertexWriter<I, V, E> simpleVertexWriter;
54
55
56
57
58
59
60
61 @SuppressWarnings("unchecked")
62 public SynchronizedSuperstepOutput(
63 ImmutableClassesGiraphConfiguration<I, V, E> conf,
64 Mapper<?, ?, ?, ?>.Context context) {
65 this.context = context;
66 try {
67 vertexOutputFormat = conf.createWrappedVertexOutputFormat();
68 vertexOutputFormat.preWriting(context);
69 vertexWriter = vertexOutputFormat.createVertexWriter(context);
70 vertexWriter.setConf(conf);
71 vertexWriter.initialize(context);
72 } catch (IOException e) {
73 throw new IllegalStateException("SynchronizedSuperstepOutput: " +
74 "IOException occurred", e);
75 } catch (InterruptedException e) {
76 throw new IllegalStateException("SynchronizedSuperstepOutput: " +
77 "InterruptedException occurred", e);
78 }
79 simpleVertexWriter = new SimpleVertexWriter<I, V, E>() {
80 @Override
81 public synchronized void writeVertex(
82 Vertex<I, V, E> vertex) throws IOException, InterruptedException {
83 vertexWriter.writeVertex(vertex);
84 }
85 };
86 }
87
88 @Override
89 public SimpleVertexWriter<I, V, E> getVertexWriter() {
90 return simpleVertexWriter;
91 }
92
93 @Override
94 public void returnVertexWriter(SimpleVertexWriter<I, V, E> vertexWriter) {
95 }
96
97 @Override
98 public void postApplication() throws IOException, InterruptedException {
99 vertexWriter.close(context);
100 vertexOutputFormat.postWriting(context);
101 }
102 }