This project has retired. For details please refer to its Attic page.
Reducer xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.reducers;
19  
20  import java.io.DataInput;
21  import java.io.DataOutput;
22  import java.io.IOException;
23  
24  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
25  import org.apache.giraph.utils.WritableUtils;
26  import org.apache.hadoop.io.Writable;
27  
28  /**
29   * Object responsible for performing reducing operation.
30   * Simple wrapper of ReduceOperation object and current value holding
31   * partially reduced result.
32   *
33   * @param <S> Single value type, objects passed on workers
34   * @param <R> Reduced value type
35   */
36  public class Reducer<S, R extends Writable> {
37    /** Reduce operations */
38    private ReduceOperation<S, R> reduceOp;
39    /** Current (partially) reduced value*/
40    private R currentValue;
41  
42    /**
43     * Constructor
44     */
45    public Reducer() {
46    }
47    /**
48     * Constructor
49     * @param reduceOp Reduce operations
50     */
51    public Reducer(ReduceOperation<S, R> reduceOp) {
52      this.reduceOp = reduceOp;
53      this.currentValue = createInitialValue();
54    }
55    /**
56     * Constructor
57     * @param reduceOp Reduce operations
58     * @param currentValue current reduced value
59     */
60    public Reducer(ReduceOperation<S, R> reduceOp, R currentValue) {
61      this.reduceOp = reduceOp;
62      this.currentValue = currentValue;
63    }
64  
65    /**
66     * Reduce given value into current reduced value.
67     * @param valueToReduce Single value to reduce
68     */
69    public void reduce(S valueToReduce) {
70      currentValue = reduceOp.reduce(currentValue, valueToReduce);
71    }
72    /**
73     * Reduce given partially reduced value into current reduced value.
74     * @param valueToReduce Partial value to reduce
75     */
76    public void reduceMerge(R valueToReduce) {
77      currentValue = reduceOp.reduceMerge(currentValue, valueToReduce);
78    }
79    /**
80     * Return new initial reduced value.
81     * @return New initial reduced value
82     */
83    public R createInitialValue() {
84      R value = reduceOp.createInitialValue();
85      if (value == null) {
86        throw new IllegalStateException(
87            "Initial value for reducer cannot be null, but is for " + reduceOp);
88      }
89      return value;
90    }
91  
92    public ReduceOperation<S, R> getReduceOp() {
93      return reduceOp;
94    }
95  
96    public R getCurrentValue() {
97      return currentValue;
98    }
99  
100   public void setCurrentValue(R currentValue) {
101     this.currentValue = currentValue;
102   }
103 
104   /**
105    * Serialize the fields of this object to <code>out</code>.
106    *
107    * @param out <code>DataOuput</code> to serialize this object into.
108    * @throws IOException
109    */
110   public void write(DataOutput out) throws IOException {
111     WritableUtils.writeWritableObject(reduceOp, out);
112     currentValue.write(out);
113   }
114 
115   /**
116    * Deserialize the fields of this object from <code>in</code>.
117    *
118    * <p>For efficiency, implementations should attempt to re-use storage in the
119    * existing object where possible.</p>
120    *
121    * @param in <code>DataInput</code> to deseriablize this object from.
122    * @param conf Configuration
123    * @throws IOException
124    */
125   public void readFields(DataInput in,
126       ImmutableClassesGiraphConfiguration conf) throws IOException {
127     reduceOp = WritableUtils.readWritableObject(in, conf);
128     currentValue = createInitialValue();
129     currentValue.readFields(in);
130   }
131 }