This project has retired. For details please refer to its Attic page.
TopNReduce xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.reducers;
19  
20  import org.apache.giraph.reducers.impl.KryoWrappedReduceOperation;
21  
22  import java.io.DataInput;
23  import java.io.DataOutput;
24  import java.io.IOException;
25  import java.util.PriorityQueue;
26  
27  /**
28   * Extracts top N largest elements
29   *
30   * @param <S> Single value type, objects passed on workers
31   */
32  public class TopNReduce<S extends Comparable<S>>
33    extends KryoWrappedReduceOperation<S, PriorityQueue<S>> {
34    private int capacity;
35  
36    public TopNReduce(int capacity) {
37      this.capacity = capacity;
38    }
39  
40    public TopNReduce() { }
41  
42    @Override
43    public PriorityQueue<S> createValue() {
44      return new PriorityQueue<S>();
45    }
46  
47    @Override
48    public void reduce(PriorityQueue<S> heap, S value) {
49      if (capacity == 0) {
50        return;
51      }
52  
53      if (heap.size() < capacity) {
54        heap.add(value);
55      } else {
56        S head = heap.peek();
57        if (head.compareTo(value) < 0) {
58          heap.poll();
59          heap.add(value);
60        }
61      }
62    }
63  
64    @Override
65    public void reduceMerge(
66      PriorityQueue<S> reduceInto,
67      PriorityQueue<S> toReduce
68    ) {
69      for (S element : toReduce) {
70        reduce(reduceInto, element);
71      }
72    }
73  
74    @Override
75    public void write(DataOutput out) throws IOException {
76      super.write(out);
77      out.writeInt(capacity);
78    }
79  
80    @Override
81    public void readFields(DataInput in) throws IOException {
82      super.readFields(in);
83      capacity = in.readInt();
84    }
85  }