This project has retired. For details please refer to its
Attic page.
NettyWorkerClientRequestProcessor xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.comm.netty;
19
20 import java.io.IOException;
21 import java.util.Iterator;
22 import java.util.Map;
23 import java.util.concurrent.ConcurrentMap;
24
25 import org.apache.giraph.bsp.BspService;
26 import org.apache.giraph.bsp.CentralizedServiceWorker;
27 import org.apache.giraph.comm.SendEdgeCache;
28 import org.apache.giraph.comm.SendMessageCache;
29 import org.apache.giraph.comm.SendMutationsCache;
30 import org.apache.giraph.comm.SendOneMessageToManyCache;
31 import org.apache.giraph.comm.SendPartitionCache;
32 import org.apache.giraph.comm.ServerData;
33 import org.apache.giraph.comm.WorkerClient;
34 import org.apache.giraph.comm.WorkerClientRequestProcessor;
35 import org.apache.giraph.comm.messages.MessageStore;
36 import org.apache.giraph.comm.requests.SendPartitionCurrentMessagesRequest;
37 import org.apache.giraph.comm.requests.SendPartitionMutationsRequest;
38 import org.apache.giraph.comm.requests.SendVertexRequest;
39 import org.apache.giraph.comm.requests.SendWorkerEdgesRequest;
40 import org.apache.giraph.comm.requests.SendWorkerVerticesRequest;
41 import org.apache.giraph.comm.requests.WorkerRequest;
42 import org.apache.giraph.comm.requests.WritableRequest;
43 import org.apache.giraph.conf.GiraphConfiguration;
44 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
45 import org.apache.giraph.edge.Edge;
46 import org.apache.giraph.factories.MessageValueFactory;
47 import org.apache.giraph.graph.Vertex;
48 import org.apache.giraph.graph.VertexMutations;
49 import org.apache.giraph.metrics.GiraphMetrics;
50 import org.apache.giraph.metrics.MetricNames;
51 import org.apache.giraph.metrics.SuperstepMetricsRegistry;
52 import org.apache.giraph.partition.Partition;
53 import org.apache.giraph.partition.PartitionOwner;
54 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
55 import org.apache.giraph.utils.ExtendedDataOutput;
56 import org.apache.giraph.utils.PairList;
57 import org.apache.giraph.utils.VertexIdEdges;
58 import org.apache.giraph.worker.WorkerInfo;
59 import org.apache.hadoop.io.Writable;
60 import org.apache.hadoop.io.WritableComparable;
61 import org.apache.hadoop.mapreduce.Mapper;
62 import org.apache.log4j.Logger;
63
64 import com.yammer.metrics.core.Counter;
65 import com.yammer.metrics.core.Gauge;
66 import com.yammer.metrics.util.PercentGauge;
67
68
69
70
71
72
73
74
75
76
77 @SuppressWarnings("unchecked")
78 public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
79 V extends Writable, E extends Writable> implements
80 WorkerClientRequestProcessor<I, V, E> {
81
82 private static final Logger LOG =
83 Logger.getLogger(NettyWorkerClientRequestProcessor.class);
84
85 private final SendPartitionCache<I, V, E> sendPartitionCache;
86
87 private final SendMessageCache<I, Writable> sendMessageCache;
88
89 private final SendEdgeCache<I, E> sendEdgeCache;
90
91 private final SendMutationsCache<I, V, E> sendMutationsCache =
92 new SendMutationsCache<I, V, E>();
93
94 private final WorkerClient<I, V, E> workerClient;
95
96 private final int maxMessagesSizePerWorker;
97
98 private final int maxVerticesSizePerWorker;
99
100 private final int maxEdgesSizePerWorker;
101
102 private final int maxMutationsPerPartition;
103
104 private final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
105
106 private final CentralizedServiceWorker<I, V, E> serviceWorker;
107
108 private final ServerData<I, V, E> serverData;
109
110
111
112 private final Counter localRequests;
113
114 private final Counter remoteRequests;
115
116 private final MessageValueFactory messageValueFactory;
117
118
119
120
121
122
123
124
125
126 public NettyWorkerClientRequestProcessor(
127 Mapper<?, ?, ?, ?>.Context context,
128 ImmutableClassesGiraphConfiguration<I, V, E> conf,
129 CentralizedServiceWorker<I, V, E> serviceWorker,
130 boolean useOneMessageToManyIdsEncoding) {
131 this.workerClient = serviceWorker.getWorkerClient();
132 this.configuration = conf;
133
134
135 sendPartitionCache =
136 new SendPartitionCache<I, V, E>(conf, serviceWorker);
137 sendEdgeCache = new SendEdgeCache<I, E>(conf, serviceWorker);
138 maxMessagesSizePerWorker =
139 GiraphConfiguration.MAX_MSG_REQUEST_SIZE.get(conf);
140 maxVerticesSizePerWorker =
141 GiraphConfiguration.MAX_VERTEX_REQUEST_SIZE.get(conf);
142 if (useOneMessageToManyIdsEncoding) {
143 sendMessageCache =
144 new SendOneMessageToManyCache<I, Writable>(conf, serviceWorker,
145 this, maxMessagesSizePerWorker);
146 } else {
147 sendMessageCache =
148 new SendMessageCache<I, Writable>(conf, serviceWorker,
149 this, maxMessagesSizePerWorker);
150 }
151 maxEdgesSizePerWorker =
152 GiraphConfiguration.MAX_EDGE_REQUEST_SIZE.get(conf);
153 maxMutationsPerPartition =
154 GiraphConfiguration.MAX_MUTATIONS_PER_REQUEST.get(conf);
155 this.serviceWorker = serviceWorker;
156 this.serverData = serviceWorker.getServerData();
157
158
159
160 SuperstepMetricsRegistry smr = GiraphMetrics.get().perSuperstep();
161 localRequests = smr.getCounter(MetricNames.LOCAL_REQUESTS);
162 remoteRequests = smr.getCounter(MetricNames.REMOTE_REQUESTS);
163 setupGauges(smr, localRequests, remoteRequests);
164 messageValueFactory = configuration.createOutgoingMessageValueFactory();
165 }
166
167 @Override
168 public void sendMessageRequest(I destVertexId, Writable message) {
169 this.sendMessageCache.sendMessageRequest(destVertexId, message);
170 }
171
172 @Override
173 public void sendMessageToAllRequest(
174 Vertex<I, V, E> vertex, Writable message) {
175 this.sendMessageCache.sendMessageToAllRequest(vertex, message);
176 }
177
178 @Override
179 public void sendMessageToAllRequest(
180 Iterator<I> vertexIdIterator, Writable message) {
181 this.sendMessageCache.sendMessageToAllRequest(vertexIdIterator, message);
182 }
183
184 @Override
185 public void sendPartitionRequest(WorkerInfo workerInfo,
186 Partition<I, V, E> partition) {
187 if (LOG.isTraceEnabled()) {
188 LOG.trace("sendVertexRequest: Sending to " + workerInfo +
189 ", with partition " + partition);
190 }
191
192 WritableRequest vertexRequest = new SendVertexRequest<I, V, E>(partition);
193 doRequest(workerInfo, vertexRequest);
194
195
196 if (serviceWorker.getSuperstep() != BspService.INPUT_SUPERSTEP) {
197 sendPartitionMessages(workerInfo, partition);
198 ConcurrentMap<I, VertexMutations<I, V, E>> vertexMutationMap =
199 serverData.getPartitionMutations().remove(partition.getId());
200 WritableRequest partitionMutationsRequest =
201 new SendPartitionMutationsRequest<I, V, E>(partition.getId(),
202 vertexMutationMap);
203 doRequest(workerInfo, partitionMutationsRequest);
204 }
205 }
206
207
208
209
210
211
212
213 private void sendPartitionMessages(WorkerInfo workerInfo,
214 Partition<I, V, E> partition) {
215 final int partitionId = partition.getId();
216 MessageStore<I, Writable> messageStore =
217 serverData.getCurrentMessageStore();
218 ByteArrayVertexIdMessages<I, Writable> vertexIdMessages =
219 new ByteArrayVertexIdMessages<I, Writable>(
220 messageValueFactory);
221 vertexIdMessages.setConf(configuration);
222 vertexIdMessages.initialize();
223 for (I vertexId :
224 messageStore.getPartitionDestinationVertices(partitionId)) {
225
226
227 Iterable<Writable> messages = messageStore.getVertexMessages(vertexId);
228 for (Writable message : messages) {
229 vertexIdMessages.add(vertexId, message);
230 }
231 if (vertexIdMessages.getSize() > maxMessagesSizePerWorker) {
232 WritableRequest messagesRequest =
233 new SendPartitionCurrentMessagesRequest<I, V, E, Writable>(
234 partitionId, vertexIdMessages);
235 doRequest(workerInfo, messagesRequest);
236 vertexIdMessages =
237 new ByteArrayVertexIdMessages<I, Writable>(
238 messageValueFactory);
239 vertexIdMessages.setConf(configuration);
240 vertexIdMessages.initialize();
241 }
242 }
243 if (!vertexIdMessages.isEmpty()) {
244 WritableRequest messagesRequest = new
245 SendPartitionCurrentMessagesRequest<I, V, E, Writable>(
246 partitionId, vertexIdMessages);
247 doRequest(workerInfo, messagesRequest);
248 }
249 messageStore.clearPartition(partitionId);
250 }
251
252 @Override
253 public boolean sendVertexRequest(PartitionOwner partitionOwner,
254 Vertex<I, V, E> vertex) {
255
256 int workerMessageSize = sendPartitionCache.addVertex(
257 partitionOwner, vertex);
258
259
260
261 if (workerMessageSize >= maxVerticesSizePerWorker) {
262 PairList<Integer, ExtendedDataOutput>
263 workerPartitionVertices =
264 sendPartitionCache.removeWorkerData(partitionOwner.getWorkerInfo());
265 WritableRequest writableRequest =
266 new SendWorkerVerticesRequest<I, V, E>(
267 configuration, workerPartitionVertices);
268 doRequest(partitionOwner.getWorkerInfo(), writableRequest);
269 return true;
270 }
271
272 return false;
273 }
274
275 @Override
276 public void addEdgeRequest(I vertexIndex, Edge<I, E> edge) throws
277 IOException {
278 PartitionOwner partitionOwner =
279 serviceWorker.getVertexPartitionOwner(vertexIndex);
280 int partitionId = partitionOwner.getPartitionId();
281 if (LOG.isTraceEnabled()) {
282 LOG.trace("addEdgeRequest: Sending edge " + edge + " for index " +
283 vertexIndex + " with partition " + partitionId);
284 }
285
286
287 int partitionMutationCount =
288 sendMutationsCache.addEdgeMutation(partitionId, vertexIndex, edge);
289
290 sendMutationsRequestIfFull(
291 partitionId, partitionOwner, partitionMutationCount);
292 }
293
294 @Override
295 public boolean sendEdgeRequest(I sourceVertexId, Edge<I, E> edge)
296 throws IOException {
297 PartitionOwner owner =
298 serviceWorker.getVertexPartitionOwner(sourceVertexId);
299 WorkerInfo workerInfo = owner.getWorkerInfo();
300 final int partitionId = owner.getPartitionId();
301 if (LOG.isTraceEnabled()) {
302 LOG.trace("sendEdgeRequest: Send bytes (" + edge.toString() +
303 ") to " + sourceVertexId + " on worker " + workerInfo);
304 }
305
306
307 int workerEdgesSize = sendEdgeCache.addEdge(
308 workerInfo, partitionId, sourceVertexId, edge);
309
310
311
312 if (workerEdgesSize >= maxEdgesSizePerWorker) {
313 PairList<Integer, VertexIdEdges<I, E>> workerEdges =
314 sendEdgeCache.removeWorkerEdges(workerInfo);
315 WritableRequest writableRequest =
316 new SendWorkerEdgesRequest<I, E>(workerEdges);
317 doRequest(workerInfo, writableRequest);
318 return true;
319 }
320
321 return false;
322 }
323
324
325
326
327
328
329
330
331
332 private void sendMutationsRequestIfFull(
333 int partitionId, PartitionOwner partitionOwner,
334 int partitionMutationCount) {
335
336 if (partitionMutationCount >= maxMutationsPerPartition) {
337 Map<I, VertexMutations<I, V, E>> partitionMutations =
338 sendMutationsCache.removePartitionMutations(partitionId);
339 WritableRequest writableRequest =
340 new SendPartitionMutationsRequest<I, V, E>(
341 partitionId, partitionMutations);
342 doRequest(partitionOwner.getWorkerInfo(), writableRequest);
343 }
344 }
345
346 @Override
347 public void removeEdgesRequest(I vertexIndex,
348 I destinationVertexIndex) throws IOException {
349 PartitionOwner partitionOwner =
350 serviceWorker.getVertexPartitionOwner(vertexIndex);
351 int partitionId = partitionOwner.getPartitionId();
352 if (LOG.isTraceEnabled()) {
353 LOG.trace("removeEdgesRequest: Removing edge " +
354 destinationVertexIndex +
355 " for index " + vertexIndex + " with partition " + partitionId);
356 }
357
358
359 int partitionMutationCount =
360 sendMutationsCache.removeEdgeMutation(
361 partitionId, vertexIndex, destinationVertexIndex);
362
363 sendMutationsRequestIfFull(
364 partitionId, partitionOwner, partitionMutationCount);
365 }
366
367 @Override
368 public void addVertexRequest(Vertex<I, V, E> vertex) throws IOException {
369 PartitionOwner partitionOwner =
370 serviceWorker.getVertexPartitionOwner(vertex.getId());
371 int partitionId = partitionOwner.getPartitionId();
372 if (LOG.isTraceEnabled()) {
373 LOG.trace("addVertexRequest: Sending vertex " + vertex +
374 " to partition " + partitionId);
375 }
376
377
378 int partitionMutationCount =
379 sendMutationsCache.addVertexMutation(partitionId, vertex);
380
381 sendMutationsRequestIfFull(
382 partitionId, partitionOwner, partitionMutationCount);
383 }
384
385 @Override
386 public void removeVertexRequest(I vertexIndex) throws IOException {
387 PartitionOwner partitionOwner =
388 serviceWorker.getVertexPartitionOwner(vertexIndex);
389 int partitionId = partitionOwner.getPartitionId();
390 if (LOG.isTraceEnabled()) {
391 LOG.trace("removeVertexRequest: Removing vertex index " +
392 vertexIndex + " from partition " + partitionId);
393 }
394
395
396 int partitionMutationCount =
397 sendMutationsCache.removeVertexMutation(partitionId, vertexIndex);
398
399 sendMutationsRequestIfFull(
400 partitionId, partitionOwner, partitionMutationCount);
401 }
402
403 @Override
404 public void flush() throws IOException {
405
406
407 sendMessageCache.flush();
408
409
410 PairList<WorkerInfo, PairList<Integer, ExtendedDataOutput>>
411 remainingVertexCache = sendPartitionCache.removeAllData();
412 PairList<WorkerInfo,
413 PairList<Integer, ExtendedDataOutput>>.Iterator
414 vertexIterator = remainingVertexCache.getIterator();
415 while (vertexIterator.hasNext()) {
416 vertexIterator.next();
417 WritableRequest writableRequest =
418 new SendWorkerVerticesRequest(
419 configuration, vertexIterator.getCurrentSecond());
420 doRequest(vertexIterator.getCurrentFirst(), writableRequest);
421 }
422
423
424 PairList<WorkerInfo, PairList<Integer,
425 VertexIdEdges<I, E>>>
426 remainingEdgeCache = sendEdgeCache.removeAllEdges();
427 PairList<WorkerInfo,
428 PairList<Integer, VertexIdEdges<I, E>>>.Iterator
429 edgeIterator = remainingEdgeCache.getIterator();
430 while (edgeIterator.hasNext()) {
431 edgeIterator.next();
432 WritableRequest writableRequest =
433 new SendWorkerEdgesRequest<I, E>(
434 edgeIterator.getCurrentSecond());
435 doRequest(edgeIterator.getCurrentFirst(), writableRequest);
436 }
437
438
439 Map<Integer, Map<I, VertexMutations<I, V, E>>> remainingMutationsCache =
440 sendMutationsCache.removeAllPartitionMutations();
441 for (Map.Entry<Integer, Map<I, VertexMutations<I, V, E>>> entry :
442 remainingMutationsCache.entrySet()) {
443 WritableRequest writableRequest =
444 new SendPartitionMutationsRequest<I, V, E>(
445 entry.getKey(), entry.getValue());
446 PartitionOwner partitionOwner =
447 serviceWorker.getVertexPartitionOwner(
448 entry.getValue().keySet().iterator().next());
449 doRequest(partitionOwner.getWorkerInfo(), writableRequest);
450 }
451 }
452
453 @Override
454 public long resetMessageCount() {
455 return this.sendMessageCache.resetMessageCount();
456 }
457
458 @Override
459 public long resetMessageBytesCount() {
460 return this.sendMessageCache.resetMessageBytesCount();
461 }
462
463
464
465
466
467
468
469 public void doRequest(WorkerInfo workerInfo,
470 WritableRequest writableRequest) {
471
472 if (serviceWorker.getWorkerInfo().getTaskId() ==
473 workerInfo.getTaskId()) {
474 ((WorkerRequest) writableRequest).doRequest(serverData);
475 localRequests.inc();
476 } else {
477 workerClient.sendWritableRequest(
478 workerInfo.getTaskId(), writableRequest);
479 remoteRequests.inc();
480 }
481 }
482
483
484
485
486
487
488
489
490
491
492
493 private static void setupGauges(SuperstepMetricsRegistry smr,
494 final Counter localRequests,
495 final Counter remoteRequests) {
496 final Gauge<Long> totalRequests = smr.getGauge(MetricNames.TOTAL_REQUESTS,
497 new Gauge<Long>() {
498 @Override
499 public Long value() {
500 return localRequests.count() + remoteRequests.count();
501 }
502 }
503 );
504 smr.getGauge(MetricNames.PERCENT_LOCAL_REQUESTS, new PercentGauge() {
505 @Override protected double getNumerator() {
506 return localRequests.count();
507 }
508
509 @Override protected double getDenominator() {
510 return totalRequests.value();
511 }
512 });
513 }
514 }