This project has retired. For details please refer to its Attic page.
NettyWorkerClientRequestProcessor xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.comm.netty;
19  
20  import java.io.IOException;
21  import java.util.Iterator;
22  import java.util.Map;
23  import java.util.concurrent.ConcurrentMap;
24  
25  import org.apache.giraph.bsp.BspService;
26  import org.apache.giraph.bsp.CentralizedServiceWorker;
27  import org.apache.giraph.comm.SendEdgeCache;
28  import org.apache.giraph.comm.SendMessageCache;
29  import org.apache.giraph.comm.SendMutationsCache;
30  import org.apache.giraph.comm.SendOneMessageToManyCache;
31  import org.apache.giraph.comm.SendPartitionCache;
32  import org.apache.giraph.comm.ServerData;
33  import org.apache.giraph.comm.WorkerClient;
34  import org.apache.giraph.comm.WorkerClientRequestProcessor;
35  import org.apache.giraph.comm.messages.MessageStore;
36  import org.apache.giraph.comm.requests.SendPartitionCurrentMessagesRequest;
37  import org.apache.giraph.comm.requests.SendPartitionMutationsRequest;
38  import org.apache.giraph.comm.requests.SendVertexRequest;
39  import org.apache.giraph.comm.requests.SendWorkerEdgesRequest;
40  import org.apache.giraph.comm.requests.SendWorkerVerticesRequest;
41  import org.apache.giraph.comm.requests.WorkerRequest;
42  import org.apache.giraph.comm.requests.WritableRequest;
43  import org.apache.giraph.conf.GiraphConfiguration;
44  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
45  import org.apache.giraph.edge.Edge;
46  import org.apache.giraph.factories.MessageValueFactory;
47  import org.apache.giraph.graph.Vertex;
48  import org.apache.giraph.graph.VertexMutations;
49  import org.apache.giraph.metrics.GiraphMetrics;
50  import org.apache.giraph.metrics.MetricNames;
51  import org.apache.giraph.metrics.SuperstepMetricsRegistry;
52  import org.apache.giraph.partition.Partition;
53  import org.apache.giraph.partition.PartitionOwner;
54  import org.apache.giraph.utils.ByteArrayVertexIdMessages;
55  import org.apache.giraph.utils.ExtendedDataOutput;
56  import org.apache.giraph.utils.PairList;
57  import org.apache.giraph.utils.VertexIdEdges;
58  import org.apache.giraph.worker.WorkerInfo;
59  import org.apache.hadoop.io.Writable;
60  import org.apache.hadoop.io.WritableComparable;
61  import org.apache.hadoop.mapreduce.Mapper;
62  import org.apache.log4j.Logger;
63  
64  import com.yammer.metrics.core.Counter;
65  import com.yammer.metrics.core.Gauge;
66  import com.yammer.metrics.util.PercentGauge;
67  
68  /**
69   * Aggregate requests and sends them to the thread-safe NettyClient.  This
70   * class is not thread-safe and expected to be used and then thrown away after
71   * a phase of communication has completed.
72   *
73   * @param <I> Vertex id
74   * @param <V> Vertex data
75   * @param <E> Edge data
76   */
77  @SuppressWarnings("unchecked")
78  public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
79      V extends Writable, E extends Writable> implements
80      WorkerClientRequestProcessor<I, V, E> {
81    /** Class logger */
82    private static final Logger LOG =
83        Logger.getLogger(NettyWorkerClientRequestProcessor.class);
84    /** Cached partitions of vertices to send */
85    private final SendPartitionCache<I, V, E> sendPartitionCache;
86    /** Cached map of partitions to vertex indices to messages */
87    private final SendMessageCache<I, Writable> sendMessageCache;
88    /** Cache of edges to be sent. */
89    private final SendEdgeCache<I, E> sendEdgeCache;
90    /** Cached map of partitions to vertex indices to mutations */
91    private final SendMutationsCache<I, V, E> sendMutationsCache =
92        new SendMutationsCache<I, V, E>();
93    /** NettyClient that could be shared among one or more instances */
94    private final WorkerClient<I, V, E> workerClient;
95    /** Maximum size of messages per remote worker to cache before sending */
96    private final int maxMessagesSizePerWorker;
97    /** Maximum size of vertices per remote worker to cache before sending. */
98    private final int maxVerticesSizePerWorker;
99    /** Maximum size of edges per remote worker to cache before sending. */
100   private final int maxEdgesSizePerWorker;
101   /** Maximum number of mutations per partition before sending */
102   private final int maxMutationsPerPartition;
103   /** Giraph configuration */
104   private final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
105   /** Service worker */
106   private final CentralizedServiceWorker<I, V, E> serviceWorker;
107   /** Server data from the server (used for local requests) */
108   private final ServerData<I, V, E> serverData;
109 
110   // Per-Superstep Metrics
111   /** Number of requests that went on the wire */
112   private final Counter localRequests;
113   /** Number of requests that were handled locally */
114   private final Counter remoteRequests;
115   /** Cached message value factory */
116   private final MessageValueFactory messageValueFactory;
117 
118   /**
119    * Constructor.
120    *
121    * @param context Context
122    * @param conf Configuration
123    * @param serviceWorker Service worker
124    * @param useOneMessageToManyIdsEncoding should use one message to many
125    */
126   public NettyWorkerClientRequestProcessor(
127       Mapper<?, ?, ?, ?>.Context context,
128       ImmutableClassesGiraphConfiguration<I, V, E> conf,
129       CentralizedServiceWorker<I, V, E> serviceWorker,
130       boolean useOneMessageToManyIdsEncoding) {
131     this.workerClient = serviceWorker.getWorkerClient();
132     this.configuration = conf;
133 
134 
135     sendPartitionCache =
136         new SendPartitionCache<I, V, E>(conf, serviceWorker);
137     sendEdgeCache = new SendEdgeCache<I, E>(conf, serviceWorker);
138     maxMessagesSizePerWorker =
139         GiraphConfiguration.MAX_MSG_REQUEST_SIZE.get(conf);
140     maxVerticesSizePerWorker =
141         GiraphConfiguration.MAX_VERTEX_REQUEST_SIZE.get(conf);
142     if (useOneMessageToManyIdsEncoding) {
143       sendMessageCache =
144         new SendOneMessageToManyCache<I, Writable>(conf, serviceWorker,
145           this, maxMessagesSizePerWorker);
146     } else {
147       sendMessageCache =
148         new SendMessageCache<I, Writable>(conf, serviceWorker,
149           this, maxMessagesSizePerWorker);
150     }
151     maxEdgesSizePerWorker =
152         GiraphConfiguration.MAX_EDGE_REQUEST_SIZE.get(conf);
153     maxMutationsPerPartition =
154         GiraphConfiguration.MAX_MUTATIONS_PER_REQUEST.get(conf);
155     this.serviceWorker = serviceWorker;
156     this.serverData = serviceWorker.getServerData();
157 
158     // Per-Superstep Metrics.
159     // Since this object is not long lived we just initialize the metrics here.
160     SuperstepMetricsRegistry smr = GiraphMetrics.get().perSuperstep();
161     localRequests = smr.getCounter(MetricNames.LOCAL_REQUESTS);
162     remoteRequests = smr.getCounter(MetricNames.REMOTE_REQUESTS);
163     setupGauges(smr, localRequests, remoteRequests);
164     messageValueFactory = configuration.createOutgoingMessageValueFactory();
165   }
166 
167   @Override
168   public void sendMessageRequest(I destVertexId, Writable message) {
169     this.sendMessageCache.sendMessageRequest(destVertexId, message);
170   }
171 
172   @Override
173   public void sendMessageToAllRequest(
174     Vertex<I, V, E> vertex, Writable message) {
175     this.sendMessageCache.sendMessageToAllRequest(vertex, message);
176   }
177 
178   @Override
179   public void sendMessageToAllRequest(
180     Iterator<I> vertexIdIterator, Writable message) {
181     this.sendMessageCache.sendMessageToAllRequest(vertexIdIterator, message);
182   }
183 
184   @Override
185   public void sendPartitionRequest(WorkerInfo workerInfo,
186                                    Partition<I, V, E> partition) {
187     if (LOG.isTraceEnabled()) {
188       LOG.trace("sendVertexRequest: Sending to " + workerInfo +
189           ", with partition " + partition);
190     }
191 
192     WritableRequest vertexRequest = new SendVertexRequest<I, V, E>(partition);
193     doRequest(workerInfo, vertexRequest);
194 
195     // Messages are stored separately
196     if (serviceWorker.getSuperstep() != BspService.INPUT_SUPERSTEP) {
197       sendPartitionMessages(workerInfo, partition);
198       ConcurrentMap<I, VertexMutations<I, V, E>> vertexMutationMap =
199           serverData.getPartitionMutations().remove(partition.getId());
200       WritableRequest partitionMutationsRequest =
201           new SendPartitionMutationsRequest<I, V, E>(partition.getId(),
202               vertexMutationMap);
203       doRequest(workerInfo, partitionMutationsRequest);
204     }
205   }
206 
207   /**
208    * Send all messages for a partition to another worker.
209    *
210    * @param workerInfo Worker to send the partition messages to
211    * @param partition Partition whose messages to send
212    */
213   private void sendPartitionMessages(WorkerInfo workerInfo,
214                                      Partition<I, V, E> partition) {
215     final int partitionId = partition.getId();
216     MessageStore<I, Writable> messageStore =
217         serverData.getCurrentMessageStore();
218     ByteArrayVertexIdMessages<I, Writable> vertexIdMessages =
219         new ByteArrayVertexIdMessages<I, Writable>(
220             messageValueFactory);
221     vertexIdMessages.setConf(configuration);
222     vertexIdMessages.initialize();
223     for (I vertexId :
224         messageStore.getPartitionDestinationVertices(partitionId)) {
225       // Messages cannot be re-used from this iterable, but add()
226       // serializes the message, making this safe
227       Iterable<Writable> messages = messageStore.getVertexMessages(vertexId);
228       for (Writable message : messages) {
229         vertexIdMessages.add(vertexId, message);
230       }
231       if (vertexIdMessages.getSize() > maxMessagesSizePerWorker) {
232         WritableRequest messagesRequest =
233             new SendPartitionCurrentMessagesRequest<I, V, E, Writable>(
234             partitionId, vertexIdMessages);
235         doRequest(workerInfo, messagesRequest);
236         vertexIdMessages =
237             new ByteArrayVertexIdMessages<I, Writable>(
238                 messageValueFactory);
239         vertexIdMessages.setConf(configuration);
240         vertexIdMessages.initialize();
241       }
242     }
243     if (!vertexIdMessages.isEmpty()) {
244       WritableRequest messagesRequest = new
245           SendPartitionCurrentMessagesRequest<I, V, E, Writable>(
246           partitionId, vertexIdMessages);
247       doRequest(workerInfo, messagesRequest);
248     }
249     messageStore.clearPartition(partitionId);
250   }
251 
252   @Override
253   public boolean sendVertexRequest(PartitionOwner partitionOwner,
254       Vertex<I, V, E> vertex) {
255     // Add the vertex to the cache
256     int workerMessageSize = sendPartitionCache.addVertex(
257         partitionOwner, vertex);
258 
259     // Send a request if the cache of outgoing message to
260     // the remote worker 'workerInfo' is full enough to be flushed
261     if (workerMessageSize >= maxVerticesSizePerWorker) {
262       PairList<Integer, ExtendedDataOutput>
263           workerPartitionVertices =
264           sendPartitionCache.removeWorkerData(partitionOwner.getWorkerInfo());
265       WritableRequest writableRequest =
266           new SendWorkerVerticesRequest<I, V, E>(
267               configuration, workerPartitionVertices);
268       doRequest(partitionOwner.getWorkerInfo(), writableRequest);
269       return true;
270     }
271 
272     return false;
273   }
274 
275   @Override
276   public void addEdgeRequest(I vertexIndex, Edge<I, E> edge) throws
277       IOException {
278     PartitionOwner partitionOwner =
279         serviceWorker.getVertexPartitionOwner(vertexIndex);
280     int partitionId = partitionOwner.getPartitionId();
281     if (LOG.isTraceEnabled()) {
282       LOG.trace("addEdgeRequest: Sending edge " + edge + " for index " +
283           vertexIndex + " with partition " + partitionId);
284     }
285 
286     // Add the message to the cache
287     int partitionMutationCount =
288         sendMutationsCache.addEdgeMutation(partitionId, vertexIndex, edge);
289 
290     sendMutationsRequestIfFull(
291         partitionId, partitionOwner, partitionMutationCount);
292   }
293 
294   @Override
295   public boolean sendEdgeRequest(I sourceVertexId, Edge<I, E> edge)
296     throws IOException {
297     PartitionOwner owner =
298         serviceWorker.getVertexPartitionOwner(sourceVertexId);
299     WorkerInfo workerInfo = owner.getWorkerInfo();
300     final int partitionId = owner.getPartitionId();
301     if (LOG.isTraceEnabled()) {
302       LOG.trace("sendEdgeRequest: Send bytes (" + edge.toString() +
303           ") to " + sourceVertexId + " on worker " + workerInfo);
304     }
305 
306     // Add the message to the cache
307     int workerEdgesSize = sendEdgeCache.addEdge(
308         workerInfo, partitionId, sourceVertexId, edge);
309 
310     // Send a request if the cache of outgoing edges to the remote worker is
311     // full
312     if (workerEdgesSize >= maxEdgesSizePerWorker) {
313       PairList<Integer, VertexIdEdges<I, E>> workerEdges =
314           sendEdgeCache.removeWorkerEdges(workerInfo);
315       WritableRequest writableRequest =
316           new SendWorkerEdgesRequest<I, E>(workerEdges);
317       doRequest(workerInfo, writableRequest);
318       return true;
319     }
320 
321     return false;
322   }
323 
324   /**
325    * Send a mutations request if the maximum number of mutations per partition
326    * was met.
327    *
328    * @param partitionId Partition id
329    * @param partitionOwner Owner of the partition
330    * @param partitionMutationCount Number of mutations for this partition
331    */
332   private void sendMutationsRequestIfFull(
333       int partitionId, PartitionOwner partitionOwner,
334       int partitionMutationCount) {
335     // Send a request if enough mutations are there for a partition
336     if (partitionMutationCount >= maxMutationsPerPartition) {
337       Map<I, VertexMutations<I, V, E>> partitionMutations =
338           sendMutationsCache.removePartitionMutations(partitionId);
339       WritableRequest writableRequest =
340           new SendPartitionMutationsRequest<I, V, E>(
341               partitionId, partitionMutations);
342       doRequest(partitionOwner.getWorkerInfo(), writableRequest);
343     }
344   }
345 
346   @Override
347   public void removeEdgesRequest(I vertexIndex,
348                                  I destinationVertexIndex) throws IOException {
349     PartitionOwner partitionOwner =
350         serviceWorker.getVertexPartitionOwner(vertexIndex);
351     int partitionId = partitionOwner.getPartitionId();
352     if (LOG.isTraceEnabled()) {
353       LOG.trace("removeEdgesRequest: Removing edge " +
354           destinationVertexIndex +
355           " for index " + vertexIndex + " with partition " + partitionId);
356     }
357 
358     // Add the message to the cache
359     int partitionMutationCount =
360         sendMutationsCache.removeEdgeMutation(
361             partitionId, vertexIndex, destinationVertexIndex);
362 
363     sendMutationsRequestIfFull(
364         partitionId, partitionOwner, partitionMutationCount);
365   }
366 
367   @Override
368   public void addVertexRequest(Vertex<I, V, E> vertex) throws IOException {
369     PartitionOwner partitionOwner =
370         serviceWorker.getVertexPartitionOwner(vertex.getId());
371     int partitionId = partitionOwner.getPartitionId();
372     if (LOG.isTraceEnabled()) {
373       LOG.trace("addVertexRequest: Sending vertex " + vertex +
374           " to partition " + partitionId);
375     }
376 
377     // Add the message to the cache
378     int partitionMutationCount =
379         sendMutationsCache.addVertexMutation(partitionId, vertex);
380 
381     sendMutationsRequestIfFull(
382         partitionId, partitionOwner, partitionMutationCount);
383   }
384 
385   @Override
386   public void removeVertexRequest(I vertexIndex) throws IOException {
387     PartitionOwner partitionOwner =
388         serviceWorker.getVertexPartitionOwner(vertexIndex);
389     int partitionId = partitionOwner.getPartitionId();
390     if (LOG.isTraceEnabled()) {
391       LOG.trace("removeVertexRequest: Removing vertex index " +
392           vertexIndex + " from partition " + partitionId);
393     }
394 
395     // Add the message to the cache
396     int partitionMutationCount =
397         sendMutationsCache.removeVertexMutation(partitionId, vertexIndex);
398 
399     sendMutationsRequestIfFull(
400         partitionId, partitionOwner, partitionMutationCount);
401   }
402 
403   @Override
404   public void flush() throws IOException {
405     // Execute the remaining sends messages (if any)
406     // including individual and compact messages.
407     sendMessageCache.flush();
408 
409     // Execute the remaining sends vertices (if any)
410     PairList<WorkerInfo, PairList<Integer, ExtendedDataOutput>>
411         remainingVertexCache = sendPartitionCache.removeAllData();
412     PairList<WorkerInfo,
413         PairList<Integer, ExtendedDataOutput>>.Iterator
414         vertexIterator = remainingVertexCache.getIterator();
415     while (vertexIterator.hasNext()) {
416       vertexIterator.next();
417       WritableRequest writableRequest =
418           new SendWorkerVerticesRequest(
419               configuration, vertexIterator.getCurrentSecond());
420       doRequest(vertexIterator.getCurrentFirst(), writableRequest);
421     }
422 
423     // Execute the remaining sends edges (if any)
424     PairList<WorkerInfo, PairList<Integer,
425         VertexIdEdges<I, E>>>
426         remainingEdgeCache = sendEdgeCache.removeAllEdges();
427     PairList<WorkerInfo,
428         PairList<Integer, VertexIdEdges<I, E>>>.Iterator
429         edgeIterator = remainingEdgeCache.getIterator();
430     while (edgeIterator.hasNext()) {
431       edgeIterator.next();
432       WritableRequest writableRequest =
433           new SendWorkerEdgesRequest<I, E>(
434               edgeIterator.getCurrentSecond());
435       doRequest(edgeIterator.getCurrentFirst(), writableRequest);
436     }
437 
438     // Execute the remaining sends mutations (if any)
439     Map<Integer, Map<I, VertexMutations<I, V, E>>> remainingMutationsCache =
440         sendMutationsCache.removeAllPartitionMutations();
441     for (Map.Entry<Integer, Map<I, VertexMutations<I, V, E>>> entry :
442         remainingMutationsCache.entrySet()) {
443       WritableRequest writableRequest =
444           new SendPartitionMutationsRequest<I, V, E>(
445               entry.getKey(), entry.getValue());
446       PartitionOwner partitionOwner =
447           serviceWorker.getVertexPartitionOwner(
448               entry.getValue().keySet().iterator().next());
449       doRequest(partitionOwner.getWorkerInfo(), writableRequest);
450     }
451   }
452 
453   @Override
454   public long resetMessageCount() {
455     return this.sendMessageCache.resetMessageCount();
456   }
457 
458   @Override
459   public long resetMessageBytesCount() {
460     return this.sendMessageCache.resetMessageBytesCount();
461   }
462 
463   /**
464    * When doing the request, short circuit if it is local
465    *
466    * @param workerInfo Worker info
467    * @param writableRequest Request to either submit or run locally
468    */
469   public void doRequest(WorkerInfo workerInfo,
470                          WritableRequest writableRequest) {
471     // If this is local, execute locally
472     if (serviceWorker.getWorkerInfo().getTaskId() ==
473         workerInfo.getTaskId()) {
474       ((WorkerRequest) writableRequest).doRequest(serverData);
475       localRequests.inc();
476     } else {
477       workerClient.sendWritableRequest(
478           workerInfo.getTaskId(), writableRequest);
479       remoteRequests.inc();
480     }
481   }
482 
483   /**
484    * Sets up gauges for superstep metrics.
485    * This has to be static so that internal objects created here don't
486    * hold references to this$0. Otherwise we memory leaking
487    * NettyWorkerClientRequestProcessor objects.
488    *
489    * @param smr metric registry for current superstep
490    * @param localRequests counter for local requests
491    * @param remoteRequests counter for remote requests
492    */
493   private static void setupGauges(SuperstepMetricsRegistry smr,
494                                   final Counter localRequests,
495                                   final Counter remoteRequests) {
496     final Gauge<Long> totalRequests = smr.getGauge(MetricNames.TOTAL_REQUESTS,
497         new Gauge<Long>() {
498           @Override
499           public Long value() {
500             return localRequests.count() + remoteRequests.count();
501           }
502         }
503     );
504     smr.getGauge(MetricNames.PERCENT_LOCAL_REQUESTS, new PercentGauge() {
505       @Override protected double getNumerator() {
506         return localRequests.count();
507       }
508 
509       @Override protected double getDenominator() {
510         return totalRequests.value();
511       }
512     });
513   }
514 }