This project has retired. For details please refer to its
Attic page.
SendMessageWithCombinerPiece xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.library.internal;
19
20 import java.util.Iterator;
21 import java.util.Spliterator;
22 import java.util.Spliterators;
23 import java.util.stream.StreamSupport;
24
25 import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
26 import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
27 import org.apache.giraph.block_app.framework.block.Block;
28 import org.apache.giraph.block_app.framework.piece.Piece;
29 import org.apache.giraph.block_app.framework.piece.delegate.FilteringPiece;
30 import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
31 import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
32 import org.apache.giraph.block_app.library.striping.StripingUtils;
33 import org.apache.giraph.combiner.MessageCombiner;
34 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
35 import org.apache.giraph.function.Function;
36 import org.apache.giraph.function.Predicate;
37 import org.apache.giraph.function.primitive.Int2ObjFunction;
38 import org.apache.giraph.function.vertex.ConsumerWithVertex;
39 import org.apache.giraph.function.vertex.SupplierFromVertex;
40 import org.apache.giraph.graph.Vertex;
41 import org.apache.hadoop.io.Writable;
42 import org.apache.hadoop.io.WritableComparable;
43
44 import com.google.common.base.Preconditions;
45
46
47
48
49
50
51
52
53
54
55 public class SendMessageWithCombinerPiece<I extends WritableComparable,
56 V extends Writable, E extends Writable, M extends Writable>
57 extends Piece<I, V, E, M, Object> {
58 private final String name;
59 private final MessageCombiner<? super I, M> messageCombiner;
60 private final SupplierFromVertex<I, V, E, M> messageSupplier;
61 private final SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier;
62 private final ConsumerWithVertex<I, V, E, M> messagesConsumer;
63
64 public SendMessageWithCombinerPiece(String name,
65 MessageCombiner<? super I, M> messageCombiner,
66 SupplierFromVertex<I, V, E, M> messageSupplier,
67 SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier,
68 ConsumerWithVertex<I, V, E, M> messagesConsumer) {
69 Preconditions.checkNotNull(messageCombiner);
70 this.name = name;
71 this.messageCombiner = messageCombiner;
72 this.messageSupplier = messageSupplier;
73 this.targetsSupplier = targetsSupplier;
74 this.messagesConsumer = messagesConsumer;
75 }
76
77
78
79
80
81
82
83
84
85
86 public Block stripeByReceiver(
87 int stripes,
88 Int2ObjFunction<Int2ObjFunction<Predicate<I>>> stripeSupplier) {
89 return StripingUtils.generateStripedBlock(
90 stripes,
91 new Function<Predicate<I>, Block>() {
92 @Override
93 public Block apply(final Predicate<I> stripePredicate) {
94 return FilteringPiece.createReceiveFiltering(
95 new SupplierFromVertex<I, V, E, Boolean>() {
96 @Override
97 public Boolean get(Vertex<I, V, E> vertex) {
98 return stripePredicate.apply(vertex.getId());
99 }
100 },
101 new SendMessageWithCombinerPiece<>(
102 name,
103 messageCombiner,
104 messageSupplier,
105 new SupplierFromVertex<I, V, E, Iterator<I>>() {
106 @Override
107 public Iterator<I> get(Vertex<I, V, E> vertex) {
108 return StreamSupport.stream(
109 Spliterators.spliteratorUnknownSize(
110 targetsSupplier.get(vertex), Spliterator.ORDERED),
111 false).filter(stripePredicate::apply).iterator();
112 }
113 },
114 messagesConsumer));
115 }
116 },
117 stripeSupplier);
118 }
119
120 @Override
121 public VertexSender<I, V, E> getVertexSender(
122 final BlockWorkerSendApi<I, V, E, M> workerApi,
123 Object executionStage) {
124 return new InnerVertexSender() {
125 @Override
126 public void vertexSend(Vertex<I, V, E> vertex) {
127 Iterator<I> targets = targetsSupplier.get(vertex);
128 M message = messageSupplier.get(vertex);
129 if (message != null && targets != null && targets.hasNext()) {
130 workerApi.sendMessageToMultipleEdges(targets, message);
131 }
132 }
133 };
134 }
135
136 @Override
137 public VertexReceiver<I, V, E, M> getVertexReceiver(
138 BlockWorkerReceiveApi<I> workerApi,
139 Object executionStage) {
140 return new InnerVertexReceiver() {
141 @Override
142 public void vertexReceive(Vertex<I, V, E> vertex, Iterable<M> messages) {
143 Iterator<M> iter = messages.iterator();
144 M combinedMessage = null;
145 if (iter.hasNext()) {
146 combinedMessage = iter.next();
147
148 Preconditions.checkArgument(!iter.hasNext());
149 }
150 messagesConsumer.apply(vertex, combinedMessage);
151 }
152 };
153 }
154
155 @Override
156 public MessageCombiner<? super I, M> getMessageCombiner(
157 ImmutableClassesGiraphConfiguration conf) {
158 return messageCombiner;
159 }
160
161 @Override
162 public String toString() {
163 return name;
164 }
165 }