1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.combiner;
1920import org.apache.giraph.types.ops.DoubleTypeOps;
21import org.apache.giraph.types.ops.FloatTypeOps;
22import org.apache.giraph.types.ops.IntTypeOps;
23import org.apache.giraph.types.ops.LongTypeOps;
24import org.apache.giraph.types.ops.NumericTypeOps;
25import org.apache.hadoop.io.DoubleWritable;
26import org.apache.hadoop.io.FloatWritable;
27import org.apache.hadoop.io.IntWritable;
28import org.apache.hadoop.io.LongWritable;
29import org.apache.hadoop.io.WritableComparable;
3031/**32 * Message combiner which calculates max of all messages.33 *34 * @param <M> Message type35 */36publicclass MaxMessageCombiner<M extends WritableComparable>
37implements MessageCombiner<WritableComparable, M> {
38/** DoubleWritable specialization */39publicstaticfinal MaxMessageCombiner<DoubleWritable> DOUBLE =
40new MaxMessageCombiner<>(DoubleTypeOps.INSTANCE);
41/** DoubleWritable specialization */42publicstaticfinal MaxMessageCombiner<FloatWritable> FLOAT =
43new MaxMessageCombiner<>(FloatTypeOps.INSTANCE);
44/** LongWritable specialization */45publicstaticfinal MaxMessageCombiner<LongWritable> LONG =
46new MaxMessageCombiner<>(LongTypeOps.INSTANCE);
47/** IntWritable specialization */48publicstaticfinal MaxMessageCombiner<IntWritable> INT =
49new MaxMessageCombiner<>(IntTypeOps.INSTANCE);
5051/** Value type operations */52privatefinal NumericTypeOps<M> typeOps;
5354/**55 * Constructor56 * @param typeOps Value type operations57 */58publicMaxMessageCombiner(NumericTypeOps<M> typeOps) {
59this.typeOps = typeOps;
60 }
6162 @Override
63publicvoid combine(
64 WritableComparable vertexIndex, M originalMessage, M messageToCombine) {
65if (originalMessage.compareTo(messageToCombine) < 0) {
66 typeOps.set(originalMessage, messageToCombine);
67 }
68 }
6970 @Override
71public M createInitialMessage() {
72return typeOps.createZero();
73 }
74 }