This project has retired. For details please refer to its Attic page.
CollectPrimitiveReduceOperation xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.reducers.collect;
19  
20  import java.io.DataInput;
21  import java.io.DataOutput;
22  import java.io.IOException;
23  
24  import org.apache.giraph.reducers.impl.KryoWrappedReduceOperation;
25  import org.apache.giraph.types.ops.PrimitiveTypeOps;
26  import org.apache.giraph.types.ops.TypeOpsUtils;
27  import org.apache.giraph.types.ops.collections.ResettableIterator;
28  import org.apache.giraph.types.ops.collections.array.WArrayList;
29  import org.apache.giraph.utils.WritableUtils;
30  
31  /**
32   * Collect primitive values reduce operation
33   *
34   * @param <S> Primitive Writable type, which has its type ops
35   */
36  public class CollectPrimitiveReduceOperation<S>
37      extends KryoWrappedReduceOperation<S, WArrayList<S>> {
38    /**
39     * Type ops if available, or null
40     */
41    private PrimitiveTypeOps<S> typeOps;
42  
43    /** For reflection only */
44    public CollectPrimitiveReduceOperation() {
45    }
46  
47    public CollectPrimitiveReduceOperation(PrimitiveTypeOps<S> typeOps) {
48      this.typeOps = typeOps;
49    }
50  
51    @Override
52    public WArrayList<S> createValue() {
53      return createList();
54    }
55  
56    @Override
57    public void reduce(WArrayList<S> reduceInto, S value) {
58      reduceInto.addW(value);
59    }
60  
61    @Override
62    public void reduceMerge(final WArrayList<S> reduceInto,
63        WArrayList<S> toReduce) {
64      ResettableIterator<S> iterator = toReduce.fastIteratorW();
65      while (iterator.hasNext()) {
66        reduceInto.addW(iterator.next());
67      }
68    }
69  
70    public WArrayList<S> createList() {
71      return typeOps.createArrayList();
72    }
73  
74    @Override
75    public void write(DataOutput out) throws IOException {
76      WritableUtils.writeClass(typeOps.getTypeClass(), out);
77    }
78  
79    @Override
80    public void readFields(DataInput in) throws IOException {
81      typeOps = TypeOpsUtils.getPrimitiveTypeOps(
82          WritableUtils.<S>readClass(in));
83    }
84  }