View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.framework.output;
19  
20  import java.util.HashMap;
21  import java.util.Map;
22  import java.util.Queue;
23  import java.util.concurrent.Callable;
24  import java.util.concurrent.ConcurrentLinkedQueue;
25  
26  import org.apache.giraph.block_app.framework.api.BlockOutputApi;
27  import org.apache.giraph.conf.GiraphConstants;
28  import org.apache.giraph.utils.CallableFactory;
29  import org.apache.giraph.utils.ProgressableUtils;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.util.Progressable;
32  
33  /**
34   * Handler for blocks output - keeps track of outputs and writers created
35   */
36  @SuppressWarnings("unchecked")
37  public class BlockOutputHandle implements BlockOutputApi {
38    private transient Configuration conf;
39    private transient Progressable progressable;
40    private final Map<String, BlockOutputDesc> outputDescMap;
41    private final Map<String, Queue<BlockOutputWriter>> freeWriters =
42        new HashMap<>();
43    private final Map<String, Queue<BlockOutputWriter>> occupiedWriters =
44        new HashMap<>();
45  
46    public BlockOutputHandle() {
47      outputDescMap = null;
48    }
49  
50    public BlockOutputHandle(String jobIdentifier, Configuration conf,
51        Progressable hadoopProgressable) {
52      outputDescMap = BlockOutputFormat.createInitAndCheckOutputDescsMap(
53          conf, jobIdentifier);
54      for (String confOption : outputDescMap.keySet()) {
55        freeWriters.put(confOption,
56            new ConcurrentLinkedQueue<BlockOutputWriter>());
57        occupiedWriters.put(confOption,
58            new ConcurrentLinkedQueue<BlockOutputWriter>());
59      }
60      initialize(conf, hadoopProgressable);
61    }
62  
63    public void initialize(Configuration conf, Progressable progressable) {
64      this.conf = conf;
65      this.progressable = progressable;
66    }
67  
68  
69    @Override
70    public <OW extends BlockOutputWriter, OD extends BlockOutputDesc<OW>>
71    OD getOutputDesc(String confOption) {
72      if (outputDescMap == null) {
73        throw new IllegalArgumentException(
74            "Output cannot be used with checkpointing");
75      }
76      return (OD) outputDescMap.get(confOption);
77    }
78  
79    @Override
80    public <OW extends BlockOutputWriter> OW getWriter(String confOption) {
81      if (outputDescMap == null) {
82        throw new IllegalArgumentException(
83            "Output cannot be used with checkpointing");
84      }
85      OW outputWriter = (OW) freeWriters.get(confOption).poll();
86      if (outputWriter == null) {
87        outputWriter = (OW) outputDescMap.get(confOption).createOutputWriter(
88            conf, progressable);
89      }
90      occupiedWriters.get(confOption).add(outputWriter);
91      return outputWriter;
92    }
93  
94    public void returnAllWriters() {
95      for (Map.Entry<String, Queue<BlockOutputWriter>> entry :
96          occupiedWriters.entrySet()) {
97        freeWriters.get(entry.getKey()).addAll(entry.getValue());
98        entry.getValue().clear();
99      }
100   }
101 
102   public void closeAllWriters() {
103     final Queue<BlockOutputWriter> allWriters = new ConcurrentLinkedQueue<>();
104     for (Queue<BlockOutputWriter> blockOutputWriters : freeWriters.values()) {
105       allWriters.addAll(blockOutputWriters);
106     }
107     if (allWriters.isEmpty()) {
108       return;
109     }
110     // Closing writers can take time - use multiple threads and call progress
111     CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
112       @Override
113       public Callable<Void> newCallable(int callableId) {
114         return new Callable<Void>() {
115           @Override
116           public Void call() throws Exception {
117             BlockOutputWriter writer = allWriters.poll();
118             while (writer != null) {
119               writer.close();
120               writer = allWriters.poll();
121             }
122             return null;
123           }
124         };
125       }
126     };
127     ProgressableUtils.getResultsWithNCallables(callableFactory,
128         Math.min(GiraphConstants.NUM_OUTPUT_THREADS.get(conf),
129             allWriters.size()), "close-writers-%d", progressable);
130   }
131 }