This project has retired. For details please refer to its
        
        Attic page.
      
1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.giraph.zk;
20  
21  import java.util.concurrent.TimeUnit;
22  import java.util.concurrent.locks.Condition;
23  import java.util.concurrent.locks.Lock;
24  import java.util.concurrent.locks.ReentrantLock;
25  
26  import org.apache.giraph.time.SystemTime;
27  import org.apache.giraph.time.Time;
28  import org.apache.hadoop.util.Progressable;
29  import org.apache.log4j.Logger;
30  
31  
32  
33  
34  
35  public class PredicateLock implements BspEvent {
36    
37    private static final Logger LOG = Logger.getLogger(PredicateLock.class);
38    
39    private static final int DEFAULT_MSEC_PERIOD = 10000;
40    
41    protected final Progressable progressable;
42    
43    private final int msecPeriod;
44    
45    private Lock lock = new ReentrantLock();
46    
47    private Condition cond = lock.newCondition();
48    
49    private boolean eventOccurred = false;
50    
51    private final Time time;
52  
53    
54  
55  
56  
57  
58    public PredicateLock(Progressable progressable) {
59      this(progressable, DEFAULT_MSEC_PERIOD, SystemTime.get());
60    }
61  
62    
63  
64  
65  
66  
67  
68  
69    public PredicateLock(Progressable progressable, int msecPeriod, Time time) {
70      this.progressable = progressable;
71      this.msecPeriod = msecPeriod;
72      this.time = time;
73    }
74  
75    @Override
76    public void reset() {
77      lock.lock();
78      try {
79        eventOccurred = false;
80      } finally {
81        lock.unlock();
82      }
83    }
84  
85    @Override
86    public void signal() {
87      lock.lock();
88      try {
89        eventOccurred = true;
90        cond.signalAll();
91      } finally {
92        lock.unlock();
93      }
94    }
95  
96    @Override
97    public boolean waitMsecs(int msecs) {
98      if (msecs < 0) {
99        throw new RuntimeException("waitMsecs: msecs cannot be negative!");
100     }
101     long maxMsecs = time.getMilliseconds() + msecs;
102     int curMsecTimeout = 0;
103     lock.lock();
104     try {
105       while (!eventOccurred) {
106         curMsecTimeout =
107             Math.min(msecs, msecPeriod);
108         if (LOG.isDebugEnabled()) {
109           LOG.debug("waitMsecs: Wait for " + curMsecTimeout);
110         }
111         try {
112           boolean signaled =
113               cond.await(curMsecTimeout, TimeUnit.MILLISECONDS);
114           if (LOG.isDebugEnabled()) {
115             LOG.debug("waitMsecs: Got timed signaled of " +
116               signaled);
117           }
118         } catch (InterruptedException e) {
119           throw new IllegalStateException(
120             "waitMsecs: Caught interrupted " +
121             "exception on cond.await() " +
122             curMsecTimeout, e);
123         }
124         if (time.getMilliseconds() > maxMsecs) {
125           return false;
126         }
127         msecs = Math.max(0, msecs - curMsecTimeout);
128         progressable.progress(); 
129       }
130     } finally {
131       lock.unlock();
132     }
133     return true;
134   }
135 
136   @Override
137   public void waitForTimeoutOrFail(long timeout) {
138     long t0 = System.currentTimeMillis();
139     while (!waitMsecs(msecPeriod)) {
140       if (System.currentTimeMillis() > t0 + timeout) {
141         throw new RuntimeException("Timeout waiting");
142       }
143       progressable.progress();
144     }
145   }
146 }