1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.block_app.framework.block;
1920import java.util.Iterator;
2122import org.apache.giraph.block_app.framework.piece.AbstractPiece;
23import org.apache.giraph.block_app.framework.piece.delegate.FilteringPiece;
24import org.apache.giraph.function.Consumer;
25import org.apache.giraph.function.vertex.SupplierFromVertex;
26import org.apache.hadoop.io.Writable;
27import org.apache.hadoop.io.WritableComparable;
2829import com.google.common.base.Function;
30import com.google.common.collect.Iterators;
3132/**33 * Block which filters out calls to vertexSend/vertexReceive functions34 * of all pieces in a given block.35 * Filtering happens based on toCallSend and toCallReceive suppliers36 * that are passed in, as every piece is just wrapped with FilteringPiece.37 *38 * @param <I> Vertex id type39 * @param <V> Vertex value type40 * @param <E> Edge value type41 */42 @SuppressWarnings({ "rawtypes", "unchecked" })
43publicfinalclass FilteringBlock<I extends WritableComparable,
44 V extends Writable, E extends Writable>
45implementsBlock {
46privatefinal SupplierFromVertex<I, V, E, Boolean> toCallSend;
47privatefinal SupplierFromVertex<I, V, E, Boolean> toCallReceive;
48privatefinalBlock block;
4950/**51 * Creates filtering block which uses passed {@code toCallSend} to filter52 * calls to {@code vertexSend}, and passed {@code toCallReceive} to filter53 * calls to {@code vertexReceive}, on all pieces within passed {@code block}.54 */55publicFilteringBlock(
56 SupplierFromVertex<I, V, E, Boolean> toCallSend,
57 SupplierFromVertex<I, V, E, Boolean> toCallReceive,
58Block block) {
59this.toCallSend = toCallSend;
60this.toCallReceive = toCallReceive;
61this.block = block;
62 }
6364/**65 * Creates filtering block, where both vertexSend and vertexReceive is66 * filtered based on same supplier.67 */68publicFilteringBlock(
69 SupplierFromVertex<I, V, E, Boolean> toCallSendAndReceive, Block block) {
70this(toCallSendAndReceive, toCallSendAndReceive, block);
71 }
7273/**74 * Creates filtering block, that filters only vertexReceive function,75 * and always calls vertexSend function.76 */77publicstatic78 <I extends WritableComparable, V extends Writable, E extends Writable>
79Block createReceiveFiltering(
80 SupplierFromVertex<I, V, E, Boolean> toCallReceive,
81Block innerBlock) {
82returnnew FilteringBlock<>(null, toCallReceive, innerBlock);
83 }
8485/**86 * Creates filtering block, that filters only vertexSend function,87 * and always calls vertexReceive function.88 */89publicstatic90 <I extends WritableComparable, V extends Writable, E extends Writable>
91Block createSendFiltering(
92 SupplierFromVertex<I, V, E, Boolean> toCallSend,
93Block innerBlock) {
94returnnew FilteringBlock<>(toCallSend, null, innerBlock);
95 }
9697 @Override
98public Iterator<AbstractPiece> iterator() {
99return Iterators.transform(
100 block.iterator(),
101new Function<AbstractPiece, AbstractPiece>() {
102 @Override
103publicAbstractPiece apply(AbstractPiece input) {
104returnnew FilteringPiece<>(toCallSend, toCallReceive, input);
105 }
106 });
107 }
108109 @Override
110publicvoid forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
111 block.forAllPossiblePieces(consumer);
112 }
113114 @Override
115publicPieceCount getPieceCount() {
116return block.getPieceCount();
117 }
118 }