1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.examples.scc;
2021importstatic org.apache.giraph.examples.scc.SccPhaseMasterCompute.PHASE;
22importstatic org.apache.giraph.examples.scc.SccPhaseMasterCompute.NEW_MAXIMUM;
23importstatic org.apache.giraph.examples.scc.SccPhaseMasterCompute.CONVERGED;
2425import java.io.IOException;
2627import org.apache.giraph.Algorithm;
28import org.apache.giraph.examples.scc.SccPhaseMasterCompute.Phases;
29import org.apache.giraph.graph.BasicComputation;
30import org.apache.giraph.graph.Vertex;
31import org.apache.hadoop.io.BooleanWritable;
32import org.apache.hadoop.io.IntWritable;
33import org.apache.hadoop.io.LongWritable;
34import org.apache.hadoop.io.NullWritable;
3536/**37 * Finds strongly connected components of the graph.38 */39 @Algorithm(name = "Strongly Connected Components",
40 description = "Finds strongly connected components of the graph")
41publicclassSccComputationextends42 BasicComputation<LongWritable, SccVertexValue, NullWritable, LongWritable> {
4344/**45 * Current phase of the computation as defined in SccPhaseMasterCompute46 */47privatePhases currPhase;
4849/**50 * Reusable object to encapsulate message value, in order to avoid51 * creating a new instance every time a message is sent.52 */53private LongWritable messageValue = new LongWritable();
5455/**56 * Reusable object to encapsulate a parent vertex id.57 */58private LongWritable parentId = new LongWritable();
5960 @Override
61publicvoid preSuperstep() {
62 IntWritable phaseInt = getAggregatedValue(PHASE);
63 currPhase = SccPhaseMasterCompute.getPhase(phaseInt);
64 }
6566 @Override
67publicvoid compute(
68 Vertex<LongWritable, SccVertexValue, NullWritable> vertex,
69 Iterable<LongWritable> messages) throws IOException {
7071SccVertexValue vertexValue = vertex.getValue();
7273if (!vertexValue.isActive()) {
74 vertex.voteToHalt();
75return;
76 }
7778switch (currPhase) {
79case TRANSPOSE :
80 vertexValue.clearParents();
81 sendMessageToAllEdges(vertex, vertex.getId());
82break;
83case TRIMMING :
84 trim(vertex, messages);
85break;
86case FORWARD_TRAVERSAL :
87 forwardTraversal(vertex, messages);
88break;
89case BACKWARD_TRAVERSAL_START :
90 backwardTraversalStart(vertex);
91break;
92case BACKWARD_TRAVERSAL_REST :
93 backwardTraversalRest(vertex, messages);
94break;
95default :
96break;
97 }
9899 }
100101/**102 * Creates list of parents based on the received ids and halts the vertices103 * that don't have any parent or outgoing edge, hence, they can't be104 * part of an SCC.105 * @param vertex Current vertex.106 * @param messages Received ids from the Transpose phase.107 */108privatevoid trim(Vertex<LongWritable, SccVertexValue, NullWritable> vertex,
109 Iterable<LongWritable> messages) {
110SccVertexValue vertexValue = vertex.getValue();
111// Keep the ids of the parent nodes to allow for backwards traversal112for (LongWritable parent : messages) {
113 vertexValue.addParent(parent.get());
114 }
115// If this node doesn't have any parents or outgoing edges,116// it can't be part of an SCC117 vertexValue.set(vertex.getId().get());
118if (vertex.getNumEdges() == 0 || vertexValue.getParents() == null) {
119 vertexValue.deactivate();
120 } else {
121 messageValue.set(vertexValue.get());
122 sendMessageToAllEdges(vertex, messageValue);
123 }
124 }
125126/**127 * Traverse the graph through outgoing edges and keep the maximum vertex128 * value.129 * If a new maximum value is found, propagate it until convergence.130 * @param vertex Current vertex.131 * @param messages Received values from neighbor vertices.132 */133privatevoid forwardTraversal(
134 Vertex<LongWritable, SccVertexValue, NullWritable> vertex,
135 Iterable<LongWritable> messages) {
136SccVertexValue vertexValue = vertex.getValue();
137boolean changed = setMaxValue(vertexValue, messages);
138if (changed) {
139 messageValue.set(vertexValue.get());
140 sendMessageToAllEdges(vertex, messageValue);
141 aggregate(NEW_MAXIMUM, new BooleanWritable(true));
142 }
143 }
144145/**146 * Traverse the transposed graph and keep the maximum vertex value.147 * @param vertex Current vertex.148 */149privatevoid backwardTraversalStart(
150 Vertex<LongWritable, SccVertexValue, NullWritable> vertex) {
151SccVertexValue vertexValue = vertex.getValue();
152if (vertexValue.get() == vertex.getId().get()) {
153 messageValue.set(vertexValue.get());
154 sendMessageToAllParents(vertex, messageValue);
155 }
156 }
157158/**159 * Traverse the transposed graph and keep the maximum vertex value.160 * @param vertex Current vertex.161 * @param messages Received values from children vertices.162 */163privatevoid backwardTraversalRest(
164 Vertex<LongWritable, SccVertexValue, NullWritable> vertex,
165 Iterable<LongWritable> messages) {
166SccVertexValue vertexValue = vertex.getValue();
167for (LongWritable m : messages) {
168if (vertexValue.get() == m.get()) {
169 sendMessageToAllParents(vertex, m);
170 aggregate(CONVERGED, new BooleanWritable(true));
171 vertexValue.deactivate();
172 vertex.voteToHalt();
173break;
174 }
175 }
176 }
177178/**179 * Compares the messages values with the current vertex value and finds180 * the maximum.181 * If the maximum value is different from the vertex value, makes it the182 * new vertex value and returns true, otherwise, returns false.183 * @param vertexValue Current vertex value.184 * @param messages Messages containing neighbors' vertex values.185 * @return True if a new maximum was found, otherwise, returns false.186 */187privateboolean setMaxValue(SccVertexValue vertexValue,
188 Iterable<LongWritable> messages) {
189boolean changed = false;
190for (LongWritable m : messages) {
191if (vertexValue.get() < m.get()) {
192 vertexValue.set(m.get());
193 changed = true;
194 }
195 }
196return changed;
197 }
198199200/**201 * Send message to all parents.202 * @param vertex Current vertex.203 * @param message Message to be sent.204 */205privatevoid sendMessageToAllParents(
206 Vertex<LongWritable, SccVertexValue, NullWritable> vertex,
207 LongWritable message) {
208for (Long id : vertex.getValue().getParents()) {
209 parentId.set(id);
210 sendMessage(parentId, message);
211 }
212 }
213 }