This project has retired. For details please refer to its Attic page.
NettyWorkerAggregatorRequestProcessor xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.netty;
20  
21  import java.io.IOException;
22  
23  import org.apache.giraph.bsp.CentralizedServiceWorker;
24  import org.apache.giraph.comm.GlobalCommType;
25  import org.apache.giraph.comm.WorkerClient;
26  import org.apache.giraph.comm.aggregators.AggregatorUtils;
27  import org.apache.giraph.comm.aggregators.SendGlobalCommCache;
28  import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
29  import org.apache.giraph.comm.requests.SendAggregatorsToWorkerRequest;
30  import org.apache.giraph.comm.requests.SendReducedToMasterRequest;
31  import org.apache.giraph.comm.requests.SendWorkerAggregatorsRequest;
32  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
33  import org.apache.giraph.worker.WorkerInfo;
34  import org.apache.hadoop.io.Writable;
35  import org.apache.hadoop.util.Progressable;
36  
37  /**
38   * Netty implementation of {@link WorkerAggregatorRequestProcessor}
39   */
40  public class NettyWorkerAggregatorRequestProcessor
41      implements WorkerAggregatorRequestProcessor {
42    /** Progressable used to report progress */
43    private final Progressable progressable;
44    /** NettyClient that could be shared among one or more instances */
45    private final WorkerClient<?, ?, ?> workerClient;
46    /** Service worker */
47    private final CentralizedServiceWorker<?, ?, ?> serviceWorker;
48    /** Cached map of partition ids to serialized aggregator data */
49    private final SendGlobalCommCache sendReducedValuesCache =
50        new SendGlobalCommCache(false);
51    /** How big a single aggregator request can be */
52    private final int maxBytesPerAggregatorRequest;
53  
54    /**
55     * Constructor.
56     *
57     * @param progressable  Progressable used to report progress
58     * @param configuration Configuration
59     * @param serviceWorker Service worker
60     */
61    public NettyWorkerAggregatorRequestProcessor(
62        Progressable progressable,
63        ImmutableClassesGiraphConfiguration<?, ?, ?> configuration,
64        CentralizedServiceWorker<?, ?, ?> serviceWorker) {
65      this.serviceWorker = serviceWorker;
66      this.workerClient = serviceWorker.getWorkerClient();
67      this.progressable = progressable;
68      maxBytesPerAggregatorRequest = configuration.getInt(
69          AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST,
70          AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST_DEFAULT);
71  
72    }
73  
74    @Override
75    public boolean sendReducedValue(String name,
76        Writable reducedValue) throws IOException {
77      WorkerInfo owner =
78          AggregatorUtils.getOwner(name,
79              serviceWorker.getWorkerInfoList());
80      if (isThisWorker(owner)) {
81        return false;
82      } else {
83        int currentSize = sendReducedValuesCache.addValue(owner.getTaskId(),
84            name, GlobalCommType.REDUCED_VALUE, reducedValue);
85        if (currentSize >= maxBytesPerAggregatorRequest) {
86          flushAggregatorsToWorker(owner);
87        }
88        return true;
89      }
90    }
91  
92    @Override
93    public void flush() throws IOException {
94      for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
95        if (!isThisWorker(workerInfo)) {
96          sendReducedValuesCache.addSpecialCount(workerInfo.getTaskId());
97          flushAggregatorsToWorker(workerInfo);
98          progressable.progress();
99        }
100     }
101     sendReducedValuesCache.reset();
102   }
103 
104   /**
105    * Send aggregators from cache to worker.
106    *
107    * @param worker Worker which we want to send aggregators to
108    */
109   private void flushAggregatorsToWorker(WorkerInfo worker) {
110     byte[] data =
111         sendReducedValuesCache.removeSerialized(worker.getTaskId());
112     workerClient.sendWritableRequest(worker.getTaskId(),
113         new SendWorkerAggregatorsRequest(data,
114             serviceWorker.getWorkerInfo().getTaskId()));
115   }
116 
117   @Override
118   public void sendReducedValuesToMaster(byte[] data) throws IOException {
119     workerClient.sendWritableRequest(serviceWorker.getMasterInfo().getTaskId(),
120         new SendReducedToMasterRequest(data));
121   }
122 
123   @Override
124   public void distributeReducedValues(
125       Iterable<byte[]> aggregatorDataList) throws IOException {
126     for (byte[] aggregatorData : aggregatorDataList) {
127       for (WorkerInfo worker : serviceWorker.getWorkerInfoList()) {
128         if (!isThisWorker(worker)) {
129           SendAggregatorsToWorkerRequest request =
130               new SendAggregatorsToWorkerRequest(aggregatorData,
131                   serviceWorker.getWorkerInfo().getTaskId());
132           workerClient.sendWritableRequest(worker.getTaskId(), request);
133         }
134         progressable.progress();
135       }
136     }
137   }
138 
139   /**
140    * Check if workerInfo describes current worker.
141    *
142    * @param workerInfo Worker to check
143    * @return True iff workerInfo corresponds to current worker.
144    */
145   private boolean isThisWorker(WorkerInfo workerInfo) {
146     return serviceWorker.getWorkerInfo().getTaskId() == workerInfo.getTaskId();
147   }
148 }