This project has retired. For details please refer to its
        
        Attic page.
      
 
AggregatorReduceOperation xref
1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  package org.apache.giraph.master;
19  
20  import java.io.DataInput;
21  import java.io.DataOutput;
22  import java.io.IOException;
23  
24  import org.apache.giraph.aggregators.Aggregator;
25  import org.apache.giraph.conf.GiraphConfigurationSettable;
26  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
27  import org.apache.giraph.reducers.ReduceSameTypeOperation;
28  import org.apache.giraph.utils.ReflectionUtils;
29  import org.apache.giraph.utils.WritableUtils;
30  import org.apache.hadoop.io.Writable;
31  
32  
33  
34  
35  
36  
37  public class AggregatorReduceOperation<A extends Writable>
38      extends ReduceSameTypeOperation<A> implements GiraphConfigurationSettable {
39    
40    private Class<? extends Aggregator<A>> aggregatorClass;
41    
42    private Aggregator<A> aggregator;
43    
44    private ImmutableClassesGiraphConfiguration<?, ?, ?> conf;
45  
46    
47    public AggregatorReduceOperation() {
48    }
49  
50    
51  
52  
53  
54  
55    public AggregatorReduceOperation(
56        Class<? extends Aggregator<A>> aggregatorClass,
57        ImmutableClassesGiraphConfiguration<?, ?, ?> conf) {
58      this.aggregatorClass = aggregatorClass;
59      this.conf = conf;
60      initAggregator();
61    }
62  
63    
64    private void initAggregator() {
65      aggregator = ReflectionUtils.newInstance(aggregatorClass, conf);
66      aggregator.setAggregatedValue(null);
67    }
68  
69    @Override
70    public A createInitialValue() {
71      A agg = aggregator.createInitialValue();
72      if (agg == null) {
73        throw new IllegalStateException(
74            "Aggregators initial value must not be null, but is for " +
75            aggregator);
76      }
77      return agg;
78    }
79  
80    
81  
82  
83  
84    public AggregatorReduceOperation<A> createCopy() {
85      return new AggregatorReduceOperation<>(aggregatorClass, conf);
86    }
87  
88    public Class<? extends Aggregator<A>> getAggregatorClass() {
89      return aggregatorClass;
90    }
91  
92    @Override
93    public synchronized A reduce(A curValue, A valueToReduce) {
94      aggregator.setAggregatedValue(curValue);
95      aggregator.aggregate(valueToReduce);
96      A aggregated = aggregator.getAggregatedValue();
97      aggregator.setAggregatedValue(null);
98      return aggregated;
99    }
100 
101   @Override
102   public void setConf(ImmutableClassesGiraphConfiguration conf) {
103     this.conf = conf;
104   }
105 
106   @Override
107   public void write(DataOutput out) throws IOException {
108     WritableUtils.writeClass(aggregatorClass, out);
109   }
110 
111   @Override
112   public void readFields(DataInput in) throws IOException {
113     aggregatorClass = WritableUtils.readClass(in);
114     initAggregator();
115   }
116 
117 
118 }