1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.giraph.utils;
20
21 import org.apache.hadoop.util.Progressable;
22
23 import com.google.common.base.Preconditions;
24
25 import java.util.ArrayList;
26 import java.util.Collections;
27 import java.util.List;
28 import java.util.concurrent.Semaphore;
29
30 /**
31 * Keeps a set of elements, and allows for waiting on certain number of
32 * elements to become available. Assumes that at any point no more elements
33 * than we'll be asking for will be added to the set. Reusable.
34 *
35 * @param <T> Element type
36 */
37 public class BlockingElementsSet<T> {
38 /** Semaphore to keep track of element count */
39 private final Semaphore semaphore = new Semaphore(0);
40 /** Elements */
41 private final List<T> elements =
42 Collections.synchronizedList(new ArrayList<T>());
43
44 /**
45 * Put an element in the set
46 *
47 * @param element Element to put
48 */
49 public void offer(T element) {
50 elements.add(element);
51 semaphore.release();
52 }
53
54 /**
55 * Get one element when it becomes available,
56 * reporting progress while waiting
57 *
58 * @param progressable Progressable to report progress
59 * @return Element acquired
60 */
61 public T getElement(Progressable progressable) {
62 return getElements(1, progressable).get(0);
63 }
64
65 /**
66 * Get desired number of elements when they become available,
67 * reporting progress while waiting
68 *
69 * @param elementCount How many elements to wait for
70 * @param progressable Progressable to report progress
71 * @return List of elements acquired
72 */
73 public List<T> getElements(int elementCount, Progressable progressable) {
74 ProgressableUtils.awaitSemaphorePermits(
75 semaphore, elementCount, progressable);
76 Preconditions.checkState(elements.size() == elementCount);
77 List<T> ret = new ArrayList<>(elements);
78 elements.clear();
79 return ret;
80 }
81 }