Flow of the program within the framework is defined as an Iterable of steps, represented as a Block class. Each Block is a unit of execution, representing some computation to be performed on the graph. We can combine simple Blocks into more complex execution using nesting:
new SequenceBlock(b1, b2, ...) new RepeatBlock(n, b) new IfBlock(condition, thenBlock)
Each step is represented as a Piece class. Implementation logic will be nicely organized and hidden within Pieces, and flow of the application is clear. Piece captures these three consecutive operations:
Example is a good way of understanding how things work, and we will explain how to build PageRank on top of the Framework. Note that framework works with Java 7 as well, but it is preferable to develop in Java 8, since code will look simpler.
We start every application by creating a new class that will extend AbstractBlockFactory, where we will define types of vertex id, vertex value, edge value, etc. For our example, VertexID is going to be long, vertex value is going to be double - the page rank of a vertex, and we will have unweighted graph - so it will look like this.
Main function that we need to extend is createBlock - which defines what should be executed. PageRank is an iterative algorithm, where in each step, value of each vertex is updated to be weighted sum of it's neighbors. We can do so by sending our value to all of our neighbors, and after receiving the messages - sum them all up, and update vertex value accordingly. We will use Pieces.sendMessageToNeighbors to do so:
// Types here represent vertex id, vertex value, edge value and message respectively. // For types we do not use - we just leave it unspecified - as we do with vertex id and edge value here Block iter = Pieces.<WritableComparable, DoubleWritable, Writable, DoubleWritable>sendMessageToNeighbors( "PageRankIteration", DoubleWritable.class, // messageSupplier - which message to send to neighbors (vertex) -> new DoubleWritable(vertex.getValue().get() / vertex.getNumEdges()), // messagesConsumer - what to do with received messages (vertex, messages) -> { double sum = 0; for (DoubleWritable value : messages) { sum += value.get(); } vertex.getValue().set(0.15f + 0.85f * sum); });
Now we have a piece that does one PageRank iteration, and we only need to execute it number of iterations we want:
new RepeatBlock(NUM_ITERATIONS.get(conf), iter);
And that is all, we now have a fully functional PageRank application! We can even use DoubleSumMessageCombiner so framework does the sum for us, shown in PageRank.
But most likely - we are going to want to run as many iterations as needed - until application converges. Let's say we want to run it until no vertices update their value by more than EPS=1e-3. We will use RepeatUntilBlock - which provides a way for pieces to signal when repetition should be stopped early through toQuit argument:
ObjectTransfer<Boolean> converged = new ObjectTransfer<>(); Block iter = ...; return new RepeatUntilBlock( NUM_ITERATIONS.get(conf), iter, converged);
ObjectTransfer has two functions void accept(T) and T get(), and allows different Blocks/Pieces to transfer values between them. RepeatUntilBlock will in each iteration call get() on it, and terminate once it returns true. We now only need to modify our iteration, to update converged value, once vertex update is small enough. We can do that by looking after each iteration by how much each vertex value changes - and decide whether to continue iterating, by calculating max of those changes. We are going to do so in two connected steps, and for that there is utility class SendMessageChain, which allows chain of connected logic to be executed, which cannot be done within a single Piece:
Block iter = SendMessageChain.<WritableComparable, DoubleWritable, Writable, DoubleWritable> startSendToNeighbors( "PageRankSend", DoubleSumMessageCombiner.class, // which message to send to neighbors (vertex) -> new DoubleWritable(vertex.getValue().get() / vertex.getNumEdges()) ).endReduce( "PageRankUpdateAndCheckConvergence", // which reduce operation to perform SumReduce.LONG, // how to process combined message, and return value to be reduced (vertex, combinedMessage) -> { double sum = combinedMessage != null ? combinedMessage.get() : 0; double newValue = 0.15f + 0.85f * sum; double change = Math.abs(newValue - vertex.getValue().get()); vertex.getValue().set(newValue); return (change > EPS) ? new LongWritable(1) : new LongWritable(0); }, // what to do with reduced value - number of vertices that changed their value above threshold (changingCount) -> converged.accept(changingCount.get() == 0) );
This now together gives us a full PageRank application, that will stop after converging! You can find the code for it in PageRankWithConvergence.
SendMessageChain is just a wrapper around two or more pieces connected together. We can instead extend our original example by adding a Pieces.reduce after iteration piece, and connecting the two pieces via ObjectTransfer, as can be seen in PageRankWithTransferAndConvergence. You can see more example usage of SendMessageChain in it's test
For common logic - there is a growing library of utilities, that helps you do them easily. Looking at their implementations is also a good way to understand how framework works.
Set of common pieces are provided, allowing you to write simple applications, without writing a single Piece directly:
Pieces.forAllVertices((vertex) -> { code}); Pieces.sendMessageToNeighbors(messageClass, messageSupplier, messagesConsumer) Pieces.sendMessage(messageClass, messageSupplier, targetsSupplier, messagesConsumer) Pieces.reduce(reducer, whatToReduce, whatToDoWithResultOnMaster); Pieces.reduceAndBroadcast(reducer, whatToReduce, whatToDoWithResultOnEachVertex); Pieces.removeVertices((vertex) -> boolean); // and chaining above actions: SendMessageChain.startSend(___).thenSendToNeighbors(___).endReduce(___)
Common reusable pieces:
// cleaning up input graphs: PrepareGraphPieces.removeAsymEdges() PrepareGraphPieces.removeStandAloneVertices() // adjusting graph: PrepareGraphPieces.normalizeEdges() PrepareGraphPieces.makeSymmetricUnweighted() PrepareGraphPieces.makeSymmetricWeighted() // Calculating size of the graph: PrepareGraphPieces.countTotalEdgesPiece PrepareGraphPieces.calcSumEdgesPiece
Working with pieces:
// executing multiple pieces consecutively, in the same stage/superstep: new DelegatePiece<>(piece1, piece2, ...) // executing piece only on subset of vertices: new FilteringPiece<>(filterFunction, piece) new FilteringPiece<>(filterSendFunction, filterReceiveFunction) new FilteringBlock(filterFunction, block)
But even if you have code written without this framework, you can still benefit from many goodnesses of the framework, with minor changes. Additionally there is a simple step-by-step migration path to the framework, if you want more.
First, we have a simple step, with minimal code change - that makes your whole application a single Block, you can then easily combine with other blocks. In order to do so, you only need to change parent classes from Giraph ones (AbstractComputation, MasterCompute, WorkerContext), to their drop-in replacements - (MigrationFullAbstractComputation, MigrationFullMasterCompute and MigrationFullWorkerContext). Those classes are fully compatible, so you will not need to modify any code within them. Then you only need to write your block factory which extends MigrationFullBlockFactory, to specify vertex and edge types, and call createMigrationAppBlock with initial computations. You can then combine multiple applications, or use any library written in the framework, but your application is left as one whole indivisible block.
If you want to benefit from the framework within your application as well, execution needs to be changed to use composition of Blocks. In order to do so easily, you should modify parent classes to (MigrationAbstractComputation, MigrationMasterCompute and MigrationWorkerContext). They don't have any methods that affect computation ordering - and so all you need to do is fix compile errors, by moving that logic within composition of Blocks, within BlockFactory or utility functions. Calling MigrationPiece.createMigrationPiece and passing appropriate computations, gives you an independent piece, that you can then use in the same way as before, but also combine it in any other way with other pieces you have or are written within the library.
This allows you to benefit fully from the framework, but you might still want to cleanup your code, by moving it into Pieces, from the migration classes - but that should be simple to do so at this point - it should be only moving the code around.