View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.framework.piece.delegate;
19  
20  import java.util.ArrayList;
21  import java.util.Arrays;
22  import java.util.List;
23  
24  import org.apache.giraph.block_app.framework.api.BlockMasterApi;
25  import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi;
26  import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
27  import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
28  import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
29  import org.apache.giraph.block_app.framework.piece.AbstractPiece;
30  import org.apache.giraph.block_app.framework.piece.interfaces.VertexPostprocessor;
31  import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
32  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
33  import org.apache.giraph.conf.MessageClasses;
34  import org.apache.giraph.function.Consumer;
35  import org.apache.giraph.graph.Vertex;
36  import org.apache.giraph.types.NoMessage;
37  import org.apache.hadoop.io.Writable;
38  import org.apache.hadoop.io.WritableComparable;
39  
40  import com.google.common.base.Preconditions;
41  
42  /**
43   * Delegate Piece which allows combining multiple pieces in same iteration:
44   * new DelegatePiece(new LogicPiece(), new StatsPiece())
45   * You should be careful when doing so, since those pieces must not interact,
46   * and only one can send messages.
47   * Execution of any of the Piece methods by the framework is going to trigger
48   * sequential execution of that method on all of the pieces this DelegatePiece
49   * wraps. That means for example, getVertexSender is going to be called on all
50   * pieces before masterCompute is called on all pieces, which is called before
51   * getVertexReceiver on all pieces.
52   *
53   * Also, via overriding, it provides an abstract class for filtering. I.e. if
54   * you want piece that filters out calls to masterCompute, you can have:
55   * new FilterMasterPiece(new LogicPiece()),
56   * with FilterMasterPiece extends DelegatePiece, and only overrides getMaster
57   * function and DelegateMasterPiece class.
58   *
59   * @param <I> Vertex id type
60   * @param <V> Vertex value type
61   * @param <E> Edge value type
62   * @param <M> Message type
63   * @param <WV> Worker value type
64   * @param <WM> Worker message type
65   * @param <S> Execution stage type
66   */
67  @SuppressWarnings("rawtypes")
68  public class DelegatePiece<I extends WritableComparable, V extends Writable,
69      E extends Writable, M extends Writable, WV, WM extends Writable, S>
70      extends AbstractPiece<I, V, E, M, WV, WM, S> {
71  
72    private final List<AbstractPiece<I, V, E, M, WV, WM, S>> innerPieces;
73  
74    @SafeVarargs
75    @SuppressWarnings("unchecked")
76    public DelegatePiece(AbstractPiece<? super I, ? super V, ? super E,
77        ? super M, ? super WV, ? super WM, ? super S>... innerPieces) {
78      // Pieces are contravariant, but Java generics cannot express that,
79      // so use unchecked cast inside to allow callers to be typesafe
80      this.innerPieces = new ArrayList(Arrays.asList(innerPieces));
81    }
82  
83    @SuppressWarnings("unchecked")
84    public DelegatePiece(AbstractPiece<? super I, ? super V, ? super E,
85        ? super M, ? super WV, ? super WM, ? super S> innerPiece) {
86      // Pieces are contravariant, but Java generics cannot express that,
87      // so use unchecked cast inside to allow callers to be typesafe
88      this.innerPieces = new ArrayList(Arrays.asList(innerPiece));
89    }
90  
91    protected DelegateWorkerSendFunctions delegateWorkerSendFunctions(
92        ArrayList<InnerVertexSender> workerSendFunctions,
93        BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage) {
94      return new DelegateWorkerSendFunctions(workerSendFunctions);
95    }
96  
97    protected DelegateWorkerReceiveFunctions delegateWorkerReceiveFunctions(
98        ArrayList<VertexReceiver<I, V, E, M>> workerReceiveFunctions,
99        BlockWorkerReceiveApi<I> workerApi, S executionStage) {
100     return new DelegateWorkerReceiveFunctions(workerReceiveFunctions);
101   }
102 
103   @Override
104   public InnerVertexSender getWrappedVertexSender(
105       BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage) {
106     ArrayList<InnerVertexSender> workerSendFunctions =
107         new ArrayList<>(innerPieces.size());
108     for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
109       workerSendFunctions.add(
110           innerPiece.getWrappedVertexSender(workerApi, executionStage));
111     }
112     return delegateWorkerSendFunctions(
113         workerSendFunctions, workerApi, executionStage);
114   }
115 
116   @Override
117   public InnerVertexReceiver getVertexReceiver(
118       BlockWorkerReceiveApi<I> workerApi, S executionStage) {
119     ArrayList<VertexReceiver<I, V, E, M>> workerReceiveFunctions =
120         new ArrayList<>(innerPieces.size());
121     for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
122       workerReceiveFunctions.add(
123           innerPiece.getVertexReceiver(workerApi, executionStage));
124     }
125     return delegateWorkerReceiveFunctions(
126         workerReceiveFunctions, workerApi, executionStage);
127   }
128 
129   /** Delegating WorkerSendPiece */
130   protected class DelegateWorkerSendFunctions extends InnerVertexSender {
131     private final ArrayList<InnerVertexSender> workerSendFunctions;
132 
133     public DelegateWorkerSendFunctions(
134         ArrayList<InnerVertexSender> workerSendFunctions) {
135       this.workerSendFunctions = workerSendFunctions;
136     }
137 
138     @Override
139     public void vertexSend(Vertex<I, V, E> vertex) {
140       for (InnerVertexSender functions : workerSendFunctions) {
141         if (functions != null) {
142           functions.vertexSend(vertex);
143         }
144       }
145     }
146 
147     @Override
148     public void postprocess() {
149       for (InnerVertexSender functions : workerSendFunctions) {
150         if (functions != null) {
151           functions.postprocess();
152         }
153       }
154     }
155   }
156 
157   /** Delegating WorkerReceivePiece */
158   protected class DelegateWorkerReceiveFunctions extends InnerVertexReceiver {
159     private final ArrayList<VertexReceiver<I, V, E, M>>
160     workerReceiveFunctions;
161 
162     public DelegateWorkerReceiveFunctions(
163         ArrayList<VertexReceiver<I, V, E, M>> workerReceiveFunctions) {
164       this.workerReceiveFunctions = workerReceiveFunctions;
165     }
166 
167     @Override
168     public void vertexReceive(Vertex<I, V, E> vertex, Iterable<M> messages) {
169       for (VertexReceiver<I, V, E, M> functions :
170             workerReceiveFunctions) {
171         if (functions != null) {
172           functions.vertexReceive(vertex, messages);
173         }
174       }
175     }
176 
177     @Override
178     public void postprocess() {
179       for (VertexReceiver<I, V, E, M> functions :
180             workerReceiveFunctions) {
181         if (functions instanceof VertexPostprocessor) {
182           ((VertexPostprocessor) functions).postprocess();
183         }
184       }
185     }
186   }
187 
188   @Override
189   public void masterCompute(BlockMasterApi api, S executionStage) {
190     for (AbstractPiece<I, V, E, M, WV, WM, S> piece : innerPieces) {
191       piece.masterCompute(api, executionStage);
192     }
193   }
194 
195   @Override
196   public void workerContextSend(
197       BlockWorkerContextSendApi<I, WM> workerContextApi, S executionStage,
198       WV workerValue) {
199     for (AbstractPiece<I, V, E, M, WV, WM, S> piece : innerPieces) {
200       piece.workerContextSend(workerContextApi, executionStage, workerValue);
201     }
202   }
203 
204   @Override
205   public void workerContextReceive(
206       BlockWorkerContextReceiveApi workerContextApi, S executionStage,
207       WV workerValue, List<WM> workerMessages) {
208     for (AbstractPiece<I, V, E, M, WV, WM, S> piece : innerPieces) {
209       piece.workerContextReceive(
210           workerContextApi, executionStage, workerValue, workerMessages);
211     }
212   }
213 
214   @Override
215   public S nextExecutionStage(S executionStage) {
216     for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
217       executionStage = innerPiece.nextExecutionStage(executionStage);
218     }
219     return executionStage;
220   }
221 
222   @Override
223   public MessageClasses<I, M> getMessageClasses(
224       ImmutableClassesGiraphConfiguration conf) {
225     MessageClasses<I, M> messageClasses = null;
226     MessageClasses<I, M> firstMessageClasses = null;
227     for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
228       MessageClasses<I, M> cur = innerPiece.getMessageClasses(conf);
229       Preconditions.checkState(cur != null);
230       if (!cur.getMessageClass().equals(NoMessage.class)) {
231         if (messageClasses != null) {
232           throw new RuntimeException(
233               "Only one piece combined through delegate (" +
234               toString() + ") can send messages");
235         }
236         messageClasses = cur;
237       }
238       if (firstMessageClasses == null) {
239         firstMessageClasses = cur;
240       }
241     }
242     return messageClasses != null ? messageClasses : firstMessageClasses;
243   }
244 
245   @Override
246   public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
247     for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
248       innerPiece.forAllPossiblePieces(consumer);
249     }
250   }
251 
252   @SuppressWarnings("deprecation")
253   @Override
254   public void registerAggregators(BlockMasterApi master)
255       throws InstantiationException, IllegalAccessException {
256     for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
257       innerPiece.registerAggregators(master);
258     }
259   }
260 
261   @Override
262   public void wrappedRegisterReducers(
263       BlockMasterApi masterApi, S executionStage) {
264     for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
265       innerPiece.wrappedRegisterReducers(masterApi, executionStage);
266     }
267   }
268 
269   protected String delegationName() {
270     return "Delegate";
271   }
272 
273   @Override
274   public String toString() {
275     return delegationName() + innerPieces.toString();
276   }
277 }