View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.io.superstep_output;
20  
21  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
22  import org.apache.giraph.graph.Vertex;
23  import org.apache.giraph.io.SimpleVertexWriter;
24  import org.apache.giraph.io.VertexWriter;
25  import org.apache.hadoop.io.Writable;
26  import org.apache.hadoop.io.WritableComparable;
27  import org.apache.hadoop.mapreduce.Mapper;
28  
29  import java.io.IOException;
30  
31  /**
32   * Class to use as {@link SuperstepOutput} when chosen VertexOutputFormat is
33   * not thread-safe.
34   *
35   * @param <I> Vertex id
36   * @param <V> Vertex value
37   * @param <E> Edge value
38   */
39  public class SynchronizedSuperstepOutput<I extends WritableComparable,
40      V extends Writable, E extends Writable> implements
41      SuperstepOutput<I, V, E> {
42    /** Mapper context */
43    private final Mapper<?, ?, ?, ?>.Context context;
44    /** Main vertex writer */
45    private final VertexWriter<I, V, E> vertexWriter;
46    /**
47     * Simple vertex writer, wrapper for {@link #vertexWriter}.
48     * Call to writeVertex is thread-safe.
49     */
50    private final SimpleVertexWriter<I, V, E> simpleVertexWriter;
51  
52    /**
53     * Constructor
54     *
55     * @param conf Configuration
56     * @param context Mapper context
57     */
58    @SuppressWarnings("unchecked")
59    public SynchronizedSuperstepOutput(
60        ImmutableClassesGiraphConfiguration<I, V, E> conf,
61        Mapper<?, ?, ?, ?>.Context context) {
62      this.context = context;
63      try {
64        vertexWriter =
65            conf.createWrappedVertexOutputFormat().createVertexWriter(context);
66        vertexWriter.setConf(conf);
67        vertexWriter.initialize(context);
68      } catch (IOException e) {
69        throw new IllegalStateException("SynchronizedSuperstepOutput: " +
70            "IOException occurred", e);
71      } catch (InterruptedException e) {
72        throw new IllegalStateException("SynchronizedSuperstepOutput: " +
73            "InterruptedException occurred", e);
74      }
75      simpleVertexWriter = new SimpleVertexWriter<I, V, E>() {
76        @Override
77        public synchronized void writeVertex(
78            Vertex<I, V, E> vertex) throws IOException, InterruptedException {
79          vertexWriter.writeVertex(vertex);
80        }
81      };
82    }
83  
84    @Override
85    public SimpleVertexWriter<I, V, E> getVertexWriter() {
86      return simpleVertexWriter;
87    }
88  
89    @Override
90    public void returnVertexWriter(SimpleVertexWriter<I, V, E> vertexWriter) {
91    }
92  
93    @Override
94    public void postApplication() throws IOException, InterruptedException {
95      vertexWriter.close(context);
96    }
97  }