View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.framework.piece;
19  
20  import java.util.Iterator;
21  import java.util.List;
22  
23  import org.apache.giraph.block_app.framework.api.BlockMasterApi;
24  import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi;
25  import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
26  import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
27  import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
28  import org.apache.giraph.block_app.framework.block.Block;
29  import org.apache.giraph.block_app.framework.piece.interfaces.VertexPostprocessor;
30  import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
31  import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
32  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
33  import org.apache.giraph.conf.MessageClasses;
34  import org.apache.giraph.function.Consumer;
35  import org.apache.hadoop.io.Writable;
36  import org.apache.hadoop.io.WritableComparable;
37  
38  import com.google.common.collect.Iterators;
39  
40  /**
41   * Parent of all Pieces, contains comprehensive list of methods Piece
42   * can support. Specific subclasses should be extended directly,
43   * to simplify usage - most frequently for example Piece class.
44   *
45   * Single unit of execution, capturing:
46   * - sending and then receiving messages from vertices
47   * - sending data to be aggregated from workers to master
48   * - sending values from master, via aggregators, to workers
49   * - sending and receiving worker messages
50   *
51   *
52   * Order of execution is:
53   * - On master, once at the start of the application
54   * -- registerAggregators (deprecated, use registerReducers instead)
55   *
56   * - After masterCompute of previous piece, on master:
57   * -- registerReducers
58   *
59   * - Send logic on workers:
60   * -- getVertexSender per each worker thread, and on object returned by it:
61   * --- vertexSend on each vertex
62   * --- postprocess on each worker thread
63   * -- workerContextSend per worker
64   *
65   * - Logic on master:
66   * -- masterCompute
67   *
68   * - Receive logic on workers:
69   * -- workerContextReceive per worker
70   * -- getVertexReceiver per each worker thread, and on object returned by it:
71   * --- vertexReceive on each vertex
72   * --- postprocess on each worker thread
73   *
74   * And before everything, during initialization, registerAggregators.
75   *
76   * Only masterCompute and registerReducers/registerAggregators should modify
77   * the Piece, all of the worker methods should treat Piece as read-only.
78   *
79   * Each piece should be encapsulated unit of execution. Vertex value should be
80   * used as a single implicit "communication" channel between different pieces,
81   * all other dependencies should be explicitly defined and passed through
82   * constructor, via interfaces (as explained below).
83   * I.e. state of the vertex value is invariant that Pieces act upon.
84   * Best is not to depend on explicit vertex value class, but on interface that
85   * provides all needed functions, so that pieces can be freely combined,
86   * as long as vertex value implements appropriate ones.
87   * Similarly, use most abstract class you need - if Piece doesn't depend
88   * on edge value, don't use NullWritable, but Writable. Or if it doesn't
89   * depend on ExecutionStage, use Object for it.
90   *
91   * All other external dependencies should be explicitly passed through
92   * constructor, through interfaces.
93   *
94   * All Pieces will be created within one context - on the master.
95   * They are then going to be replicated across all workers, and across all
96   * threads within each worker, and will see everything that happens in global
97   * context (masterCompute) before them, including any state master has.
98   * Through ObjectHolder/ObjectTransfer, you can pass data between Pieces in
99   * global context, and from global context to worker functions of a Piece
100  * that happens in the future.
101  *
102  * VertexReceiver of previous Piece and VertexSender of next Piece live in
103  * the same context, and vertexReceive of the next Piece is executed
104  * immediately after vertexSend of the previous piece, before vertexSend is
105  * called on the next vertex.
106  * This detail allows you to have external dependency on each other through
107  * memory only mediator objects - like ObjectTransfer.
108  *
109  * All other logic going to live in different contexts,
110  * specifically VertexSender and VertexReceiver of the same Piece,
111  * or workerContextSend and VertexSender of the same Piece, and cannot interact
112  * with each other outside of changing the state of the graph or using
113  * global communication api.
114  *
115  * All methods on this class (or objects it returns) will be called serially,
116  * so there is no need for any Thread synchronization.
117  * Each Thread will have a complete deep copy of the Piece, to achieve that,
118  * so all static fields must be written to be Thread safe!
119  * (i.e. either immutable, or have synchronized/locked access to them)
120  *
121  * @param <I> Vertex id type
122  * @param <V> Vertex value type
123  * @param <E> Edge value type
124  * @param <M> Message type
125  * @param <WV> Worker value type
126  * @param <WM> Worker message type
127  * @param <S> Execution stage type
128  */
129 @SuppressWarnings({ "rawtypes" })
130 public abstract class AbstractPiece<I extends WritableComparable,
131     V extends Writable, E extends Writable, M extends Writable, WV,
132     WM extends Writable, S> implements Block {
133 
134   // Overridable functions
135 
136   // registerReducers(CreateReducersApi reduceApi, S executionStage)
137 
138   /**
139    * Add automatic handling of reducers to registerReducers.
140    * Only for internal use.
141    */
142   public abstract void wrappedRegisterReducers(
143       BlockMasterApi masterApi, S executionStage);
144 
145   // getVertexSender(BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage)
146 
147   /**
148    * Add automatic handling of reducers to getVertexSender.
149    *
150    * Only for Framework internal use.
151    */
152   public abstract InnerVertexSender getWrappedVertexSender(
153       final BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage);
154 
155   /**
156    * Override to have worker context send computation.
157    *
158    * Called once per worker, after all vertices have been processed with
159    * getVertexSender.
160    */
161   public void workerContextSend(
162       BlockWorkerContextSendApi<I, WM> workerContextApi, S executionStage,
163       WV workerValue) {
164   }
165 
166   /**
167    * Function that is called on master, after send phase, before receive phase.
168    *
169    * It can:
170    * - read aggregators sent from worker
171    * - do global processing
172    * - send data to workers through aggregators
173    */
174   public void masterCompute(BlockMasterApi masterApi, S executionStage) {
175   }
176 
177   /**
178    * Override to have worker context receive computation.
179    *
180    * Called once per worker, before all vertices are going to be processed
181    * with getVertexReceiver.
182    */
183   public void workerContextReceive(
184       BlockWorkerContextReceiveApi workerContextApi, S executionStage,
185       WV workerValue, List<WM> workerMessages) {
186   }
187 
188   /**
189    * Override to do vertex receive processing.
190    *
191    * Creates handler that defines what should be executed on worker
192    * for each vertex during receive phase.
193    *
194    * This logic executed last.
195    * This function is called once on each worker on each thread, in parallel,
196    * on their copy of Piece object to create functions handler.
197    *
198    * If returned object implements Postprocessor interface, then corresponding
199    * postprocess() function is going to be called once, after all vertices
200    * corresponding thread needed to process are done.
201    */
202   public VertexReceiver<I, V, E, M> getVertexReceiver(
203       BlockWorkerReceiveApi<I> workerApi, S executionStage) {
204     return null;
205   }
206 
207   /**
208    * Returns MessageClasses definition for messages being sent by this Piece.
209    */
210   public abstract MessageClasses<I, M> getMessageClasses(
211       ImmutableClassesGiraphConfiguration conf);
212 
213   /**
214    * Override to provide different next execution stage for
215    * Pieces that come after it.
216    *
217    * Execution stage should be immutable, and this function should be
218    * returning a new object, if it needs to return different value.
219    *
220    * It affects pieces that come after this piece,
221    * and isn't applied to execution stage this piece sees.
222    */
223   public S nextExecutionStage(S executionStage) {
224     return executionStage;
225   }
226 
227   /**
228    * Override to register any potential aggregators used by this piece.
229    *
230    * @deprecated Use registerReducers instead.
231    */
232   @Deprecated
233   public void registerAggregators(BlockMasterApi masterApi)
234       throws InstantiationException, IllegalAccessException {
235   }
236 
237   // Inner classes
238 
239   /** Inner class to provide clean use without specifying types */
240   public abstract class InnerVertexSender
241       implements VertexSender<I, V, E>, VertexPostprocessor {
242     @Override
243     public void postprocess() { }
244   }
245 
246   /** Inner class to provide clean use without specifying types */
247   public abstract class InnerVertexReceiver
248       implements VertexReceiver<I, V, E, M>, VertexPostprocessor {
249     @Override
250     public void postprocess() { }
251   }
252 
253   // Internal implementation
254 
255   @Override
256   public final Iterator<AbstractPiece> iterator() {
257     return Iterators.<AbstractPiece>singletonIterator(this);
258   }
259 
260   @Override
261   public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
262     consumer.apply(this);
263   }
264 
265   @Override
266   public String toString() {
267     String name = getClass().getSimpleName();
268     if (name.isEmpty()) {
269       name = getClass().getName();
270     }
271     return name;
272   }
273 
274 
275   // make hashCode and equals final, forcing them to be based on
276   // reference identity.
277   @Override
278   public final int hashCode() {
279     return super.hashCode();
280   }
281 
282   @Override
283   public final boolean equals(Object obj) {
284     return super.equals(obj);
285   }
286 
287 }