This project has retired. For details please refer to its Attic page.
AggregatorBroadcast xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.master;
19  
20  import java.io.DataInput;
21  import java.io.DataOutput;
22  import java.io.IOException;
23  
24  import org.apache.giraph.aggregators.Aggregator;
25  import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
26  import org.apache.giraph.utils.ReflectionUtils;
27  import org.apache.giraph.utils.WritableUtils;
28  import org.apache.hadoop.io.Writable;
29  
30  /**
31   * Writable representation of aggregated value
32   *
33   * @param <A> Aggregation object type
34   */
35  public class AggregatorBroadcast<A extends Writable>
36    extends DefaultImmutableClassesGiraphConfigurable
37    implements Writable {
38    /** Aggregator class */
39    private Class<? extends Aggregator<A>> aggregatorClass;
40    /** Aggregated value */
41    private A value;
42  
43    /** Constructor */
44    public AggregatorBroadcast() {
45    }
46  
47    /**
48     * Constructor
49     * @param aggregatorClass Aggregator class
50     * @param value Aggregated value
51     */
52    public AggregatorBroadcast(
53        Class<? extends Aggregator<A>> aggregatorClass, A value) {
54      this.aggregatorClass = aggregatorClass;
55      this.value = value;
56    }
57  
58    public A getValue() {
59      return value;
60    }
61  
62    @Override
63    public void write(DataOutput out) throws IOException {
64      WritableUtils.writeClass(aggregatorClass, out);
65      value.write(out);
66    }
67  
68    @Override
69    public void readFields(DataInput in) throws IOException {
70      aggregatorClass = WritableUtils.readClass(in);
71      value = ReflectionUtils.newInstance(aggregatorClass, getConf())
72          .createInitialValue();
73      value.readFields(in);
74    }
75  }