This project has retired. For details please refer to its Attic page.
MasterGlobalCommHandler xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.master;
20  
21  import org.apache.giraph.master.input.MasterInputSplitsHandler;
22  import org.apache.giraph.partition.PartitionStats;
23  import org.apache.giraph.reducers.ReduceOperation;
24  import org.apache.giraph.utils.BlockingElementsSet;
25  import org.apache.hadoop.io.Writable;
26  import org.apache.hadoop.util.Progressable;
27  
28  import com.google.common.collect.Iterables;
29  
30  import java.util.List;
31  
32  /**
33   * Handler for all master communications
34   */
35  public class MasterGlobalCommHandler implements MasterGlobalCommUsage {
36    /** Aggregator handler */
37    private final MasterAggregatorHandler aggregatorHandler;
38    /** Input splits handler*/
39    private final MasterInputSplitsHandler inputSplitsHandler;
40    /** Partition stats received from workers */
41    private final BlockingElementsSet<List<PartitionStats>> partitionStats =
42        new BlockingElementsSet<>();
43  
44    /**
45     * Constructor
46     *
47     * @param aggregatorHandler Aggregator handler
48     * @param inputSplitsHandler Input splits handler
49     */
50    public MasterGlobalCommHandler(
51        MasterAggregatorHandler aggregatorHandler,
52        MasterInputSplitsHandler inputSplitsHandler) {
53      this.aggregatorHandler = aggregatorHandler;
54      this.inputSplitsHandler = inputSplitsHandler;
55    }
56  
57    public MasterAggregatorHandler getAggregatorHandler() {
58      return aggregatorHandler;
59    }
60  
61    public MasterInputSplitsHandler getInputSplitsHandler() {
62      return inputSplitsHandler;
63    }
64  
65    @Override
66    public <S, R extends Writable> void registerReducer(String name,
67        ReduceOperation<S, R> reduceOp) {
68      aggregatorHandler.registerReducer(name, reduceOp);
69    }
70  
71    @Override
72    public <S, R extends Writable> void registerReducer(String name,
73        ReduceOperation<S, R> reduceOp, R globalInitialValue) {
74      aggregatorHandler.registerReducer(name, reduceOp, globalInitialValue);
75    }
76  
77    @Override
78    public <R extends Writable> R getReduced(String name) {
79      return aggregatorHandler.getReduced(name);
80    }
81  
82    @Override
83    public void broadcast(String name, Writable value) {
84      aggregatorHandler.broadcast(name, value);
85    }
86  
87    /**
88     * Received partition stats from a worker
89     *
90     * @param partitionStats Partition stats
91     */
92    public void receivedPartitionStats(List<PartitionStats> partitionStats) {
93      this.partitionStats.offer(partitionStats);
94    }
95  
96    /**
97     * Get all partition stats. Blocks until all workers have sent their stats
98     *
99     * @param numWorkers Number of workers to wait for
100    * @param progressable Progressable to report progress to
101    * @return All partition stats
102    */
103   public Iterable<PartitionStats> getAllPartitionStats(int numWorkers,
104       Progressable progressable) {
105     return Iterables.concat(
106         partitionStats.getElements(numWorkers, progressable));
107   }
108 }