This project has retired. For details please refer to its
Attic page.
AggregatorToGlobalCommTranslation xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.master;
19
20 import java.io.DataInput;
21 import java.io.DataOutput;
22 import java.io.IOException;
23 import java.util.HashMap;
24 import java.util.Map.Entry;
25
26 import org.apache.giraph.aggregators.Aggregator;
27 import org.apache.giraph.comm.aggregators.AggregatorUtils;
28 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
29 import org.apache.giraph.utils.MasterLoggingAggregator;
30 import org.apache.giraph.utils.WritableUtils;
31 import org.apache.hadoop.io.Writable;
32 import org.apache.log4j.Logger;
33
34 import com.google.common.base.Preconditions;
35
36
37
38
39
40 public class AggregatorToGlobalCommTranslation
41 implements MasterAggregatorUsage, Writable {
42
43 private static final Logger LOG =
44 Logger.getLogger(AggregatorToGlobalCommTranslation.class);
45
46
47 private final MasterGlobalCommUsage globalComm;
48
49 private final HashMap<String, AggregatorWrapper<Writable>>
50 registeredAggregators = new HashMap<>();
51
52
53
54
55
56
57
58
59
60 private final HashMap<String, Writable>
61 initAggregatorValues = new HashMap<>();
62
63
64 private final ImmutableClassesGiraphConfiguration<?, ?, ?> conf;
65
66
67
68
69
70
71 public AggregatorToGlobalCommTranslation(
72 ImmutableClassesGiraphConfiguration<?, ?, ?> conf,
73 MasterGlobalCommUsage globalComm) {
74 this.conf = conf;
75 this.globalComm = globalComm;
76 MasterLoggingAggregator.registerAggregator(this, conf);
77 }
78
79 @Override
80 public <A extends Writable> A getAggregatedValue(String name) {
81 AggregatorWrapper<Writable> agg = registeredAggregators.get(name);
82 if (agg == null) {
83 LOG.warn("getAggregatedValue: " +
84 AggregatorUtils.getUnregisteredAggregatorMessage(name,
85 registeredAggregators.size() != 0, conf));
86
87 return null;
88 }
89
90 A value = globalComm.getReduced(name);
91 if (value == null) {
92 value = (A) initAggregatorValues.get(name);
93 }
94
95 if (value == null) {
96 value = (A) agg.getReduceOp().createInitialValue();
97 initAggregatorValues.put(name, value);
98 }
99
100 Preconditions.checkState(value != null);
101 return value;
102 }
103
104 @Override
105 public <A extends Writable> void setAggregatedValue(String name, A value) {
106 AggregatorWrapper<Writable> aggregator = registeredAggregators.get(name);
107 if (aggregator == null) {
108 throw new IllegalArgumentException("setAggregatedValue: " +
109 AggregatorUtils.getUnregisteredAggregatorMessage(name,
110 registeredAggregators.size() != 0, conf));
111 }
112 aggregator.setCurrentValue(value);
113 }
114
115
116
117
118
119 public void postMasterCompute() {
120
121
122 for (Entry<String, AggregatorWrapper<Writable>> entry :
123 registeredAggregators.entrySet()) {
124 Writable value = entry.getValue().getCurrentValue();
125 if (value == null) {
126 value = globalComm.getReduced(entry.getKey());
127 }
128 Preconditions.checkState(value != null);
129
130 globalComm.broadcast(entry.getKey(), new AggregatorBroadcast<>(
131 entry.getValue().getReduceOp().getAggregatorClass(), value));
132
133
134
135 AggregatorReduceOperation<Writable> cleanReduceOp =
136 entry.getValue().createReduceOp();
137 if (entry.getValue().isPersistent()) {
138 globalComm.registerReducer(
139 entry.getKey(), cleanReduceOp, value);
140 } else {
141 globalComm.registerReducer(
142 entry.getKey(), cleanReduceOp);
143 }
144 entry.getValue().setCurrentValue(null);
145 }
146 initAggregatorValues.clear();
147 }
148
149
150 public void prepareSuperstep() {
151 MasterLoggingAggregator.logAggregatedValue(this, conf);
152 }
153
154 @Override
155 public <A extends Writable> boolean registerAggregator(String name,
156 Class<? extends Aggregator<A>> aggregatorClass) throws
157 InstantiationException, IllegalAccessException {
158 registerAggregator(name, aggregatorClass, false);
159 return true;
160 }
161
162 @Override
163 public <A extends Writable> boolean registerPersistentAggregator(String name,
164 Class<? extends Aggregator<A>> aggregatorClass) throws
165 InstantiationException, IllegalAccessException {
166 registerAggregator(name, aggregatorClass, true);
167 return true;
168 }
169
170 @Override
171 public void write(DataOutput out) throws IOException {
172 out.writeInt(registeredAggregators.size());
173 for (Entry<String, AggregatorWrapper<Writable>> entry :
174 registeredAggregators.entrySet()) {
175 out.writeUTF(entry.getKey());
176 entry.getValue().write(out);
177 }
178 }
179
180 @Override
181 public void readFields(DataInput in) throws IOException {
182 registeredAggregators.clear();
183 int numAggregators = in.readInt();
184 for (int i = 0; i < numAggregators; i++) {
185 String name = in.readUTF();
186 AggregatorWrapper<Writable> agg = new AggregatorWrapper<>();
187 agg.readFields(in);
188 registeredAggregators.put(name, agg);
189 }
190 initAggregatorValues.clear();
191 }
192
193
194
195
196
197
198
199
200
201
202
203 private <A extends Writable> AggregatorWrapper<A> registerAggregator
204 (String name, Class<? extends Aggregator<A>> aggregatorClass,
205 boolean persistent) throws InstantiationException,
206 IllegalAccessException {
207 AggregatorWrapper<A> aggregatorWrapper =
208 (AggregatorWrapper<A>) registeredAggregators.get(name);
209 if (aggregatorWrapper == null) {
210 aggregatorWrapper =
211 new AggregatorWrapper<A>(aggregatorClass, persistent);
212
213
214
215
216
217 aggregatorWrapper.setCurrentValue(
218 aggregatorWrapper.getReduceOp().createInitialValue());
219 registeredAggregators.put(
220 name, (AggregatorWrapper<Writable>) aggregatorWrapper);
221 }
222 return aggregatorWrapper;
223 }
224
225
226
227
228
229 private class AggregatorWrapper<A extends Writable>
230 implements Writable {
231
232 private boolean persistent;
233
234 private AggregatorReduceOperation<A> reduceOp;
235
236 private A currentValue;
237
238
239 public AggregatorWrapper() {
240 }
241
242
243
244
245
246
247 public AggregatorWrapper(
248 Class<? extends Aggregator<A>> aggregatorClass,
249 boolean persistent) {
250 this.persistent = persistent;
251 this.reduceOp = new AggregatorReduceOperation<>(aggregatorClass, conf);
252 }
253
254 public AggregatorReduceOperation<A> getReduceOp() {
255 return reduceOp;
256 }
257
258
259
260
261
262 public AggregatorReduceOperation<A> createReduceOp() {
263 return reduceOp.createCopy();
264 }
265
266 public A getCurrentValue() {
267 return currentValue;
268 }
269
270 public void setCurrentValue(A currentValue) {
271 this.currentValue = currentValue;
272 }
273
274 public boolean isPersistent() {
275 return persistent;
276 }
277
278 @Override
279 public void write(DataOutput out) throws IOException {
280 out.writeBoolean(persistent);
281 reduceOp.write(out);
282
283 Preconditions.checkState(currentValue == null, "AggregatorWrapper " +
284 "shouldn't have value at the end of the superstep");
285 }
286
287 @Override
288 public void readFields(DataInput in) throws IOException {
289 persistent = in.readBoolean();
290 reduceOp = WritableUtils.createWritable(
291 AggregatorReduceOperation.class, conf);
292 reduceOp.readFields(in);
293 currentValue = null;
294 }
295 }
296 }