This project has retired. For details please refer to its Attic page.
AllAggregatorServerData xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.aggregators;
20  
21  import java.util.Collections;
22  import java.util.List;
23  import java.util.Map;
24  import java.util.Map.Entry;
25  import java.util.Set;
26  import java.util.concurrent.ConcurrentMap;
27  
28  import org.apache.giraph.comm.GlobalCommType;
29  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
30  import org.apache.giraph.master.MasterInfo;
31  import org.apache.giraph.reducers.ReduceOperation;
32  import org.apache.giraph.reducers.Reducer;
33  import org.apache.giraph.utils.TaskIdsPermitsBarrier;
34  import org.apache.hadoop.io.Writable;
35  import org.apache.hadoop.util.Progressable;
36  import org.apache.log4j.Logger;
37  
38  import com.google.common.base.Preconditions;
39  import com.google.common.collect.Lists;
40  import com.google.common.collect.Maps;
41  
42  /**
43   * Accepts aggregators and their values from previous superstep from master
44   * and workers which own aggregators. Keeps data received from master so it
45   * could be distributed later. Also counts the requests so we would know
46   * when we are done receiving requests.
47   *
48   * Only restriction is that we need to call registerAggregatorClass before
49   * calling createAggregatorInitialValue, other than that methods of this class
50   * are thread-safe.
51   */
52  public class AllAggregatorServerData {
53    /** Class logger */
54    private static final Logger LOG =
55        Logger.getLogger(AllAggregatorServerData.class);
56    /** Map of broadcasted values from master */
57    private final ConcurrentMap<String, Writable>
58    broadcastedMap = Maps.newConcurrentMap();
59    /** Map of registered reducers for current superstep */
60    private final ConcurrentMap<String, ReduceOperation<Object, Writable>>
61    reduceOpMap = Maps.newConcurrentMap();
62    /**
63     * Counts the requests with final aggregators from master.
64     * It uses values from special aggregators
65     * (named AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)
66     * to know how many requests it has to receive.
67     */
68    private final TaskIdsPermitsBarrier masterBarrier;
69    /**
70     * Aggregator data which this worker received from master and which it is
71     * going to distribute before starting next superstep. Thread-safe.
72     */
73    private final List<byte[]> masterData =
74        Collections.synchronizedList(Lists.<byte[]>newArrayList());
75    /**
76     * Counts the requests with final aggregators from other workers.
77     * It uses values from special aggregators
78     * (named AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)
79     * to know how many requests it has to receive.
80     */
81    private final TaskIdsPermitsBarrier workersBarrier;
82    /** Progressable used to report progress */
83    private final Progressable progressable;
84    /** Configuration */
85    private final ImmutableClassesGiraphConfiguration conf;
86  
87    /**
88     * Constructor
89     *
90     * @param progressable Progressable used to report progress
91     * @param conf Configuration
92     */
93    public AllAggregatorServerData(Progressable progressable,
94        ImmutableClassesGiraphConfiguration conf) {
95      this.progressable = progressable;
96      this.conf = conf;
97      workersBarrier = new TaskIdsPermitsBarrier(progressable);
98      masterBarrier = new TaskIdsPermitsBarrier(progressable);
99    }
100 
101   /**
102    * Received value through global communication from master.
103    * @param name Name
104    * @param type Global communication type
105    * @param value Object value
106    */
107   public void receiveValueFromMaster(
108       String name, GlobalCommType type, Writable value) {
109     switch (type) {
110     case BROADCAST:
111       broadcastedMap.put(name, value);
112       break;
113 
114     case REDUCE_OPERATIONS:
115       reduceOpMap.put(name, (ReduceOperation<Object, Writable>) value);
116       break;
117 
118     default:
119       throw new IllegalArgumentException("Unkown request type " + type);
120     }
121     progressable.progress();
122   }
123 
124   /**
125    * Notify this object that an aggregator request from master has been
126    * received.
127    *
128    * @param data Byte request with data received from master
129    */
130   public void receivedRequestFromMaster(byte[] data) {
131     masterData.add(data);
132     masterBarrier.releaseOnePermit();
133   }
134 
135   /**
136    * Notify this object about the total number of requests which should
137    * arrive from master.
138    *
139    * @param requestCount Number of requests which should arrive
140    * @param taskId Task id of master
141    */
142   public void receivedRequestCountFromMaster(long requestCount, int taskId) {
143     masterBarrier.requirePermits(requestCount, taskId);
144   }
145 
146   /**
147    * Notify this object that an aggregator request from some worker has been
148    * received.
149    */
150   public void receivedRequestFromWorker() {
151     workersBarrier.releaseOnePermit();
152   }
153 
154   /**
155    * Notify this object about the total number of requests which should
156    * arrive from one of the workers.
157    *
158    * @param requestCount Number of requests which should arrive
159    * @param taskId Task id of that worker
160    */
161   public void receivedRequestCountFromWorker(long requestCount, int taskId) {
162     workersBarrier.requirePermits(requestCount, taskId);
163   }
164 
165   /**
166    * This function will wait until all aggregator requests from master have
167    * arrived, and return that data afterwards.
168    *
169    * @param masterInfo Master info
170    * @return Iterable through data received from master
171    */
172   public Iterable<byte[]> getDataFromMasterWhenReady(MasterInfo masterInfo) {
173     masterBarrier.waitForRequiredPermits(
174         Collections.singleton(masterInfo.getTaskId()));
175     if (LOG.isDebugEnabled()) {
176       LOG.debug("getDataFromMasterWhenReady: " +
177           "Aggregator data for distribution ready");
178     }
179     return masterData;
180   }
181 
182   /**
183    * This function will wait until all aggregator requests from workers have
184    * arrived, and fill the maps for next superstep when ready.
185    *
186    * @param workerIds All workers in the job apart from the current one
187    * @param broadcastedMapToFill Broadcast map to fill out
188    * @param reducerMapToFill Registered reducer map to fill out.
189    */
190   public void fillNextSuperstepMapsWhenReady(
191       Set<Integer> workerIds,
192       Map<String, Writable> broadcastedMapToFill,
193       Map<String, Reducer<Object, Writable>> reducerMapToFill) {
194     workersBarrier.waitForRequiredPermits(workerIds);
195     if (LOG.isDebugEnabled()) {
196       LOG.debug("fillNextSuperstepMapsWhenReady: Global data ready");
197     }
198 
199     Preconditions.checkArgument(broadcastedMapToFill.isEmpty(),
200         "broadcastedMap needs to be empty for filling");
201     Preconditions.checkArgument(reducerMapToFill.isEmpty(),
202         "reducerMap needs to be empty for filling");
203 
204     broadcastedMapToFill.putAll(broadcastedMap);
205 
206     for (Entry<String, ReduceOperation<Object, Writable>> entry :
207         reduceOpMap.entrySet()) {
208       reducerMapToFill.put(entry.getKey(), new Reducer<>(entry.getValue()));
209     }
210 
211     broadcastedMap.clear();
212     reduceOpMap.clear();
213     masterData.clear();
214     if (LOG.isDebugEnabled()) {
215       LOG.debug("reset: Ready for next superstep");
216     }
217   }
218 }
219