This project has retired. For details please refer to its Attic page.
DelegatePiece xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.framework.piece.delegate;
19  
20  import java.util.ArrayList;
21  import java.util.Arrays;
22  import java.util.List;
23  
24  import org.apache.giraph.block_app.framework.api.BlockMasterApi;
25  import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi;
26  import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
27  import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
28  import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
29  import org.apache.giraph.block_app.framework.block.PieceCount;
30  import org.apache.giraph.block_app.framework.piece.AbstractPiece;
31  import org.apache.giraph.block_app.framework.piece.interfaces.VertexPostprocessor;
32  import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
33  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
34  import org.apache.giraph.conf.MessageClasses;
35  import org.apache.giraph.function.Consumer;
36  import org.apache.giraph.graph.Vertex;
37  import org.apache.giraph.types.NoMessage;
38  import org.apache.hadoop.io.Writable;
39  import org.apache.hadoop.io.WritableComparable;
40  
41  import com.google.common.base.Preconditions;
42  
43  /**
44   * Delegate Piece which allows combining multiple pieces in same iteration:
45   * new DelegatePiece(new LogicPiece(), new StatsPiece())
46   * You should be careful when doing so, since those pieces must not interact,
47   * and only one can send messages.
48   * Execution of any of the Piece methods by the framework is going to trigger
49   * sequential execution of that method on all of the pieces this DelegatePiece
50   * wraps. That means for example, getVertexSender is going to be called on all
51   * pieces before masterCompute is called on all pieces, which is called before
52   * getVertexReceiver on all pieces.
53   *
54   * Also, via overriding, it provides an abstract class for filtering. I.e. if
55   * you want piece that filters out calls to masterCompute, you can have:
56   * new FilterMasterPiece(new LogicPiece()),
57   * with FilterMasterPiece extends DelegatePiece, and only overrides getMaster
58   * function and DelegateMasterPiece class.
59   *
60   * @param <I> Vertex id type
61   * @param <V> Vertex value type
62   * @param <E> Edge value type
63   * @param <M> Message type
64   * @param <WV> Worker value type
65   * @param <WM> Worker message type
66   * @param <S> Execution stage type
67   */
68  @SuppressWarnings("rawtypes")
69  public class DelegatePiece<I extends WritableComparable, V extends Writable,
70      E extends Writable, M extends Writable, WV, WM extends Writable, S>
71      extends AbstractPiece<I, V, E, M, WV, WM, S> {
72  
73    private final List<AbstractPiece<I, V, E, M, WV, WM, S>> innerPieces;
74  
75    @SafeVarargs
76    @SuppressWarnings("unchecked")
77    public DelegatePiece(AbstractPiece<? super I, ? super V, ? super E,
78        ? super M, ? super WV, ? super WM, ? super S>... innerPieces) {
79      // Pieces are contravariant, but Java generics cannot express that,
80      // so use unchecked cast inside to allow callers to be typesafe
81      this.innerPieces = new ArrayList(Arrays.asList(innerPieces));
82    }
83  
84    @SuppressWarnings("unchecked")
85    public DelegatePiece(AbstractPiece<? super I, ? super V, ? super E,
86        ? super M, ? super WV, ? super WM, ? super S> innerPiece) {
87      // Pieces are contravariant, but Java generics cannot express that,
88      // so use unchecked cast inside to allow callers to be typesafe
89      this.innerPieces = new ArrayList(Arrays.asList(innerPiece));
90    }
91  
92    protected DelegateWorkerSendFunctions delegateWorkerSendFunctions(
93        ArrayList<InnerVertexSender> workerSendFunctions,
94        BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage) {
95      return new DelegateWorkerSendFunctions(workerSendFunctions);
96    }
97  
98    protected DelegateWorkerReceiveFunctions delegateWorkerReceiveFunctions(
99        ArrayList<VertexReceiver<I, V, E, M>> workerReceiveFunctions,
100       BlockWorkerReceiveApi<I> workerApi, S executionStage) {
101     return new DelegateWorkerReceiveFunctions(workerReceiveFunctions);
102   }
103 
104   @Override
105   public InnerVertexSender getWrappedVertexSender(
106       BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage) {
107     ArrayList<InnerVertexSender> workerSendFunctions =
108         new ArrayList<>(innerPieces.size());
109     for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
110       workerSendFunctions.add(
111           innerPiece.getWrappedVertexSender(workerApi, executionStage));
112     }
113     return delegateWorkerSendFunctions(
114         workerSendFunctions, workerApi, executionStage);
115   }
116 
117   @Override
118   public InnerVertexReceiver getVertexReceiver(
119       BlockWorkerReceiveApi<I> workerApi, S executionStage) {
120     ArrayList<VertexReceiver<I, V, E, M>> workerReceiveFunctions =
121         new ArrayList<>(innerPieces.size());
122     for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
123       workerReceiveFunctions.add(
124           innerPiece.getVertexReceiver(workerApi, executionStage));
125     }
126     return delegateWorkerReceiveFunctions(
127         workerReceiveFunctions, workerApi, executionStage);
128   }
129 
130   /** Delegating WorkerSendPiece */
131   protected class DelegateWorkerSendFunctions extends InnerVertexSender {
132     private final ArrayList<InnerVertexSender> workerSendFunctions;
133 
134     public DelegateWorkerSendFunctions(
135         ArrayList<InnerVertexSender> workerSendFunctions) {
136       this.workerSendFunctions = workerSendFunctions;
137     }
138 
139     @Override
140     public void vertexSend(Vertex<I, V, E> vertex) {
141       for (InnerVertexSender functions : workerSendFunctions) {
142         if (functions != null) {
143           functions.vertexSend(vertex);
144         }
145       }
146     }
147 
148     @Override
149     public void postprocess() {
150       for (InnerVertexSender functions : workerSendFunctions) {
151         if (functions != null) {
152           functions.postprocess();
153         }
154       }
155     }
156   }
157 
158   /** Delegating WorkerReceivePiece */
159   protected class DelegateWorkerReceiveFunctions extends InnerVertexReceiver {
160     private final ArrayList<VertexReceiver<I, V, E, M>>
161     workerReceiveFunctions;
162 
163     public DelegateWorkerReceiveFunctions(
164         ArrayList<VertexReceiver<I, V, E, M>> workerReceiveFunctions) {
165       this.workerReceiveFunctions = workerReceiveFunctions;
166     }
167 
168     @Override
169     public void vertexReceive(Vertex<I, V, E> vertex, Iterable<M> messages) {
170       for (VertexReceiver<I, V, E, M> functions :
171             workerReceiveFunctions) {
172         if (functions != null) {
173           functions.vertexReceive(vertex, messages);
174         }
175       }
176     }
177 
178     @Override
179     public void postprocess() {
180       for (VertexReceiver<I, V, E, M> functions :
181             workerReceiveFunctions) {
182         if (functions instanceof VertexPostprocessor) {
183           ((VertexPostprocessor) functions).postprocess();
184         }
185       }
186     }
187   }
188 
189   @Override
190   public void masterCompute(BlockMasterApi api, S executionStage) {
191     for (AbstractPiece<I, V, E, M, WV, WM, S> piece : innerPieces) {
192       piece.masterCompute(api, executionStage);
193     }
194   }
195 
196   @Override
197   public void workerContextSend(
198       BlockWorkerContextSendApi<I, WM> workerContextApi, S executionStage,
199       WV workerValue) {
200     for (AbstractPiece<I, V, E, M, WV, WM, S> piece : innerPieces) {
201       piece.workerContextSend(workerContextApi, executionStage, workerValue);
202     }
203   }
204 
205   @Override
206   public void workerContextReceive(
207       BlockWorkerContextReceiveApi workerContextApi, S executionStage,
208       WV workerValue, List<WM> workerMessages) {
209     for (AbstractPiece<I, V, E, M, WV, WM, S> piece : innerPieces) {
210       piece.workerContextReceive(
211           workerContextApi, executionStage, workerValue, workerMessages);
212     }
213   }
214 
215   @Override
216   public S nextExecutionStage(S executionStage) {
217     for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
218       executionStage = innerPiece.nextExecutionStage(executionStage);
219     }
220     return executionStage;
221   }
222 
223   @Override
224   public MessageClasses<I, M> getMessageClasses(
225       ImmutableClassesGiraphConfiguration conf) {
226     MessageClasses<I, M> messageClasses = null;
227     MessageClasses<I, M> firstMessageClasses = null;
228     for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
229       MessageClasses<I, M> cur = innerPiece.getMessageClasses(conf);
230       Preconditions.checkState(cur != null);
231       if (!cur.getMessageClass().equals(NoMessage.class)) {
232         if (messageClasses != null) {
233           throw new RuntimeException(
234               "Only one piece combined through delegate (" +
235               toString() + ") can send messages");
236         }
237         messageClasses = cur;
238       }
239       if (firstMessageClasses == null) {
240         firstMessageClasses = cur;
241       }
242     }
243     return messageClasses != null ? messageClasses : firstMessageClasses;
244   }
245 
246   @Override
247   public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
248     for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
249       innerPiece.forAllPossiblePieces(consumer);
250     }
251   }
252 
253   @Override
254   public PieceCount getPieceCount() {
255     return new PieceCount(1);
256   }
257 
258   @SuppressWarnings("deprecation")
259   @Override
260   public void registerAggregators(BlockMasterApi master)
261       throws InstantiationException, IllegalAccessException {
262     for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
263       innerPiece.registerAggregators(master);
264     }
265   }
266 
267   @Override
268   public void wrappedRegisterReducers(
269       BlockMasterApi masterApi, S executionStage) {
270     for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
271       innerPiece.wrappedRegisterReducers(masterApi, executionStage);
272     }
273   }
274 
275   protected String delegationName() {
276     return "Delegate";
277   }
278 
279   @Override
280   public String toString() {
281     return delegationName() + innerPieces.toString();
282   }
283 }