This project has retired. For details please refer to its
        
        Attic page.
      
 
NettyWorkerClientRequestProcessor xref
1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  package org.apache.giraph.comm.netty;
19  
20  import java.io.IOException;
21  import java.util.Iterator;
22  import java.util.Map;
23  import java.util.concurrent.ConcurrentMap;
24  
25  import org.apache.giraph.bsp.BspService;
26  import org.apache.giraph.bsp.CentralizedServiceWorker;
27  import org.apache.giraph.comm.SendEdgeCache;
28  import org.apache.giraph.comm.SendMessageCache;
29  import org.apache.giraph.comm.SendMutationsCache;
30  import org.apache.giraph.comm.SendOneMessageToManyCache;
31  import org.apache.giraph.comm.SendPartitionCache;
32  import org.apache.giraph.comm.ServerData;
33  import org.apache.giraph.comm.WorkerClient;
34  import org.apache.giraph.comm.WorkerClientRequestProcessor;
35  import org.apache.giraph.comm.messages.MessageStore;
36  import org.apache.giraph.comm.requests.SendPartitionCurrentMessagesRequest;
37  import org.apache.giraph.comm.requests.SendPartitionMutationsRequest;
38  import org.apache.giraph.comm.requests.SendVertexRequest;
39  import org.apache.giraph.comm.requests.SendWorkerEdgesRequest;
40  import org.apache.giraph.comm.requests.SendWorkerVerticesRequest;
41  import org.apache.giraph.comm.requests.WorkerRequest;
42  import org.apache.giraph.comm.requests.WritableRequest;
43  import org.apache.giraph.conf.GiraphConfiguration;
44  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
45  import org.apache.giraph.edge.Edge;
46  import org.apache.giraph.factories.MessageValueFactory;
47  import org.apache.giraph.graph.Vertex;
48  import org.apache.giraph.graph.VertexMutations;
49  import org.apache.giraph.metrics.GiraphMetrics;
50  import org.apache.giraph.metrics.MetricNames;
51  import org.apache.giraph.metrics.SuperstepMetricsRegistry;
52  import org.apache.giraph.partition.Partition;
53  import org.apache.giraph.partition.PartitionOwner;
54  import org.apache.giraph.utils.ByteArrayVertexIdMessages;
55  import org.apache.giraph.utils.ExtendedDataOutput;
56  import org.apache.giraph.utils.PairList;
57  import org.apache.giraph.utils.VertexIdEdges;
58  import org.apache.giraph.worker.WorkerInfo;
59  import org.apache.hadoop.io.Writable;
60  import org.apache.hadoop.io.WritableComparable;
61  import org.apache.hadoop.mapreduce.Mapper;
62  import org.apache.log4j.Logger;
63  
64  import com.yammer.metrics.core.Counter;
65  import com.yammer.metrics.core.Gauge;
66  import com.yammer.metrics.util.PercentGauge;
67  
68  
69  
70  
71  
72  
73  
74  
75  
76  
77  @SuppressWarnings("unchecked")
78  public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
79      V extends Writable, E extends Writable> implements
80      WorkerClientRequestProcessor<I, V, E> {
81    
82    private static final Logger LOG =
83        Logger.getLogger(NettyWorkerClientRequestProcessor.class);
84    
85    private final SendPartitionCache<I, V, E> sendPartitionCache;
86    
87    private final SendMessageCache<I, Writable> sendMessageCache;
88    
89    private final SendEdgeCache<I, E> sendEdgeCache;
90    
91    private final SendMutationsCache<I, V, E> sendMutationsCache =
92        new SendMutationsCache<I, V, E>();
93    
94    private final WorkerClient<I, V, E> workerClient;
95    
96    private final int maxMessagesSizePerWorker;
97    
98    private final int maxVerticesSizePerWorker;
99    
100   private final int maxEdgesSizePerWorker;
101   
102   private final int maxMutationsPerPartition;
103   
104   private final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
105   
106   private final CentralizedServiceWorker<I, V, E> serviceWorker;
107   
108   private final ServerData<I, V, E> serverData;
109 
110   
111   
112   private final Counter localRequests;
113   
114   private final Counter remoteRequests;
115   
116   private final MessageValueFactory messageValueFactory;
117 
118   
119 
120 
121 
122 
123 
124 
125 
126   public NettyWorkerClientRequestProcessor(
127       Mapper<?, ?, ?, ?>.Context context,
128       ImmutableClassesGiraphConfiguration<I, V, E> conf,
129       CentralizedServiceWorker<I, V, E> serviceWorker,
130       boolean useOneMessageToManyIdsEncoding) {
131     this.workerClient = serviceWorker.getWorkerClient();
132     this.configuration = conf;
133 
134 
135     sendPartitionCache =
136         new SendPartitionCache<I, V, E>(conf, serviceWorker);
137     sendEdgeCache = new SendEdgeCache<I, E>(conf, serviceWorker);
138     maxMessagesSizePerWorker =
139         GiraphConfiguration.MAX_MSG_REQUEST_SIZE.get(conf);
140     maxVerticesSizePerWorker =
141         GiraphConfiguration.MAX_VERTEX_REQUEST_SIZE.get(conf);
142     if (useOneMessageToManyIdsEncoding) {
143       sendMessageCache =
144         new SendOneMessageToManyCache<I, Writable>(conf, serviceWorker,
145           this, maxMessagesSizePerWorker);
146     } else {
147       sendMessageCache =
148         new SendMessageCache<I, Writable>(conf, serviceWorker,
149           this, maxMessagesSizePerWorker);
150     }
151     maxEdgesSizePerWorker =
152         GiraphConfiguration.MAX_EDGE_REQUEST_SIZE.get(conf);
153     maxMutationsPerPartition =
154         GiraphConfiguration.MAX_MUTATIONS_PER_REQUEST.get(conf);
155     this.serviceWorker = serviceWorker;
156     this.serverData = serviceWorker.getServerData();
157 
158     
159     
160     SuperstepMetricsRegistry smr = GiraphMetrics.get().perSuperstep();
161     localRequests = smr.getCounter(MetricNames.LOCAL_REQUESTS);
162     remoteRequests = smr.getCounter(MetricNames.REMOTE_REQUESTS);
163     setupGauges(smr, localRequests, remoteRequests);
164     messageValueFactory = configuration.createOutgoingMessageValueFactory();
165   }
166 
167   @Override
168   public void sendMessageRequest(I destVertexId, Writable message) {
169     this.sendMessageCache.sendMessageRequest(destVertexId, message);
170   }
171 
172   @Override
173   public void sendMessageToAllRequest(
174     Vertex<I, V, E> vertex, Writable message) {
175     this.sendMessageCache.sendMessageToAllRequest(vertex, message);
176   }
177 
178   @Override
179   public void sendMessageToAllRequest(
180     Iterator<I> vertexIdIterator, Writable message) {
181     this.sendMessageCache.sendMessageToAllRequest(vertexIdIterator, message);
182   }
183 
184   @Override
185   public void sendPartitionRequest(WorkerInfo workerInfo,
186                                    Partition<I, V, E> partition) {
187     if (LOG.isTraceEnabled()) {
188       LOG.trace("sendVertexRequest: Sending to " + workerInfo +
189           ", with partition " + partition);
190     }
191 
192     WritableRequest vertexRequest = new SendVertexRequest<I, V, E>(partition);
193     doRequest(workerInfo, vertexRequest);
194 
195     
196     if (serviceWorker.getSuperstep() != BspService.INPUT_SUPERSTEP) {
197       sendPartitionMessages(workerInfo, partition);
198       ConcurrentMap<I, VertexMutations<I, V, E>> vertexMutationMap =
199           serverData.getPartitionMutations().remove(partition.getId());
200       WritableRequest partitionMutationsRequest =
201           new SendPartitionMutationsRequest<I, V, E>(partition.getId(),
202               vertexMutationMap);
203       doRequest(workerInfo, partitionMutationsRequest);
204     }
205   }
206 
207   
208 
209 
210 
211 
212 
213   private void sendPartitionMessages(WorkerInfo workerInfo,
214                                      Partition<I, V, E> partition) {
215     final int partitionId = partition.getId();
216     MessageStore<I, Writable> messageStore =
217         serverData.getCurrentMessageStore();
218     ByteArrayVertexIdMessages<I, Writable> vertexIdMessages =
219         new ByteArrayVertexIdMessages<I, Writable>(
220             messageValueFactory);
221     vertexIdMessages.setConf(configuration);
222     vertexIdMessages.initialize();
223     for (I vertexId :
224         messageStore.getPartitionDestinationVertices(partitionId)) {
225       
226       
227       Iterable<Writable> messages = messageStore.getVertexMessages(vertexId);
228       for (Writable message : messages) {
229         vertexIdMessages.add(vertexId, message);
230       }
231       if (vertexIdMessages.getSize() > maxMessagesSizePerWorker) {
232         WritableRequest messagesRequest =
233             new SendPartitionCurrentMessagesRequest<I, V, E, Writable>(
234             partitionId, vertexIdMessages);
235         doRequest(workerInfo, messagesRequest);
236         vertexIdMessages =
237             new ByteArrayVertexIdMessages<I, Writable>(
238                 messageValueFactory);
239         vertexIdMessages.setConf(configuration);
240         vertexIdMessages.initialize();
241       }
242     }
243     if (!vertexIdMessages.isEmpty()) {
244       WritableRequest messagesRequest = new
245           SendPartitionCurrentMessagesRequest<I, V, E, Writable>(
246           partitionId, vertexIdMessages);
247       doRequest(workerInfo, messagesRequest);
248     }
249     messageStore.clearPartition(partitionId);
250   }
251 
252   @Override
253   public boolean sendVertexRequest(PartitionOwner partitionOwner,
254       Vertex<I, V, E> vertex) {
255     
256     int workerMessageSize = sendPartitionCache.addVertex(
257         partitionOwner, vertex);
258 
259     
260     
261     if (workerMessageSize >= maxVerticesSizePerWorker) {
262       PairList<Integer, ExtendedDataOutput>
263           workerPartitionVertices =
264           sendPartitionCache.removeWorkerData(partitionOwner.getWorkerInfo());
265       WritableRequest writableRequest =
266           new SendWorkerVerticesRequest<I, V, E>(
267               configuration, workerPartitionVertices);
268       doRequest(partitionOwner.getWorkerInfo(), writableRequest);
269       return true;
270     }
271 
272     return false;
273   }
274 
275   @Override
276   public void addEdgeRequest(I vertexIndex, Edge<I, E> edge) throws
277       IOException {
278     PartitionOwner partitionOwner =
279         serviceWorker.getVertexPartitionOwner(vertexIndex);
280     int partitionId = partitionOwner.getPartitionId();
281     if (LOG.isTraceEnabled()) {
282       LOG.trace("addEdgeRequest: Sending edge " + edge + " for index " +
283           vertexIndex + " with partition " + partitionId);
284     }
285 
286     
287     int partitionMutationCount =
288         sendMutationsCache.addEdgeMutation(partitionId, vertexIndex, edge);
289 
290     sendMutationsRequestIfFull(
291         partitionId, partitionOwner, partitionMutationCount);
292   }
293 
294   @Override
295   public boolean sendEdgeRequest(I sourceVertexId, Edge<I, E> edge)
296     throws IOException {
297     PartitionOwner owner =
298         serviceWorker.getVertexPartitionOwner(sourceVertexId);
299     WorkerInfo workerInfo = owner.getWorkerInfo();
300     final int partitionId = owner.getPartitionId();
301     if (LOG.isTraceEnabled()) {
302       LOG.trace("sendEdgeRequest: Send bytes (" + edge.toString() +
303           ") to " + sourceVertexId + " on worker " + workerInfo);
304     }
305 
306     
307     int workerEdgesSize = sendEdgeCache.addEdge(
308         workerInfo, partitionId, sourceVertexId, edge);
309 
310     
311     
312     if (workerEdgesSize >= maxEdgesSizePerWorker) {
313       PairList<Integer, VertexIdEdges<I, E>> workerEdges =
314           sendEdgeCache.removeWorkerEdges(workerInfo);
315       WritableRequest writableRequest =
316           new SendWorkerEdgesRequest<I, E>(workerEdges);
317       doRequest(workerInfo, writableRequest);
318       return true;
319     }
320 
321     return false;
322   }
323 
324   
325 
326 
327 
328 
329 
330 
331 
332   private void sendMutationsRequestIfFull(
333       int partitionId, PartitionOwner partitionOwner,
334       int partitionMutationCount) {
335     
336     if (partitionMutationCount >= maxMutationsPerPartition) {
337       Map<I, VertexMutations<I, V, E>> partitionMutations =
338           sendMutationsCache.removePartitionMutations(partitionId);
339       WritableRequest writableRequest =
340           new SendPartitionMutationsRequest<I, V, E>(
341               partitionId, partitionMutations);
342       doRequest(partitionOwner.getWorkerInfo(), writableRequest);
343     }
344   }
345 
346   @Override
347   public void removeEdgesRequest(I vertexIndex,
348                                  I destinationVertexIndex) throws IOException {
349     PartitionOwner partitionOwner =
350         serviceWorker.getVertexPartitionOwner(vertexIndex);
351     int partitionId = partitionOwner.getPartitionId();
352     if (LOG.isTraceEnabled()) {
353       LOG.trace("removeEdgesRequest: Removing edge " +
354           destinationVertexIndex +
355           " for index " + vertexIndex + " with partition " + partitionId);
356     }
357 
358     
359     int partitionMutationCount =
360         sendMutationsCache.removeEdgeMutation(
361             partitionId, vertexIndex, destinationVertexIndex);
362 
363     sendMutationsRequestIfFull(
364         partitionId, partitionOwner, partitionMutationCount);
365   }
366 
367   @Override
368   public void addVertexRequest(Vertex<I, V, E> vertex) throws IOException {
369     PartitionOwner partitionOwner =
370         serviceWorker.getVertexPartitionOwner(vertex.getId());
371     int partitionId = partitionOwner.getPartitionId();
372     if (LOG.isTraceEnabled()) {
373       LOG.trace("addVertexRequest: Sending vertex " + vertex +
374           " to partition " + partitionId);
375     }
376 
377     
378     int partitionMutationCount =
379         sendMutationsCache.addVertexMutation(partitionId, vertex);
380 
381     sendMutationsRequestIfFull(
382         partitionId, partitionOwner, partitionMutationCount);
383   }
384 
385   @Override
386   public void removeVertexRequest(I vertexIndex) throws IOException {
387     PartitionOwner partitionOwner =
388         serviceWorker.getVertexPartitionOwner(vertexIndex);
389     int partitionId = partitionOwner.getPartitionId();
390     if (LOG.isTraceEnabled()) {
391       LOG.trace("removeVertexRequest: Removing vertex index " +
392           vertexIndex + " from partition " + partitionId);
393     }
394 
395     
396     int partitionMutationCount =
397         sendMutationsCache.removeVertexMutation(partitionId, vertexIndex);
398 
399     sendMutationsRequestIfFull(
400         partitionId, partitionOwner, partitionMutationCount);
401   }
402 
403   @Override
404   public void flush() throws IOException {
405     
406     
407     sendMessageCache.flush();
408 
409     
410     PairList<WorkerInfo, PairList<Integer, ExtendedDataOutput>>
411         remainingVertexCache = sendPartitionCache.removeAllData();
412     PairList<WorkerInfo,
413         PairList<Integer, ExtendedDataOutput>>.Iterator
414         vertexIterator = remainingVertexCache.getIterator();
415     while (vertexIterator.hasNext()) {
416       vertexIterator.next();
417       WritableRequest writableRequest =
418           new SendWorkerVerticesRequest(
419               configuration, vertexIterator.getCurrentSecond());
420       doRequest(vertexIterator.getCurrentFirst(), writableRequest);
421     }
422 
423     
424     PairList<WorkerInfo, PairList<Integer,
425         VertexIdEdges<I, E>>>
426         remainingEdgeCache = sendEdgeCache.removeAllEdges();
427     PairList<WorkerInfo,
428         PairList<Integer, VertexIdEdges<I, E>>>.Iterator
429         edgeIterator = remainingEdgeCache.getIterator();
430     while (edgeIterator.hasNext()) {
431       edgeIterator.next();
432       WritableRequest writableRequest =
433           new SendWorkerEdgesRequest<I, E>(
434               edgeIterator.getCurrentSecond());
435       doRequest(edgeIterator.getCurrentFirst(), writableRequest);
436     }
437 
438     
439     Map<Integer, Map<I, VertexMutations<I, V, E>>> remainingMutationsCache =
440         sendMutationsCache.removeAllPartitionMutations();
441     for (Map.Entry<Integer, Map<I, VertexMutations<I, V, E>>> entry :
442         remainingMutationsCache.entrySet()) {
443       WritableRequest writableRequest =
444           new SendPartitionMutationsRequest<I, V, E>(
445               entry.getKey(), entry.getValue());
446       PartitionOwner partitionOwner =
447           serviceWorker.getVertexPartitionOwner(
448               entry.getValue().keySet().iterator().next());
449       doRequest(partitionOwner.getWorkerInfo(), writableRequest);
450     }
451   }
452 
453   @Override
454   public long resetMessageCount() {
455     return this.sendMessageCache.resetMessageCount();
456   }
457 
458   @Override
459   public long resetMessageBytesCount() {
460     return this.sendMessageCache.resetMessageBytesCount();
461   }
462 
463   
464 
465 
466 
467 
468 
469   public void doRequest(WorkerInfo workerInfo,
470                          WritableRequest writableRequest) {
471     
472     if (serviceWorker.getWorkerInfo().getTaskId() ==
473         workerInfo.getTaskId()) {
474       ((WorkerRequest) writableRequest).doRequest(serverData);
475       localRequests.inc();
476     } else {
477       workerClient.sendWritableRequest(
478           workerInfo.getTaskId(), writableRequest);
479       remoteRequests.inc();
480     }
481   }
482 
483   
484 
485 
486 
487 
488 
489 
490 
491 
492 
493   private static void setupGauges(SuperstepMetricsRegistry smr,
494                                   final Counter localRequests,
495                                   final Counter remoteRequests) {
496     final Gauge<Long> totalRequests = smr.getGauge(MetricNames.TOTAL_REQUESTS,
497         new Gauge<Long>() {
498           @Override
499           public Long value() {
500             return localRequests.count() + remoteRequests.count();
501           }
502         }
503     );
504     smr.getGauge(MetricNames.PERCENT_LOCAL_REQUESTS, new PercentGauge() {
505       @Override protected double getNumerator() {
506         return localRequests.count();
507       }
508 
509       @Override protected double getDenominator() {
510         return totalRequests.value();
511       }
512     });
513   }
514 }