This project has retired. For details please refer to its Attic page.
CollectTuplesOfPrimitivesReduceOperation xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.reducers.collect;
19  
20  import java.io.DataInput;
21  import java.io.DataOutput;
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.List;
25  
26  import org.apache.giraph.reducers.impl.KryoWrappedReduceOperation;
27  import org.apache.giraph.types.ops.PrimitiveTypeOps;
28  import org.apache.giraph.types.ops.TypeOpsUtils;
29  import org.apache.giraph.types.ops.collections.ResettableIterator;
30  import org.apache.giraph.types.ops.collections.array.WArrayList;
31  import org.apache.giraph.utils.WritableUtils;
32  
33  /**
34   * Collect tuples of primitive values reduce operation
35   */
36  public class CollectTuplesOfPrimitivesReduceOperation
37      extends KryoWrappedReduceOperation<List<Object>, List<WArrayList>> {
38    /**
39     * Type ops if available, or null
40     */
41    private List<PrimitiveTypeOps> typeOpsList;
42  
43    /** For reflection only */
44    public CollectTuplesOfPrimitivesReduceOperation() {
45    }
46  
47    public CollectTuplesOfPrimitivesReduceOperation(
48        List<PrimitiveTypeOps> typeOpsList) {
49      this.typeOpsList = typeOpsList;
50    }
51  
52    @Override
53    public List<WArrayList> createValue() {
54      List<WArrayList> ret = new ArrayList<>(typeOpsList.size());
55      for (PrimitiveTypeOps typeOps : typeOpsList) {
56        ret.add(typeOps.createArrayList());
57      }
58      return ret;
59    }
60  
61    @Override
62    public void reduce(List<WArrayList> reduceInto, List<Object> value) {
63      for (int i = 0; i < reduceInto.size(); i++) {
64        reduceInto.get(i).addW(value.get(i));
65      }
66    }
67  
68    @Override
69    public void reduceMerge(final List<WArrayList> reduceInto,
70        List<WArrayList> toReduce) {
71      for (int i = 0; i < reduceInto.size(); i++) {
72        ResettableIterator iterator = toReduce.get(i).fastIteratorW();
73        while (iterator.hasNext()) {
74          reduceInto.get(i).addW(iterator.next());
75        }
76      }
77    }
78  
79    @Override
80    public void write(DataOutput out) throws IOException {
81      out.writeInt(typeOpsList.size());
82      for (PrimitiveTypeOps typeOps : typeOpsList) {
83        WritableUtils.writeClass(typeOps.getTypeClass(), out);
84      }
85    }
86  
87    @Override
88    public void readFields(DataInput in) throws IOException {
89      int size = in.readInt();
90      typeOpsList = new ArrayList<>(size);
91      for (int i = 0; i < size; i++) {
92        typeOpsList.add(TypeOpsUtils.getPrimitiveTypeOps(
93            WritableUtils.readClass(in)));
94      }
95    }
96  }