This project has retired. For details please refer to its Attic page.
AbstractPiece xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.framework.piece;
19  
20  import java.util.Iterator;
21  import java.util.List;
22  
23  import org.apache.giraph.block_app.framework.api.BlockMasterApi;
24  import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi;
25  import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
26  import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
27  import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
28  import org.apache.giraph.block_app.framework.block.Block;
29  import org.apache.giraph.block_app.framework.block.PieceCount;
30  import org.apache.giraph.block_app.framework.piece.interfaces.VertexPostprocessor;
31  import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
32  import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
33  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
34  import org.apache.giraph.conf.MessageClasses;
35  import org.apache.giraph.function.Consumer;
36  import org.apache.hadoop.io.Writable;
37  import org.apache.hadoop.io.WritableComparable;
38  
39  import com.google.common.collect.Iterators;
40  
41  /**
42   * Parent of all Pieces, contains comprehensive list of methods Piece
43   * can support. Specific subclasses should be extended directly,
44   * to simplify usage - most frequently for example Piece class.
45   *
46   * Single unit of execution, capturing:
47   * - sending and then receiving messages from vertices
48   * - sending data to be aggregated from workers to master
49   * - sending values from master, via aggregators, to workers
50   * - sending and receiving worker messages
51   *
52   *
53   * Order of execution is:
54   * - On master, once at the start of the application
55   * -- registerAggregators (deprecated, use registerReducers instead)
56   *
57   * - After masterCompute of previous piece, on master:
58   * -- registerReducers
59   *
60   * - Send logic on workers:
61   * -- getVertexSender per each worker thread, and on object returned by it:
62   * --- vertexSend on each vertex
63   * --- postprocess on each worker thread
64   * -- workerContextSend per worker
65   *
66   * - Logic on master:
67   * -- masterCompute
68   *
69   * - Receive logic on workers:
70   * -- workerContextReceive per worker
71   * -- getVertexReceiver per each worker thread, and on object returned by it:
72   * --- vertexReceive on each vertex
73   * --- postprocess on each worker thread
74   *
75   * And before everything, during initialization, registerAggregators.
76   *
77   * Only masterCompute and registerReducers/registerAggregators should modify
78   * the Piece, all of the worker methods should treat Piece as read-only.
79   *
80   * Each piece should be encapsulated unit of execution. Vertex value should be
81   * used as a single implicit "communication" channel between different pieces,
82   * all other dependencies should be explicitly defined and passed through
83   * constructor, via interfaces (as explained below).
84   * I.e. state of the vertex value is invariant that Pieces act upon.
85   * Best is not to depend on explicit vertex value class, but on interface that
86   * provides all needed functions, so that pieces can be freely combined,
87   * as long as vertex value implements appropriate ones.
88   * Similarly, use most abstract class you need - if Piece doesn't depend
89   * on edge value, don't use NullWritable, but Writable. Or if it doesn't
90   * depend on ExecutionStage, use Object for it.
91   *
92   * All other external dependencies should be explicitly passed through
93   * constructor, through interfaces.
94   *
95   * All Pieces will be created within one context - on the master.
96   * They are then going to be replicated across all workers, and across all
97   * threads within each worker, and will see everything that happens in global
98   * context (masterCompute) before them, including any state master has.
99   * Through ObjectHolder/ObjectTransfer, you can pass data between Pieces in
100  * global context, and from global context to worker functions of a Piece
101  * that happens in the future.
102  *
103  * VertexReceiver of previous Piece and VertexSender of next Piece live in
104  * the same context, and vertexReceive of the next Piece is executed
105  * immediately after vertexSend of the previous piece, before vertexSend is
106  * called on the next vertex.
107  * This detail allows you to have external dependency on each other through
108  * memory only mediator objects - like ObjectTransfer.
109  *
110  * All other logic going to live in different contexts,
111  * specifically VertexSender and VertexReceiver of the same Piece,
112  * or workerContextSend and VertexSender of the same Piece, and cannot interact
113  * with each other outside of changing the state of the graph or using
114  * global communication api.
115  *
116  * All methods on this class (or objects it returns) will be called serially,
117  * so there is no need for any Thread synchronization.
118  * Each Thread will have a complete deep copy of the Piece, to achieve that,
119  * so all static fields must be written to be Thread safe!
120  * (i.e. either immutable, or have synchronized/locked access to them)
121  *
122  * @param <I> Vertex id type
123  * @param <V> Vertex value type
124  * @param <E> Edge value type
125  * @param <M> Message type
126  * @param <WV> Worker value type
127  * @param <WM> Worker message type
128  * @param <S> Execution stage type
129  */
130 @SuppressWarnings({ "rawtypes" })
131 public abstract class AbstractPiece<I extends WritableComparable,
132     V extends Writable, E extends Writable, M extends Writable, WV,
133     WM extends Writable, S> implements Block {
134 
135   // Overridable functions
136 
137   // registerReducers(CreateReducersApi reduceApi, S executionStage)
138 
139   /**
140    * Add automatic handling of reducers to registerReducers.
141    * Only for internal use.
142    */
143   public abstract void wrappedRegisterReducers(
144       BlockMasterApi masterApi, S executionStage);
145 
146   // getVertexSender(BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage)
147 
148   /**
149    * Add automatic handling of reducers to getVertexSender.
150    *
151    * Only for Framework internal use.
152    */
153   public abstract InnerVertexSender getWrappedVertexSender(
154       final BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage);
155 
156   /**
157    * Override to have worker context send computation.
158    *
159    * Called once per worker, after all vertices have been processed with
160    * getVertexSender.
161    */
162   public void workerContextSend(
163       BlockWorkerContextSendApi<I, WM> workerContextApi, S executionStage,
164       WV workerValue) {
165   }
166 
167   /**
168    * Function that is called on master, after send phase, before receive phase.
169    *
170    * It can:
171    * - read aggregators sent from worker
172    * - do global processing
173    * - send data to workers through aggregators
174    */
175   public void masterCompute(BlockMasterApi masterApi, S executionStage) {
176   }
177 
178   /**
179    * Override to have worker context receive computation.
180    *
181    * Called once per worker, before all vertices are going to be processed
182    * with getVertexReceiver.
183    */
184   public void workerContextReceive(
185       BlockWorkerContextReceiveApi workerContextApi, S executionStage,
186       WV workerValue, List<WM> workerMessages) {
187   }
188 
189   /**
190    * Override to do vertex receive processing.
191    *
192    * Creates handler that defines what should be executed on worker
193    * for each vertex during receive phase.
194    *
195    * This logic executed last.
196    * This function is called once on each worker on each thread, in parallel,
197    * on their copy of Piece object to create functions handler.
198    *
199    * If returned object implements Postprocessor interface, then corresponding
200    * postprocess() function is going to be called once, after all vertices
201    * corresponding thread needed to process are done.
202    */
203   public VertexReceiver<I, V, E, M> getVertexReceiver(
204       BlockWorkerReceiveApi<I> workerApi, S executionStage) {
205     return null;
206   }
207 
208   /**
209    * Returns MessageClasses definition for messages being sent by this Piece.
210    */
211   public abstract MessageClasses<I, M> getMessageClasses(
212       ImmutableClassesGiraphConfiguration conf);
213 
214   /**
215    * Override to provide different next execution stage for
216    * Pieces that come after it.
217    *
218    * Execution stage should be immutable, and this function should be
219    * returning a new object, if it needs to return different value.
220    *
221    * It affects pieces that come after this piece,
222    * and isn't applied to execution stage this piece sees.
223    */
224   public S nextExecutionStage(S executionStage) {
225     return executionStage;
226   }
227 
228   /**
229    * Override to register any potential aggregators used by this piece.
230    *
231    * @deprecated Use registerReducers instead.
232    */
233   @Deprecated
234   public void registerAggregators(BlockMasterApi masterApi)
235       throws InstantiationException, IllegalAccessException {
236   }
237 
238   // Inner classes
239 
240   /** Inner class to provide clean use without specifying types */
241   public abstract class InnerVertexSender
242       implements VertexSender<I, V, E>, VertexPostprocessor {
243     @Override
244     public void postprocess() { }
245   }
246 
247   /** Inner class to provide clean use without specifying types */
248   public abstract class InnerVertexReceiver
249       implements VertexReceiver<I, V, E, M>, VertexPostprocessor {
250     @Override
251     public void postprocess() { }
252   }
253 
254   // Internal implementation
255 
256   @Override
257   public final Iterator<AbstractPiece> iterator() {
258     return Iterators.<AbstractPiece>singletonIterator(this);
259   }
260 
261   @Override
262   public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
263     consumer.apply(this);
264   }
265 
266   @Override
267   public PieceCount getPieceCount() {
268     return new PieceCount(1);
269   }
270 
271   @Override
272   public String toString() {
273     String name = getClass().getSimpleName();
274     if (name.isEmpty()) {
275       name = getClass().getName();
276     }
277     return name;
278   }
279 
280 
281   // make hashCode and equals final, forcing them to be based on
282   // reference identity.
283   @Override
284   public final int hashCode() {
285     return super.hashCode();
286   }
287 
288   @Override
289   public final boolean equals(Object obj) {
290     return super.equals(obj);
291   }
292 
293 }