View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.reducers.impl;
19  
20  import java.io.DataInput;
21  import java.io.DataOutput;
22  import java.io.IOException;
23  
24  import org.apache.commons.lang3.tuple.Pair;
25  import org.apache.giraph.reducers.ReduceOperation;
26  import org.apache.giraph.utils.WritableUtils;
27  import org.apache.giraph.writable.tuple.PairWritable;
28  import org.apache.hadoop.io.Writable;
29  import org.python.google.common.base.Preconditions;
30  
31  /**
32   * Combines two individual reducers, to create a single reducer of pairs that
33   * reduces each of them individually.
34   *
35   * @param <S1> First single value type
36   * @param <R1> First reduced value type
37   * @param <S2> Second single value type
38   * @param <R2> Second reduced value type
39   */
40  public class PairReduce<S1, R1 extends Writable, S2, R2 extends Writable>
41      implements ReduceOperation<Pair<S1, S2>, PairWritable<R1, R2>> {
42    /** First reduceOp */
43    private ReduceOperation<S1, R1> reduce1;
44    /** Second reduceOp */
45    private ReduceOperation<S2, R2> reduce2;
46  
47    /** Constructor */
48    public PairReduce() {
49    }
50  
51    /**
52     * Constructor
53     * @param reduce1 First reduceOp
54     * @param reduce2 Second reduceOp
55     */
56    public PairReduce(
57        ReduceOperation<S1, R1> reduce1, ReduceOperation<S2, R2> reduce2) {
58      this.reduce1 = reduce1;
59      this.reduce2 = reduce2;
60    }
61  
62  
63    @Override
64    public PairWritable<R1, R2> createInitialValue() {
65      return new PairWritable<>(
66          reduce1.createInitialValue(), reduce2.createInitialValue());
67    }
68  
69    @Override
70    public PairWritable<R1, R2> reduce(
71        PairWritable<R1, R2> curValue, Pair<S1, S2> valueToReduce) {
72      Preconditions.checkState(
73          curValue.getLeft() ==
74          reduce1.reduce(curValue.getLeft(), valueToReduce.getLeft()));
75      Preconditions.checkState(
76          curValue.getRight() ==
77          reduce2.reduce(curValue.getRight(), valueToReduce.getRight()));
78      return curValue;
79    }
80  
81    @Override
82    public PairWritable<R1, R2> reduceMerge(
83        PairWritable<R1, R2> curValue, PairWritable<R1, R2> valueToReduce) {
84      Preconditions.checkState(
85          curValue.getLeft() ==
86          reduce1.reduceMerge(curValue.getLeft(), valueToReduce.getLeft()));
87      Preconditions.checkState(
88          curValue.getRight() ==
89          reduce2.reduceMerge(curValue.getRight(), valueToReduce.getRight()));
90      return curValue;
91    }
92  
93    @Override
94    public void write(DataOutput out) throws IOException {
95      WritableUtils.writeWritableObject(reduce1, out);
96      WritableUtils.writeWritableObject(reduce2, out);
97    }
98  
99    @Override
100   public void readFields(DataInput in) throws IOException {
101     reduce1 = WritableUtils.readWritableObject(in, null);
102     reduce2 = WritableUtils.readWritableObject(in, null);
103   }
104 }