View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.zk;
20  
21  import java.util.concurrent.TimeUnit;
22  import java.util.concurrent.locks.Condition;
23  import java.util.concurrent.locks.Lock;
24  import java.util.concurrent.locks.ReentrantLock;
25  
26  import org.apache.giraph.time.SystemTime;
27  import org.apache.giraph.time.Time;
28  import org.apache.hadoop.util.Progressable;
29  import org.apache.log4j.Logger;
30  
31  /**
32   * A lock with a predicate that was be used to synchronize events and keep the
33   * job context updated while waiting.
34   */
35  public class PredicateLock implements BspEvent {
36    /** Class logger */
37    private static final Logger LOG = Logger.getLogger(PredicateLock.class);
38    /** Default msecs to refresh the progress meter */
39    private static final int DEFAULT_MSEC_PERIOD = 10000;
40    /** Progressable for reporting progress (Job context) */
41    protected final Progressable progressable;
42    /** Actual mses to refresh the progress meter */
43    private final int msecPeriod;
44    /** Lock */
45    private Lock lock = new ReentrantLock();
46    /** Condition associated with lock */
47    private Condition cond = lock.newCondition();
48    /** Predicate */
49    private boolean eventOccurred = false;
50    /** Keeps track of the time */
51    private final Time time;
52  
53    /**
54     * Constructor with default values.
55     *
56     * @param progressable used to report progress() (usually a Mapper.Context)
57     */
58    public PredicateLock(Progressable progressable) {
59      this(progressable, DEFAULT_MSEC_PERIOD, SystemTime.get());
60    }
61  
62    /**
63     * Constructor.
64     *
65     * @param progressable used to report progress() (usually a Mapper.Context)
66     * @param msecPeriod Msecs between progress reports
67     * @param time Time implementation
68     */
69    public PredicateLock(Progressable progressable, int msecPeriod, Time time) {
70      this.progressable = progressable;
71      this.msecPeriod = msecPeriod;
72      this.time = time;
73    }
74  
75    @Override
76    public void reset() {
77      lock.lock();
78      try {
79        eventOccurred = false;
80      } finally {
81        lock.unlock();
82      }
83    }
84  
85    @Override
86    public void signal() {
87      lock.lock();
88      try {
89        eventOccurred = true;
90        cond.signalAll();
91      } finally {
92        lock.unlock();
93      }
94    }
95  
96    @Override
97    public boolean waitMsecs(int msecs) {
98      if (msecs < 0) {
99        throw new RuntimeException("waitMsecs: msecs cannot be negative!");
100     }
101     long maxMsecs = time.getMilliseconds() + msecs;
102     int curMsecTimeout = 0;
103     lock.lock();
104     try {
105       while (!eventOccurred) {
106         curMsecTimeout =
107             Math.min(msecs, msecPeriod);
108         if (LOG.isDebugEnabled()) {
109           LOG.debug("waitMsecs: Wait for " + curMsecTimeout);
110         }
111         try {
112           boolean signaled =
113               cond.await(curMsecTimeout, TimeUnit.MILLISECONDS);
114           if (LOG.isDebugEnabled()) {
115             LOG.debug("waitMsecs: Got timed signaled of " +
116               signaled);
117           }
118         } catch (InterruptedException e) {
119           throw new IllegalStateException(
120             "waitMsecs: Caught interrupted " +
121             "exception on cond.await() " +
122             curMsecTimeout, e);
123         }
124         if (time.getMilliseconds() > maxMsecs) {
125           return false;
126         }
127         msecs = Math.max(0, msecs - curMsecTimeout);
128         progressable.progress(); // go around again
129       }
130     } finally {
131       lock.unlock();
132     }
133     return true;
134   }
135 
136   @Override
137   public void waitForever() {
138     while (!waitMsecs(msecPeriod)) {
139       progressable.progress();
140     }
141   }
142 }