This project has retired. For details please refer to its
        
        Attic page.
      
1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.giraph.io.superstep_output;
20  
21  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
22  import org.apache.giraph.graph.Vertex;
23  import org.apache.giraph.io.SimpleVertexWriter;
24  import org.apache.giraph.io.VertexWriter;
25  import org.apache.giraph.io.internal.WrappedVertexOutputFormat;
26  import org.apache.hadoop.io.Writable;
27  import org.apache.hadoop.io.WritableComparable;
28  import org.apache.hadoop.mapreduce.Mapper;
29  
30  import java.io.IOException;
31  
32  
33  
34  
35  
36  
37  
38  
39  
40  public class SynchronizedSuperstepOutput<I extends WritableComparable,
41      V extends Writable, E extends Writable> implements
42      SuperstepOutput<I, V, E> {
43    
44    private final Mapper<?, ?, ?, ?>.Context context;
45    
46    private final VertexWriter<I, V, E> vertexWriter;
47    
48    private final WrappedVertexOutputFormat<I, V, E> vertexOutputFormat;
49    
50  
51  
52  
53    private final SimpleVertexWriter<I, V, E> simpleVertexWriter;
54  
55    
56  
57  
58  
59  
60  
61    @SuppressWarnings("unchecked")
62    public SynchronizedSuperstepOutput(
63        ImmutableClassesGiraphConfiguration<I, V, E> conf,
64        Mapper<?, ?, ?, ?>.Context context) {
65      this.context = context;
66      try {
67        vertexOutputFormat = conf.createWrappedVertexOutputFormat();
68        vertexOutputFormat.preWriting(context);
69        vertexWriter = vertexOutputFormat.createVertexWriter(context);
70        vertexWriter.setConf(conf);
71        vertexWriter.initialize(context);
72      } catch (IOException e) {
73        throw new IllegalStateException("SynchronizedSuperstepOutput: " +
74            "IOException occurred", e);
75      } catch (InterruptedException e) {
76        throw new IllegalStateException("SynchronizedSuperstepOutput: " +
77            "InterruptedException occurred", e);
78      }
79      simpleVertexWriter = new SimpleVertexWriter<I, V, E>() {
80        @Override
81        public synchronized void writeVertex(
82            Vertex<I, V, E> vertex) throws IOException, InterruptedException {
83          vertexWriter.writeVertex(vertex);
84        }
85      };
86    }
87  
88    @Override
89    public SimpleVertexWriter<I, V, E> getVertexWriter() {
90      return simpleVertexWriter;
91    }
92  
93    @Override
94    public void returnVertexWriter(SimpleVertexWriter<I, V, E> vertexWriter) {
95    }
96  
97    @Override
98    public void postApplication() throws IOException, InterruptedException {
99      vertexWriter.close(context);
100     vertexOutputFormat.postWriting(context);
101   }
102 }