This project has retired. For details please refer to its Attic page.
OwnerAggregatorServerData xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.aggregators;
20  
21  import java.util.AbstractMap;
22  import java.util.Map;
23  import java.util.Set;
24  import java.util.concurrent.ConcurrentMap;
25  
26  import org.apache.giraph.reducers.ReduceOperation;
27  import org.apache.giraph.reducers.Reducer;
28  import org.apache.giraph.utils.TaskIdsPermitsBarrier;
29  import org.apache.hadoop.io.Writable;
30  import org.apache.hadoop.util.Progressable;
31  import org.apache.log4j.Logger;
32  
33  import com.google.common.base.Function;
34  import com.google.common.collect.Iterables;
35  import com.google.common.collect.Maps;
36  
37  /**
38   * Class for holding aggregators which current worker owns,
39   * and aggregating partial aggregator values from workers.
40   *
41   * Protocol:
42   * 1. Before the beginning of superstep, worker receives its aggregators
43   * from master, and these aggregators will be registered to this class.
44   * Multiple registrations can be called concurrently.
45   * 2. During the superstep, whenever a worker finishes computation,
46   * it will send partial aggregated values to worker owner. This class is used
47   * to help deserialize the arriving aggregator values, and aggregate the values
48   * at the destination owner worker; these can happen concurrently.
49   * (we know step 1. is finished before anything from step 2. happens because
50   * other workers can't start computation before they receive aggregators
51   * which this worker owns)
52   * 3. This class also tracks the number of partial aggregator requests which
53   * worker received. In the end of superstep, getMyAggregatorValuesWhenReady
54   * will be called to ensure everything was received and get the values which
55   * need to be sent to master.
56   * Because of this counting, in step 2. even if worker owns no aggregators,
57   * it will still send a message without aggregator data.
58   * 4. In the end we reset to prepare for the next superstep.
59   */
60  public class OwnerAggregatorServerData {
61    /** Class logger */
62    private static final Logger LOG =
63        Logger.getLogger(OwnerAggregatorServerData.class);
64    /** Map of aggregators which current worker owns */
65    private final ConcurrentMap<String, Reducer<Object, Writable>>
66    myReducerMap = Maps.newConcurrentMap();
67    /**
68     * Counts the requests with partial aggregated values from other workers.
69     * It uses GlobalCommType.SPECIAL_COUNT to know how many requests it
70     * has to receive.
71     */
72    private final TaskIdsPermitsBarrier workersBarrier;
73    /** Progressable used to report progress */
74    private final Progressable progressable;
75  
76    /**
77     * Constructor
78     *
79     * @param progressable Progressable used to report progress
80     */
81    public OwnerAggregatorServerData(Progressable progressable) {
82      this.progressable = progressable;
83      workersBarrier = new TaskIdsPermitsBarrier(progressable);
84    }
85  
86    /**
87     * Register a reducer which current worker owns. Thread-safe.
88     *
89     * @param name Name of aggregator
90     * @param reduceOp Reduce operation
91     */
92    public void registerReducer(String name,
93        ReduceOperation<Object, Writable> reduceOp) {
94      if (LOG.isDebugEnabled() && myReducerMap.isEmpty()) {
95        LOG.debug("registerAggregator: The first registration after a reset()");
96      }
97      myReducerMap.putIfAbsent(name, new Reducer<>(reduceOp));
98      progressable.progress();
99    }
100 
101   /**
102    * Reduce partial value of one of current worker's reducers.
103    *
104    * Thread-safe. Call only after reducers have been registered.
105    *
106    * @param name Name of the reducer
107    * @param value Value to reduce to it
108    */
109   public void reduce(String name, Writable value) {
110     Reducer<Object, Writable> reducer = myReducerMap.get(name);
111     synchronized (reducer) {
112       reducer.reduceMerge(value);
113     }
114     progressable.progress();
115   }
116 
117 
118   /**
119    * Create initial value for a reducer. Used so requests
120    * would be able to deserialize data.
121    *
122    * Thread-safe. Call only after reducer has been registered.
123    *
124    * @param name Name of the reducer
125    * @return Empty value
126    */
127   public Writable createInitialValue(String name) {
128     Reducer<Object, Writable> reducer = myReducerMap.get(name);
129     synchronized (reducer) {
130       return reducer.createInitialValue();
131     }
132   }
133 
134   /**
135    * Notify this object that a partial aggregated values request from some
136    * worker have been received. Thread-safe.
137    */
138   public void receivedRequestFromWorker() {
139     workersBarrier.releaseOnePermit();
140   }
141 
142   /**
143    * Notify this object about the total number of requests which should
144    * arrive from one of the workers. Thread-safe.
145    *
146    * @param requestCount Number of requests which should arrive
147    * @param taskId Task id of that worker
148    */
149   public void receivedRequestCountFromWorker(long requestCount, int taskId) {
150     workersBarrier.requirePermits(requestCount, taskId);
151   }
152 
153   /**
154    * This function will wait until all partial aggregated values from all
155    * workers are ready and aggregated, and return final aggregated values
156    * afterwards.
157    *
158    * @param workerIds All workers in the job apart from the current one
159    * @return Iterable through final aggregated values which this worker owns
160    */
161   public Iterable<Map.Entry<String, Writable>>
162   getMyReducedValuesWhenReady(Set<Integer> workerIds) {
163     workersBarrier.waitForRequiredPermits(workerIds);
164     if (LOG.isDebugEnabled()) {
165       LOG.debug("getMyAggregatorValuesWhenReady: Values ready");
166     }
167     return Iterables.transform(myReducerMap.entrySet(),
168         new Function<Map.Entry<String, Reducer<Object, Writable>>,
169             Map.Entry<String, Writable>>() {
170           @Override
171           public Map.Entry<String, Writable> apply(
172               Map.Entry<String, Reducer<Object, Writable>> aggregator) {
173             return new AbstractMap.SimpleEntry<String, Writable>(
174                 aggregator.getKey(),
175                 aggregator.getValue().getCurrentValue());
176           }
177         });
178   }
179 
180   /**
181    * Prepare for next superstep
182    */
183   public void reset() {
184     myReducerMap.clear();
185     if (LOG.isDebugEnabled()) {
186       LOG.debug("reset: Ready for next superstep");
187     }
188   }
189 
190 }