This project has retired. For details please refer to its
        
        Attic page.
      
 
NettyWorkerAggregatorRequestProcessor xref
1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.giraph.comm.netty;
20  
21  import java.io.IOException;
22  
23  import org.apache.giraph.bsp.CentralizedServiceWorker;
24  import org.apache.giraph.comm.GlobalCommType;
25  import org.apache.giraph.comm.WorkerClient;
26  import org.apache.giraph.comm.aggregators.AggregatorUtils;
27  import org.apache.giraph.comm.aggregators.SendGlobalCommCache;
28  import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
29  import org.apache.giraph.comm.requests.SendAggregatorsToWorkerRequest;
30  import org.apache.giraph.comm.requests.SendReducedToMasterRequest;
31  import org.apache.giraph.comm.requests.SendWorkerAggregatorsRequest;
32  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
33  import org.apache.giraph.worker.WorkerInfo;
34  import org.apache.hadoop.io.Writable;
35  import org.apache.hadoop.util.Progressable;
36  
37  
38  
39  
40  public class NettyWorkerAggregatorRequestProcessor
41      implements WorkerAggregatorRequestProcessor {
42    
43    private final Progressable progressable;
44    
45    private final WorkerClient<?, ?, ?> workerClient;
46    
47    private final CentralizedServiceWorker<?, ?, ?> serviceWorker;
48    
49    private final SendGlobalCommCache sendReducedValuesCache =
50        new SendGlobalCommCache(false);
51    
52    private final int maxBytesPerAggregatorRequest;
53  
54    
55  
56  
57  
58  
59  
60  
61    public NettyWorkerAggregatorRequestProcessor(
62        Progressable progressable,
63        ImmutableClassesGiraphConfiguration<?, ?, ?> configuration,
64        CentralizedServiceWorker<?, ?, ?> serviceWorker) {
65      this.serviceWorker = serviceWorker;
66      this.workerClient = serviceWorker.getWorkerClient();
67      this.progressable = progressable;
68      maxBytesPerAggregatorRequest = configuration.getInt(
69          AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST,
70          AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST_DEFAULT);
71  
72    }
73  
74    @Override
75    public boolean sendReducedValue(String name,
76        Writable reducedValue) throws IOException {
77      WorkerInfo owner =
78          AggregatorUtils.getOwner(name,
79              serviceWorker.getWorkerInfoList());
80      if (isThisWorker(owner)) {
81        return false;
82      } else {
83        int currentSize = sendReducedValuesCache.addValue(owner.getTaskId(),
84            name, GlobalCommType.REDUCED_VALUE, reducedValue);
85        if (currentSize >= maxBytesPerAggregatorRequest) {
86          flushAggregatorsToWorker(owner);
87        }
88        return true;
89      }
90    }
91  
92    @Override
93    public void flush() throws IOException {
94      for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
95        if (!isThisWorker(workerInfo)) {
96          sendReducedValuesCache.addSpecialCount(workerInfo.getTaskId());
97          flushAggregatorsToWorker(workerInfo);
98          progressable.progress();
99        }
100     }
101     sendReducedValuesCache.reset();
102   }
103 
104   
105 
106 
107 
108 
109   private void flushAggregatorsToWorker(WorkerInfo worker) {
110     byte[] data =
111         sendReducedValuesCache.removeSerialized(worker.getTaskId());
112     workerClient.sendWritableRequest(worker.getTaskId(),
113         new SendWorkerAggregatorsRequest(data,
114             serviceWorker.getWorkerInfo().getTaskId()));
115   }
116 
117   @Override
118   public void sendReducedValuesToMaster(byte[] data) throws IOException {
119     workerClient.sendWritableRequest(serviceWorker.getMasterInfo().getTaskId(),
120         new SendReducedToMasterRequest(data));
121   }
122 
123   @Override
124   public void distributeReducedValues(
125       Iterable<byte[]> aggregatorDataList) throws IOException {
126     for (byte[] aggregatorData : aggregatorDataList) {
127       for (WorkerInfo worker : serviceWorker.getWorkerInfoList()) {
128         if (!isThisWorker(worker)) {
129           SendAggregatorsToWorkerRequest request =
130               new SendAggregatorsToWorkerRequest(aggregatorData,
131                   serviceWorker.getWorkerInfo().getTaskId());
132           workerClient.sendWritableRequest(worker.getTaskId(), request);
133         }
134         progressable.progress();
135       }
136     }
137   }
138 
139   
140 
141 
142 
143 
144 
145   private boolean isThisWorker(WorkerInfo workerInfo) {
146     return serviceWorker.getWorkerInfo().getTaskId() == workerInfo.getTaskId();
147   }
148 }