This project has retired. For details please refer to its Attic page.
BrachaTouegDeadlockComputation xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.examples;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.HashMap;
24  
25  import org.apache.giraph.examples.utils.BrachaTouegDeadlockVertexValue;
26  import org.apache.giraph.examples.utils.BrachaTouegDeadlockMessage;
27  import org.apache.giraph.conf.LongConfOption;
28  import org.apache.giraph.edge.Edge;
29  import org.apache.giraph.graph.BasicComputation;
30  import org.apache.giraph.graph.Vertex;
31  import org.apache.hadoop.io.LongWritable;
32  import org.apache.log4j.Logger;
33  
34  /**
35   * This code demonstrates the Bracha Toueg deadlock detection algorithm.
36   * The Bracha Toueg algorithm is a distributed, asynchronous, centralized
37   * algorithm for deadlock detection. The algorithm is executed on a snapshot of
38   * a undirected graph which depicts the corresponding wait-for-graph.
39   * Consequently the algorithm works on <b>directed graphs</b> but assumes
40   * the possibility to communicate in both ways on all the edges.
41   * This is an adaptation of the standard algorithm for Giraph/Pregel system.
42   * Since the value of the vertex is dumped during checkpointing, the algorithm
43   * keeps all the state of the vertex in the value.
44   */
45  @Algorithm(
46      name = "Bracha Toueg deadlock detection"
47  )
48  public class BrachaTouegDeadlockComputation
49    extends BasicComputation<LongWritable, BrachaTouegDeadlockVertexValue,
50      LongWritable, BrachaTouegDeadlockMessage> {
51  
52    /** The deadlock detection initiator id */
53    public static final LongConfOption BRACHA_TOUEG_DL_INITIATOR_ID =
54      new LongConfOption("BrachaTouegDeadlockVertex.initiatorId", 1,
55          "The deadlock detection initiator id");
56  
57    /** Class logger */
58    private static final Logger LOG =
59      Logger.getLogger(BrachaTouegDeadlockComputation.class);
60  
61    @Override
62    public void compute(
63        Vertex<LongWritable, BrachaTouegDeadlockVertexValue, LongWritable> vertex,
64        Iterable<BrachaTouegDeadlockMessage> messages)
65      throws IOException {
66  
67      BrachaTouegDeadlockVertexValue value;
68      long superstep = getSuperstep();
69  
70      if (superstep == 0) {
71        /* Phase to exchange the sender vertex IDs on the incoming edges.
72           It also prepares the internal state of the vertex */
73        initAlgorithm(vertex);
74  
75      /* After each vertex collects the messages sent by the parents, the
76         initiator node starts the algorithm by means of a NOTIFY message */
77      } else if (superstep == 1) {
78        /* get the value/state of the vertex */
79        value = vertex.getValue();
80  
81        if (LOG.isDebugEnabled()) {
82          LOG.debug("Vertex ID " + vertex.getId() + " status is:");
83          LOG.debug("\tpending requests? " + value.hasPendingRequests());
84          LOG.debug("\tis free? " + value.isFree());
85          LOG.debug("\tis notified? " + value.isNotified());
86        }
87  
88        /* collect all the incoming senders IDs */
89        for (BrachaTouegDeadlockMessage message : messages) {
90          value.addParent(Long.valueOf(message.getSenderId()));
91        }
92  
93        /* debugging purpose: print all the parents of the vertex */
94        if (LOG.isDebugEnabled()) {
95          logParents(vertex);
96          if (isInitiator(vertex)) {
97            LOG.debug("Vertex ID " + vertex.getId() + " start the algorithm.");
98          }
99        }
100 
101       if (isInitiator(vertex)) {
102         /* the initiator starts the algorithm */
103         notifyVertices(vertex);
104       } else {
105         /* The Pregel model prescribes that each node starts in the "active"
106            state. In some cases the Bracha-Toueg Algorithm leaves some nodes
107            untouched causing the algorithm never to end. To avoid this
108            situation at algorithm initialization all the nodes except the
109            initiator (which by default is active) will vote to halt so that
110            the unused vertices will not produce an infinite computation. Later,
111            only when required the vote will be triggered. */
112         vertex.voteToHalt();
113         return;
114       }
115 
116     /* At this point the actual deadlock detection algorithm is started. */
117     } else {
118       Long ackSenderId;
119 
120       value = vertex.getValue();
121 
122       /* process all the incoming messages and act based on the type of
123          message received */
124       for (BrachaTouegDeadlockMessage message : messages) {
125         long type = message.getType();
126 
127         if (LOG.isDebugEnabled()) {
128           LOG.debug("Vertex ID " + vertex.getId() + " received: " + message);
129         }
130 
131         if (type == BrachaTouegDeadlockMessage.NOTIFY) {
132           handleNotifyMessage(vertex, message);
133         } else if (type == BrachaTouegDeadlockMessage.GRANT) {
134           handleGrantMessage(vertex, message);
135         } else if (type == BrachaTouegDeadlockMessage.DONE ||
136                    type == BrachaTouegDeadlockMessage.ACK) {
137           /* Both ACK and DONE Messages are handled in the same way. The
138              action take afterwards is independent on these types of
139              messages.  */
140           value.receivedMessage(message.getSenderId(), message.getType());
141         }
142       }
143 
144       ackSenderId = value.getIdWithInHoldAck();
145       if (value.isFree() &&
146           !value.isWaitingForMessage(BrachaTouegDeadlockMessage.ACK) &&
147           !ackSenderId.equals(BrachaTouegDeadlockVertexValue.INVALID_ID)) {
148 
149         sendAckMessage(ackSenderId, vertex);
150         value.setIdWithInHoldAck(BrachaTouegDeadlockVertexValue.INVALID_ID);
151       }
152 
153       /* if all the ACK and DONE messages have been received, the vertex can
154          send the pending DONE message to the parent and vote to halt */
155       if (value.isNotified() &&
156           !value.isWaitingForMessage(BrachaTouegDeadlockMessage.ACK) &&
157           !value.isWaitingForMessage(BrachaTouegDeadlockMessage.DONE)) {
158 
159         Long senderId = value.getIdWithInHoldDone();
160 
161         if (LOG.isDebugEnabled()) {
162           LOG.debug("Vertex ID " + vertex.getId() +
163                     " sent the last DONE message.");
164           LOG.debug("Vertex ID " + vertex.getId() + " voted to halt.");
165         }
166 
167         /* the initiator vertex does not need to send the DONE message since
168            it is the starting point of the algorithm */
169         if (!isInitiator(vertex) &&
170             !senderId.equals(BrachaTouegDeadlockVertexValue.INVALID_ID)) {
171           sendMessage(vertex.getId().get(), senderId,
172                       BrachaTouegDeadlockMessage.DONE);
173           value.setIdWithInHoldDone(BrachaTouegDeadlockVertexValue.INVALID_ID);
174         }
175 
176         vertex.voteToHalt();
177       }
178     }
179   }
180 
181   /**
182    * check whether the vertex is the initiator of the algorithm
183    *
184    * @param vertex Vertex
185    * @return True if the vertex is the initiator
186    */
187   private boolean isInitiator(Vertex<LongWritable, ?, ?> vertex) {
188     return vertex.getId().get() == BRACHA_TOUEG_DL_INITIATOR_ID.get(getConf());
189   }
190 
191   /**
192    * Initializes the algorithm by sending the control message for ID exchange
193    * and preparing the value of the vertex.
194    *
195    * @param  vertex  vertex from which the control message is sent
196    */
197   private void initAlgorithm(Vertex<LongWritable,
198     BrachaTouegDeadlockVertexValue, LongWritable> vertex) {
199 
200     BrachaTouegDeadlockVertexValue value;
201     HashMap<Long, ArrayList<Long>> requests =
202       new HashMap<Long, ArrayList<Long>>();
203     long vertexId = vertex.getId().get();
204 
205     /* prepare the pending requests tracking data structure */
206     for (Edge<LongWritable, LongWritable> edge : vertex.getEdges()) {
207       ArrayList<Long> targets;
208       Long tag = Long.valueOf(edge.getValue().get());
209       Long target = Long.valueOf(edge.getTargetVertexId().get());
210 
211       if (requests.containsKey(tag)) {
212         targets = requests.get(tag);
213       } else {
214         targets = new ArrayList<Long>();
215       }
216 
217       targets.add(target);
218       requests.put(tag, targets);
219     }
220 
221     /* save in the value the number of requests that the node needs to get
222        satisfied to consider itself free */
223     value = new BrachaTouegDeadlockVertexValue(requests);
224     vertex.setValue(value);
225 
226     /* send to all the outgoint edges the id of the current vertex */
227     for (Edge<LongWritable, LongWritable> edge : vertex.getEdges()) {
228       sendMessage(vertexId, edge.getTargetVertexId().get(),
229                   BrachaTouegDeadlockMessage.CTRL_IN_EDGE);
230     }
231   }
232 
233   /**
234    * Send message wrapper for the Bracha Toueg algorithm specific for ACK
235    * messages.
236    *
237    * @param receiver      recipient of the message
238    * @param vertex        vertex sending the message
239    */
240   private void sendAckMessage(long receiver, Vertex<LongWritable,
241       BrachaTouegDeadlockVertexValue, LongWritable> vertex) {
242 
243     this.sendMessage(Long.valueOf(vertex.getId().get()),
244                      receiver, BrachaTouegDeadlockMessage.ACK);
245 
246     if (!vertex.getValue().isNotified()) {
247       vertex.voteToHalt();
248     }
249   }
250 
251   /**
252    * Send message wrapper for the Bracha Toueg algorithm
253    *
254    * @param sender        sender of the message
255    * @param receiver      recipient of the message
256    * @param messageType   type of message to be sent
257    */
258   private void sendMessage(long sender, long receiver, long messageType) {
259     BrachaTouegDeadlockMessage  message;
260 
261     message = new BrachaTouegDeadlockMessage(sender, messageType);
262     sendMessage(new LongWritable(receiver), message);
263     if (LOG.isDebugEnabled()) {
264       LOG.debug("sent message " + message + " from " + sender +
265                 " to " + receiver);
266     }
267   }
268 
269   /**
270    * this is a debugging function to verify that all parents have been
271    * detected.
272    *
273    * @param vertex    vertex which collected its parents
274    */
275   private void logParents(Vertex<LongWritable,
276                                  BrachaTouegDeadlockVertexValue,
277                                  LongWritable> vertex) {
278     ArrayList<Long> parents = vertex.getValue().getParents();
279     int sz = parents.size();
280     StringBuffer buffer = new StringBuffer();
281 
282     buffer.append("Vertex " + vertex.getId() + " parents:");
283     for (int i = 0; i < sz; ++i) {
284       buffer.append(" - " + parents.get(i));
285     }
286     LOG.debug(buffer.toString());
287   }
288 
289   /**
290    * This method resembles the notify_u procedure of the Bracha-Toueg algorithm.
291    * It proceeds by sending a NOTIFY message via its outgoing edges and waits
292    * for a DONE message from each destination node. If no pending requests need
293    * to be awaited, the grant_u procedure is called. The latter case is
294    * encounterd when the "wave" of NOTIFY messages reaches the edge of the
295    * graph.
296    *
297    * @param vertex  the vertex on which the notify method i called
298    */
299   private void notifyVertices(
300     Vertex<LongWritable, BrachaTouegDeadlockVertexValue, LongWritable> vertex) {
301 
302     BrachaTouegDeadlockVertexValue value = vertex.getValue();
303     long vertexId = vertex.getId().get();
304     boolean hasOutEdges = false;
305 
306     value.setNotified();
307 
308     for (Edge<LongWritable, LongWritable> edge : vertex.getEdges()) {
309       hasOutEdges = true;
310       sendMessage(vertexId,
311                   edge.getTargetVertexId().get(),
312                   BrachaTouegDeadlockMessage.NOTIFY);
313 
314       /* the node will wait for a DONE message from each notified vertex */
315       value.waitForMessage(Long.valueOf(edge.getTargetVertexId().get()),
316                            Long.valueOf(BrachaTouegDeadlockMessage.DONE));
317     }
318 
319     /* if no requests are pending, the node has to start GRANTing to all
320        incoming edges */
321     if (!hasOutEdges && isInitiator(vertex)) {
322       value.setFree();
323     } else if (!value.hasPendingRequests() && !value.isFree()) {
324       grantVertices(vertex);
325     }
326   }
327 
328   /**
329    * @param vertex      vertex on which the grant method is called
330    */
331   private void grantVertices(
332     Vertex<LongWritable, BrachaTouegDeadlockVertexValue, LongWritable> vertex) {
333 
334     BrachaTouegDeadlockVertexValue value = vertex.getValue();
335     ArrayList<Long> parents = value.getParents();
336     long vertexId = vertex.getId().get();
337 
338     value.setFree();
339 
340     /* grant all the parents with resource access */
341     for (Long parent : parents) {
342       sendMessage(vertexId, parent,
343                   BrachaTouegDeadlockMessage.GRANT);
344 
345       /* the node will wait for a ACK message for each GRANTed vertex */
346       value.waitForMessage(parent,
347                            Long.valueOf(BrachaTouegDeadlockMessage.ACK));
348     }
349   }
350 
351   /**
352    * Function to handle the cases when a NOTIFY message is received.
353    * If the message received is of type NOTIFY we distinguish two cases:
354    * 1. The node was not yet notified: in this case  it forwards the
355    *    NOTIFY message to its outgoing messages. In this phase the
356    *    {@link BrachaTougeDeadlockComputation#notifyVertices} function is
357    *    called.
358    *    NB: in this case there is the need to keep track of the sender
359    *        of the message since later a DONE must be sent back.
360    * 2. The node was notified: in this case the node will immediately
361    *    reply with a DONE message.
362    *
363    * @param vertex    vertex that received the DONE message
364    * @param message   message received by the vertex
365    */
366   private void handleNotifyMessage(
367     Vertex<LongWritable, BrachaTouegDeadlockVertexValue, LongWritable> vertex,
368     BrachaTouegDeadlockMessage message) {
369 
370     BrachaTouegDeadlockVertexValue value = vertex.getValue();
371 
372     if (!value.isNotified()) {
373       notifyVertices(vertex);
374       value.setIdWithInHoldDone(message.getSenderId());
375     } else {
376       sendMessage(vertex.getId().get(), message.getSenderId(),
377                   BrachaTouegDeadlockMessage.DONE);
378     }
379   }
380 
381   /**
382    * Function to handle the cases when a GRANT message is received.
383    * When a GRANT message is received the number of requests is
384    * decremented. In this case we must distinguish three cases:
385    * 1. The number of requests needed reaches zero: at this stage a
386    *    round of {@link BrachaTougeDeadlockComputation#grantVertices} is
387    *    started to forward the resource granting mechanism.
388    *    NB: the sender id of the node must be kept to handle the delivery of
389    *    the ACK to the sender at the end of the granting procedure.
390    * 2. The node already started go grant since it is free: in this case an ACK
391    *    message is immediately sent back to the sender.
392    * 3. The number of requests is bigger than zero: in this case an ACK
393    *    is sent back to the sender.
394    *
395    * @param vertex    vertex that received the ACK message
396    * @param message   message received by the vertex
397    */
398   private void handleGrantMessage(
399     Vertex<LongWritable, BrachaTouegDeadlockVertexValue, LongWritable> vertex,
400     BrachaTouegDeadlockMessage message) {
401 
402     BrachaTouegDeadlockVertexValue value = vertex.getValue();
403     Long senderId = Long.valueOf(message.getSenderId());
404     LongWritable wId = new LongWritable(senderId);
405     LongWritable tag = vertex.getEdgeValue(wId);
406 
407     value.removeRequest(tag, wId);
408 
409     if (value.isFree() || value.getNumOfRequests(tag) > 0) {
410       sendAckMessage(senderId, vertex);
411       return;
412     } else {
413       grantVertices(vertex);
414       value.setIdWithInHoldAck(senderId);
415     }
416   }
417 }