This project has retired. For details please refer to its Attic page.
AggregatorToGlobalCommTranslation xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.master;
19  
20  import java.io.DataInput;
21  import java.io.DataOutput;
22  import java.io.IOException;
23  import java.util.HashMap;
24  import java.util.Map.Entry;
25  
26  import org.apache.giraph.aggregators.Aggregator;
27  import org.apache.giraph.comm.aggregators.AggregatorUtils;
28  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
29  import org.apache.giraph.utils.MasterLoggingAggregator;
30  import org.apache.giraph.utils.WritableUtils;
31  import org.apache.hadoop.io.Writable;
32  import org.apache.log4j.Logger;
33  
34  import com.google.common.base.Preconditions;
35  
36  /**
37   * Class that translates aggregator handling on the master to
38   * reduce and broadcast operations supported by the MasterAggregatorHandler.
39   */
40  public class AggregatorToGlobalCommTranslation
41      implements MasterAggregatorUsage, Writable {
42    /** Class logger */
43    private static final Logger LOG =
44        Logger.getLogger(AggregatorToGlobalCommTranslation.class);
45  
46    /** Class providing reduce and broadcast interface to use */
47    private final MasterGlobalCommUsage globalComm;
48    /** List of registered aggregators */
49    private final HashMap<String, AggregatorWrapper<Writable>>
50    registeredAggregators = new HashMap<>();
51  
52    /**
53     * List of init aggregator values, in case someone tries to
54     * access aggregator immediatelly after registering it.
55     *
56     * Instead of simply returning value, we need to store it during
57     * that superstep, so consecutive calls will return identical object,
58     * which they can modify.
59     */
60    private final HashMap<String, Writable>
61    initAggregatorValues = new HashMap<>();
62  
63    /** Conf */
64    private final ImmutableClassesGiraphConfiguration<?, ?, ?> conf;
65  
66    /**
67     * Constructor
68     * @param conf Configuration
69     * @param globalComm Global communication interface
70     */
71    public AggregatorToGlobalCommTranslation(
72        ImmutableClassesGiraphConfiguration<?, ?, ?> conf,
73        MasterGlobalCommUsage globalComm) {
74      this.conf = conf;
75      this.globalComm = globalComm;
76      MasterLoggingAggregator.registerAggregator(this, conf);
77    }
78  
79    @Override
80    public <A extends Writable> A getAggregatedValue(String name) {
81      AggregatorWrapper<Writable> agg = registeredAggregators.get(name);
82      if (agg == null) {
83        LOG.warn("getAggregatedValue: " +
84          AggregatorUtils.getUnregisteredAggregatorMessage(name,
85              registeredAggregators.size() != 0, conf));
86        // to make sure we are not accessing reducer of the same name.
87        return null;
88      }
89  
90      A value = globalComm.getReduced(name);
91      if (value == null) {
92        value = (A) initAggregatorValues.get(name);
93      }
94  
95      if (value == null) {
96        value = (A) agg.getReduceOp().createInitialValue();
97        initAggregatorValues.put(name, value);
98      }
99  
100     Preconditions.checkState(value != null);
101     return value;
102   }
103 
104   @Override
105   public <A extends Writable> void setAggregatedValue(String name, A value) {
106     AggregatorWrapper<Writable> aggregator = registeredAggregators.get(name);
107     if (aggregator == null) {
108       throw new IllegalArgumentException("setAggregatedValue: "  +
109           AggregatorUtils.getUnregisteredAggregatorMessage(name,
110               registeredAggregators.size() != 0, conf));
111     }
112     aggregator.setCurrentValue(value);
113   }
114 
115   /**
116    * Called after master compute, to do aggregator-&gt;reduce/broadcast
117    * translation
118    */
119   public void postMasterCompute() {
120     // broadcast what master set, or if it didn't broadcast reduced value
121     // register reduce with the same value
122     for (Entry<String, AggregatorWrapper<Writable>> entry :
123         registeredAggregators.entrySet()) {
124       Writable value = entry.getValue().getCurrentValue();
125       if (value == null) {
126         value = globalComm.getReduced(entry.getKey());
127       }
128       Preconditions.checkState(value != null);
129 
130       globalComm.broadcast(entry.getKey(), new AggregatorBroadcast<>(
131           entry.getValue().getReduceOp().getAggregatorClass(), value));
132 
133       // Always register clean instance of reduceOp, not to conflict with
134       // reduceOp from previous superstep.
135       AggregatorReduceOperation<Writable> cleanReduceOp =
136           entry.getValue().createReduceOp();
137       if (entry.getValue().isPersistent()) {
138         globalComm.registerReducer(
139             entry.getKey(), cleanReduceOp, value);
140       } else {
141         globalComm.registerReducer(
142             entry.getKey(), cleanReduceOp);
143       }
144       entry.getValue().setCurrentValue(null);
145     }
146     initAggregatorValues.clear();
147   }
148 
149   /** Prepare before calling master compute */
150   public void prepareSuperstep() {
151     MasterLoggingAggregator.logAggregatedValue(this, conf);
152   }
153 
154   @Override
155   public <A extends Writable> boolean registerAggregator(String name,
156       Class<? extends Aggregator<A>> aggregatorClass) throws
157       InstantiationException, IllegalAccessException {
158     registerAggregator(name, aggregatorClass, false);
159     return true;
160   }
161 
162   @Override
163   public <A extends Writable> boolean registerPersistentAggregator(String name,
164       Class<? extends Aggregator<A>> aggregatorClass) throws
165       InstantiationException, IllegalAccessException {
166     registerAggregator(name, aggregatorClass, true);
167     return true;
168   }
169 
170   @Override
171   public void write(DataOutput out) throws IOException {
172     out.writeInt(registeredAggregators.size());
173     for (Entry<String, AggregatorWrapper<Writable>> entry :
174         registeredAggregators.entrySet()) {
175       out.writeUTF(entry.getKey());
176       entry.getValue().write(out);
177     }
178   }
179 
180   @Override
181   public void readFields(DataInput in) throws IOException {
182     registeredAggregators.clear();
183     int numAggregators = in.readInt();
184     for (int i = 0; i < numAggregators; i++) {
185       String name = in.readUTF();
186       AggregatorWrapper<Writable> agg = new AggregatorWrapper<>();
187       agg.readFields(in);
188       registeredAggregators.put(name, agg);
189     }
190     initAggregatorValues.clear();
191   }
192 
193   /**
194    * Helper function for registering aggregators.
195    *
196    * @param name            Name of the aggregator
197    * @param aggregatorClass Aggregator class
198    * @param persistent      Whether aggregator is persistent or not
199    * @param <A>             Aggregated value type
200    * @return Newly registered aggregator or aggregator which was previously
201    *         created with selected name, if any
202    */
203   private <A extends Writable> AggregatorWrapper<A> registerAggregator
204   (String name, Class<? extends Aggregator<A>> aggregatorClass,
205       boolean persistent) throws InstantiationException,
206       IllegalAccessException {
207     AggregatorWrapper<A> aggregatorWrapper =
208         (AggregatorWrapper<A>) registeredAggregators.get(name);
209     if (aggregatorWrapper == null) {
210       aggregatorWrapper =
211           new AggregatorWrapper<A>(aggregatorClass, persistent);
212       // postMasterCompute uses previously reduced value to broadcast,
213       // unless current value is set. After aggregator is registered,
214       // there was no previously reduced value, so set current value
215       // to default to avoid calling getReduced() on unregistered reducer.
216       // (which logs unnecessary warnings)
217       aggregatorWrapper.setCurrentValue(
218           aggregatorWrapper.getReduceOp().createInitialValue());
219       registeredAggregators.put(
220           name, (AggregatorWrapper<Writable>) aggregatorWrapper);
221     }
222     return aggregatorWrapper;
223   }
224 
225   /**
226    * Object holding all needed data related to single Aggregator
227    * @param <A> Aggregated value type
228    */
229   private class AggregatorWrapper<A extends Writable>
230       implements Writable {
231     /** False iff aggregator should be reset at the end of each super step */
232     private boolean persistent;
233     /** Translation of aggregator to reduce operations */
234     private AggregatorReduceOperation<A> reduceOp;
235     /** Current value, set by master manually */
236     private A currentValue;
237 
238     /** Constructor */
239     public AggregatorWrapper() {
240     }
241 
242     /**
243      * Constructor
244      * @param aggregatorClass Aggregator class
245      * @param persistent Is persistent
246      */
247     public AggregatorWrapper(
248         Class<? extends Aggregator<A>> aggregatorClass,
249         boolean persistent) {
250       this.persistent = persistent;
251       this.reduceOp = new AggregatorReduceOperation<>(aggregatorClass, conf);
252     }
253 
254     public AggregatorReduceOperation<A> getReduceOp() {
255       return reduceOp;
256     }
257 
258     /**
259      * Create a fresh instance of AggregatorReduceOperation
260      * @return fresh instance of AggregatorReduceOperation
261      */
262     public AggregatorReduceOperation<A> createReduceOp() {
263       return reduceOp.createCopy();
264     }
265 
266     public A getCurrentValue() {
267       return currentValue;
268     }
269 
270     public void setCurrentValue(A currentValue) {
271       this.currentValue = currentValue;
272     }
273 
274     public boolean isPersistent() {
275       return persistent;
276     }
277 
278     @Override
279     public void write(DataOutput out) throws IOException {
280       out.writeBoolean(persistent);
281       reduceOp.write(out);
282 
283       Preconditions.checkState(currentValue == null, "AggregatorWrapper " +
284           "shouldn't have value at the end of the superstep");
285     }
286 
287     @Override
288     public void readFields(DataInput in) throws IOException {
289       persistent = in.readBoolean();
290       reduceOp = WritableUtils.createWritable(
291           AggregatorReduceOperation.class, conf);
292       reduceOp.readFields(in);
293       currentValue = null;
294     }
295   }
296 }