View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.conf;
19  
20  import java.io.DataInput;
21  import java.io.DataOutput;
22  import java.io.IOException;
23  
24  import org.apache.giraph.combiner.MessageCombiner;
25  import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
26  import org.apache.giraph.factories.DefaultMessageValueFactory;
27  import org.apache.giraph.factories.MessageValueFactory;
28  import org.apache.giraph.utils.ReflectionUtils;
29  import org.apache.giraph.utils.WritableUtils;
30  import org.apache.hadoop.io.Writable;
31  import org.apache.hadoop.io.WritableComparable;
32  import org.python.google.common.base.Preconditions;
33  
34  /**
35   * Default implementation of MessageClasses
36   *
37   * @param <I> Vertex id type
38   * @param <M> Message type
39   */
40  public class DefaultMessageClasses
41      <I extends WritableComparable, M extends Writable>
42      implements MessageClasses<I, M> {
43    /** message class */
44    private Class<M> messageClass;
45    /** message value factory class */
46    private Class<? extends MessageValueFactory<M>>
47    messageValueFactoryClass;
48    /** message combiner class */
49    private Class<? extends MessageCombiner<? super I, M>> messageCombinerClass;
50    /** whether message class was manually modified in this superstep */
51    private boolean messageClassModified;
52    /** message encode and store type */
53    private MessageEncodeAndStoreType messageEncodeAndStoreType;
54  
55    /** Constructor */
56    public DefaultMessageClasses() {
57    }
58  
59    /**
60     * Constructor
61     * @param messageClass message class
62     * @param messageValueFactoryClass message value factory class
63     * @param messageCombinerClass message combiner class
64     * @param messageEncodeAndStoreType message encode and store type
65     */
66    public DefaultMessageClasses(
67        Class<M> messageClass,
68        Class<? extends MessageValueFactory<M>> messageValueFactoryClass,
69        Class<? extends MessageCombiner<? super I, M>> messageCombinerClass,
70          MessageEncodeAndStoreType messageEncodeAndStoreType) {
71      this.messageClass = messageClass;
72      this.messageValueFactoryClass = messageValueFactoryClass;
73      this.messageCombinerClass = messageCombinerClass;
74      this.messageClassModified = false;
75      this.messageEncodeAndStoreType = messageEncodeAndStoreType;
76    }
77  
78    @Override
79    public Class<M> getMessageClass() {
80      return messageClass;
81    }
82  
83    @Override
84    public MessageValueFactory<M> createMessageValueFactory(
85        ImmutableClassesGiraphConfiguration conf) {
86      if (messageValueFactoryClass.equals(DefaultMessageValueFactory.class)) {
87        return new DefaultMessageValueFactory<>(messageClass, conf);
88      }
89  
90      MessageValueFactory factory = ReflectionUtils.newInstance(
91          messageValueFactoryClass, conf);
92      if (!factory.newInstance().getClass().equals(messageClass)) {
93        throw new IllegalStateException("Message factory " +
94          messageValueFactoryClass + " creates " +
95          factory.newInstance().getClass().getName() + ", but message type is " +
96          messageClass.getName());
97      }
98      return factory;
99    }
100 
101   @Override
102   public MessageCombiner<? super I, M> createMessageCombiner(
103       ImmutableClassesGiraphConfiguration conf) {
104     if (messageCombinerClass == null) {
105       return null;
106     }
107 
108     MessageCombiner combiner =
109         ReflectionUtils.newInstance(messageCombinerClass, conf);
110     if (combiner != null) {
111       Preconditions.checkState(
112           combiner.createInitialMessage().getClass().equals(messageClass));
113     }
114     return combiner;
115   }
116 
117   @Override
118   public boolean useMessageCombiner() {
119     return messageCombinerClass != null;
120   }
121 
122   @Override
123   public MessageEncodeAndStoreType getMessageEncodeAndStoreType() {
124     return messageEncodeAndStoreType;
125   }
126 
127   @Override
128   public MessageClasses<I, M> createCopyForNewSuperstep() {
129     return new DefaultMessageClasses<>(messageClass, messageValueFactoryClass,
130         messageCombinerClass, messageEncodeAndStoreType);
131   }
132 
133   @Override
134   public void verifyConsistent(
135       ImmutableClassesGiraphConfiguration conf) {
136     Class<?>[] factoryTypes = ReflectionUtils.getTypeArguments(
137         MessageValueFactory.class, messageValueFactoryClass);
138     ReflectionUtils.verifyTypes(messageClass, factoryTypes[0],
139         "Message factory", messageValueFactoryClass);
140 
141     if (messageCombinerClass != null) {
142       Class<?>[] combinerTypes = ReflectionUtils.getTypeArguments(
143           MessageCombiner.class, messageCombinerClass);
144       ReflectionUtils.verifyTypes(conf.getVertexIdClass(), combinerTypes[0],
145           "Vertex id", messageCombinerClass);
146       ReflectionUtils.verifyTypes(messageClass, combinerTypes[1],
147           "Outgoing message", messageCombinerClass);
148     }
149   }
150 
151   /**
152    * Set message class
153    * @param messageClass message classs
154    */
155   public void setMessageClass(Class<M> messageClass) {
156     this.messageClassModified = true;
157     this.messageClass = messageClass;
158   }
159 
160   /**
161    * Set message class if not set already in this superstep
162    * @param messageClass message class
163    */
164   public void setIfNotModifiedMessageClass(Class<M> messageClass) {
165     if (!messageClassModified) {
166       this.messageClass = messageClass;
167     }
168   }
169 
170   public void setMessageCombinerClass(
171       Class<? extends MessageCombiner<? super I, M>> messageCombinerClass) {
172     this.messageCombinerClass = messageCombinerClass;
173   }
174 
175   public void setMessageValueFactoryClass(
176       Class<? extends MessageValueFactory<M>> messageValueFactoryClass) {
177     this.messageValueFactoryClass = messageValueFactoryClass;
178   }
179 
180   public void setMessageEncodeAndStoreType(
181       MessageEncodeAndStoreType messageEncodeAndStoreType) {
182     this.messageEncodeAndStoreType = messageEncodeAndStoreType;
183   }
184 
185   @Override
186   public void write(DataOutput out) throws IOException {
187     WritableUtils.writeClass(messageClass, out);
188     WritableUtils.writeClass(messageValueFactoryClass, out);
189     WritableUtils.writeClass(messageCombinerClass, out);
190     out.writeBoolean(messageClassModified);
191     out.writeByte(messageEncodeAndStoreType.ordinal());
192   }
193 
194   @Override
195   public void readFields(DataInput in) throws IOException {
196     messageClass = WritableUtils.readClass(in);
197     messageValueFactoryClass = WritableUtils.readClass(in);
198     messageCombinerClass = WritableUtils.readClass(in);
199     messageClassModified = in.readBoolean();
200     messageEncodeAndStoreType =
201         messageEncodeAndStoreType.values()[in.readByte()];
202   }
203 }