View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.io.superstep_output;
20  
21  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
22  import org.apache.giraph.io.SimpleVertexWriter;
23  import org.apache.giraph.io.VertexOutputFormat;
24  import org.apache.giraph.io.VertexWriter;
25  import org.apache.giraph.utils.CallableFactory;
26  import org.apache.giraph.utils.ProgressableUtils;
27  import org.apache.hadoop.io.Writable;
28  import org.apache.hadoop.io.WritableComparable;
29  import org.apache.hadoop.mapreduce.Mapper;
30  
31  import com.google.common.collect.Lists;
32  import com.google.common.collect.Sets;
33  
34  import java.io.IOException;
35  import java.util.List;
36  import java.util.Set;
37  import java.util.concurrent.Callable;
38  
39  /**
40   * Class to use as {@link SuperstepOutput} when chosen VertexOutputFormat is
41   * thread-safe.
42   *
43   * @param <I> Vertex id
44   * @param <V> Vertex value
45   * @param <E> Edge value
46   */
47  public class MultiThreadedSuperstepOutput<I extends WritableComparable,
48      V extends Writable, E extends Writable> implements
49      SuperstepOutput<I, V, E> {
50    /** Mapper context */
51    private final Mapper<?, ?, ?, ?>.Context context;
52    /** Configuration */
53    private ImmutableClassesGiraphConfiguration<I, V, E> configuration;
54    /** Vertex output format, used to get new vertex writers */
55    private final VertexOutputFormat<I, V, E> vertexOutputFormat;
56    /**
57     * List of returned vertex writers, these can be reused and will all be
58     * closed in the end of the application
59     */
60    private final List<VertexWriter<I, V, E>> availableVertexWriters;
61    /** Vertex writes which were created by this class and are currently used */
62    private final Set<VertexWriter<I, V, E>> occupiedVertexWriters;
63  
64    /**
65     * Constructor
66     *
67     * @param conf    Configuration
68     * @param context Mapper context
69     */
70    public MultiThreadedSuperstepOutput(
71        ImmutableClassesGiraphConfiguration<I, V, E> conf,
72        Mapper<?, ?, ?, ?>.Context context) {
73      this.configuration = conf;
74      vertexOutputFormat = conf.createWrappedVertexOutputFormat();
75      this.context = context;
76      availableVertexWriters = Lists.newArrayList();
77      occupiedVertexWriters = Sets.newHashSet();
78    }
79  
80    @Override
81    public synchronized SimpleVertexWriter<I, V, E> getVertexWriter() {
82      VertexWriter<I, V, E> vertexWriter;
83      if (availableVertexWriters.isEmpty()) {
84        try {
85          vertexWriter = vertexOutputFormat.createVertexWriter(context);
86          vertexWriter.setConf(configuration);
87          vertexWriter.initialize(context);
88        } catch (IOException e) {
89          throw new IllegalStateException("getVertexWriter: " +
90              "IOException occurred", e);
91        } catch (InterruptedException e) {
92          throw new IllegalStateException("getVertexWriter: " +
93              "InterruptedException occurred", e);
94        }
95      } else {
96        vertexWriter =
97            availableVertexWriters.remove(availableVertexWriters.size() - 1);
98      }
99      occupiedVertexWriters.add(vertexWriter);
100     return vertexWriter;
101   }
102 
103   @Override
104   public synchronized void returnVertexWriter(
105       SimpleVertexWriter<I, V, E> vertexWriter) {
106     VertexWriter<I, V, E> returnedWriter = (VertexWriter<I, V, E>) vertexWriter;
107     if (!occupiedVertexWriters.remove(returnedWriter)) {
108       throw new IllegalStateException("returnVertexWriter: " +
109           "Returned vertex writer which is not currently occupied!");
110     }
111     availableVertexWriters.add(returnedWriter);
112   }
113 
114   @Override
115   public synchronized void postApplication() throws IOException,
116       InterruptedException {
117     if (!occupiedVertexWriters.isEmpty()) {
118       throw new IllegalStateException("postApplication: " +
119           occupiedVertexWriters.size() +
120           " vertex writers were not returned!");
121     }
122 
123     // Closing writers can take time - use multiple threads and call progress
124     CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
125       @Override
126       public Callable<Void> newCallable(int callableId) {
127         return new Callable<Void>() {
128           @Override
129           public Void call() throws Exception {
130             while (true) {
131               VertexWriter<I, V, E> vertexWriter;
132               synchronized (availableVertexWriters) {
133                 if (availableVertexWriters.isEmpty()) {
134                   return null;
135                 }
136                 vertexWriter = availableVertexWriters.remove(
137                     availableVertexWriters.size() - 1);
138               }
139               vertexWriter.close(context);
140             }
141           }
142         };
143       }
144     };
145     ProgressableUtils.getResultsWithNCallables(callableFactory,
146         Math.min(configuration.getNumOutputThreads(),
147             availableVertexWriters.size()), "close-writers-%d", context);
148   }
149 }