This project has retired. For details please refer to its Attic page.
TestPredicateLock xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.zk;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertFalse;
23  import static org.junit.Assert.assertTrue;
24  import static org.mockito.Mockito.mock;
25  import static org.mockito.Mockito.when;
26  
27  import java.util.concurrent.atomic.AtomicInteger;
28  
29  import org.apache.giraph.time.Time;
30  import org.apache.hadoop.util.Progressable;
31  import org.junit.Before;
32  import org.junit.Test;
33  
34  /**
35   * Ensure that PredicateLock objects work correctly.
36   */
37  public class TestPredicateLock {
38    /** How many times was progress called? */
39    private AtomicInteger progressCalled = new AtomicInteger(0);
40  
41    private static class SignalThread extends Thread {
42      private final BspEvent event;
43      public SignalThread(BspEvent event) {
44        this.event = event;
45      }
46      public void run() {
47        try {
48          Thread.sleep(500);
49        } catch (InterruptedException e) {
50        }
51        event.signal();
52      }
53    }
54  
55    private Progressable stubContext;
56  
57    private Progressable getStubProgressable() {
58      if (stubContext == null)
59        stubContext = new Progressable() {
60          @Override
61          public void progress() {
62            progressCalled.incrementAndGet();
63          }
64        };
65      return stubContext;
66    }
67  
68    @Before
69    public void setUp() {
70      progressCalled.set(0);
71    }
72  
73    /**
74     * SMake sure the the event is not signaled.
75     */
76    @Test
77    public void testWaitMsecsNoEvent() {
78      Time mockTime = mock(Time.class);
79      when(mockTime.getMilliseconds()).
80          thenReturn(0L).thenReturn(2L);
81      BspEvent event = new PredicateLock(getStubProgressable(), 1, mockTime);
82      boolean gotPredicate = event.waitMsecs(1);
83      assertFalse(gotPredicate);
84      assertEquals(0, progressCalled.get());
85      when(mockTime.getMilliseconds()).
86          thenReturn(0L).thenReturn(0L).thenReturn(2L);
87      gotPredicate = event.waitMsecs(1);
88      assertFalse(gotPredicate);
89      assertEquals(1, progressCalled.get());
90    }
91  
92    /**
93     * Single threaded case where the event is signaled.
94     */
95    @Test
96    public void testEvent() {
97      Time mockTime = mock(Time.class);
98      when(mockTime.getMilliseconds()).
99          thenReturn(0L).thenReturn(2L);
100     BspEvent event = new PredicateLock(getStubProgressable(), 1, mockTime);
101     event.signal();
102     boolean gotPredicate = event.waitMsecs(2);
103     assertTrue(gotPredicate);
104     event.reset();
105     when(mockTime.getMilliseconds()).
106         thenReturn(0L).thenReturn(2L);
107     gotPredicate = event.waitMsecs(0);
108     assertFalse(gotPredicate);
109   }
110 
111   /**
112    * Thread signaled test for {@link PredicateLock#waitForTimeoutOrFail(long)}
113    */
114   @Test
115   public void testWaitForever() {
116     BspEvent event = new PredicateLock(getStubProgressable());
117     Thread signalThread = new SignalThread(event);
118     signalThread.start();
119     event.waitForTimeoutOrFail(5 * 60_000);
120     try {
121       signalThread.join();
122     } catch (InterruptedException e) {
123     }
124     assertTrue(event.waitMsecs(0));
125   }
126 
127   /**
128    * Thread signaled test to make sure the the event is signaled correctly
129    */
130   @Test
131   public void testWaitMsecs() {
132     BspEvent event = new PredicateLock(getStubProgressable());
133     Thread signalThread = new SignalThread(event);
134     signalThread.start();
135     boolean gotPredicate = event.waitMsecs(2000);
136     assertTrue(gotPredicate);
137     try {
138       signalThread.join();
139     } catch (InterruptedException e) {
140     }
141     gotPredicate = event.waitMsecs(0);
142     assertTrue(gotPredicate);
143   }
144 }