This project has retired. For details please refer to its
        
        Attic page.
      
 
DelegatePiece xref
1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  package org.apache.giraph.block_app.framework.piece.delegate;
19  
20  import java.util.ArrayList;
21  import java.util.Arrays;
22  import java.util.List;
23  
24  import org.apache.giraph.block_app.framework.api.BlockMasterApi;
25  import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi;
26  import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
27  import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
28  import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
29  import org.apache.giraph.block_app.framework.block.PieceCount;
30  import org.apache.giraph.block_app.framework.piece.AbstractPiece;
31  import org.apache.giraph.block_app.framework.piece.interfaces.VertexPostprocessor;
32  import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
33  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
34  import org.apache.giraph.conf.MessageClasses;
35  import org.apache.giraph.function.Consumer;
36  import org.apache.giraph.graph.Vertex;
37  import org.apache.giraph.types.NoMessage;
38  import org.apache.hadoop.io.Writable;
39  import org.apache.hadoop.io.WritableComparable;
40  
41  import com.google.common.base.Preconditions;
42  
43  
44  
45  
46  
47  
48  
49  
50  
51  
52  
53  
54  
55  
56  
57  
58  
59  
60  
61  
62  
63  
64  
65  
66  
67  
68  @SuppressWarnings("rawtypes")
69  public class DelegatePiece<I extends WritableComparable, V extends Writable,
70      E extends Writable, M extends Writable, WV, WM extends Writable, S>
71      extends AbstractPiece<I, V, E, M, WV, WM, S> {
72  
73    private final List<AbstractPiece<I, V, E, M, WV, WM, S>> innerPieces;
74  
75    @SafeVarargs
76    @SuppressWarnings("unchecked")
77    public DelegatePiece(AbstractPiece<? super I, ? super V, ? super E,
78        ? super M, ? super WV, ? super WM, ? super S>... innerPieces) {
79      
80      
81      this.innerPieces = new ArrayList(Arrays.asList(innerPieces));
82    }
83  
84    @SuppressWarnings("unchecked")
85    public DelegatePiece(AbstractPiece<? super I, ? super V, ? super E,
86        ? super M, ? super WV, ? super WM, ? super S> innerPiece) {
87      
88      
89      this.innerPieces = new ArrayList(Arrays.asList(innerPiece));
90    }
91  
92    protected DelegateWorkerSendFunctions delegateWorkerSendFunctions(
93        ArrayList<InnerVertexSender> workerSendFunctions,
94        BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage) {
95      return new DelegateWorkerSendFunctions(workerSendFunctions);
96    }
97  
98    protected DelegateWorkerReceiveFunctions delegateWorkerReceiveFunctions(
99        ArrayList<VertexReceiver<I, V, E, M>> workerReceiveFunctions,
100       BlockWorkerReceiveApi<I> workerApi, S executionStage) {
101     return new DelegateWorkerReceiveFunctions(workerReceiveFunctions);
102   }
103 
104   @Override
105   public InnerVertexSender getWrappedVertexSender(
106       BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage) {
107     ArrayList<InnerVertexSender> workerSendFunctions =
108         new ArrayList<>(innerPieces.size());
109     for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
110       workerSendFunctions.add(
111           innerPiece.getWrappedVertexSender(workerApi, executionStage));
112     }
113     return delegateWorkerSendFunctions(
114         workerSendFunctions, workerApi, executionStage);
115   }
116 
117   @Override
118   public InnerVertexReceiver getVertexReceiver(
119       BlockWorkerReceiveApi<I> workerApi, S executionStage) {
120     ArrayList<VertexReceiver<I, V, E, M>> workerReceiveFunctions =
121         new ArrayList<>(innerPieces.size());
122     for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
123       workerReceiveFunctions.add(
124           innerPiece.getVertexReceiver(workerApi, executionStage));
125     }
126     return delegateWorkerReceiveFunctions(
127         workerReceiveFunctions, workerApi, executionStage);
128   }
129 
130   
131   protected class DelegateWorkerSendFunctions extends InnerVertexSender {
132     private final ArrayList<InnerVertexSender> workerSendFunctions;
133 
134     public DelegateWorkerSendFunctions(
135         ArrayList<InnerVertexSender> workerSendFunctions) {
136       this.workerSendFunctions = workerSendFunctions;
137     }
138 
139     @Override
140     public void vertexSend(Vertex<I, V, E> vertex) {
141       for (InnerVertexSender functions : workerSendFunctions) {
142         if (functions != null) {
143           functions.vertexSend(vertex);
144         }
145       }
146     }
147 
148     @Override
149     public void postprocess() {
150       for (InnerVertexSender functions : workerSendFunctions) {
151         if (functions != null) {
152           functions.postprocess();
153         }
154       }
155     }
156   }
157 
158   
159   protected class DelegateWorkerReceiveFunctions extends InnerVertexReceiver {
160     private final ArrayList<VertexReceiver<I, V, E, M>>
161     workerReceiveFunctions;
162 
163     public DelegateWorkerReceiveFunctions(
164         ArrayList<VertexReceiver<I, V, E, M>> workerReceiveFunctions) {
165       this.workerReceiveFunctions = workerReceiveFunctions;
166     }
167 
168     @Override
169     public void vertexReceive(Vertex<I, V, E> vertex, Iterable<M> messages) {
170       for (VertexReceiver<I, V, E, M> functions :
171             workerReceiveFunctions) {
172         if (functions != null) {
173           functions.vertexReceive(vertex, messages);
174         }
175       }
176     }
177 
178     @Override
179     public void postprocess() {
180       for (VertexReceiver<I, V, E, M> functions :
181             workerReceiveFunctions) {
182         if (functions instanceof VertexPostprocessor) {
183           ((VertexPostprocessor) functions).postprocess();
184         }
185       }
186     }
187   }
188 
189   @Override
190   public void masterCompute(BlockMasterApi api, S executionStage) {
191     for (AbstractPiece<I, V, E, M, WV, WM, S> piece : innerPieces) {
192       piece.masterCompute(api, executionStage);
193     }
194   }
195 
196   @Override
197   public void workerContextSend(
198       BlockWorkerContextSendApi<I, WM> workerContextApi, S executionStage,
199       WV workerValue) {
200     for (AbstractPiece<I, V, E, M, WV, WM, S> piece : innerPieces) {
201       piece.workerContextSend(workerContextApi, executionStage, workerValue);
202     }
203   }
204 
205   @Override
206   public void workerContextReceive(
207       BlockWorkerContextReceiveApi workerContextApi, S executionStage,
208       WV workerValue, List<WM> workerMessages) {
209     for (AbstractPiece<I, V, E, M, WV, WM, S> piece : innerPieces) {
210       piece.workerContextReceive(
211           workerContextApi, executionStage, workerValue, workerMessages);
212     }
213   }
214 
215   @Override
216   public S nextExecutionStage(S executionStage) {
217     for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
218       executionStage = innerPiece.nextExecutionStage(executionStage);
219     }
220     return executionStage;
221   }
222 
223   @Override
224   public MessageClasses<I, M> getMessageClasses(
225       ImmutableClassesGiraphConfiguration conf) {
226     MessageClasses<I, M> messageClasses = null;
227     MessageClasses<I, M> firstMessageClasses = null;
228     for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
229       MessageClasses<I, M> cur = innerPiece.getMessageClasses(conf);
230       Preconditions.checkState(cur != null);
231       if (!cur.getMessageClass().equals(NoMessage.class)) {
232         if (messageClasses != null) {
233           throw new RuntimeException(
234               "Only one piece combined through delegate (" +
235               toString() + ") can send messages");
236         }
237         messageClasses = cur;
238       }
239       if (firstMessageClasses == null) {
240         firstMessageClasses = cur;
241       }
242     }
243     return messageClasses != null ? messageClasses : firstMessageClasses;
244   }
245 
246   @Override
247   public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
248     for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
249       innerPiece.forAllPossiblePieces(consumer);
250     }
251   }
252 
253   @Override
254   public PieceCount getPieceCount() {
255     return new PieceCount(1);
256   }
257 
258   @SuppressWarnings("deprecation")
259   @Override
260   public void registerAggregators(BlockMasterApi master)
261       throws InstantiationException, IllegalAccessException {
262     for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
263       innerPiece.registerAggregators(master);
264     }
265   }
266 
267   @Override
268   public void wrappedRegisterReducers(
269       BlockMasterApi masterApi, S executionStage) {
270     for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
271       innerPiece.wrappedRegisterReducers(masterApi, executionStage);
272     }
273   }
274 
275   protected String delegationName() {
276     return "Delegate";
277   }
278 
279   @Override
280   public String toString() {
281     return delegationName() + innerPieces.toString();
282   }
283 }