This project has retired. For details please refer to its Attic page.
ObjectMessageClasses xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.framework.piece.messages;
19  
20  import org.apache.giraph.combiner.MessageCombiner;
21  import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
22  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23  import org.apache.giraph.conf.MessageClasses;
24  import org.apache.giraph.factories.MessageValueFactory;
25  import org.apache.giraph.utils.ReflectionUtils;
26  import org.apache.giraph.writable.kryo.KryoWritable;
27  import org.apache.hadoop.io.Writable;
28  import org.apache.hadoop.io.WritableComparable;
29  
30  import com.google.common.base.Preconditions;
31  
32  /**
33   * MessageClasses implementation that provides factory and combiner instances
34   * through a provided supplier.
35   *
36   * @param <I> Vertex id type
37   * @param <M> Message type
38   */
39  public class ObjectMessageClasses<I extends WritableComparable,
40      M extends Writable> extends KryoWritable implements MessageClasses<I, M> {
41    private final Class<M> messageClass;
42    private final SupplierFromConf<MessageValueFactory<M>>
43    messageValueFactorySupplier;
44    private final SupplierFromConf<? extends MessageCombiner<? super I, M>>
45    messageCombinerSupplier;
46    private final MessageEncodeAndStoreType messageEncodeAndStoreType;
47    private final boolean ignoreExistingVertices;
48  
49    public ObjectMessageClasses() {
50      this(null, null, null, null, false);
51    }
52  
53    public ObjectMessageClasses(Class<M> messageClass,
54        SupplierFromConf<MessageValueFactory<M>> messageValueFactorySupplier,
55        SupplierFromConf<? extends MessageCombiner<? super I, M>>
56          messageCombinerSupplier,
57        MessageEncodeAndStoreType messageEncodeAndStoreType,
58        boolean ignoreExistingVertices) {
59      this.messageClass = messageClass;
60      this.messageValueFactorySupplier = messageValueFactorySupplier;
61      this.messageCombinerSupplier = messageCombinerSupplier;
62      this.messageEncodeAndStoreType = messageEncodeAndStoreType;
63      this.ignoreExistingVertices = ignoreExistingVertices;
64    }
65  
66    @Override
67    public Class<M> getMessageClass() {
68      return messageClass;
69    }
70  
71    @Override
72    public MessageValueFactory<M> createMessageValueFactory(
73        ImmutableClassesGiraphConfiguration conf) {
74      return Preconditions.checkNotNull(messageValueFactorySupplier.apply(conf));
75    }
76  
77    @Override
78    public MessageCombiner<? super I, M> createMessageCombiner(
79        ImmutableClassesGiraphConfiguration<I, ? extends Writable,
80          ? extends Writable> conf) {
81      return messageCombinerSupplier != null ?
82        Preconditions.checkNotNull(messageCombinerSupplier.apply(conf)) : null;
83    }
84  
85    @Override
86    public boolean useMessageCombiner() {
87      return messageCombinerSupplier != null;
88    }
89  
90    @Override
91    public boolean ignoreExistingVertices() {
92      return ignoreExistingVertices;
93    }
94  
95    @Override
96    public MessageEncodeAndStoreType getMessageEncodeAndStoreType() {
97      return messageEncodeAndStoreType;
98    }
99  
100   @Override
101   public MessageClasses<I, M> createCopyForNewSuperstep() {
102     return new ObjectMessageClasses<>(
103         messageClass, messageValueFactorySupplier,
104         messageCombinerSupplier, messageEncodeAndStoreType,
105         ignoreExistingVertices);
106   }
107 
108   @Override
109   public void verifyConsistent(ImmutableClassesGiraphConfiguration conf) {
110     MessageValueFactory<M> messageValueFactory =
111         messageValueFactorySupplier.apply(conf);
112     Preconditions.checkState(
113         messageValueFactory.newInstance().getClass().equals(messageClass));
114 
115     if (messageCombinerSupplier != null) {
116       MessageCombiner<? super I, M> messageCombiner =
117           messageCombinerSupplier.apply(conf);
118       Preconditions.checkState(messageCombiner.createInitialMessage()
119           .getClass().equals(messageClass));
120       Class<?>[] combinerTypes = ReflectionUtils.getTypeArguments(
121           MessageCombiner.class, messageCombiner.getClass());
122       ReflectionUtils.verifyTypes(conf.getVertexIdClass(), combinerTypes[0],
123           "Vertex id", messageCombiner.getClass());
124       ReflectionUtils.verifyTypes(messageClass, combinerTypes[1],
125           "Outgoing message", messageCombiner.getClass());
126     }
127   }
128 }