View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.comm.netty;
19  
20  import java.io.IOException;
21  import java.util.Iterator;
22  import java.util.Map;
23  import java.util.concurrent.ConcurrentMap;
24  
25  import org.apache.giraph.bsp.BspService;
26  import org.apache.giraph.bsp.CentralizedServiceWorker;
27  import org.apache.giraph.comm.SendEdgeCache;
28  import org.apache.giraph.comm.SendMessageCache;
29  import org.apache.giraph.comm.SendMutationsCache;
30  import org.apache.giraph.comm.SendOneMessageToManyCache;
31  import org.apache.giraph.comm.SendPartitionCache;
32  import org.apache.giraph.comm.ServerData;
33  import org.apache.giraph.comm.WorkerClient;
34  import org.apache.giraph.comm.WorkerClientRequestProcessor;
35  import org.apache.giraph.comm.messages.MessageStore;
36  import org.apache.giraph.comm.requests.SendPartitionCurrentMessagesRequest;
37  import org.apache.giraph.comm.requests.SendPartitionMutationsRequest;
38  import org.apache.giraph.comm.requests.SendVertexRequest;
39  import org.apache.giraph.comm.requests.SendWorkerEdgesRequest;
40  import org.apache.giraph.comm.requests.SendWorkerVerticesRequest;
41  import org.apache.giraph.comm.requests.WorkerRequest;
42  import org.apache.giraph.comm.requests.WritableRequest;
43  import org.apache.giraph.conf.GiraphConfiguration;
44  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
45  import org.apache.giraph.edge.Edge;
46  import org.apache.giraph.graph.Vertex;
47  import org.apache.giraph.graph.VertexMutations;
48  import org.apache.giraph.metrics.GiraphMetrics;
49  import org.apache.giraph.metrics.MetricNames;
50  import org.apache.giraph.metrics.SuperstepMetricsRegistry;
51  import org.apache.giraph.partition.Partition;
52  import org.apache.giraph.partition.PartitionOwner;
53  import org.apache.giraph.utils.ByteArrayVertexIdMessages;
54  import org.apache.giraph.utils.ExtendedDataOutput;
55  import org.apache.giraph.utils.PairList;
56  import org.apache.giraph.utils.VertexIdEdges;
57  import org.apache.giraph.worker.WorkerInfo;
58  import org.apache.hadoop.io.Writable;
59  import org.apache.hadoop.io.WritableComparable;
60  import org.apache.hadoop.mapreduce.Mapper;
61  import org.apache.log4j.Logger;
62  
63  import com.yammer.metrics.core.Counter;
64  import com.yammer.metrics.core.Gauge;
65  import com.yammer.metrics.util.PercentGauge;
66  
67  /**
68   * Aggregate requests and sends them to the thread-safe NettyClient.  This
69   * class is not thread-safe and expected to be used and then thrown away after
70   * a phase of communication has completed.
71   *
72   * @param <I> Vertex id
73   * @param <V> Vertex data
74   * @param <E> Edge data
75   */
76  @SuppressWarnings("unchecked")
77  public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
78      V extends Writable, E extends Writable> implements
79      WorkerClientRequestProcessor<I, V, E> {
80    /** Class logger */
81    private static final Logger LOG =
82        Logger.getLogger(NettyWorkerClientRequestProcessor.class);
83    /** Cached partitions of vertices to send */
84    private final SendPartitionCache<I, V, E> sendPartitionCache;
85    /** Cached map of partitions to vertex indices to messages */
86    private final SendMessageCache<I, Writable> sendMessageCache;
87    /** Cache of edges to be sent. */
88    private final SendEdgeCache<I, E> sendEdgeCache;
89    /** Cached map of partitions to vertex indices to mutations */
90    private final SendMutationsCache<I, V, E> sendMutationsCache =
91        new SendMutationsCache<I, V, E>();
92    /** NettyClient that could be shared among one or more instances */
93    private final WorkerClient<I, V, E> workerClient;
94    /** Maximum size of messages per remote worker to cache before sending */
95    private final int maxMessagesSizePerWorker;
96    /** Maximum size of vertices per remote worker to cache before sending. */
97    private final int maxVerticesSizePerWorker;
98    /** Maximum size of edges per remote worker to cache before sending. */
99    private final int maxEdgesSizePerWorker;
100   /** Maximum number of mutations per partition before sending */
101   private final int maxMutationsPerPartition;
102   /** Giraph configuration */
103   private final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
104   /** Service worker */
105   private final CentralizedServiceWorker<I, V, E> serviceWorker;
106   /** Server data from the server (used for local requests) */
107   private final ServerData<I, V, E> serverData;
108 
109   // Per-Superstep Metrics
110   /** Number of requests that went on the wire */
111   private final Counter localRequests;
112   /** Number of requests that were handled locally */
113   private final Counter remoteRequests;
114 
115   /**
116    * Constructor.
117    *
118    * @param context Context
119    * @param conf Configuration
120    * @param serviceWorker Service worker
121    * @param useOneMessageToManyIdsEncoding should use one message to many
122    */
123   public NettyWorkerClientRequestProcessor(
124       Mapper<?, ?, ?, ?>.Context context,
125       ImmutableClassesGiraphConfiguration<I, V, E> conf,
126       CentralizedServiceWorker<I, V, E> serviceWorker,
127       boolean useOneMessageToManyIdsEncoding) {
128     this.workerClient = serviceWorker.getWorkerClient();
129     this.configuration = conf;
130 
131 
132     sendPartitionCache =
133         new SendPartitionCache<I, V, E>(conf, serviceWorker);
134     sendEdgeCache = new SendEdgeCache<I, E>(conf, serviceWorker);
135     maxMessagesSizePerWorker =
136         GiraphConfiguration.MAX_MSG_REQUEST_SIZE.get(conf);
137     maxVerticesSizePerWorker =
138         GiraphConfiguration.MAX_VERTEX_REQUEST_SIZE.get(conf);
139     if (useOneMessageToManyIdsEncoding) {
140       sendMessageCache =
141         new SendOneMessageToManyCache<I, Writable>(conf, serviceWorker,
142           this, maxMessagesSizePerWorker);
143     } else {
144       sendMessageCache =
145         new SendMessageCache<I, Writable>(conf, serviceWorker,
146           this, maxMessagesSizePerWorker);
147     }
148     maxEdgesSizePerWorker =
149         GiraphConfiguration.MAX_EDGE_REQUEST_SIZE.get(conf);
150     maxMutationsPerPartition =
151         GiraphConfiguration.MAX_MUTATIONS_PER_REQUEST.get(conf);
152     this.serviceWorker = serviceWorker;
153     this.serverData = serviceWorker.getServerData();
154 
155     // Per-Superstep Metrics.
156     // Since this object is not long lived we just initialize the metrics here.
157     SuperstepMetricsRegistry smr = GiraphMetrics.get().perSuperstep();
158     localRequests = smr.getCounter(MetricNames.LOCAL_REQUESTS);
159     remoteRequests = smr.getCounter(MetricNames.REMOTE_REQUESTS);
160     setupGauges(smr, localRequests, remoteRequests);
161   }
162 
163   @Override
164   public void sendMessageRequest(I destVertexId, Writable message) {
165     this.sendMessageCache.sendMessageRequest(destVertexId, message);
166   }
167 
168   @Override
169   public void sendMessageToAllRequest(
170     Vertex<I, V, E> vertex, Writable message) {
171     this.sendMessageCache.sendMessageToAllRequest(vertex, message);
172   }
173 
174   @Override
175   public void sendMessageToAllRequest(
176     Iterator<I> vertexIdIterator, Writable message) {
177     this.sendMessageCache.sendMessageToAllRequest(vertexIdIterator, message);
178   }
179 
180   @Override
181   public void sendPartitionRequest(WorkerInfo workerInfo,
182                                    Partition<I, V, E> partition) {
183     if (LOG.isTraceEnabled()) {
184       LOG.trace("sendVertexRequest: Sending to " + workerInfo +
185           ", with partition " + partition);
186     }
187 
188     WritableRequest vertexRequest = new SendVertexRequest<I, V, E>(partition);
189     doRequest(workerInfo, vertexRequest);
190 
191     // Messages are stored separately
192     if (serviceWorker.getSuperstep() != BspService.INPUT_SUPERSTEP) {
193       sendPartitionMessages(workerInfo, partition);
194       ConcurrentMap<I, VertexMutations<I, V, E>> vertexMutationMap =
195           serverData.getPartitionMutations().remove(partition.getId());
196       WritableRequest partitionMutationsRequest =
197           new SendPartitionMutationsRequest<I, V, E>(partition.getId(),
198               vertexMutationMap);
199       doRequest(workerInfo, partitionMutationsRequest);
200     }
201   }
202 
203   /**
204    * Send all messages for a partition to another worker.
205    *
206    * @param workerInfo Worker to send the partition messages to
207    * @param partition Partition whose messages to send
208    */
209   private void sendPartitionMessages(WorkerInfo workerInfo,
210                                      Partition<I, V, E> partition) {
211     final int partitionId = partition.getId();
212     MessageStore<I, Writable> messageStore =
213         serverData.getCurrentMessageStore();
214     ByteArrayVertexIdMessages<I, Writable> vertexIdMessages =
215         new ByteArrayVertexIdMessages<I, Writable>(
216             configuration.createOutgoingMessageValueFactory());
217     vertexIdMessages.setConf(configuration);
218     vertexIdMessages.initialize();
219     for (I vertexId :
220         messageStore.getPartitionDestinationVertices(partitionId)) {
221       // Messages cannot be re-used from this iterable, but add()
222       // serializes the message, making this safe
223       Iterable<Writable> messages = messageStore.getVertexMessages(vertexId);
224       for (Writable message : messages) {
225         vertexIdMessages.add(vertexId, message);
226       }
227       if (vertexIdMessages.getSize() > maxMessagesSizePerWorker) {
228         WritableRequest messagesRequest =
229             new SendPartitionCurrentMessagesRequest<I, V, E, Writable>(
230             partitionId, vertexIdMessages);
231         doRequest(workerInfo, messagesRequest);
232         vertexIdMessages =
233             new ByteArrayVertexIdMessages<I, Writable>(
234                 configuration.createOutgoingMessageValueFactory());
235         vertexIdMessages.setConf(configuration);
236         vertexIdMessages.initialize();
237       }
238     }
239     if (!vertexIdMessages.isEmpty()) {
240       WritableRequest messagesRequest = new
241           SendPartitionCurrentMessagesRequest<I, V, E, Writable>(
242           partitionId, vertexIdMessages);
243       doRequest(workerInfo, messagesRequest);
244     }
245     messageStore.clearPartition(partitionId);
246   }
247 
248   @Override
249   public boolean sendVertexRequest(PartitionOwner partitionOwner,
250       Vertex<I, V, E> vertex) {
251     // Add the vertex to the cache
252     int workerMessageSize = sendPartitionCache.addVertex(
253         partitionOwner, vertex);
254 
255     // Send a request if the cache of outgoing message to
256     // the remote worker 'workerInfo' is full enough to be flushed
257     if (workerMessageSize >= maxVerticesSizePerWorker) {
258       PairList<Integer, ExtendedDataOutput>
259           workerPartitionVertices =
260           sendPartitionCache.removeWorkerData(partitionOwner.getWorkerInfo());
261       WritableRequest writableRequest =
262           new SendWorkerVerticesRequest<I, V, E>(
263               configuration, workerPartitionVertices);
264       doRequest(partitionOwner.getWorkerInfo(), writableRequest);
265       return true;
266     }
267 
268     return false;
269   }
270 
271   @Override
272   public void addEdgeRequest(I vertexIndex, Edge<I, E> edge) throws
273       IOException {
274     PartitionOwner partitionOwner =
275         serviceWorker.getVertexPartitionOwner(vertexIndex);
276     int partitionId = partitionOwner.getPartitionId();
277     if (LOG.isTraceEnabled()) {
278       LOG.trace("addEdgeRequest: Sending edge " + edge + " for index " +
279           vertexIndex + " with partition " + partitionId);
280     }
281 
282     // Add the message to the cache
283     int partitionMutationCount =
284         sendMutationsCache.addEdgeMutation(partitionId, vertexIndex, edge);
285 
286     sendMutationsRequestIfFull(
287         partitionId, partitionOwner, partitionMutationCount);
288   }
289 
290   @Override
291   public boolean sendEdgeRequest(I sourceVertexId, Edge<I, E> edge)
292     throws IOException {
293     PartitionOwner owner =
294         serviceWorker.getVertexPartitionOwner(sourceVertexId);
295     WorkerInfo workerInfo = owner.getWorkerInfo();
296     final int partitionId = owner.getPartitionId();
297     if (LOG.isTraceEnabled()) {
298       LOG.trace("sendEdgeRequest: Send bytes (" + edge.toString() +
299           ") to " + sourceVertexId + " on worker " + workerInfo);
300     }
301 
302     // Add the message to the cache
303     int workerEdgesSize = sendEdgeCache.addEdge(
304         workerInfo, partitionId, sourceVertexId, edge);
305 
306     // Send a request if the cache of outgoing edges to the remote worker is
307     // full
308     if (workerEdgesSize >= maxEdgesSizePerWorker) {
309       PairList<Integer, VertexIdEdges<I, E>> workerEdges =
310           sendEdgeCache.removeWorkerEdges(workerInfo);
311       WritableRequest writableRequest =
312           new SendWorkerEdgesRequest<I, E>(workerEdges);
313       doRequest(workerInfo, writableRequest);
314       return true;
315     }
316 
317     return false;
318   }
319 
320   /**
321    * Send a mutations request if the maximum number of mutations per partition
322    * was met.
323    *
324    * @param partitionId Partition id
325    * @param partitionOwner Owner of the partition
326    * @param partitionMutationCount Number of mutations for this partition
327    */
328   private void sendMutationsRequestIfFull(
329       int partitionId, PartitionOwner partitionOwner,
330       int partitionMutationCount) {
331     // Send a request if enough mutations are there for a partition
332     if (partitionMutationCount >= maxMutationsPerPartition) {
333       Map<I, VertexMutations<I, V, E>> partitionMutations =
334           sendMutationsCache.removePartitionMutations(partitionId);
335       WritableRequest writableRequest =
336           new SendPartitionMutationsRequest<I, V, E>(
337               partitionId, partitionMutations);
338       doRequest(partitionOwner.getWorkerInfo(), writableRequest);
339     }
340   }
341 
342   @Override
343   public void removeEdgesRequest(I vertexIndex,
344                                  I destinationVertexIndex) throws IOException {
345     PartitionOwner partitionOwner =
346         serviceWorker.getVertexPartitionOwner(vertexIndex);
347     int partitionId = partitionOwner.getPartitionId();
348     if (LOG.isTraceEnabled()) {
349       LOG.trace("removeEdgesRequest: Removing edge " +
350           destinationVertexIndex +
351           " for index " + vertexIndex + " with partition " + partitionId);
352     }
353 
354     // Add the message to the cache
355     int partitionMutationCount =
356         sendMutationsCache.removeEdgeMutation(
357             partitionId, vertexIndex, destinationVertexIndex);
358 
359     sendMutationsRequestIfFull(
360         partitionId, partitionOwner, partitionMutationCount);
361   }
362 
363   @Override
364   public void addVertexRequest(Vertex<I, V, E> vertex) throws IOException {
365     PartitionOwner partitionOwner =
366         serviceWorker.getVertexPartitionOwner(vertex.getId());
367     int partitionId = partitionOwner.getPartitionId();
368     if (LOG.isTraceEnabled()) {
369       LOG.trace("addVertexRequest: Sending vertex " + vertex +
370           " to partition " + partitionId);
371     }
372 
373     // Add the message to the cache
374     int partitionMutationCount =
375         sendMutationsCache.addVertexMutation(partitionId, vertex);
376 
377     sendMutationsRequestIfFull(
378         partitionId, partitionOwner, partitionMutationCount);
379   }
380 
381   @Override
382   public void removeVertexRequest(I vertexIndex) throws IOException {
383     PartitionOwner partitionOwner =
384         serviceWorker.getVertexPartitionOwner(vertexIndex);
385     int partitionId = partitionOwner.getPartitionId();
386     if (LOG.isTraceEnabled()) {
387       LOG.trace("removeVertexRequest: Removing vertex index " +
388           vertexIndex + " from partition " + partitionId);
389     }
390 
391     // Add the message to the cache
392     int partitionMutationCount =
393         sendMutationsCache.removeVertexMutation(partitionId, vertexIndex);
394 
395     sendMutationsRequestIfFull(
396         partitionId, partitionOwner, partitionMutationCount);
397   }
398 
399   @Override
400   public void flush() throws IOException {
401     // Execute the remaining sends messages (if any)
402     // including individual and compact messages.
403     sendMessageCache.flush();
404 
405     // Execute the remaining sends vertices (if any)
406     PairList<WorkerInfo, PairList<Integer, ExtendedDataOutput>>
407         remainingVertexCache = sendPartitionCache.removeAllData();
408     PairList<WorkerInfo,
409         PairList<Integer, ExtendedDataOutput>>.Iterator
410         vertexIterator = remainingVertexCache.getIterator();
411     while (vertexIterator.hasNext()) {
412       vertexIterator.next();
413       WritableRequest writableRequest =
414           new SendWorkerVerticesRequest(
415               configuration, vertexIterator.getCurrentSecond());
416       doRequest(vertexIterator.getCurrentFirst(), writableRequest);
417     }
418 
419     // Execute the remaining sends edges (if any)
420     PairList<WorkerInfo, PairList<Integer,
421         VertexIdEdges<I, E>>>
422         remainingEdgeCache = sendEdgeCache.removeAllEdges();
423     PairList<WorkerInfo,
424         PairList<Integer, VertexIdEdges<I, E>>>.Iterator
425         edgeIterator = remainingEdgeCache.getIterator();
426     while (edgeIterator.hasNext()) {
427       edgeIterator.next();
428       WritableRequest writableRequest =
429           new SendWorkerEdgesRequest<I, E>(
430               edgeIterator.getCurrentSecond());
431       doRequest(edgeIterator.getCurrentFirst(), writableRequest);
432     }
433 
434     // Execute the remaining sends mutations (if any)
435     Map<Integer, Map<I, VertexMutations<I, V, E>>> remainingMutationsCache =
436         sendMutationsCache.removeAllPartitionMutations();
437     for (Map.Entry<Integer, Map<I, VertexMutations<I, V, E>>> entry :
438         remainingMutationsCache.entrySet()) {
439       WritableRequest writableRequest =
440           new SendPartitionMutationsRequest<I, V, E>(
441               entry.getKey(), entry.getValue());
442       PartitionOwner partitionOwner =
443           serviceWorker.getVertexPartitionOwner(
444               entry.getValue().keySet().iterator().next());
445       doRequest(partitionOwner.getWorkerInfo(), writableRequest);
446     }
447   }
448 
449   @Override
450   public long resetMessageCount() {
451     return this.sendMessageCache.resetMessageCount();
452   }
453 
454   @Override
455   public long resetMessageBytesCount() {
456     return this.sendMessageCache.resetMessageBytesCount();
457   }
458 
459   /**
460    * When doing the request, short circuit if it is local
461    *
462    * @param workerInfo Worker info
463    * @param writableRequest Request to either submit or run locally
464    */
465   public void doRequest(WorkerInfo workerInfo,
466                          WritableRequest writableRequest) {
467     // If this is local, execute locally
468     if (serviceWorker.getWorkerInfo().getTaskId() ==
469         workerInfo.getTaskId()) {
470       ((WorkerRequest) writableRequest).doRequest(serverData);
471       localRequests.inc();
472     } else {
473       workerClient.sendWritableRequest(
474           workerInfo.getTaskId(), writableRequest);
475       remoteRequests.inc();
476     }
477   }
478 
479   /**
480    * Sets up gauges for superstep metrics.
481    * This has to be static so that internal objects created here don't
482    * hold references to this$0. Otherwise we memory leaking
483    * NettyWorkerClientRequestProcessor objects.
484    *
485    * @param smr metric registry for current superstep
486    * @param localRequests counter for local requests
487    * @param remoteRequests counter for remote requests
488    */
489   private static void setupGauges(SuperstepMetricsRegistry smr,
490                                   final Counter localRequests,
491                                   final Counter remoteRequests) {
492     final Gauge<Long> totalRequests = smr.getGauge(MetricNames.TOTAL_REQUESTS,
493         new Gauge<Long>() {
494           @Override
495           public Long value() {
496             return localRequests.count() + remoteRequests.count();
497           }
498         }
499     );
500     smr.getGauge(MetricNames.PERCENT_LOCAL_REQUESTS, new PercentGauge() {
501       @Override protected double getNumerator() {
502         return localRequests.count();
503       }
504 
505       @Override protected double getDenominator() {
506         return totalRequests.value();
507       }
508     });
509   }
510 }