This project has retired. For details please refer to its Attic page.
BlockOutputHandle xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.framework.output;
19  
20  import java.util.HashMap;
21  import java.util.Map;
22  import java.util.Queue;
23  import java.util.concurrent.Callable;
24  import java.util.concurrent.ConcurrentLinkedQueue;
25  
26  import org.apache.giraph.block_app.framework.api.BlockOutputApi;
27  import org.apache.giraph.conf.GiraphConstants;
28  import org.apache.giraph.utils.CallableFactory;
29  import org.apache.giraph.utils.ProgressableUtils;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.util.Progressable;
32  
33  /**
34   * Handler for blocks output - keeps track of outputs and writers created
35   */
36  @SuppressWarnings("unchecked")
37  public class BlockOutputHandle implements BlockOutputApi {
38    private transient Configuration conf;
39    private transient Progressable progressable;
40    private final Map<String, BlockOutputDesc> outputDescMap;
41    private final Map<String, Queue<BlockOutputWriter>> freeWriters =
42        new HashMap<>();
43    private final Map<String, Queue<BlockOutputWriter>> occupiedWriters =
44        new HashMap<>();
45  
46    public BlockOutputHandle() {
47      outputDescMap = null;
48    }
49  
50    public BlockOutputHandle(String jobIdentifier, Configuration conf,
51        Progressable hadoopProgressable) {
52      outputDescMap = BlockOutputFormat.createInitAndCheckOutputDescsMap(
53          conf, jobIdentifier);
54      for (Map.Entry<String, BlockOutputDesc> entry : outputDescMap.entrySet()) {
55        entry.getValue().preWriting();
56        freeWriters.put(entry.getKey(),
57            new ConcurrentLinkedQueue<BlockOutputWriter>());
58        occupiedWriters.put(entry.getKey(),
59            new ConcurrentLinkedQueue<BlockOutputWriter>());
60      }
61      initialize(conf, hadoopProgressable);
62    }
63  
64    public void initialize(Configuration conf, Progressable progressable) {
65      this.conf = conf;
66      this.progressable = progressable;
67    }
68  
69  
70    @Override
71    public <OW extends BlockOutputWriter, OD extends BlockOutputDesc<OW>>
72    OD getOutputDesc(String confOption) {
73      if (outputDescMap == null) {
74        throw new IllegalArgumentException(
75            "Output cannot be used with checkpointing");
76      }
77      return (OD) outputDescMap.get(confOption);
78    }
79  
80    @Override
81    public <OW extends BlockOutputWriter> OW getWriter(String confOption) {
82      if (outputDescMap == null) {
83        throw new IllegalArgumentException(
84            "Output cannot be used with checkpointing");
85      }
86      OW outputWriter = (OW) freeWriters.get(confOption).poll();
87      if (outputWriter == null) {
88        outputWriter = (OW) outputDescMap.get(confOption).createOutputWriter(
89            conf, progressable);
90      }
91      occupiedWriters.get(confOption).add(outputWriter);
92      return outputWriter;
93    }
94  
95    public void returnAllWriters() {
96      for (Map.Entry<String, Queue<BlockOutputWriter>> entry :
97          occupiedWriters.entrySet()) {
98        freeWriters.get(entry.getKey()).addAll(entry.getValue());
99        entry.getValue().clear();
100     }
101   }
102 
103   public void closeAllWriters() {
104     final Queue<BlockOutputWriter> allWriters = new ConcurrentLinkedQueue<>();
105     for (Queue<BlockOutputWriter> blockOutputWriters : freeWriters.values()) {
106       allWriters.addAll(blockOutputWriters);
107     }
108     if (allWriters.isEmpty()) {
109       return;
110     }
111     // Closing writers can take time - use multiple threads and call progress
112     CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
113       @Override
114       public Callable<Void> newCallable(int callableId) {
115         return new Callable<Void>() {
116           @Override
117           public Void call() throws Exception {
118             BlockOutputWriter writer = allWriters.poll();
119             while (writer != null) {
120               writer.close();
121               writer = allWriters.poll();
122             }
123             return null;
124           }
125         };
126       }
127     };
128     ProgressableUtils.getResultsWithNCallables(callableFactory,
129         Math.min(GiraphConstants.NUM_OUTPUT_THREADS.get(conf),
130             allWriters.size()), "close-writers-%d", progressable);
131     // Close all output formats
132     for (BlockOutputDesc outputDesc : outputDescMap.values()) {
133       outputDesc.postWriting();
134     }
135   }
136 }