View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.framework.piece.messages;
19  
20  import org.apache.giraph.combiner.MessageCombiner;
21  import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
22  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23  import org.apache.giraph.conf.MessageClasses;
24  import org.apache.giraph.factories.MessageValueFactory;
25  import org.apache.giraph.utils.ReflectionUtils;
26  import org.apache.giraph.writable.kryo.KryoWritable;
27  import org.apache.hadoop.io.Writable;
28  import org.apache.hadoop.io.WritableComparable;
29  
30  import com.google.common.base.Preconditions;
31  
32  /**
33   * MessageClasses implementation that provides factory and combiner instances
34   * through a provided supplier.
35   *
36   * @param <I> Vertex id type
37   * @param <M> Message type
38   */
39  public class ObjectMessageClasses<I extends WritableComparable,
40      M extends Writable> extends KryoWritable implements MessageClasses<I, M> {
41    private final Class<M> messageClass;
42    private final SupplierFromConf<MessageValueFactory<M>>
43    messageValueFactorySupplier;
44    private final SupplierFromConf<? extends MessageCombiner<? super I, M>>
45    messageCombinerSupplier;
46    private final MessageEncodeAndStoreType messageEncodeAndStoreType;
47  
48    public ObjectMessageClasses() {
49      this(null, null, null, null);
50    }
51  
52    public ObjectMessageClasses(Class<M> messageClass,
53        SupplierFromConf<MessageValueFactory<M>> messageValueFactorySupplier,
54        SupplierFromConf<? extends MessageCombiner<? super I, M>>
55          messageCombinerSupplier,
56        MessageEncodeAndStoreType messageEncodeAndStoreType) {
57      this.messageClass = messageClass;
58      this.messageValueFactorySupplier = messageValueFactorySupplier;
59      this.messageCombinerSupplier = messageCombinerSupplier;
60      this.messageEncodeAndStoreType = messageEncodeAndStoreType;
61    }
62  
63    @Override
64    public Class<M> getMessageClass() {
65      return messageClass;
66    }
67  
68    @Override
69    public MessageValueFactory<M> createMessageValueFactory(
70        ImmutableClassesGiraphConfiguration conf) {
71      return Preconditions.checkNotNull(messageValueFactorySupplier.apply(conf));
72    }
73  
74    @Override
75    public MessageCombiner<? super I, M> createMessageCombiner(
76        ImmutableClassesGiraphConfiguration<I, ? extends Writable,
77          ? extends Writable> conf) {
78      return messageCombinerSupplier != null ?
79        Preconditions.checkNotNull(messageCombinerSupplier.apply(conf)) : null;
80    }
81  
82    @Override
83    public boolean useMessageCombiner() {
84      return messageCombinerSupplier != null;
85    }
86  
87    @Override
88    public MessageEncodeAndStoreType getMessageEncodeAndStoreType() {
89      return messageEncodeAndStoreType;
90    }
91  
92    @Override
93    public MessageClasses<I, M> createCopyForNewSuperstep() {
94      return new ObjectMessageClasses<>(
95          messageClass, messageValueFactorySupplier,
96          messageCombinerSupplier, messageEncodeAndStoreType);
97    }
98  
99    @Override
100   public void verifyConsistent(ImmutableClassesGiraphConfiguration conf) {
101     MessageValueFactory<M> messageValueFactory =
102         messageValueFactorySupplier.apply(conf);
103     Preconditions.checkState(
104         messageValueFactory.newInstance().getClass().equals(messageClass));
105 
106     if (messageCombinerSupplier != null) {
107       MessageCombiner<? super I, M> messageCombiner =
108           messageCombinerSupplier.apply(conf);
109       Preconditions.checkState(messageCombiner.createInitialMessage()
110           .getClass().equals(messageClass));
111       Class<?>[] combinerTypes = ReflectionUtils.getTypeArguments(
112           MessageCombiner.class, messageCombiner.getClass());
113       ReflectionUtils.verifyTypes(conf.getVertexIdClass(), combinerTypes[0],
114           "Vertex id", messageCombiner.getClass());
115       ReflectionUtils.verifyTypes(messageClass, combinerTypes[1],
116           "Outgoing message", messageCombiner.getClass());
117     }
118   }
119 }