This project has retired. For details please refer to its Attic page.
SccComputation xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.examples.scc;
20  
21  import static org.apache.giraph.examples.scc.SccPhaseMasterCompute.PHASE;
22  import static org.apache.giraph.examples.scc.SccPhaseMasterCompute.NEW_MAXIMUM;
23  import static org.apache.giraph.examples.scc.SccPhaseMasterCompute.CONVERGED;
24  
25  import java.io.IOException;
26  
27  import org.apache.giraph.Algorithm;
28  import org.apache.giraph.examples.scc.SccPhaseMasterCompute.Phases;
29  import org.apache.giraph.graph.BasicComputation;
30  import org.apache.giraph.graph.Vertex;
31  import org.apache.hadoop.io.BooleanWritable;
32  import org.apache.hadoop.io.IntWritable;
33  import org.apache.hadoop.io.LongWritable;
34  import org.apache.hadoop.io.NullWritable;
35  
36  /**
37   * Finds strongly connected components of the graph.
38   */
39  @Algorithm(name = "Strongly Connected Components",
40             description = "Finds strongly connected components of the graph")
41  public class SccComputation extends
42      BasicComputation<LongWritable, SccVertexValue, NullWritable, LongWritable> {
43  
44    /**
45     * Current phase of the computation as defined in SccPhaseMasterCompute
46     */
47    private Phases currPhase;
48  
49    /**
50     * Reusable object to encapsulate message value, in order to avoid
51     * creating a new instance every time a message is sent.
52     */
53    private LongWritable messageValue = new LongWritable();
54  
55    /**
56     * Reusable object to encapsulate a parent vertex id.
57     */
58    private LongWritable parentId = new LongWritable();
59  
60    @Override
61    public void preSuperstep() {
62      IntWritable phaseInt = getAggregatedValue(PHASE);
63      currPhase = SccPhaseMasterCompute.getPhase(phaseInt);
64    }
65  
66    @Override
67    public void compute(
68        Vertex<LongWritable, SccVertexValue, NullWritable> vertex,
69        Iterable<LongWritable> messages) throws IOException {
70  
71      SccVertexValue vertexValue = vertex.getValue();
72  
73      if (!vertexValue.isActive()) {
74        vertex.voteToHalt();
75        return;
76      }
77  
78      switch (currPhase) {
79      case TRANSPOSE :
80        vertexValue.clearParents();
81        sendMessageToAllEdges(vertex, vertex.getId());
82        break;
83      case TRIMMING :
84        trim(vertex, messages);
85        break;
86      case FORWARD_TRAVERSAL :
87        forwardTraversal(vertex, messages);
88        break;
89      case BACKWARD_TRAVERSAL_START :
90        backwardTraversalStart(vertex);
91        break;
92      case BACKWARD_TRAVERSAL_REST :
93        backwardTraversalRest(vertex, messages);
94        break;
95      default :
96        break;
97      }
98  
99    }
100 
101   /**
102    * Creates list of parents based on the received ids and halts the vertices
103    * that don't have any parent or outgoing edge, hence, they can't be
104    * part of an SCC.
105    * @param vertex Current vertex.
106    * @param messages Received ids from the Transpose phase.
107    */
108   private void trim(Vertex<LongWritable, SccVertexValue, NullWritable> vertex,
109                     Iterable<LongWritable> messages) {
110     SccVertexValue vertexValue = vertex.getValue();
111     // Keep the ids of the parent nodes to allow for backwards traversal
112     for (LongWritable parent : messages) {
113       vertexValue.addParent(parent.get());
114     }
115     // If this node doesn't have any parents or outgoing edges,
116     // it can't be part of an SCC
117     vertexValue.set(vertex.getId().get());
118     if (vertex.getNumEdges() == 0 || vertexValue.getParents() == null) {
119       vertexValue.deactivate();
120     } else {
121       messageValue.set(vertexValue.get());
122       sendMessageToAllEdges(vertex, messageValue);
123     }
124   }
125 
126   /**
127    * Traverse the graph through outgoing edges and keep the maximum vertex
128    * value.
129    * If a new maximum value is found, propagate it until convergence.
130    * @param vertex Current vertex.
131    * @param messages Received values from neighbor vertices.
132    */
133   private void forwardTraversal(
134       Vertex<LongWritable, SccVertexValue, NullWritable> vertex,
135       Iterable<LongWritable> messages) {
136     SccVertexValue vertexValue = vertex.getValue();
137     boolean changed = setMaxValue(vertexValue, messages);
138     if (changed) {
139       messageValue.set(vertexValue.get());
140       sendMessageToAllEdges(vertex, messageValue);
141       aggregate(NEW_MAXIMUM, new BooleanWritable(true));
142     }
143   }
144 
145   /**
146    * Traverse the transposed graph and keep the maximum vertex value.
147    * @param vertex Current vertex.
148    */
149   private void backwardTraversalStart(
150       Vertex<LongWritable, SccVertexValue, NullWritable> vertex) {
151     SccVertexValue vertexValue = vertex.getValue();
152     if (vertexValue.get() == vertex.getId().get()) {
153       messageValue.set(vertexValue.get());
154       sendMessageToAllParents(vertex, messageValue);
155     }
156   }
157 
158   /**
159    * Traverse the transposed graph and keep the maximum vertex value.
160    * @param vertex Current vertex.
161    * @param messages Received values from children vertices.
162    */
163   private void backwardTraversalRest(
164       Vertex<LongWritable, SccVertexValue, NullWritable> vertex,
165       Iterable<LongWritable> messages) {
166     SccVertexValue vertexValue = vertex.getValue();
167     for (LongWritable m : messages) {
168       if (vertexValue.get() == m.get()) {
169         sendMessageToAllParents(vertex, m);
170         aggregate(CONVERGED, new BooleanWritable(true));
171         vertexValue.deactivate();
172         vertex.voteToHalt();
173         break;
174       }
175     }
176   }
177 
178   /**
179    * Compares the messages values with the current vertex value and finds
180    * the maximum.
181    * If the maximum value is different from the vertex value, makes it the
182    * new vertex value and returns true, otherwise, returns false.
183    * @param vertexValue Current vertex value.
184    * @param messages Messages containing neighbors' vertex values.
185    * @return True if a new maximum was found, otherwise, returns false.
186    */
187   private boolean setMaxValue(SccVertexValue vertexValue,
188                               Iterable<LongWritable> messages) {
189     boolean changed = false;
190     for (LongWritable m : messages) {
191       if (vertexValue.get() < m.get()) {
192         vertexValue.set(m.get());
193         changed = true;
194       }
195     }
196     return changed;
197   }
198 
199 
200   /**
201    * Send message to all parents.
202    * @param vertex Current vertex.
203    * @param message Message to be sent.
204    */
205   private void sendMessageToAllParents(
206       Vertex<LongWritable, SccVertexValue, NullWritable> vertex,
207       LongWritable message) {
208     for (Long id : vertex.getValue().getParents()) {
209       parentId.set(id);
210       sendMessage(parentId, message);
211     }
212   }
213 }