This project has retired. For details please refer to its Attic page.
AggregatorReduceOperation xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.master;
19  
20  import java.io.DataInput;
21  import java.io.DataOutput;
22  import java.io.IOException;
23  
24  import org.apache.giraph.aggregators.Aggregator;
25  import org.apache.giraph.conf.GiraphConfigurationSettable;
26  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
27  import org.apache.giraph.reducers.ReduceSameTypeOperation;
28  import org.apache.giraph.utils.ReflectionUtils;
29  import org.apache.giraph.utils.WritableUtils;
30  import org.apache.hadoop.io.Writable;
31  
32  /**
33   * Translates aggregation operation to reduce operations.
34   *
35   * @param <A> Aggregation object type
36   */
37  public class AggregatorReduceOperation<A extends Writable>
38      extends ReduceSameTypeOperation<A> implements GiraphConfigurationSettable {
39    /** Aggregator class */
40    private Class<? extends Aggregator<A>> aggregatorClass;
41    /** Aggregator */
42    private Aggregator<A> aggregator;
43    /** Configuration */
44    private ImmutableClassesGiraphConfiguration<?, ?, ?> conf;
45  
46    /** Constructor */
47    public AggregatorReduceOperation() {
48    }
49  
50    /**
51     * Constructor
52     * @param aggregatorClass Aggregator class
53     * @param conf Configuration
54     */
55    public AggregatorReduceOperation(
56        Class<? extends Aggregator<A>> aggregatorClass,
57        ImmutableClassesGiraphConfiguration<?, ?, ?> conf) {
58      this.aggregatorClass = aggregatorClass;
59      this.conf = conf;
60      initAggregator();
61    }
62  
63    /** Initialize aggregator */
64    private void initAggregator() {
65      aggregator = ReflectionUtils.newInstance(aggregatorClass, conf);
66      aggregator.setAggregatedValue(null);
67    }
68  
69    @Override
70    public A createInitialValue() {
71      A agg = aggregator.createInitialValue();
72      if (agg == null) {
73        throw new IllegalStateException(
74            "Aggregators initial value must not be null, but is for " +
75            aggregator);
76      }
77      return agg;
78    }
79  
80    /**
81     * Creates copy of this object
82     * @return copy
83     */
84    public AggregatorReduceOperation<A> createCopy() {
85      return new AggregatorReduceOperation<>(aggregatorClass, conf);
86    }
87  
88    public Class<? extends Aggregator<A>> getAggregatorClass() {
89      return aggregatorClass;
90    }
91  
92    @Override
93    public synchronized A reduce(A curValue, A valueToReduce) {
94      aggregator.setAggregatedValue(curValue);
95      aggregator.aggregate(valueToReduce);
96      A aggregated = aggregator.getAggregatedValue();
97      aggregator.setAggregatedValue(null);
98      return aggregated;
99    }
100 
101   @Override
102   public void setConf(ImmutableClassesGiraphConfiguration conf) {
103     this.conf = conf;
104   }
105 
106   @Override
107   public void write(DataOutput out) throws IOException {
108     WritableUtils.writeClass(aggregatorClass, out);
109   }
110 
111   @Override
112   public void readFields(DataInput in) throws IOException {
113     aggregatorClass = WritableUtils.readClass(in);
114     initAggregator();
115   }
116 
117 
118 }