This project has retired. For details please refer to its Attic page.
SynchronizedSuperstepOutput xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.io.superstep_output;
20  
21  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
22  import org.apache.giraph.graph.Vertex;
23  import org.apache.giraph.io.SimpleVertexWriter;
24  import org.apache.giraph.io.VertexWriter;
25  import org.apache.giraph.io.internal.WrappedVertexOutputFormat;
26  import org.apache.hadoop.io.Writable;
27  import org.apache.hadoop.io.WritableComparable;
28  import org.apache.hadoop.mapreduce.Mapper;
29  
30  import java.io.IOException;
31  
32  /**
33   * Class to use as {@link SuperstepOutput} when chosen VertexOutputFormat is
34   * not thread-safe.
35   *
36   * @param <I> Vertex id
37   * @param <V> Vertex value
38   * @param <E> Edge value
39   */
40  public class SynchronizedSuperstepOutput<I extends WritableComparable,
41      V extends Writable, E extends Writable> implements
42      SuperstepOutput<I, V, E> {
43    /** Mapper context */
44    private final Mapper<?, ?, ?, ?>.Context context;
45    /** Main vertex writer */
46    private final VertexWriter<I, V, E> vertexWriter;
47    /** Vertex output format */
48    private final WrappedVertexOutputFormat<I, V, E> vertexOutputFormat;
49    /**
50     * Simple vertex writer, wrapper for {@link #vertexWriter}.
51     * Call to writeVertex is thread-safe.
52     */
53    private final SimpleVertexWriter<I, V, E> simpleVertexWriter;
54  
55    /**
56     * Constructor
57     *
58     * @param conf Configuration
59     * @param context Mapper context
60     */
61    @SuppressWarnings("unchecked")
62    public SynchronizedSuperstepOutput(
63        ImmutableClassesGiraphConfiguration<I, V, E> conf,
64        Mapper<?, ?, ?, ?>.Context context) {
65      this.context = context;
66      try {
67        vertexOutputFormat = conf.createWrappedVertexOutputFormat();
68        vertexOutputFormat.preWriting(context);
69        vertexWriter = vertexOutputFormat.createVertexWriter(context);
70        vertexWriter.setConf(conf);
71        vertexWriter.initialize(context);
72      } catch (IOException e) {
73        throw new IllegalStateException("SynchronizedSuperstepOutput: " +
74            "IOException occurred", e);
75      } catch (InterruptedException e) {
76        throw new IllegalStateException("SynchronizedSuperstepOutput: " +
77            "InterruptedException occurred", e);
78      }
79      simpleVertexWriter = new SimpleVertexWriter<I, V, E>() {
80        @Override
81        public synchronized void writeVertex(
82            Vertex<I, V, E> vertex) throws IOException, InterruptedException {
83          vertexWriter.writeVertex(vertex);
84        }
85      };
86    }
87  
88    @Override
89    public SimpleVertexWriter<I, V, E> getVertexWriter() {
90      return simpleVertexWriter;
91    }
92  
93    @Override
94    public void returnVertexWriter(SimpleVertexWriter<I, V, E> vertexWriter) {
95    }
96  
97    @Override
98    public void postApplication() throws IOException, InterruptedException {
99      vertexWriter.close(context);
100     vertexOutputFormat.postWriting(context);
101   }
102 }