1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.giraph.utils; 20 21 import org.apache.hadoop.util.Progressable; 22 23 import com.google.common.base.Preconditions; 24 25 import java.util.ArrayList; 26 import java.util.Collections; 27 import java.util.List; 28 import java.util.concurrent.Semaphore; 29 30 /** 31 * Keeps a set of elements, and allows for waiting on certain number of 32 * elements to become available. Assumes that at any point no more elements 33 * than we'll be asking for will be added to the set. Reusable. 34 * 35 * @param <T> Element type 36 */ 37 public class BlockingElementsSet<T> { 38 /** Semaphore to keep track of element count */ 39 private final Semaphore semaphore = new Semaphore(0); 40 /** Elements */ 41 private final List<T> elements = 42 Collections.synchronizedList(new ArrayList<T>()); 43 44 /** 45 * Put an element in the set 46 * 47 * @param element Element to put 48 */ 49 public void offer(T element) { 50 elements.add(element); 51 semaphore.release(); 52 } 53 54 /** 55 * Get one element when it becomes available, 56 * reporting progress while waiting 57 * 58 * @param progressable Progressable to report progress 59 * @return Element acquired 60 */ 61 public T getElement(Progressable progressable) { 62 return getElements(1, progressable).get(0); 63 } 64 65 /** 66 * Get desired number of elements when they become available, 67 * reporting progress while waiting 68 * 69 * @param elementCount How many elements to wait for 70 * @param progressable Progressable to report progress 71 * @return List of elements acquired 72 */ 73 public List<T> getElements(int elementCount, Progressable progressable) { 74 ProgressableUtils.awaitSemaphorePermits( 75 semaphore, elementCount, progressable); 76 Preconditions.checkState(elements.size() == elementCount); 77 List<T> ret = new ArrayList<>(elements); 78 elements.clear(); 79 return ret; 80 } 81 }