1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.examples.utils;
2021import java.io.DataInput;
22import java.io.DataOutput;
23import java.io.IOException;
24import java.util.ArrayList;
25import java.util.HashMap;
26import java.util.Map;
2728import org.apache.hadoop.io.LongWritable;
29import org.apache.hadoop.io.Writable;
3031/**32 * Vertex value used for the Bracha-Toueg Dealock algorithm33 */34publicclassBrachaTouegDeadlockVertexValueimplements Writable {
35/** Invalid ID */36publicstaticfinal Long INVALID_ID = Long.valueOf(-1);
3738/** Vertex is free from deadlock */39privateboolean isFree;
40/** Vertex was notified */41privateboolean isNotified;
42/**43 * Active requests which need to be satisfied to free the node.44 * Tha hash map is needed to handle the N-out-of-M semantics. The first45 * parameter identifies the TAG of the request, the second identifies the46 * vertex id to which an edge with points. All the eequests (edges) with the47 * same TAG need to be satisfied to consider the vertex free.48 */49private HashMap<Long, ArrayList<Long>> requests;
50/**51 * Structure containing the messages awaited from the other vertices.52 * The algorithm guarantees that the vertex will not wait for two different53 * messages from the same vertex.54 */55private HashMap<Long, Long> waitingList;
56/** IDs of the parents of this vertex */57private ArrayList<Long> parents;
58/** id to which the ACK message needs to be sent for the first GRANT message59 that releases the node; -1 identifies an empty ackId */60private Long idWithInHoldAck;
61/**62 * id to which the DONE message needs to be sent for the first NOTICE63 * message received; -1 identifies an empty ackId64 */65private Long idWithInHoldDone;
6667/**68 * Default constructor69 */70publicBrachaTouegDeadlockVertexValue() {
71this(new HashMap<Long, ArrayList<Long>>());
72 }
7374/**75 * Parametrized constructor76 *77 * @param requests number of requests needed to consider the node free78 */79publicBrachaTouegDeadlockVertexValue(
80 HashMap<Long, ArrayList<Long>> requests) {
8182this.isFree = false;
83this.isNotified = false;
84this.requests = requests;
85this.waitingList = new HashMap<Long, Long>();
86this.parents = new ArrayList<Long>();
87this.idWithInHoldAck = INVALID_ID;
88this.idWithInHoldDone = INVALID_ID;
89 }
9091// Serialization functions -----------------------------------------------9293 @Override
94publicvoid readFields(DataInput input) throws IOException {
95int sz;
9697this.isFree = input.readBoolean();
98this.isNotified = input.readBoolean();
99100 sz = input.readInt();
101for (int i = 0; i < sz; ++i) {
102 ArrayList<Long> targets = new ArrayList<Long>();
103 Long tag = input.readLong();
104int sw = input.readInt();
105106for (int j = 0; j < sw; ++j) {
107 Long target = input.readLong();
108109 targets.add(target);
110 }
111112this.requests.put(tag, targets);
113 }
114115 sz = input.readInt();
116for (int i = 0; i < sz; ++i) {
117 Long key = input.readLong();
118 Long value = input.readLong();
119120this.waitingList.put(key, value);
121 }
122123 sz = input.readInt();
124for (int i = 0; i < sz; ++i) {
125this.parents.add(Long.valueOf(input.readLong()));
126 }
127128this.idWithInHoldAck = input.readLong();
129this.idWithInHoldDone = input.readLong();
130 }
131132 @Override
133publicvoid write(DataOutput output) throws IOException {
134int sz;
135136 output.writeBoolean(this.isFree);
137 output.writeBoolean(this.isNotified);
138139 sz = this.requests.size();
140 output.writeInt(sz);
141for (Map.Entry<Long, ArrayList<Long>> entry : this.requests.entrySet()) {
142 ArrayList<Long> targets;
143144 output.writeLong(entry.getKey());
145 targets = entry.getValue();
146 sz = targets.size();
147 output.writeInt(sz);
148for (Long target : targets) {
149 output.writeLong(target);
150 }
151 }
152153 sz = this.waitingList.size();
154 output.writeInt(sz);
155for (Map.Entry<Long, Long> entry : this.waitingList.entrySet()) {
156 output.writeLong(entry.getKey());
157 output.writeLong(entry.getValue());
158 }
159160 sz = this.parents.size();
161 output.writeInt(sz);
162for (int i = 0; i < sz; ++i) {
163 output.writeLong(this.parents.get(i));
164 }
165166 output.writeLong(this.idWithInHoldAck);
167 output.writeLong(this.idWithInHoldDone);
168 }
169170// Accessors -------------------------------------------------------------171172/**173 * @return true if free, false otherwise174 */175publicboolean isFree() {
176returnthis.isFree;
177 }
178179/**180 * the vertex is free from deadlocks181 */182publicvoid setFree() {
183this.isFree = true;
184 }
185186/**187 * @return true if the vertex was notified, false otherwise188 */189publicboolean isNotified() {
190returnthis.isNotified;
191 }
192193/**194 * the vertex got a notification195 */196publicvoid setNotified() {
197this.isNotified = true;
198 }
199200/**201 * @return false if no pending requests have to be still processed to202 * continue the computation203 */204publicboolean hasPendingRequests() {
205boolean withPendingRequests = true;
206207if (this.requests.isEmpty()) {
208 withPendingRequests = false;
209 }
210211for (Map.Entry<Long, ArrayList<Long>> request : this.requests.entrySet()) {
212 ArrayList<Long> targets = request.getValue();
213214if (targets.size() == 0) {
215 withPendingRequests = false;
216 }
217 }
218return withPendingRequests;
219 }
220221/**222 * remove the expected request from the edge on which the message arrived223 *224 * @param tag tag of the edge225 * @param targetId target Id to which the edge points226 */227publicvoid removeRequest(LongWritable tag, LongWritable targetId) {
228 Long l = Long.valueOf(tag.get());
229 ArrayList<Long> targets = this.requests.get(l);
230231if (targets.contains(targetId.get())) {
232 targets.remove(Long.valueOf(targetId.get()));
233 }
234 }
235236/**237 * This function retrieves the number of pending requests for the specified238 * tag. Because of the N-out-of-M semantic, each time a GRANT is received239 * on an edge, the number of requests is reduced for the tag which the edge240 * is part of.241 *242 * @param tag tag related to the requests to be verified243 * @return number of requests pending for the tag provided244 */245publicint getNumOfRequests(LongWritable tag) {
246 Long l = Long.valueOf(tag.get());
247 ArrayList<Long> targets = this.requests.get(l);
248249return targets.size();
250 }
251252/**253 * Add a new message that must be expected by the node254 *255 * @param id ID of the node from which the messages is expected256 * @param type type of message that is awaited257 */258publicvoid waitForMessage(Long id, Long type) {
259// waiting list should not contain two messages for the same node260 assert waitingList.get(id) == null;
261 waitingList.put(id, type);
262 }
263264/**265 * Each time a message is received, it has to be removed from the queue266 * that keeps track of the waited messages.267 *268 * @param id ID of the node from which the messages is expected269 * @param type type of message that is awaited270 */271publicvoid receivedMessage(Long id, Long type) {
272long typel;
273274 assert waitingList.get(id) != null;
275 typel = waitingList.get(id).longValue();
276 assert typel > 0;
277 waitingList.remove(id);
278 }
279280/**281 * @param type type of message to check282 * @return boolean true if waiting the message type, false otherwise283 */284publicboolean isWaitingForMessage(Long type) {
285for (Map.Entry<Long, Long> entry : waitingList.entrySet()) {
286long typel = entry.getValue().longValue();
287if ((typel & type) > 0) {
288returntrue;
289 }
290 }
291292return false;
293 }
294295/**296 * add a parent id into the list of parents kept at the vertex side297 * @param parentId vertex id of the parent298 */299publicvoid addParent(Long parentId) {
300this.parents.add(parentId);
301 }
302303/**304 * @return list of parent IDs collected305 */306public ArrayList<Long> getParents() {
307returnthis.parents;
308 }
309310/**311 * @return the id waiting for an ACK message312 */313public Long getIdWithInHoldAck() {
314returnthis.idWithInHoldAck;
315 }
316317/**318 * @param id the id to set319 */320publicvoid setIdWithInHoldAck(Long id) {
321this.idWithInHoldAck = id;
322 }
323324/**325 * @return the id waiting for an DONE message326 */327public Long getIdWithInHoldDone() {
328return idWithInHoldDone;
329 }
330331/**332 * @param doneId the id to set333 */334publicvoid setIdWithInHoldDone(Long doneId) {
335this.idWithInHoldDone = doneId;
336 }
337338 @Override
339public String toString() {
340return"isFree=" + Boolean.toString(isFree);
341 }
342 }