View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.master;
20  
21  import static org.apache.giraph.conf.GiraphConstants.COMPUTATION_LANGUAGE;
22  
23  import java.io.DataInput;
24  import java.io.DataOutput;
25  import java.io.IOException;
26  
27  import org.apache.giraph.combiner.MessageCombiner;
28  import org.apache.giraph.conf.DefaultMessageClasses;
29  import org.apache.giraph.conf.GiraphClasses;
30  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
31  import org.apache.giraph.conf.MessageClasses;
32  import org.apache.giraph.conf.TypesHolder;
33  import org.apache.giraph.graph.Computation;
34  import org.apache.giraph.graph.Language;
35  import org.apache.giraph.utils.ReflectionUtils;
36  import org.apache.giraph.utils.WritableUtils;
37  import org.apache.hadoop.io.Writable;
38  import org.apache.hadoop.io.WritableComparable;
39  import org.apache.log4j.Logger;
40  import org.python.google.common.base.Preconditions;
41  
42  /**
43   * Holds Computation and MessageCombiner class.
44   */
45  public class SuperstepClasses implements Writable {
46    /** Class logger */
47    private static final Logger LOG = Logger.getLogger(SuperstepClasses.class);
48    /** Configuration */
49    private final ImmutableClassesGiraphConfiguration conf;
50  
51    /** Computation class to be used in the following superstep */
52    private Class<? extends Computation> computationClass;
53    /** Incoming message classes, immutable, only here for cheecking */
54    private MessageClasses<? extends WritableComparable, ? extends Writable>
55    incomingMessageClasses;
56    /** Outgoing message classes */
57    private MessageClasses<? extends WritableComparable, ? extends Writable>
58    outgoingMessageClasses;
59  
60    /**
61     * Constructor
62     * @param conf Configuration
63     * @param computationClass computation class
64     * @param incomingMessageClasses incoming message classes
65     * @param outgoingMessageClasses outgoing message classes
66     */
67    public SuperstepClasses(
68        ImmutableClassesGiraphConfiguration conf,
69        Class<? extends Computation> computationClass,
70        MessageClasses<? extends WritableComparable, ? extends Writable>
71          incomingMessageClasses,
72        MessageClasses<? extends WritableComparable, ? extends Writable>
73          outgoingMessageClasses) {
74      this.conf = conf;
75      this.computationClass = computationClass;
76      this.incomingMessageClasses = incomingMessageClasses;
77      this.outgoingMessageClasses = outgoingMessageClasses;
78    }
79  
80    /**
81     * Create empty superstep classes, readFields needs to be called afterwards
82     * @param conf Configuration
83     * @return Superstep classes
84     */
85    public static SuperstepClasses createToRead(
86        ImmutableClassesGiraphConfiguration conf) {
87      return new SuperstepClasses(conf, null, null, null);
88    }
89  
90    /**
91     * Create superstep classes by initiazling from current state
92     * in configuration
93     * @param conf Configuration
94     * @return Superstep classes
95     */
96    public static SuperstepClasses createAndExtractTypes(
97        ImmutableClassesGiraphConfiguration conf) {
98      return new SuperstepClasses(
99          conf,
100         conf.getComputationClass(),
101         conf.getOutgoingMessageClasses(),
102         conf.getOutgoingMessageClasses().createCopyForNewSuperstep());
103   }
104 
105   public Class<? extends Computation> getComputationClass() {
106     return computationClass;
107   }
108 
109   public MessageClasses<? extends WritableComparable, ? extends Writable>
110   getOutgoingMessageClasses() {
111     return outgoingMessageClasses;
112   }
113 
114   /**
115    * Set's outgoing MessageClasses for next superstep.
116    * Should not be used together with
117    * setMessageCombinerClass/setOutgoingMessageClass methods.
118    *
119    * @param outgoingMessageClasses outgoing message classes
120    */
121   public void setOutgoingMessageClasses(
122       MessageClasses<? extends WritableComparable, ? extends Writable>
123         outgoingMessageClasses) {
124     this.outgoingMessageClasses = outgoingMessageClasses;
125   }
126 
127   /**
128    * Set computation class
129    * @param computationClass computation class
130    */
131   public void setComputationClass(
132       Class<? extends Computation> computationClass) {
133     this.computationClass = computationClass;
134 
135     if (computationClass != null) {
136       Class[] computationTypes = ReflectionUtils.getTypeArguments(
137           TypesHolder.class, computationClass);
138       if (computationTypes[4] != null &&
139           outgoingMessageClasses instanceof DefaultMessageClasses) {
140         ((DefaultMessageClasses) outgoingMessageClasses)
141           .setIfNotModifiedMessageClass(computationTypes[4]);
142       }
143     }
144   }
145 
146   /**
147    * Set message combiner class.
148    * Should not be used together setOutgoingMessageClasses
149    * (throws exception if called with it),
150    * as it is unnecessary to do so.
151    *
152    * @param messageCombinerClass message combiner class
153    */
154   public void setMessageCombinerClass(
155       Class<? extends MessageCombiner> messageCombinerClass) {
156     Preconditions.checkState(
157         outgoingMessageClasses instanceof DefaultMessageClasses);
158     ((DefaultMessageClasses) outgoingMessageClasses).
159         setMessageCombinerClass(messageCombinerClass);
160   }
161 
162   /**
163    * Set incoming message class
164    * @param incomingMessageClass incoming message class
165    */
166   @Deprecated
167   public void setIncomingMessageClass(
168       Class<? extends Writable> incomingMessageClass) {
169     if (!incomingMessageClasses.getMessageClass().
170         equals(incomingMessageClass)) {
171       throw new IllegalArgumentException(
172           "Cannot change incoming message class from " +
173           incomingMessageClasses.getMessageClass() +
174           " previously, to " + incomingMessageClass);
175     }
176   }
177 
178   /**
179    * Set outgoing message class.
180    * Should not be used together setOutgoingMessageClasses
181    * (throws exception if called with it),
182    * as it is unnecessary to do so.
183    *
184    * @param outgoingMessageClass outgoing message class
185    */
186   public void setOutgoingMessageClass(
187       Class<? extends Writable> outgoingMessageClass) {
188     Preconditions.checkState(
189         outgoingMessageClasses instanceof DefaultMessageClasses);
190     ((DefaultMessageClasses) outgoingMessageClasses).
191         setMessageClass(outgoingMessageClass);
192   }
193 
194   /**
195    * Get message combiner class
196    * @return message combiner class
197    */
198   public Class<? extends MessageCombiner> getMessageCombinerClass() {
199     MessageCombiner combiner =
200         outgoingMessageClasses.createMessageCombiner(conf);
201     return combiner != null ? combiner.getClass() : null;
202   }
203 
204   /**
205    * Verify that types of current Computation and MessageCombiner are valid.
206    * If types don't match an {@link IllegalStateException} will be thrown.
207    *
208    * @param checkMatchingMesssageTypes Check that the incoming/outgoing
209    *                                   message types match
210    */
211   public void verifyTypesMatch(boolean checkMatchingMesssageTypes) {
212     // In some cases, for example when using Jython, the Computation class may
213     // not be set. This is because it is created by a ComputationFactory
214     // dynamically and not known ahead of time. In this case there is nothing to
215     // verify here so we bail.
216     if (COMPUTATION_LANGUAGE.get(conf) == Language.JYTHON) {
217       return;
218     }
219 
220     Class<?>[] computationTypes = ReflectionUtils.getTypeArguments(
221         TypesHolder.class, computationClass);
222     ReflectionUtils.verifyTypes(conf.getVertexIdClass(), computationTypes[0],
223         "Vertex id", computationClass);
224     ReflectionUtils.verifyTypes(conf.getVertexValueClass(), computationTypes[1],
225         "Vertex value", computationClass);
226     ReflectionUtils.verifyTypes(conf.getEdgeValueClass(), computationTypes[2],
227         "Edge value", computationClass);
228 
229     if (checkMatchingMesssageTypes) {
230       ReflectionUtils.verifyTypes(incomingMessageClasses.getMessageClass(),
231           computationTypes[3], "Incoming message type", computationClass);
232     }
233 
234     ReflectionUtils.verifyTypes(outgoingMessageClasses.getMessageClass(),
235         computationTypes[4], "Outgoing message type", computationClass);
236 
237     outgoingMessageClasses.verifyConsistent(conf);
238   }
239 
240   /**
241    * Update GiraphClasses with updated types
242    * @param classes Giraph classes
243    */
244   public void updateGiraphClasses(GiraphClasses classes) {
245     classes.setComputationClass(computationClass);
246     classes.setIncomingMessageClasses(incomingMessageClasses);
247     classes.setOutgoingMessageClasses(outgoingMessageClasses);
248   }
249 
250   @Override
251   public void write(DataOutput output) throws IOException {
252     WritableUtils.writeClass(computationClass, output);
253     WritableUtils.writeWritableObject(incomingMessageClasses, output);
254     WritableUtils.writeWritableObject(outgoingMessageClasses, output);
255   }
256 
257   @Override
258   public void readFields(DataInput input) throws IOException {
259     computationClass = WritableUtils.readClass(input);
260     incomingMessageClasses = WritableUtils.readWritableObject(input, conf);
261     outgoingMessageClasses = WritableUtils.readWritableObject(input, conf);
262   }
263 
264   @Override
265   public String toString() {
266     String computationName = computationClass == null ? "_not_set_" :
267         computationClass.getName();
268     return "(computation=" + computationName +
269         ",incoming=" + incomingMessageClasses +
270         ",outgoing=" + outgoingMessageClasses + ")";
271   }
272 
273 }