This project has retired. For details please refer to its Attic page.
MultiThreadedSuperstepOutput xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.io.superstep_output;
20  
21  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
22  import org.apache.giraph.io.SimpleVertexWriter;
23  import org.apache.giraph.io.VertexOutputFormat;
24  import org.apache.giraph.io.VertexWriter;
25  import org.apache.giraph.utils.CallableFactory;
26  import org.apache.giraph.utils.ProgressableUtils;
27  import org.apache.hadoop.io.Writable;
28  import org.apache.hadoop.io.WritableComparable;
29  import org.apache.hadoop.mapreduce.Mapper;
30  
31  import com.google.common.collect.Lists;
32  import com.google.common.collect.Sets;
33  
34  import java.io.IOException;
35  import java.util.List;
36  import java.util.Set;
37  import java.util.concurrent.Callable;
38  
39  /**
40   * Class to use as {@link SuperstepOutput} when chosen VertexOutputFormat is
41   * thread-safe.
42   *
43   * @param <I> Vertex id
44   * @param <V> Vertex value
45   * @param <E> Edge value
46   */
47  public class MultiThreadedSuperstepOutput<I extends WritableComparable,
48      V extends Writable, E extends Writable> implements
49      SuperstepOutput<I, V, E> {
50    /** Mapper context */
51    private final Mapper<?, ?, ?, ?>.Context context;
52    /** Configuration */
53    private ImmutableClassesGiraphConfiguration<I, V, E> configuration;
54    /** Vertex output format, used to get new vertex writers */
55    private final VertexOutputFormat<I, V, E> vertexOutputFormat;
56    /**
57     * List of returned vertex writers, these can be reused and will all be
58     * closed in the end of the application
59     */
60    private final List<VertexWriter<I, V, E>> availableVertexWriters;
61    /** Vertex writes which were created by this class and are currently used */
62    private final Set<VertexWriter<I, V, E>> occupiedVertexWriters;
63  
64    /**
65     * Constructor
66     *
67     * @param conf    Configuration
68     * @param context Mapper context
69     */
70    public MultiThreadedSuperstepOutput(
71        ImmutableClassesGiraphConfiguration<I, V, E> conf,
72        Mapper<?, ?, ?, ?>.Context context) {
73      this.configuration = conf;
74      vertexOutputFormat = conf.createWrappedVertexOutputFormat();
75      this.context = context;
76      availableVertexWriters = Lists.newArrayList();
77      occupiedVertexWriters = Sets.newHashSet();
78      vertexOutputFormat.preWriting(context);
79    }
80  
81    @Override
82    public synchronized SimpleVertexWriter<I, V, E> getVertexWriter() {
83      VertexWriter<I, V, E> vertexWriter;
84      if (availableVertexWriters.isEmpty()) {
85        try {
86          vertexWriter = vertexOutputFormat.createVertexWriter(context);
87          vertexWriter.setConf(configuration);
88          vertexWriter.initialize(context);
89        } catch (IOException e) {
90          throw new IllegalStateException("getVertexWriter: " +
91              "IOException occurred", e);
92        } catch (InterruptedException e) {
93          throw new IllegalStateException("getVertexWriter: " +
94              "InterruptedException occurred", e);
95        }
96      } else {
97        vertexWriter =
98            availableVertexWriters.remove(availableVertexWriters.size() - 1);
99      }
100     occupiedVertexWriters.add(vertexWriter);
101     return vertexWriter;
102   }
103 
104   @Override
105   public synchronized void returnVertexWriter(
106       SimpleVertexWriter<I, V, E> vertexWriter) {
107     VertexWriter<I, V, E> returnedWriter = (VertexWriter<I, V, E>) vertexWriter;
108     if (!occupiedVertexWriters.remove(returnedWriter)) {
109       throw new IllegalStateException("returnVertexWriter: " +
110           "Returned vertex writer which is not currently occupied!");
111     }
112     availableVertexWriters.add(returnedWriter);
113   }
114 
115   @Override
116   public synchronized void postApplication() throws IOException,
117       InterruptedException {
118     if (!occupiedVertexWriters.isEmpty()) {
119       throw new IllegalStateException("postApplication: " +
120           occupiedVertexWriters.size() +
121           " vertex writers were not returned!");
122     }
123 
124     // Closing writers can take time - use multiple threads and call progress
125     CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
126       @Override
127       public Callable<Void> newCallable(int callableId) {
128         return new Callable<Void>() {
129           @Override
130           public Void call() throws Exception {
131             while (true) {
132               VertexWriter<I, V, E> vertexWriter;
133               synchronized (availableVertexWriters) {
134                 if (availableVertexWriters.isEmpty()) {
135                   return null;
136                 }
137                 vertexWriter = availableVertexWriters.remove(
138                     availableVertexWriters.size() - 1);
139               }
140               vertexWriter.close(context);
141             }
142           }
143         };
144       }
145     };
146     ProgressableUtils.getResultsWithNCallables(callableFactory,
147         Math.min(configuration.getNumOutputThreads(),
148             availableVertexWriters.size()), "close-writers-%d", context);
149     vertexOutputFormat.postWriting(context);
150   }
151 }