View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.framework.block;
19  
20  import java.util.Iterator;
21  
22  import org.apache.giraph.block_app.framework.piece.AbstractPiece;
23  import org.apache.giraph.block_app.framework.piece.delegate.FilteringPiece;
24  import org.apache.giraph.function.Consumer;
25  import org.apache.giraph.function.vertex.SupplierFromVertex;
26  import org.apache.hadoop.io.Writable;
27  import org.apache.hadoop.io.WritableComparable;
28  
29  import com.google.common.base.Function;
30  import com.google.common.collect.Iterators;
31  
32  /**
33   * Block which filters out calls to vertexSend/vertexReceive functions
34   * of all pieces in a given block.
35   * Filtering happens based on toCallSend and toCallReceive suppliers
36   * that are passed in, as every piece is just wrapped with FilteringPiece.
37   *
38   * @param <I> Vertex id type
39   * @param <V> Vertex value type
40   * @param <E> Edge value type
41   */
42  @SuppressWarnings({ "rawtypes", "unchecked" })
43  public final class FilteringBlock<I extends WritableComparable,
44          V extends Writable, E extends Writable>
45      implements Block {
46    private final SupplierFromVertex<I, V, E, Boolean> toCallSend;
47    private final SupplierFromVertex<I, V, E, Boolean> toCallReceive;
48    private final Block block;
49  
50    /**
51     * Creates filtering block which uses passed {@code toCallSend} to filter
52     * calls to {@code vertexSend}, and passed {@code toCallReceive} to filter
53     * calls to {@code vertexReceive}, on all pieces within passed {@code block}.
54     */
55    public FilteringBlock(
56        SupplierFromVertex<I, V, E, Boolean> toCallSend,
57        SupplierFromVertex<I, V, E, Boolean> toCallReceive,
58        Block block) {
59      this.toCallSend = toCallSend;
60      this.toCallReceive = toCallReceive;
61      this.block = block;
62    }
63  
64    /**
65     * Creates filtering block, where both vertexSend and vertexReceive is
66     * filtered based on same supplier.
67     */
68    public FilteringBlock(
69        SupplierFromVertex<I, V, E, Boolean> toCallSendAndReceive, Block block) {
70      this(toCallSendAndReceive, toCallSendAndReceive, block);
71    }
72  
73    /**
74     * Creates filtering block, that filters only vertexReceive function,
75     * and always calls vertexSend function.
76     */
77    public static
78    <I extends WritableComparable, V extends Writable, E extends Writable>
79    Block createReceiveFiltering(
80        SupplierFromVertex<I, V, E, Boolean> toCallReceive,
81        Block innerBlock) {
82      return new FilteringBlock<>(null, toCallReceive, innerBlock);
83    }
84  
85    /**
86     * Creates filtering block, that filters only vertexSend function,
87     * and always calls vertexReceive function.
88     */
89    public static
90    <I extends WritableComparable, V extends Writable, E extends Writable>
91    Block createSendFiltering(
92        SupplierFromVertex<I, V, E, Boolean> toCallSend,
93        Block innerBlock) {
94      return new FilteringBlock<>(toCallSend, null, innerBlock);
95    }
96  
97    @Override
98    public Iterator<AbstractPiece> iterator() {
99      return Iterators.transform(
100         block.iterator(),
101         new Function<AbstractPiece, AbstractPiece>() {
102           @Override
103           public AbstractPiece apply(AbstractPiece input) {
104             return new FilteringPiece<>(toCallSend, toCallReceive, input);
105           }
106         });
107   }
108 
109   @Override
110   public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
111     block.forAllPossiblePieces(consumer);
112   }
113 }