1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.giraph.block_app.framework.piece;
19
20 import java.util.Iterator;
21 import java.util.List;
22
23 import org.apache.giraph.block_app.framework.api.BlockMasterApi;
24 import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi;
25 import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
26 import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
27 import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
28 import org.apache.giraph.block_app.framework.block.Block;
29 import org.apache.giraph.block_app.framework.block.PieceCount;
30 import org.apache.giraph.block_app.framework.piece.interfaces.VertexPostprocessor;
31 import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
32 import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
33 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
34 import org.apache.giraph.conf.MessageClasses;
35 import org.apache.giraph.function.Consumer;
36 import org.apache.hadoop.io.Writable;
37 import org.apache.hadoop.io.WritableComparable;
38
39 import com.google.common.collect.Iterators;
40
41 /**
42 * Parent of all Pieces, contains comprehensive list of methods Piece
43 * can support. Specific subclasses should be extended directly,
44 * to simplify usage - most frequently for example Piece class.
45 *
46 * Single unit of execution, capturing:
47 * - sending and then receiving messages from vertices
48 * - sending data to be aggregated from workers to master
49 * - sending values from master, via aggregators, to workers
50 * - sending and receiving worker messages
51 *
52 *
53 * Order of execution is:
54 * - On master, once at the start of the application
55 * -- registerAggregators (deprecated, use registerReducers instead)
56 *
57 * - After masterCompute of previous piece, on master:
58 * -- registerReducers
59 *
60 * - Send logic on workers:
61 * -- getVertexSender per each worker thread, and on object returned by it:
62 * --- vertexSend on each vertex
63 * --- postprocess on each worker thread
64 * -- workerContextSend per worker
65 *
66 * - Logic on master:
67 * -- masterCompute
68 *
69 * - Receive logic on workers:
70 * -- workerContextReceive per worker
71 * -- getVertexReceiver per each worker thread, and on object returned by it:
72 * --- vertexReceive on each vertex
73 * --- postprocess on each worker thread
74 *
75 * And before everything, during initialization, registerAggregators.
76 *
77 * Only masterCompute and registerReducers/registerAggregators should modify
78 * the Piece, all of the worker methods should treat Piece as read-only.
79 *
80 * Each piece should be encapsulated unit of execution. Vertex value should be
81 * used as a single implicit "communication" channel between different pieces,
82 * all other dependencies should be explicitly defined and passed through
83 * constructor, via interfaces (as explained below).
84 * I.e. state of the vertex value is invariant that Pieces act upon.
85 * Best is not to depend on explicit vertex value class, but on interface that
86 * provides all needed functions, so that pieces can be freely combined,
87 * as long as vertex value implements appropriate ones.
88 * Similarly, use most abstract class you need - if Piece doesn't depend
89 * on edge value, don't use NullWritable, but Writable. Or if it doesn't
90 * depend on ExecutionStage, use Object for it.
91 *
92 * All other external dependencies should be explicitly passed through
93 * constructor, through interfaces.
94 *
95 * All Pieces will be created within one context - on the master.
96 * They are then going to be replicated across all workers, and across all
97 * threads within each worker, and will see everything that happens in global
98 * context (masterCompute) before them, including any state master has.
99 * Through ObjectHolder/ObjectTransfer, you can pass data between Pieces in
100 * global context, and from global context to worker functions of a Piece
101 * that happens in the future.
102 *
103 * VertexReceiver of previous Piece and VertexSender of next Piece live in
104 * the same context, and vertexReceive of the next Piece is executed
105 * immediately after vertexSend of the previous piece, before vertexSend is
106 * called on the next vertex.
107 * This detail allows you to have external dependency on each other through
108 * memory only mediator objects - like ObjectTransfer.
109 *
110 * All other logic going to live in different contexts,
111 * specifically VertexSender and VertexReceiver of the same Piece,
112 * or workerContextSend and VertexSender of the same Piece, and cannot interact
113 * with each other outside of changing the state of the graph or using
114 * global communication api.
115 *
116 * All methods on this class (or objects it returns) will be called serially,
117 * so there is no need for any Thread synchronization.
118 * Each Thread will have a complete deep copy of the Piece, to achieve that,
119 * so all static fields must be written to be Thread safe!
120 * (i.e. either immutable, or have synchronized/locked access to them)
121 *
122 * @param <I> Vertex id type
123 * @param <V> Vertex value type
124 * @param <E> Edge value type
125 * @param <M> Message type
126 * @param <WV> Worker value type
127 * @param <WM> Worker message type
128 * @param <S> Execution stage type
129 */
130 @SuppressWarnings({ "rawtypes" })
131 public abstract class AbstractPiece<I extends WritableComparable,
132 V extends Writable, E extends Writable, M extends Writable, WV,
133 WM extends Writable, S> implements Block {
134
135 // Overridable functions
136
137 // registerReducers(CreateReducersApi reduceApi, S executionStage)
138
139 /**
140 * Add automatic handling of reducers to registerReducers.
141 * Only for internal use.
142 */
143 public abstract void wrappedRegisterReducers(
144 BlockMasterApi masterApi, S executionStage);
145
146 // getVertexSender(BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage)
147
148 /**
149 * Add automatic handling of reducers to getVertexSender.
150 *
151 * Only for Framework internal use.
152 */
153 public abstract InnerVertexSender getWrappedVertexSender(
154 final BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage);
155
156 /**
157 * Override to have worker context send computation.
158 *
159 * Called once per worker, after all vertices have been processed with
160 * getVertexSender.
161 */
162 public void workerContextSend(
163 BlockWorkerContextSendApi<I, WM> workerContextApi, S executionStage,
164 WV workerValue) {
165 }
166
167 /**
168 * Function that is called on master, after send phase, before receive phase.
169 *
170 * It can:
171 * - read aggregators sent from worker
172 * - do global processing
173 * - send data to workers through aggregators
174 */
175 public void masterCompute(BlockMasterApi masterApi, S executionStage) {
176 }
177
178 /**
179 * Override to have worker context receive computation.
180 *
181 * Called once per worker, before all vertices are going to be processed
182 * with getVertexReceiver.
183 */
184 public void workerContextReceive(
185 BlockWorkerContextReceiveApi workerContextApi, S executionStage,
186 WV workerValue, List<WM> workerMessages) {
187 }
188
189 /**
190 * Override to do vertex receive processing.
191 *
192 * Creates handler that defines what should be executed on worker
193 * for each vertex during receive phase.
194 *
195 * This logic executed last.
196 * This function is called once on each worker on each thread, in parallel,
197 * on their copy of Piece object to create functions handler.
198 *
199 * If returned object implements Postprocessor interface, then corresponding
200 * postprocess() function is going to be called once, after all vertices
201 * corresponding thread needed to process are done.
202 */
203 public VertexReceiver<I, V, E, M> getVertexReceiver(
204 BlockWorkerReceiveApi<I> workerApi, S executionStage) {
205 return null;
206 }
207
208 /**
209 * Returns MessageClasses definition for messages being sent by this Piece.
210 */
211 public abstract MessageClasses<I, M> getMessageClasses(
212 ImmutableClassesGiraphConfiguration conf);
213
214 /**
215 * Override to provide different next execution stage for
216 * Pieces that come after it.
217 *
218 * Execution stage should be immutable, and this function should be
219 * returning a new object, if it needs to return different value.
220 *
221 * It affects pieces that come after this piece,
222 * and isn't applied to execution stage this piece sees.
223 */
224 public S nextExecutionStage(S executionStage) {
225 return executionStage;
226 }
227
228 /**
229 * Override to register any potential aggregators used by this piece.
230 *
231 * @deprecated Use registerReducers instead.
232 */
233 @Deprecated
234 public void registerAggregators(BlockMasterApi masterApi)
235 throws InstantiationException, IllegalAccessException {
236 }
237
238 // Inner classes
239
240 /** Inner class to provide clean use without specifying types */
241 public abstract class InnerVertexSender
242 implements VertexSender<I, V, E>, VertexPostprocessor {
243 @Override
244 public void postprocess() { }
245 }
246
247 /** Inner class to provide clean use without specifying types */
248 public abstract class InnerVertexReceiver
249 implements VertexReceiver<I, V, E, M>, VertexPostprocessor {
250 @Override
251 public void postprocess() { }
252 }
253
254 // Internal implementation
255
256 @Override
257 public final Iterator<AbstractPiece> iterator() {
258 return Iterators.<AbstractPiece>singletonIterator(this);
259 }
260
261 @Override
262 public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
263 consumer.apply(this);
264 }
265
266 @Override
267 public PieceCount getPieceCount() {
268 return new PieceCount(1);
269 }
270
271 @Override
272 public String toString() {
273 String name = getClass().getSimpleName();
274 if (name.isEmpty()) {
275 name = getClass().getName();
276 }
277 return name;
278 }
279
280
281 // make hashCode and equals final, forcing them to be based on
282 // reference identity.
283 @Override
284 public final int hashCode() {
285 return super.hashCode();
286 }
287
288 @Override
289 public final boolean equals(Object obj) {
290 return super.equals(obj);
291 }
292
293 }