View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.zk;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertFalse;
23  import static org.junit.Assert.assertTrue;
24  import static org.mockito.Mockito.mock;
25  import static org.mockito.Mockito.when;
26  
27  import java.util.concurrent.atomic.AtomicInteger;
28  
29  import org.apache.giraph.time.Time;
30  import org.apache.giraph.zk.BspEvent;
31  import org.apache.giraph.zk.PredicateLock;
32  import org.apache.hadoop.util.Progressable;
33  import org.junit.Before;
34  import org.junit.Test;
35  
36  /**
37   * Ensure that PredicateLock objects work correctly.
38   */
39  public class TestPredicateLock {
40    /** How many times was progress called? */
41    private AtomicInteger progressCalled = new AtomicInteger(0);
42  
43    private static class SignalThread extends Thread {
44      private final BspEvent event;
45      public SignalThread(BspEvent event) {
46        this.event = event;
47      }
48      public void run() {
49        try {
50          Thread.sleep(500);
51        } catch (InterruptedException e) {
52        }
53        event.signal();
54      }
55    }
56  
57    private Progressable stubContext;
58  
59    private Progressable getStubProgressable() {
60      if (stubContext == null)
61        stubContext = new Progressable() {
62          @Override
63          public void progress() {
64            progressCalled.incrementAndGet();
65          }
66        };
67      return stubContext;
68    }
69  
70    @Before
71    public void setUp() {
72      progressCalled.set(0);
73    }
74  
75    /**
76     * SMake sure the the event is not signaled.
77     */
78    @Test
79    public void testWaitMsecsNoEvent() {
80      Time mockTime = mock(Time.class);
81      when(mockTime.getMilliseconds()).
82          thenReturn(0L).thenReturn(2L);
83      BspEvent event = new PredicateLock(getStubProgressable(), 1, mockTime);
84      boolean gotPredicate = event.waitMsecs(1);
85      assertFalse(gotPredicate);
86      assertEquals(0, progressCalled.get());
87      when(mockTime.getMilliseconds()).
88          thenReturn(0L).thenReturn(0L).thenReturn(2L);
89      gotPredicate = event.waitMsecs(1);
90      assertFalse(gotPredicate);
91      assertEquals(1, progressCalled.get());
92    }
93  
94    /**
95     * Single threaded case where the event is signaled.
96     */
97    @Test
98    public void testEvent() {
99      Time mockTime = mock(Time.class);
100     when(mockTime.getMilliseconds()).
101         thenReturn(0L).thenReturn(2L);
102     BspEvent event = new PredicateLock(getStubProgressable(), 1, mockTime);
103     event.signal();
104     boolean gotPredicate = event.waitMsecs(2);
105     assertTrue(gotPredicate);
106     event.reset();
107     when(mockTime.getMilliseconds()).
108         thenReturn(0L).thenReturn(2L);
109     gotPredicate = event.waitMsecs(0);
110     assertFalse(gotPredicate);
111   }
112 
113   /**
114    * Thread signaled test for {@link PredicateLock#waitForever()}
115    */
116   @Test
117   public void testWaitForever() {
118     BspEvent event = new PredicateLock(getStubProgressable());
119     Thread signalThread = new SignalThread(event);
120     signalThread.start();
121     event.waitForever();
122     try {
123       signalThread.join();
124     } catch (InterruptedException e) {
125     }
126     assertTrue(event.waitMsecs(0));
127   }
128 
129   /**
130    * Thread signaled test to make sure the the event is signaled correctly
131    */
132   @Test
133   public void testWaitMsecs() {
134     BspEvent event = new PredicateLock(getStubProgressable());
135     Thread signalThread = new SignalThread(event);
136     signalThread.start();
137     boolean gotPredicate = event.waitMsecs(2000);
138     assertTrue(gotPredicate);
139     try {
140       signalThread.join();
141     } catch (InterruptedException e) {
142     }
143     gotPredicate = event.waitMsecs(0);
144     assertTrue(gotPredicate);
145   }
146 }