This project has retired. For details please refer to its Attic page.
FilteringPiece xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.framework.piece.delegate;
19  
20  import java.util.ArrayList;
21  
22  import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
23  import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
24  import org.apache.giraph.block_app.framework.piece.AbstractPiece;
25  import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
26  import org.apache.giraph.function.vertex.SupplierFromVertex;
27  import org.apache.giraph.graph.Vertex;
28  import org.apache.hadoop.io.Writable;
29  import org.apache.hadoop.io.WritableComparable;
30  
31  import com.google.common.base.Preconditions;
32  
33  /**
34   * Piece which uses a provided suppliers to decide whether or not to run
35   * receive/send piece part on a certain vertex.
36   *
37   * @param <I> Vertex id type
38   * @param <V> Vertex value type
39   * @param <E> Edge value type
40   * @param <M> Message type
41   * @param <WV> Worker value type
42   * @param <WM> Worker message type
43   * @param <S> Execution stage type
44   */
45  @SuppressWarnings({ "rawtypes" })
46  public class FilteringPiece<I extends WritableComparable, V extends Writable,
47      E extends Writable, M extends Writable, WV, WM extends Writable, S>
48      extends DelegatePiece<I, V, E, M, WV, WM, S> {
49    private final SupplierFromVertex<I, V, E, Boolean> toCallSend;
50    private final SupplierFromVertex<I, V, E, Boolean> toCallReceive;
51  
52    /**
53     * Creates filtering piece which uses passed {@code toCallSend} to filter
54     * calls to {@code vertexSend}, and passed {@code toCallReceive} to filter
55     * calls to {@code vertexReceive}, on passed {@code innerPiece}.
56     */
57    @SuppressWarnings("unchecked")
58    public FilteringPiece(
59        SupplierFromVertex<? super I, ? super V, ? super E, Boolean> toCallSend,
60        SupplierFromVertex<? super I, ? super V, ? super E, Boolean>
61          toCallReceive,
62        AbstractPiece<? super I, ? super V, ? super E, ? super M,
63          ? super WV, ? super WM, ? super S> innerPiece) {
64      super(innerPiece);
65      // Suppliers are contravariant on vertex types,
66      // but Java generics cannot express that,
67      // so use unchecked cast inside to allow callers to be typesafe
68      this.toCallSend = (SupplierFromVertex) toCallSend;
69      this.toCallReceive = (SupplierFromVertex) toCallReceive;
70      Preconditions.checkArgument(
71          toCallSend != null || toCallReceive != null,
72          "Both send and receive filter cannot be null");
73    }
74  
75    /**
76     * Creates filtering piece, where both vertexSend and vertexReceive is
77     * filtered based on same supplier.
78     */
79    public FilteringPiece(
80        SupplierFromVertex<? super I, ? super V, ? super E, Boolean>
81          toCallSendAndReceive,
82        AbstractPiece<? super I, ? super V, ? super E, ? super M,
83          ? super WV, ? super WM, ? super S> innerPiece) {
84      this(toCallSendAndReceive, toCallSendAndReceive, innerPiece);
85    }
86  
87    /**
88     * Creates filtering piece, that filters only vertexReceive function,
89     * and always calls vertexSend function.
90     */
91    public static <I extends WritableComparable, V extends Writable,
92    E extends Writable, M extends Writable, WV, WM extends Writable, S>
93    FilteringPiece<I, V, E, M, WV, WM, S> createReceiveFiltering(
94        SupplierFromVertex<? super I, ? super V, ? super E, Boolean>
95          toCallReceive,
96        AbstractPiece<? super I, ? super V, ? super E, ? super M,
97          ? super WV, ? super WM, ? super S> innerPiece) {
98      return new FilteringPiece<>(null, toCallReceive, innerPiece);
99    }
100 
101   /**
102    * Creates filtering block, that filters only vertexSend function,
103    * and always calls vertexReceive function.
104    */
105   public static <I extends WritableComparable, V extends Writable,
106   E extends Writable, M extends Writable, WV, WM extends Writable, S>
107   FilteringPiece<I, V, E, M, WV, WM, S> createSendFiltering(
108       SupplierFromVertex<? super I, ? super V, ? super E, Boolean> toCallSend,
109       AbstractPiece<? super I, ? super V, ? super E, ? super M, ? super WV,
110         ? super WM, ? super S> innerPiece) {
111     return new FilteringPiece<>(toCallSend, null, innerPiece);
112   }
113 
114   @Override
115   protected DelegateWorkerSendFunctions delegateWorkerSendFunctions(
116       ArrayList<InnerVertexSender> workerSendFunctions,
117       BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage) {
118     return new DelegateWorkerSendFunctions(workerSendFunctions) {
119       @Override
120       public void vertexSend(Vertex<I, V, E> vertex) {
121         if (toCallSend == null || toCallSend.get(vertex)) {
122           super.vertexSend(vertex);
123         }
124       }
125     };
126   }
127 
128   @Override
129   protected DelegateWorkerReceiveFunctions delegateWorkerReceiveFunctions(
130       ArrayList<VertexReceiver<I, V, E, M>> workerReceiveFunctions,
131       BlockWorkerReceiveApi<I> workerApi, S executionStage) {
132     return new DelegateWorkerReceiveFunctions(workerReceiveFunctions) {
133       @Override
134       public void vertexReceive(Vertex<I, V, E> vertex, Iterable<M> messages) {
135         if (toCallReceive == null || toCallReceive.get(vertex)) {
136           super.vertexReceive(vertex, messages);
137         }
138       }
139     };
140   }
141 
142   @Override
143   protected String delegationName() {
144     if (toCallSend != null && toCallReceive != null) {
145       if (toCallSend != toCallReceive) {
146         return "AsymFilter";
147       }
148       return "Filter";
149     } else if (toCallSend != null) {
150       return "SendFilter";
151     } else if (toCallReceive != null) {
152       return "ReceiveFilter";
153     } else {
154       throw new IllegalStateException("Both Send and Receive filters are null");
155     }
156   }
157 }