1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.block_app.framework.piece.delegate;
1920import java.util.ArrayList;
2122import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
23import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
24import org.apache.giraph.block_app.framework.piece.AbstractPiece;
25import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
26import org.apache.giraph.function.vertex.SupplierFromVertex;
27import org.apache.giraph.graph.Vertex;
28import org.apache.hadoop.io.Writable;
29import org.apache.hadoop.io.WritableComparable;
3031import com.google.common.base.Preconditions;
3233/**34 * Piece which uses a provided suppliers to decide whether or not to run35 * receive/send piece part on a certain vertex.36 *37 * @param <I> Vertex id type38 * @param <V> Vertex value type39 * @param <E> Edge value type40 * @param <M> Message type41 * @param <WV> Worker value type42 * @param <WM> Worker message type43 * @param <S> Execution stage type44 */45 @SuppressWarnings({ "rawtypes" })
46publicclass FilteringPiece<I extends WritableComparable, V extends Writable,
47 E extends Writable, M extends Writable, WV, WM extends Writable, S>
48extends DelegatePiece<I, V, E, M, WV, WM, S> {
49privatefinal SupplierFromVertex<I, V, E, Boolean> toCallSend;
50privatefinal SupplierFromVertex<I, V, E, Boolean> toCallReceive;
5152/**53 * Creates filtering piece which uses passed {@code toCallSend} to filter54 * calls to {@code vertexSend}, and passed {@code toCallReceive} to filter55 * calls to {@code vertexReceive}, on passed {@code innerPiece}.56 */57 @SuppressWarnings("unchecked")
58publicFilteringPiece(
59 SupplierFromVertex<? super I, ? super V, ? super E, Boolean> toCallSend,
60 SupplierFromVertex<? super I, ? super V, ? super E, Boolean>
61 toCallReceive,
62 AbstractPiece<? super I, ? super V, ? super E, ? super M,
63 ? super WV, ? super WM, ? super S> innerPiece) {
64super(innerPiece);
65// Suppliers are contravariant on vertex types,66// but Java generics cannot express that,67// so use unchecked cast inside to allow callers to be typesafe68this.toCallSend = (SupplierFromVertex) toCallSend;
69this.toCallReceive = (SupplierFromVertex) toCallReceive;
70 Preconditions.checkArgument(
71 toCallSend != null || toCallReceive != null,
72"Both send and receive filter cannot be null");
73 }
7475/**76 * Creates filtering piece, where both vertexSend and vertexReceive is77 * filtered based on same supplier.78 */79publicFilteringPiece(
80 SupplierFromVertex<? super I, ? super V, ? super E, Boolean>
81 toCallSendAndReceive,
82 AbstractPiece<? super I, ? super V, ? super E, ? super M,
83 ? super WV, ? super WM, ? super S> innerPiece) {
84this(toCallSendAndReceive, toCallSendAndReceive, innerPiece);
85 }
8687/**88 * Creates filtering piece, that filters only vertexReceive function,89 * and always calls vertexSend function.90 */91publicstatic <I extends WritableComparable, V extends Writable,
92 E extends Writable, M extends Writable, WV, WM extends Writable, S>
93 FilteringPiece<I, V, E, M, WV, WM, S> createReceiveFiltering(
94 SupplierFromVertex<? super I, ? super V, ? super E, Boolean>
95 toCallReceive,
96 AbstractPiece<? super I, ? super V, ? super E, ? super M,
97 ? super WV, ? super WM, ? super S> innerPiece) {
98returnnew FilteringPiece<>(null, toCallReceive, innerPiece);
99 }
100101/**102 * Creates filtering block, that filters only vertexSend function,103 * and always calls vertexReceive function.104 */105publicstatic <I extends WritableComparable, V extends Writable,
106 E extends Writable, M extends Writable, WV, WM extends Writable, S>
107 FilteringPiece<I, V, E, M, WV, WM, S> createSendFiltering(
108 SupplierFromVertex<? super I, ? super V, ? super E, Boolean> toCallSend,
109 AbstractPiece<? super I, ? super V, ? super E, ? super M, ? super WV,
110 ? super WM, ? super S> innerPiece) {
111returnnew FilteringPiece<>(toCallSend, null, innerPiece);
112 }
113114 @Override
115protectedDelegateWorkerSendFunctions delegateWorkerSendFunctions(
116 ArrayList<InnerVertexSender> workerSendFunctions,
117 BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage) {
118returnnewDelegateWorkerSendFunctions(workerSendFunctions) {
119 @Override
120publicvoid vertexSend(Vertex<I, V, E> vertex) {
121if (toCallSend == null || toCallSend.get(vertex)) {
122super.vertexSend(vertex);
123 }
124 }
125 };
126 }
127128 @Override
129protectedDelegateWorkerReceiveFunctions delegateWorkerReceiveFunctions(
130 ArrayList<VertexReceiver<I, V, E, M>> workerReceiveFunctions,
131 BlockWorkerReceiveApi<I> workerApi, S executionStage) {
132returnnewDelegateWorkerReceiveFunctions(workerReceiveFunctions) {
133 @Override
134publicvoid vertexReceive(Vertex<I, V, E> vertex, Iterable<M> messages) {
135if (toCallReceive == null || toCallReceive.get(vertex)) {
136super.vertexReceive(vertex, messages);
137 }
138 }
139 };
140 }
141142 @Override
143protected String delegationName() {
144if (toCallSend != null && toCallReceive != null) {
145if (toCallSend != toCallReceive) {
146return"AsymFilter";
147 }
148return"Filter";
149 } elseif (toCallSend != null) {
150return"SendFilter";
151 } elseif (toCallReceive != null) {
152return"ReceiveFilter";
153 } else {
154thrownew IllegalStateException("Both Send and Receive filters are null");
155 }
156 }
157 }