This project has retired. For details please refer to its
Attic page.
AggregatorReduceOperation xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.master;
19
20 import java.io.DataInput;
21 import java.io.DataOutput;
22 import java.io.IOException;
23
24 import org.apache.giraph.aggregators.Aggregator;
25 import org.apache.giraph.conf.GiraphConfigurationSettable;
26 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
27 import org.apache.giraph.reducers.ReduceSameTypeOperation;
28 import org.apache.giraph.utils.ReflectionUtils;
29 import org.apache.giraph.utils.WritableUtils;
30 import org.apache.hadoop.io.Writable;
31
32
33
34
35
36
37 public class AggregatorReduceOperation<A extends Writable>
38 extends ReduceSameTypeOperation<A> implements GiraphConfigurationSettable {
39
40 private Class<? extends Aggregator<A>> aggregatorClass;
41
42 private Aggregator<A> aggregator;
43
44 private ImmutableClassesGiraphConfiguration<?, ?, ?> conf;
45
46
47 public AggregatorReduceOperation() {
48 }
49
50
51
52
53
54
55 public AggregatorReduceOperation(
56 Class<? extends Aggregator<A>> aggregatorClass,
57 ImmutableClassesGiraphConfiguration<?, ?, ?> conf) {
58 this.aggregatorClass = aggregatorClass;
59 this.conf = conf;
60 initAggregator();
61 }
62
63
64 private void initAggregator() {
65 aggregator = ReflectionUtils.newInstance(aggregatorClass, conf);
66 aggregator.setAggregatedValue(null);
67 }
68
69 @Override
70 public A createInitialValue() {
71 A agg = aggregator.createInitialValue();
72 if (agg == null) {
73 throw new IllegalStateException(
74 "Aggregators initial value must not be null, but is for " +
75 aggregator);
76 }
77 return agg;
78 }
79
80
81
82
83
84 public AggregatorReduceOperation<A> createCopy() {
85 return new AggregatorReduceOperation<>(aggregatorClass, conf);
86 }
87
88 public Class<? extends Aggregator<A>> getAggregatorClass() {
89 return aggregatorClass;
90 }
91
92 @Override
93 public synchronized A reduce(A curValue, A valueToReduce) {
94 aggregator.setAggregatedValue(curValue);
95 aggregator.aggregate(valueToReduce);
96 A aggregated = aggregator.getAggregatedValue();
97 aggregator.setAggregatedValue(null);
98 return aggregated;
99 }
100
101 @Override
102 public void setConf(ImmutableClassesGiraphConfiguration conf) {
103 this.conf = conf;
104 }
105
106 @Override
107 public void write(DataOutput out) throws IOException {
108 WritableUtils.writeClass(aggregatorClass, out);
109 }
110
111 @Override
112 public void readFields(DataInput in) throws IOException {
113 aggregatorClass = WritableUtils.readClass(in);
114 initAggregator();
115 }
116
117
118 }