View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.library;
19  
20  import java.util.Iterator;
21  
22  import org.apache.giraph.block_app.framework.api.BlockMasterApi;
23  import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
24  import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
25  import org.apache.giraph.block_app.framework.api.CreateReducersApi;
26  import org.apache.giraph.block_app.framework.piece.Piece;
27  import org.apache.giraph.block_app.framework.piece.global_comm.ReducerAndBroadcastWrapperHandle;
28  import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
29  import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
30  import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
31  import org.apache.giraph.block_app.library.internal.SendMessagePiece;
32  import org.apache.giraph.block_app.library.internal.SendMessageWithCombinerPiece;
33  import org.apache.giraph.combiner.MessageCombiner;
34  import org.apache.giraph.function.Consumer;
35  import org.apache.giraph.function.PairConsumer;
36  import org.apache.giraph.function.vertex.ConsumerWithVertex;
37  import org.apache.giraph.function.vertex.SupplierFromVertex;
38  import org.apache.giraph.graph.Vertex;
39  import org.apache.giraph.reducers.ReduceOperation;
40  import org.apache.giraph.reducers.impl.SumReduce;
41  import org.apache.giraph.types.NoMessage;
42  import org.apache.hadoop.io.LongWritable;
43  import org.apache.hadoop.io.Writable;
44  import org.apache.hadoop.io.WritableComparable;
45  import org.apache.log4j.Logger;
46  
47  /**
48   * Utility class for creating common Pieces and computations for processing
49   * graphs.
50   */
51  public class Pieces {
52    private static final Logger LOG = Logger.getLogger(Pieces.class);
53  
54    private Pieces() { }
55  
56    /**
57     * For each vertex execute given process function.
58     * Computation is happening in send phase of the returned Piece.
59     */
60    public static
61    <I extends WritableComparable, V extends Writable, E extends Writable>
62    Piece<I, V, E, NoMessage, Object> forAllVertices(
63        final String pieceName, final Consumer<Vertex<I, V, E>> process) {
64      return new Piece<I, V, E, NoMessage, Object>() {
65        @Override
66        public VertexSender<I, V, E> getVertexSender(
67            BlockWorkerSendApi<I, V, E, NoMessage> workerApi,
68            Object executionStage) {
69          return new InnerVertexSender() {
70            @Override
71            public void vertexSend(Vertex<I, V, E> vertex) {
72              process.apply(vertex);
73            }
74          };
75        }
76  
77        @Override
78        public String toString() {
79          return pieceName;
80        }
81      };
82    }
83  
84    /**
85     * For each vertex execute given process function.
86     * Computation is happening in the receive phase of the returned Piece.
87     * This function should be used if you need returned Piece to interact with
88     * subsequent Piece, as that requires passed function to be executed
89     * during receive phase,
90     */
91    public static
92    <I extends WritableComparable, V extends Writable, E extends Writable>
93    Piece<I, V, E, NoMessage, Object> forAllVerticesOnReceive(
94        final String pieceName, final Consumer<Vertex<I, V, E>> process) {
95      return new Piece<I, V, E, NoMessage, Object>() {
96        @Override
97        public VertexReceiver<I, V, E, NoMessage> getVertexReceiver(
98            BlockWorkerReceiveApi<I> workerApi, Object executionStage) {
99          return new InnerVertexReceiver() {
100           @Override
101           public void vertexReceive(
102               Vertex<I, V, E> vertex, Iterable<NoMessage> messages) {
103             process.apply(vertex);
104           }
105         };
106       }
107 
108       @Override
109       public String toString() {
110         return pieceName;
111       }
112     };
113   }
114 
115   /**
116    * Creates Piece which removes vertices for which supplier returns true.
117    */
118   public static
119   <I extends WritableComparable, V extends Writable, E extends Writable>
120   Piece<I, V, E, NoMessage, Object> removeVertices(
121       final String pieceName,
122       final SupplierFromVertex<I, V, E, Boolean> shouldRemoveVertex) {
123     return new Piece<I, V, E, NoMessage, Object>() {
124       private ReducerHandle<LongWritable, LongWritable> countRemovedAgg;
125 
126       @Override
127       public void registerReducers(
128           CreateReducersApi reduceApi, Object executionStage) {
129         countRemovedAgg = reduceApi.createLocalReducer(SumReduce.LONG);
130       }
131 
132       @Override
133       public VertexSender<I, V, E> getVertexSender(
134           final BlockWorkerSendApi<I, V, E, NoMessage> workerApi,
135           Object executionStage) {
136         return new InnerVertexSender() {
137           @Override
138           public void vertexSend(Vertex<I, V, E> vertex) {
139             if (shouldRemoveVertex.get(vertex)) {
140               workerApi.removeVertexRequest(vertex.getId());
141               reduceLong(countRemovedAgg, 1);
142             }
143           }
144         };
145       }
146 
147       @Override
148       public void masterCompute(BlockMasterApi master, Object executionStage) {
149         LOG.info("Removed " + countRemovedAgg.getReducedValue(master) +
150             " vertices from the graph, during stage " + executionStage);
151       }
152 
153       @Override
154       public String toString() {
155         return pieceName;
156       }
157     };
158   }
159 
160   /**
161    * Creates single reducer piece - given reduce class, supplier of values on
162    * worker, reduces and passes the result to given consumer on master.
163    *
164    * @param <S> Single value type, objects passed on workers
165    * @param <R> Reduced value type
166    * @param <I> Vertex id type
167    * @param <V> Vertex value type
168    * @param <E> Edge value type
169    */
170   public static
171   <S, R extends Writable, I extends WritableComparable, V extends Writable,
172   E extends Writable>
173   Piece<I, V, E, NoMessage, Object> reduce(
174       String name,
175       ReduceOperation<S, R> reduceOp,
176       SupplierFromVertex<I, V, E, S> valueSupplier,
177       final Consumer<R> reducedValueConsumer) {
178     return reduceWithMaster(
179         name, reduceOp, valueSupplier,
180         new PairConsumer<R, BlockMasterApi>() {
181           @Override
182           public void apply(R input, BlockMasterApi master) {
183             reducedValueConsumer.apply(input);
184           }
185         });
186   }
187 
188   /**
189    * Creates single reducer piece - given reduce class, supplier of values on
190    * worker, reduces and passes the result to given consumer on master.
191    *
192    * @param <S> Single value type, objects passed on workers
193    * @param <R> Reduced value type
194    * @param <I> Vertex id type
195    * @param <V> Vertex value type
196    * @param <E> Edge value type
197    */
198   public static
199   <S, R extends Writable, I extends WritableComparable, V extends Writable,
200   E extends Writable>
201   Piece<I, V, E, NoMessage, Object> reduceWithMaster(
202       final String name,
203       final ReduceOperation<S, R> reduceOp,
204       final SupplierFromVertex<I, V, E, S> valueSupplier,
205       final PairConsumer<R, BlockMasterApi> reducedValueConsumer) {
206     return new Piece<I, V, E, NoMessage, Object>() {
207       private ReducerHandle<S, R> handle;
208 
209       @Override
210       public void registerReducers(
211           CreateReducersApi reduceApi, Object executionStage) {
212         handle = reduceApi.createLocalReducer(reduceOp);
213       }
214 
215       @Override
216       public VertexSender<I, V, E> getVertexSender(
217           BlockWorkerSendApi<I, V, E, NoMessage> workerApi,
218           Object executionStage) {
219         return new InnerVertexSender() {
220           @Override
221           public void vertexSend(Vertex<I, V, E> vertex) {
222             handle.reduce(valueSupplier.get(vertex));
223           }
224         };
225       }
226 
227       @Override
228       public void masterCompute(BlockMasterApi master, Object executionStage) {
229         reducedValueConsumer.apply(handle.getReducedValue(master), master);
230       }
231 
232       @Override
233       public String toString() {
234         return name;
235       }
236     };
237   }
238 
239   /**
240    * Creates single reducer and broadcast piece - given reduce class, supplier
241    * of values on worker, reduces and broadcasts the value, passing it to the
242    * consumer on worker for each vertex.
243    *
244    * @param <S> Single value type, objects passed on workers
245    * @param <R> Reduced value type
246    * @param <I> Vertex id type
247    * @param <V> Vertex value type
248    * @param <E> Edge value type
249    */
250   public static
251   <S, R extends Writable, I extends WritableComparable, V extends Writable,
252   E extends Writable>
253   Piece<I, V, E, NoMessage, Object> reduceAndBroadcast(
254       final String name,
255       final ReduceOperation<S, R> reduceOp,
256       final SupplierFromVertex<I, V, E, S> valueSupplier,
257       final ConsumerWithVertex<I, V, E, R> reducedValueConsumer) {
258     return new Piece<I, V, E, NoMessage, Object>() {
259       private final ReducerAndBroadcastWrapperHandle<S, R> handle =
260           new ReducerAndBroadcastWrapperHandle<>();
261 
262       @Override
263       public void registerReducers(
264           CreateReducersApi reduceApi, Object executionStage) {
265         handle.registeredReducer(reduceApi.createLocalReducer(reduceOp));
266       }
267 
268       @Override
269       public VertexSender<I, V, E> getVertexSender(
270           BlockWorkerSendApi<I, V, E, NoMessage> workerApi,
271           Object executionStage) {
272         return new InnerVertexSender() {
273           @Override
274           public void vertexSend(Vertex<I, V, E> vertex) {
275             handle.reduce(valueSupplier.get(vertex));
276           }
277         };
278       }
279 
280       @Override
281       public void masterCompute(BlockMasterApi master, Object executionStage) {
282         handle.broadcastValue(master);
283       }
284 
285       @Override
286       public VertexReceiver<I, V, E, NoMessage> getVertexReceiver(
287           BlockWorkerReceiveApi<I> workerApi, Object executionStage) {
288         final R value = handle.getBroadcast(workerApi);
289         return new InnerVertexReceiver() {
290           @Override
291           public void vertexReceive(
292               Vertex<I, V, E> vertex, Iterable<NoMessage> messages) {
293             reducedValueConsumer.apply(vertex, value);
294           }
295         };
296       }
297 
298       @Override
299       public String toString() {
300         return name;
301       }
302     };
303   }
304 
305   /**
306    * Creates Piece that for each vertex, sends message provided by
307    * messageSupplier to all targets provided by targetsSupplier.
308    * Received messages are then passed to and processed by provided
309    * messagesConsumer.
310    *
311    * If messageSupplier or targetsSupplier returns null, current vertex
312    * is not going to send any messages.
313    */
314   public static
315   <I extends WritableComparable, V extends Writable, E extends Writable,
316   M extends Writable>
317   SendMessagePiece<I, V, E, M> sendMessage(
318       String name,
319       Class<M> messageClass,
320       SupplierFromVertex<I, V, E, M> messageSupplier,
321       SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier,
322       ConsumerWithVertex<I, V, E, Iterable<M>> messagesConsumer) {
323     return new SendMessagePiece<>(
324         name, messageClass, messageSupplier, targetsSupplier, messagesConsumer);
325   }
326 
327   /**
328    * Creates Piece that for each vertex, sends message provided by
329    * messageSupplier to all neighbors of current vertex.
330    * Received messages are then passed to and processed by provided
331    * messagesConsumer.
332    *
333    * If messageSupplier returns null, current vertex
334    * is not going to send any messages.
335    */
336   public static
337   <I extends WritableComparable, V extends Writable, E extends Writable,
338   M extends Writable>
339   SendMessagePiece<I, V, E, M> sendMessageToNeighbors(
340       String name,
341       Class<M> messageClass,
342       SupplierFromVertex<I, V, E, M> messageSupplier,
343       ConsumerWithVertex<I, V, E, Iterable<M>> messagesConsumer) {
344     return sendMessage(
345         name, messageClass, messageSupplier,
346         VertexSuppliers.<I, V, E>vertexNeighborsSupplier(),
347         messagesConsumer);
348   }
349 
350   /**
351    * Creates Piece that for each vertex, sends message provided by
352    * messageSupplier to all targets provided by targetsSupplier,
353    * and uses given messageCombiner to combine messages together.
354    * Received combined message is then passed to and processed by provided
355    * messageConsumer. (null is passed to it, if vertex received no messages)
356    *
357    * If messageSupplier or targetsSupplier returns null, current vertex
358    * is not going to send any messages.
359    */
360   public static
361   <I extends WritableComparable, V extends Writable, E extends Writable,
362   M extends Writable>
363   SendMessageWithCombinerPiece<I, V, E, M> sendMessage(
364       String name,
365       MessageCombiner<? super I, M> messageCombiner,
366       SupplierFromVertex<I, V, E, M> messageSupplier,
367       SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier,
368       ConsumerWithVertex<I, V, E, M> messagesConsumer) {
369     return new SendMessageWithCombinerPiece<>(
370         name, messageCombiner,
371         messageSupplier, targetsSupplier, messagesConsumer);
372   }
373 
374   /**
375    * Creates Piece that for each vertex, sends message provided by
376    * messageSupplier to all neighbors of current vertex,
377    * and uses given messageCombiner to combine messages together.
378    * Received combined message is then passed to and processed by provided
379    * messageConsumer. (null is passed to it, if vertex received no messages)
380    *
381    * If messageSupplier returns null, current vertex
382    * is not going to send any messages.
383    */
384   public static
385   <I extends WritableComparable, V extends Writable, E extends Writable,
386   M extends Writable>
387   SendMessageWithCombinerPiece<I, V, E, M> sendMessageToNeighbors(
388       String name,
389       MessageCombiner<? super I, M> messageCombiner,
390       SupplierFromVertex<I, V, E, M> messageSupplier,
391       ConsumerWithVertex<I, V, E, M> messagesConsumer) {
392     return sendMessage(
393         name, messageCombiner, messageSupplier,
394         VertexSuppliers.<I, V, E>vertexNeighborsSupplier(),
395         messagesConsumer);
396   }
397 }