This project has retired. For details please refer to its Attic page.
InternalAggregators xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.framework.api.local;
19  
20  import java.util.Map;
21  import java.util.Map.Entry;
22  
23  import org.apache.giraph.master.MasterGlobalCommUsage;
24  import org.apache.giraph.reducers.ReduceOperation;
25  import org.apache.giraph.reducers.Reducer;
26  import org.apache.giraph.utils.WritableUtils;
27  import org.apache.giraph.worker.WorkerGlobalCommUsage;
28  import org.apache.hadoop.io.Writable;
29  
30  import com.google.common.collect.Maps;
31  
32  /**
33   * Internal aggregators implementation
34   */
35  @SuppressWarnings("unchecked")
36  class InternalAggregators
37      implements MasterGlobalCommUsage, WorkerGlobalCommUsage {
38    private final boolean runAllChecks;
39  
40    /** Map of reducers registered for the next worker computation */
41    private final Map<String, Reducer<Object, Writable>> reducerMap =
42        Maps.newHashMap();
43    /** Map of values to be sent to workers for next computation */
44    private final Map<String, Writable> broadcastMap =
45        Maps.newHashMap();
46    /** Values reduced from previous computation */
47    private final Map<String, Writable> reducedMap =
48        Maps.newHashMap();
49  
50    public InternalAggregators(boolean runAllChecks) {
51      this.runAllChecks = runAllChecks;
52    }
53  
54    private static <T> T getOrThrow(
55        Map<String, T> map, String mapName, String key) {
56      T value = map.get(key);
57      if (value == null) {
58        throw new IllegalArgumentException(
59            key + " not present in " + mapName);
60      }
61      return value;
62    }
63  
64    @Override
65    public void broadcast(String name, Writable value) {
66      broadcastMap.put(name, value);
67    }
68  
69    @Override
70    public <B extends Writable> B getBroadcast(String name) {
71      return (B) getOrThrow(broadcastMap, "broadcastMap", name);
72    }
73  
74    @Override
75    public <S, R extends Writable> void registerReducer(
76        String name, ReduceOperation<S, R> reduceOp) {
77      registerReducer(name, reduceOp, reduceOp.createInitialValue());
78    }
79  
80    @Override
81    public <S, R extends Writable> void registerReducer(
82        String name, ReduceOperation<S, R> reduceOp,
83        R globalInitialValue) {
84      if (reducerMap.containsKey(name)) {
85        throw new IllegalArgumentException(
86            "Reducer with name " + name + " was already registered, " +
87            " and is " + reducerMap.get(name).getReduceOp() +
88            ", and we are trying to " + " register " + reduceOp);
89      }
90      if (reduceOp == null) {
91        throw new IllegalArgumentException(
92            "null reducer cannot be registered, with name " + name);
93      }
94      if (globalInitialValue == null) {
95        throw new IllegalArgumentException(
96            "global initial value for reducer cannot be null, but is for " +
97            reduceOp + " with naem" + name);
98      }
99  
100     Reducer<S, R> reducer = new Reducer<>(reduceOp, globalInitialValue);
101     reducerMap.put(name, (Reducer<Object, Writable>) reducer);
102   }
103 
104   @Override
105   public void reduce(String name, Object value) {
106     Reducer<Object, Writable> reducer =
107         getOrThrow(reducerMap, "reducerMap", name);
108     synchronized (reducer) {
109       reducer.reduce(value);
110     }
111   }
112 
113   @Override
114   public void reduceMerge(String name, Writable value) {
115     Reducer<Object, Writable> reducer =
116         getOrThrow(reducerMap, "reducerMap", name);
117     synchronized (reducer) {
118       reducer.reduceMerge(value);
119     }
120   }
121 
122   @Override
123   public <R extends Writable> R getReduced(String name) {
124     return (R) getOrThrow(reducedMap, "reducedMap", name);
125   }
126 
127   public synchronized void afterWorkerBeforeMaster() {
128     broadcastMap.clear();
129     reducedMap.clear();
130     for (Entry<String, Reducer<Object, Writable>> entry :
131           reducerMap.entrySet()) {
132       Writable value = entry.getValue().getCurrentValue();
133       if (runAllChecks) {
134         Writable newValue = entry.getValue().createInitialValue();
135         WritableUtils.copyInto(value, newValue);
136         value = newValue;
137       }
138       reducedMap.put(entry.getKey(), value);
139     }
140     reducerMap.clear();
141   }
142 }