1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.examples;
2021import java.io.IOException;
22import java.util.ArrayList;
23import java.util.HashMap;
2425import org.apache.giraph.examples.utils.BrachaTouegDeadlockVertexValue;
26import org.apache.giraph.examples.utils.BrachaTouegDeadlockMessage;
27import org.apache.giraph.conf.LongConfOption;
28import org.apache.giraph.edge.Edge;
29import org.apache.giraph.graph.BasicComputation;
30import org.apache.giraph.graph.Vertex;
31import org.apache.hadoop.io.LongWritable;
32import org.apache.log4j.Logger;
3334/**35 * This code demonstrates the Bracha Toueg deadlock detection algorithm.36 * The Bracha Toueg algorithm is a distributed, asynchronous, centralized37 * algorithm for deadlock detection. The algorithm is executed on a snapshot of38 * a undirected graph which depicts the corresponding wait-for-graph.39 * Consequently the algorithm works on <b>directed graphs</b> but assumes40 * the possibility to communicate in both ways on all the edges.41 * This is an adaptation of the standard algorithm for Giraph/Pregel system.42 * Since the value of the vertex is dumped during checkpointing, the algorithm43 * keeps all the state of the vertex in the value.44 */45 @Algorithm(
46 name = "Bracha Toueg deadlock detection"47 )
48publicclassBrachaTouegDeadlockComputation49extends BasicComputation<LongWritable, BrachaTouegDeadlockVertexValue,
50 LongWritable, BrachaTouegDeadlockMessage> {
5152/** The deadlock detection initiator id */53publicstaticfinalLongConfOption BRACHA_TOUEG_DL_INITIATOR_ID =
54newLongConfOption("BrachaTouegDeadlockVertex.initiatorId", 1,
55"The deadlock detection initiator id");
5657/** Class logger */58privatestaticfinal Logger LOG =
59 Logger.getLogger(BrachaTouegDeadlockComputation.class);
6061 @Override
62publicvoid compute(
63 Vertex<LongWritable, BrachaTouegDeadlockVertexValue, LongWritable> vertex,
64 Iterable<BrachaTouegDeadlockMessage> messages)
65throws IOException {
6667BrachaTouegDeadlockVertexValue value;
68long superstep = getSuperstep();
6970if (superstep == 0) {
71/* Phase to exchange the sender vertex IDs on the incoming edges.72 It also prepares the internal state of the vertex */73 initAlgorithm(vertex);
7475/* After each vertex collects the messages sent by the parents, the76 initiator node starts the algorithm by means of a NOTIFY message */77 } elseif (superstep == 1) {
78/* get the value/state of the vertex */79 value = vertex.getValue();
8081if (LOG.isDebugEnabled()) {
82 LOG.debug("Vertex ID " + vertex.getId() + " status is:");
83 LOG.debug("\tpending requests? " + value.hasPendingRequests());
84 LOG.debug("\tis free? " + value.isFree());
85 LOG.debug("\tis notified? " + value.isNotified());
86 }
8788/* collect all the incoming senders IDs */89for (BrachaTouegDeadlockMessage message : messages) {
90 value.addParent(Long.valueOf(message.getSenderId()));
91 }
9293/* debugging purpose: print all the parents of the vertex */94if (LOG.isDebugEnabled()) {
95 logParents(vertex);
96if (isInitiator(vertex)) {
97 LOG.debug("Vertex ID " + vertex.getId() + " start the algorithm.");
98 }
99 }
100101if (isInitiator(vertex)) {
102/* the initiator starts the algorithm */103 notifyVertices(vertex);
104 } else {
105/* The Pregel model prescribes that each node starts in the "active"106 state. In some cases the Bracha-Toueg Algorithm leaves some nodes107 untouched causing the algorithm never to end. To avoid this108 situation at algorithm initialization all the nodes except the109 initiator (which by default is active) will vote to halt so that110 the unused vertices will not produce an infinite computation. Later,111 only when required the vote will be triggered. */112 vertex.voteToHalt();
113return;
114 }
115116/* At this point the actual deadlock detection algorithm is started. */117 } else {
118 Long ackSenderId;
119120 value = vertex.getValue();
121122/* process all the incoming messages and act based on the type of123 message received */124for (BrachaTouegDeadlockMessage message : messages) {
125long type = message.getType();
126127if (LOG.isDebugEnabled()) {
128 LOG.debug("Vertex ID " + vertex.getId() + " received: " + message);
129 }
130131if (type == BrachaTouegDeadlockMessage.NOTIFY) {
132 handleNotifyMessage(vertex, message);
133 } elseif (type == BrachaTouegDeadlockMessage.GRANT) {
134 handleGrantMessage(vertex, message);
135 } elseif (type == BrachaTouegDeadlockMessage.DONE ||
136 type == BrachaTouegDeadlockMessage.ACK) {
137/* Both ACK and DONE Messages are handled in the same way. The138 action take afterwards is independent on these types of139 messages. */140 value.receivedMessage(message.getSenderId(), message.getType());
141 }
142 }
143144 ackSenderId = value.getIdWithInHoldAck();
145if (value.isFree() &&
146 !value.isWaitingForMessage(BrachaTouegDeadlockMessage.ACK) &&
147 !ackSenderId.equals(BrachaTouegDeadlockVertexValue.INVALID_ID)) {
148149 sendAckMessage(ackSenderId, vertex);
150 value.setIdWithInHoldAck(BrachaTouegDeadlockVertexValue.INVALID_ID);
151 }
152153/* if all the ACK and DONE messages have been received, the vertex can154 send the pending DONE message to the parent and vote to halt */155if (value.isNotified() &&
156 !value.isWaitingForMessage(BrachaTouegDeadlockMessage.ACK) &&
157 !value.isWaitingForMessage(BrachaTouegDeadlockMessage.DONE)) {
158159 Long senderId = value.getIdWithInHoldDone();
160161if (LOG.isDebugEnabled()) {
162 LOG.debug("Vertex ID " + vertex.getId() +
163" sent the last DONE message.");
164 LOG.debug("Vertex ID " + vertex.getId() + " voted to halt.");
165 }
166167/* the initiator vertex does not need to send the DONE message since168 it is the starting point of the algorithm */169if (!isInitiator(vertex) &&
170 !senderId.equals(BrachaTouegDeadlockVertexValue.INVALID_ID)) {
171 sendMessage(vertex.getId().get(), senderId,
172 BrachaTouegDeadlockMessage.DONE);
173 value.setIdWithInHoldDone(BrachaTouegDeadlockVertexValue.INVALID_ID);
174 }
175176 vertex.voteToHalt();
177 }
178 }
179 }
180181/**182 * check whether the vertex is the initiator of the algorithm183 *184 * @param vertex Vertex185 * @return True if the vertex is the initiator186 */187privateboolean isInitiator(Vertex<LongWritable, ?, ?> vertex) {
188return vertex.getId().get() == BRACHA_TOUEG_DL_INITIATOR_ID.get(getConf());
189 }
190191/**192 * Initializes the algorithm by sending the control message for ID exchange193 * and preparing the value of the vertex.194 *195 * @param vertex vertex from which the control message is sent196 */197privatevoid initAlgorithm(Vertex<LongWritable,
198 BrachaTouegDeadlockVertexValue, LongWritable> vertex) {
199200BrachaTouegDeadlockVertexValue value;
201 HashMap<Long, ArrayList<Long>> requests =
202new HashMap<Long, ArrayList<Long>>();
203long vertexId = vertex.getId().get();
204205/* prepare the pending requests tracking data structure */206for (Edge<LongWritable, LongWritable> edge : vertex.getEdges()) {
207 ArrayList<Long> targets;
208 Long tag = Long.valueOf(edge.getValue().get());
209 Long target = Long.valueOf(edge.getTargetVertexId().get());
210211if (requests.containsKey(tag)) {
212 targets = requests.get(tag);
213 } else {
214 targets = new ArrayList<Long>();
215 }
216217 targets.add(target);
218 requests.put(tag, targets);
219 }
220221/* save in the value the number of requests that the node needs to get222 satisfied to consider itself free */223 value = newBrachaTouegDeadlockVertexValue(requests);
224 vertex.setValue(value);
225226/* send to all the outgoint edges the id of the current vertex */227for (Edge<LongWritable, LongWritable> edge : vertex.getEdges()) {
228 sendMessage(vertexId, edge.getTargetVertexId().get(),
229 BrachaTouegDeadlockMessage.CTRL_IN_EDGE);
230 }
231 }
232233/**234 * Send message wrapper for the Bracha Toueg algorithm specific for ACK235 * messages.236 *237 * @param receiver recipient of the message238 * @param vertex vertex sending the message239 */240privatevoid sendAckMessage(long receiver, Vertex<LongWritable,
241 BrachaTouegDeadlockVertexValue, LongWritable> vertex) {
242243this.sendMessage(Long.valueOf(vertex.getId().get()),
244 receiver, BrachaTouegDeadlockMessage.ACK);
245246if (!vertex.getValue().isNotified()) {
247 vertex.voteToHalt();
248 }
249 }
250251/**252 * Send message wrapper for the Bracha Toueg algorithm253 *254 * @param sender sender of the message255 * @param receiver recipient of the message256 * @param messageType type of message to be sent257 */258privatevoid sendMessage(long sender, long receiver, long messageType) {
259BrachaTouegDeadlockMessage message;
260261 message = newBrachaTouegDeadlockMessage(sender, messageType);
262 sendMessage(new LongWritable(receiver), message);
263if (LOG.isDebugEnabled()) {
264 LOG.debug("sent message " + message + " from " + sender +
265" to " + receiver);
266 }
267 }
268269/**270 * this is a debugging function to verify that all parents have been271 * detected.272 *273 * @param vertex vertex which collected its parents274 */275privatevoid logParents(Vertex<LongWritable,
276 BrachaTouegDeadlockVertexValue,
277 LongWritable> vertex) {
278 ArrayList<Long> parents = vertex.getValue().getParents();
279int sz = parents.size();
280 StringBuffer buffer = new StringBuffer();
281282 buffer.append("Vertex " + vertex.getId() + " parents:");
283for (int i = 0; i < sz; ++i) {
284 buffer.append(" - " + parents.get(i));
285 }
286 LOG.debug(buffer.toString());
287 }
288289/**290 * This method resembles the notify_u procedure of the Bracha-Toueg algorithm.291 * It proceeds by sending a NOTIFY message via its outgoing edges and waits292 * for a DONE message from each destination node. If no pending requests need293 * to be awaited, the grant_u procedure is called. The latter case is294 * encounterd when the "wave" of NOTIFY messages reaches the edge of the295 * graph.296 *297 * @param vertex the vertex on which the notify method i called298 */299privatevoid notifyVertices(
300 Vertex<LongWritable, BrachaTouegDeadlockVertexValue, LongWritable> vertex) {
301302BrachaTouegDeadlockVertexValue value = vertex.getValue();
303long vertexId = vertex.getId().get();
304boolean hasOutEdges = false;
305306 value.setNotified();
307308for (Edge<LongWritable, LongWritable> edge : vertex.getEdges()) {
309 hasOutEdges = true;
310 sendMessage(vertexId,
311 edge.getTargetVertexId().get(),
312 BrachaTouegDeadlockMessage.NOTIFY);
313314/* the node will wait for a DONE message from each notified vertex */315 value.waitForMessage(Long.valueOf(edge.getTargetVertexId().get()),
316 Long.valueOf(BrachaTouegDeadlockMessage.DONE));
317 }
318319/* if no requests are pending, the node has to start GRANTing to all320 incoming edges */321if (!hasOutEdges && isInitiator(vertex)) {
322 value.setFree();
323 } elseif (!value.hasPendingRequests() && !value.isFree()) {
324 grantVertices(vertex);
325 }
326 }
327328/**329 * @param vertex vertex on which the grant method is called330 */331privatevoid grantVertices(
332 Vertex<LongWritable, BrachaTouegDeadlockVertexValue, LongWritable> vertex) {
333334BrachaTouegDeadlockVertexValue value = vertex.getValue();
335 ArrayList<Long> parents = value.getParents();
336long vertexId = vertex.getId().get();
337338 value.setFree();
339340/* grant all the parents with resource access */341for (Long parent : parents) {
342 sendMessage(vertexId, parent,
343 BrachaTouegDeadlockMessage.GRANT);
344345/* the node will wait for a ACK message for each GRANTed vertex */346 value.waitForMessage(parent,
347 Long.valueOf(BrachaTouegDeadlockMessage.ACK));
348 }
349 }
350351/**352 * Function to handle the cases when a NOTIFY message is received.353 * If the message received is of type NOTIFY we distinguish two cases:354 * 1. The node was not yet notified: in this case it forwards the355 * NOTIFY message to its outgoing messages. In this phase the356 * {@link BrachaTougeDeadlockComputation#notifyVertices} function is357 * called.358 * NB: in this case there is the need to keep track of the sender359 * of the message since later a DONE must be sent back.360 * 2. The node was notified: in this case the node will immediately361 * reply with a DONE message.362 *363 * @param vertex vertex that received the DONE message364 * @param message message received by the vertex365 */366privatevoid handleNotifyMessage(
367 Vertex<LongWritable, BrachaTouegDeadlockVertexValue, LongWritable> vertex,
368BrachaTouegDeadlockMessage message) {
369370BrachaTouegDeadlockVertexValue value = vertex.getValue();
371372if (!value.isNotified()) {
373 notifyVertices(vertex);
374 value.setIdWithInHoldDone(message.getSenderId());
375 } else {
376 sendMessage(vertex.getId().get(), message.getSenderId(),
377 BrachaTouegDeadlockMessage.DONE);
378 }
379 }
380381/**382 * Function to handle the cases when a GRANT message is received.383 * When a GRANT message is received the number of requests is384 * decremented. In this case we must distinguish three cases:385 * 1. The number of requests needed reaches zero: at this stage a386 * round of {@link BrachaTougeDeadlockComputation#grantVertices} is387 * started to forward the resource granting mechanism.388 * NB: the sender id of the node must be kept to handle the delivery of389 * the ACK to the sender at the end of the granting procedure.390 * 2. The node already started go grant since it is free: in this case an ACK391 * message is immediately sent back to the sender.392 * 3. The number of requests is bigger than zero: in this case an ACK393 * is sent back to the sender.394 *395 * @param vertex vertex that received the ACK message396 * @param message message received by the vertex397 */398privatevoid handleGrantMessage(
399 Vertex<LongWritable, BrachaTouegDeadlockVertexValue, LongWritable> vertex,
400BrachaTouegDeadlockMessage message) {
401402BrachaTouegDeadlockVertexValue value = vertex.getValue();
403 Long senderId = Long.valueOf(message.getSenderId());
404 LongWritable wId = new LongWritable(senderId);
405 LongWritable tag = vertex.getEdgeValue(wId);
406407 value.removeRequest(tag, wId);
408409if (value.isFree() || value.getNumOfRequests(tag) > 0) {
410 sendAckMessage(senderId, vertex);
411return;
412 } else {
413 grantVertices(vertex);
414 value.setIdWithInHoldAck(senderId);
415 }
416 }
417 }