1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.giraph.utils;
20
21 import org.apache.hadoop.util.Progressable;
22 import org.apache.log4j.Logger;
23
24 import com.google.common.collect.Sets;
25
26 import java.util.HashSet;
27 import java.util.Set;
28
29 /**
30 * This barrier is used when we don't know how many events are we waiting on
31 * from the start. Instead we have a set of task ids, and each of those will,
32 * at some point of time, give the information about how many events from it
33 * should we expect. Barrier will be waiting for all the tasks to notify it
34 * about that number of events, and than it will also wait for all the events
35 * to happen.
36 *
37 * requirePermits() corresponds to task notifying us how many events from it
38 * to expect, and releasePermits() notifies us about events happening.
39 *
40 * This class is currently used during preparation of aggregators.
41 *
42 * User must follow this protocol for concurrent access:
43 *
44 * (1) an object instance is constructed
45 * (2) arbitrarily many times
46 * (2a) concurrent calls to requirePermits(), releasePermits() and
47 * waitForRequiredPermits() are issued
48 * (2b) waitForRequiredPermits() returns
49 *
50 * Note that the next cycle of calls to requirePermits() or releasePermits()
51 * cannot start until the previous call to waitForRequiredPermits()
52 * has returned.
53 *
54 * Methods of this class are thread-safe.
55 */
56 public class TaskIdsPermitsBarrier {
57 /** Class logger */
58 private static final Logger LOG =
59 Logger.getLogger(TaskIdsPermitsBarrier.class);
60 /** Msecs to refresh the progress meter */
61 private static final int MSEC_PERIOD = 10000;
62 /** Maximum number of task ids to list in the log */
63 private static final int MAX_TASK_IDS_TO_LOG = 10;
64 /** Progressable for reporting progress */
65 private final Progressable progressable;
66 /** Number of permits we are currently waiting for */
67 private long waitingOnPermits = 0;
68 /** Set of task ids which required permits already */
69 private final Set<Integer> arrivedTaskIds = new HashSet<Integer>();
70 /** Logger */
71 private final TimedLogger logger;
72
73 /**
74 * Constructor
75 *
76 * @param progressable Progressable for reporting progress
77 */
78 public TaskIdsPermitsBarrier(Progressable progressable) {
79 this.progressable = progressable;
80 logger = new TimedLogger(MSEC_PERIOD, LOG);
81 }
82
83 /**
84 * Wait until permits have been required desired number of times,
85 * and all required permits are available
86 *
87 * @param expectedTaskIds List of task ids which we are waiting permits from
88 */
89 public synchronized void waitForRequiredPermits(
90 Set<Integer> expectedTaskIds) {
91 while (arrivedTaskIds.size() < expectedTaskIds.size() ||
92 waitingOnPermits > 0) {
93 try {
94 wait(MSEC_PERIOD);
95 } catch (InterruptedException e) {
96 throw new IllegalStateException("waitForRequiredPermits: " +
97 "InterruptedException occurred");
98 }
99 progressable.progress();
100 if (LOG.isInfoEnabled()) {
101 if (arrivedTaskIds.size() < expectedTaskIds.size()) {
102 String logSuffix = "";
103 if (expectedTaskIds.size() - arrivedTaskIds.size() <=
104 MAX_TASK_IDS_TO_LOG) {
105 Sets.SetView<Integer> difference =
106 Sets.difference(expectedTaskIds, arrivedTaskIds);
107 logSuffix = ", task ids: " + difference;
108 }
109 logger.info("waitForRequiredPermits: " +
110 "Waiting for " +
111 (expectedTaskIds.size() - arrivedTaskIds.size()) +
112 " more tasks to send their aggregator data" +
113 logSuffix);
114 } else {
115 logger.info("waitForRequiredPermits: " +
116 "Waiting for " + waitingOnPermits + " more aggregator requests");
117 }
118 }
119 }
120
121 // Reset for the next time to use
122 arrivedTaskIds.clear();
123 waitingOnPermits = 0;
124 }
125
126 /**
127 * Require more permits. This will increase the number of times permits
128 * were required. Doesn't wait for permits to become available.
129 *
130 * @param permits Number of permits to require
131 * @param taskId Task id which required permits
132 */
133 public synchronized void requirePermits(long permits, int taskId) {
134 arrivedTaskIds.add(taskId);
135 waitingOnPermits += permits;
136 notifyAll();
137 }
138
139 /**
140 * Release one permit.
141 */
142 public synchronized void releaseOnePermit() {
143 releasePermits(1);
144 }
145
146 /**
147 * Release some permits.
148 *
149 * @param permits Number of permits to release
150 */
151 public synchronized void releasePermits(long permits) {
152 waitingOnPermits -= permits;
153 notifyAll();
154 }
155 }