1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.reducers.impl;
1920import java.io.DataInput;
21import java.io.DataOutput;
22import java.io.IOException;
2324import org.apache.commons.lang3.tuple.Pair;
25import org.apache.giraph.reducers.ReduceOperation;
26import org.apache.giraph.utils.WritableUtils;
27import org.apache.giraph.writable.tuple.PairWritable;
28import org.apache.hadoop.io.Writable;
29import com.google.common.base.Preconditions;
3031/**32 * Combines two individual reducers, to create a single reducer of pairs that33 * reduces each of them individually.34 *35 * @param <S1> First single value type36 * @param <R1> First reduced value type37 * @param <S2> Second single value type38 * @param <R2> Second reduced value type39 */40publicclass PairReduce<S1, R1 extends Writable, S2, R2 extends Writable>
41implements ReduceOperation<Pair<S1, S2>, PairWritable<R1, R2>> {
42/** First reduceOp */43private ReduceOperation<S1, R1> reduce1;
44/** Second reduceOp */45private ReduceOperation<S2, R2> reduce2;
4647/** Constructor */48publicPairReduce() {
49 }
5051/**52 * Constructor53 * @param reduce1 First reduceOp54 * @param reduce2 Second reduceOp55 */56publicPairReduce(
57 ReduceOperation<S1, R1> reduce1, ReduceOperation<S2, R2> reduce2) {
58this.reduce1 = reduce1;
59this.reduce2 = reduce2;
60 }
616263 @Override
64public PairWritable<R1, R2> createInitialValue() {
65returnnew PairWritable<>(
66 reduce1.createInitialValue(), reduce2.createInitialValue());
67 }
6869 @Override
70public PairWritable<R1, R2> reduce(
71 PairWritable<R1, R2> curValue, Pair<S1, S2> valueToReduce) {
72 Preconditions.checkState(
73 curValue.getLeft() ==
74 reduce1.reduce(curValue.getLeft(), valueToReduce.getLeft()));
75 Preconditions.checkState(
76 curValue.getRight() ==
77 reduce2.reduce(curValue.getRight(), valueToReduce.getRight()));
78return curValue;
79 }
8081 @Override
82public PairWritable<R1, R2> reduceMerge(
83 PairWritable<R1, R2> curValue, PairWritable<R1, R2> valueToReduce) {
84 Preconditions.checkState(
85 curValue.getLeft() ==
86 reduce1.reduceMerge(curValue.getLeft(), valueToReduce.getLeft()));
87 Preconditions.checkState(
88 curValue.getRight() ==
89 reduce2.reduceMerge(curValue.getRight(), valueToReduce.getRight()));
90return curValue;
91 }
9293 @Override
94publicvoid write(DataOutput out) throws IOException {
95 WritableUtils.writeWritableObject(reduce1, out);
96 WritableUtils.writeWritableObject(reduce2, out);
97 }
9899 @Override
100publicvoid readFields(DataInput in) throws IOException {
101 reduce1 = WritableUtils.readWritableObject(in, null);
102 reduce2 = WritableUtils.readWritableObject(in, null);
103 }
104 }