1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.giraph.reducers;
19
20 import java.io.DataInput;
21 import java.io.DataOutput;
22 import java.io.IOException;
23
24 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
25 import org.apache.giraph.utils.WritableUtils;
26 import org.apache.hadoop.io.Writable;
27
28 /**
29 * Object responsible for performing reducing operation.
30 * Simple wrapper of ReduceOperation object and current value holding
31 * partially reduced result.
32 *
33 * @param <S> Single value type, objects passed on workers
34 * @param <R> Reduced value type
35 */
36 public class Reducer<S, R extends Writable> {
37 /** Reduce operations */
38 private ReduceOperation<S, R> reduceOp;
39 /** Current (partially) reduced value*/
40 private R currentValue;
41
42 /**
43 * Constructor
44 */
45 public Reducer() {
46 }
47 /**
48 * Constructor
49 * @param reduceOp Reduce operations
50 */
51 public Reducer(ReduceOperation<S, R> reduceOp) {
52 this.reduceOp = reduceOp;
53 this.currentValue = createInitialValue();
54 }
55 /**
56 * Constructor
57 * @param reduceOp Reduce operations
58 * @param currentValue current reduced value
59 */
60 public Reducer(ReduceOperation<S, R> reduceOp, R currentValue) {
61 this.reduceOp = reduceOp;
62 this.currentValue = currentValue;
63 }
64
65 /**
66 * Reduce given value into current reduced value.
67 * @param valueToReduce Single value to reduce
68 */
69 public void reduce(S valueToReduce) {
70 currentValue = reduceOp.reduce(currentValue, valueToReduce);
71 }
72 /**
73 * Reduce given partially reduced value into current reduced value.
74 * @param valueToReduce Partial value to reduce
75 */
76 public void reduceMerge(R valueToReduce) {
77 currentValue = reduceOp.reduceMerge(currentValue, valueToReduce);
78 }
79 /**
80 * Return new initial reduced value.
81 * @return New initial reduced value
82 */
83 public R createInitialValue() {
84 R value = reduceOp.createInitialValue();
85 if (value == null) {
86 throw new IllegalStateException(
87 "Initial value for reducer cannot be null, but is for " + reduceOp);
88 }
89 return value;
90 }
91
92 public ReduceOperation<S, R> getReduceOp() {
93 return reduceOp;
94 }
95
96 public R getCurrentValue() {
97 return currentValue;
98 }
99
100 public void setCurrentValue(R currentValue) {
101 this.currentValue = currentValue;
102 }
103
104 /**
105 * Serialize the fields of this object to <code>out</code>.
106 *
107 * @param out <code>DataOuput</code> to serialize this object into.
108 * @throws IOException
109 */
110 public void write(DataOutput out) throws IOException {
111 WritableUtils.writeWritableObject(reduceOp, out);
112 currentValue.write(out);
113 }
114
115 /**
116 * Deserialize the fields of this object from <code>in</code>.
117 *
118 * <p>For efficiency, implementations should attempt to re-use storage in the
119 * existing object where possible.</p>
120 *
121 * @param in <code>DataInput</code> to deseriablize this object from.
122 * @param conf Configuration
123 * @throws IOException
124 */
125 public void readFields(DataInput in,
126 ImmutableClassesGiraphConfiguration conf) throws IOException {
127 reduceOp = WritableUtils.readWritableObject(in, conf);
128 currentValue = createInitialValue();
129 currentValue.readFields(in);
130 }
131 }