This project has retired. For details please refer to its Attic page.
GiraphConfigurationValidator xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.job;
20  
21  import static org.apache.giraph.conf.GiraphConstants.VERTEX_EDGES_CLASS;
22  import static org.apache.giraph.conf.GiraphConstants.VERTEX_RESOLVER_CLASS;
23  import static org.apache.giraph.utils.ReflectionUtils.getTypeArguments;
24  
25  import org.apache.giraph.combiner.MessageCombiner;
26  import org.apache.giraph.conf.GiraphConstants;
27  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
28  import org.apache.giraph.edge.OutEdges;
29  import org.apache.giraph.factories.DefaultVertexValueFactory;
30  import org.apache.giraph.factories.VertexValueFactory;
31  import org.apache.giraph.graph.DefaultVertexResolver;
32  import org.apache.giraph.graph.VertexResolver;
33  import org.apache.giraph.graph.VertexValueCombiner;
34  import org.apache.giraph.io.EdgeInputFormat;
35  import org.apache.giraph.io.EdgeOutputFormat;
36  import org.apache.giraph.io.VertexInputFormat;
37  import org.apache.giraph.io.VertexOutputFormat;
38  import org.apache.hadoop.conf.Configuration;
39  import org.apache.hadoop.io.Writable;
40  import org.apache.hadoop.io.WritableComparable;
41  import org.apache.log4j.Logger;
42  
43  /**
44   * GiraphConfigurationValidator attempts to verify the consistency of
45   * user-chosen InputFormat, OutputFormat, and Vertex generic type
46   * parameters as well as the general Configuration settings
47   * before the job run actually begins.
48   *
49   * @param <I> the Vertex ID type
50   * @param <V> the Vertex Value type
51   * @param <E> the Edge Value type
52   * @param <M1> the incoming Message type
53   * @param <M2> the outgoing Message type
54   */
55  public class GiraphConfigurationValidator<I extends WritableComparable,
56      V extends Writable, E extends Writable, M1 extends Writable,
57      M2 extends Writable> {
58    /**
59     * Class logger object.
60     */
61    private static Logger LOG =
62      Logger.getLogger(GiraphConfigurationValidator.class);
63  
64    /** I param vertex index in classList */
65    private static final int ID_PARAM_INDEX = 0;
66    /** V param vertex index in classList */
67    private static final int VALUE_PARAM_INDEX = 1;
68    /** E param vertex index in classList */
69    private static final int EDGE_PARAM_INDEX = 2;
70    /** M param vertex combiner index in classList */
71    private static final int MSG_COMBINER_PARAM_INDEX = 1;
72    /** E param edge input format index in classList */
73    private static final int EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX = 1;
74    /** E param vertex edges index in classList */
75    private static final int EDGE_PARAM_OUT_EDGES_INDEX = 1;
76    /** V param vertex value factory index in classList */
77    private static final int VALUE_PARAM_VERTEX_VALUE_FACTORY_INDEX = 0;
78    /** V param vertex value combiner index in classList */
79    private static final int VALUE_PARAM_VERTEX_VALUE_COMBINER_INDEX = 0;
80  
81    /**
82     * The Configuration object for use in the validation test.
83     */
84    private final ImmutableClassesGiraphConfiguration conf;
85  
86    /**
87     * Constructor to execute the validation test, throws
88     * unchecked exception to end job run on failure.
89     *
90     * @param conf the Configuration for this run.
91     */
92    public GiraphConfigurationValidator(Configuration conf) {
93      this.conf = new ImmutableClassesGiraphConfiguration(conf);
94    }
95  
96    /**
97     * Get vertex id type
98     *
99     * @return vertex id type
100    */
101   private Class<? extends WritableComparable> vertexIndexType() {
102     return conf.getGiraphTypes().getVertexIdClass();
103   }
104 
105   /**
106    * Get vertex value type
107    *
108    * @return vertex value type
109    */
110   private Class<? extends Writable> vertexValueType() {
111     return conf.getGiraphTypes().getVertexValueClass();
112   }
113 
114   /**
115    * Get edge value type
116    *
117    * @return edge value type
118    */
119   private Class<? extends Writable> edgeValueType() {
120     return conf.getGiraphTypes().getEdgeValueClass();
121   }
122 
123   /**
124    * Get outgoing message value type
125    *
126    * @return outgoing message value type
127    */
128   private Class<? extends Writable> outgoingMessageValueType() {
129     return conf.getOutgoingMessageValueClass();
130   }
131 
132   /**
133    * Make sure that all registered classes have matching types.  This
134    * is a little tricky due to type erasure, cannot simply get them from
135    * the class type arguments.  Also, set the vertex index, vertex value,
136    * edge value and message value classes.
137    */
138   public void validateConfiguration() {
139     checkConfiguration();
140     verifyOutEdgesGenericTypes();
141     verifyVertexInputFormatGenericTypes();
142     verifyEdgeInputFormatGenericTypes();
143     verifyVertexOutputFormatGenericTypes();
144     verifyEdgeOutputFormatGenericTypes();
145     verifyVertexResolverGenericTypes();
146     verifyVertexValueCombinerGenericTypes();
147     verifyMessageCombinerGenericTypes();
148     verifyVertexValueFactoryGenericTypes();
149   }
150 
151   /**
152    * Make sure the configuration is set properly by the user prior to
153    * submitting the job.
154    */
155   private void checkConfiguration() {
156     if (conf.getMaxWorkers() < 0) {
157       throw new RuntimeException("checkConfiguration: No valid " +
158           GiraphConstants.MAX_WORKERS);
159     }
160     if (conf.getMinPercentResponded() <= 0.0f ||
161         conf.getMinPercentResponded() > 100.0f) {
162       throw new IllegalArgumentException(
163           "checkConfiguration: Invalid " + conf.getMinPercentResponded() +
164               " for " + GiraphConstants.MIN_PERCENT_RESPONDED.getKey());
165     }
166     if (conf.getMinWorkers() < 0) {
167       throw new IllegalArgumentException("checkConfiguration: No valid " +
168           GiraphConstants.MIN_WORKERS);
169     }
170     conf.createComputationFactory().checkConfiguration(conf);
171     if (conf.getVertexInputFormatClass() == null &&
172         conf.getEdgeInputFormatClass() == null) {
173       throw new IllegalArgumentException("checkConfiguration: One of " +
174           GiraphConstants.VERTEX_INPUT_FORMAT_CLASS.getKey() + " and " +
175           GiraphConstants.EDGE_INPUT_FORMAT_CLASS.getKey() +
176           " must be non-null");
177     }
178     if (conf.getVertexResolverClass() == null) {
179       if (LOG.isInfoEnabled()) {
180         LOG.info("checkConfiguration: No class found for " +
181             VERTEX_RESOLVER_CLASS.getKey() +
182             ", defaulting to " +
183             VERTEX_RESOLVER_CLASS.getDefaultClass().getCanonicalName());
184       }
185     }
186     if (conf.getOutEdgesClass() == null) {
187       if (LOG.isInfoEnabled()) {
188         LOG.info("checkConfiguration: No class found for " +
189             VERTEX_EDGES_CLASS.getKey() + ", defaulting to " +
190             VERTEX_EDGES_CLASS.getDefaultClass().getCanonicalName());
191       }
192     }
193   }
194 
195   /**
196    * Verify matching generic types for a specific OutEdges class.
197    *
198    * @param outEdgesClass {@link org.apache.giraph.edge.OutEdges} class to check
199    */
200   private void verifyOutEdgesGenericTypesClass(
201       Class<? extends OutEdges<I, E>> outEdgesClass) {
202     Class<?>[] classList = getTypeArguments(OutEdges.class, outEdgesClass);
203     checkAssignable(classList, ID_PARAM_INDEX, vertexIndexType(),
204         OutEdges.class, "vertex index");
205     checkAssignable(classList, EDGE_PARAM_OUT_EDGES_INDEX, edgeValueType(),
206         OutEdges.class, "edge value");
207   }
208 
209   /** Verify matching generic types in OutEdges. */
210   private void verifyOutEdgesGenericTypes() {
211     Class<? extends OutEdges<I, E>> outEdgesClass =
212         conf.getOutEdgesClass();
213     Class<? extends OutEdges<I, E>> inputOutEdgesClass =
214         conf.getInputOutEdgesClass();
215     verifyOutEdgesGenericTypesClass(outEdgesClass);
216     verifyOutEdgesGenericTypesClass(inputOutEdgesClass);
217   }
218 
219   /** Verify matching generic types in VertexInputFormat. */
220   private void verifyVertexInputFormatGenericTypes() {
221     Class<? extends VertexInputFormat<I, V, E>> vertexInputFormatClass =
222       conf.getVertexInputFormatClass();
223     if (vertexInputFormatClass != null) {
224       Class<?>[] classList =
225           getTypeArguments(VertexInputFormat.class, vertexInputFormatClass);
226       checkAssignable(classList, ID_PARAM_INDEX, vertexIndexType(),
227           VertexInputFormat.class, "vertex index");
228       checkAssignable(classList, VALUE_PARAM_INDEX, vertexValueType(),
229           VertexInputFormat.class, "vertex value");
230       checkAssignable(classList, EDGE_PARAM_INDEX, edgeValueType(),
231           VertexInputFormat.class, "edge value");
232     }
233   }
234 
235   /** Verify matching generic types in EdgeInputFormat. */
236   private void verifyEdgeInputFormatGenericTypes() {
237     Class<? extends EdgeInputFormat<I, E>> edgeInputFormatClass =
238         conf.getEdgeInputFormatClass();
239     if (edgeInputFormatClass != null) {
240       Class<?>[] classList =
241           getTypeArguments(EdgeInputFormat.class, edgeInputFormatClass);
242       checkAssignable(classList, ID_PARAM_INDEX, vertexIndexType(),
243           EdgeInputFormat.class, "vertex index");
244       checkAssignable(classList, EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX,
245           edgeValueType(), EdgeInputFormat.class, "edge value");
246     }
247   }
248 
249   /**
250    * If there is a vertex value combiner type, verify its
251    * generic params match the job.
252    */
253   private void verifyVertexValueCombinerGenericTypes() {
254     Class<? extends VertexValueCombiner<V>> vertexValueCombiner =
255         conf.getVertexValueCombinerClass();
256     if (vertexValueCombiner != null) {
257       Class<?>[] classList =
258           getTypeArguments(VertexValueCombiner.class, vertexValueCombiner);
259       checkAssignable(classList, VALUE_PARAM_VERTEX_VALUE_COMBINER_INDEX,
260           vertexValueType(), VertexValueCombiner.class, "vertex value");
261     }
262   }
263 
264   /**
265    * If there is a message combiner type, verify its
266    * generic params match the job.
267    */
268   private void verifyMessageCombinerGenericTypes() {
269     MessageCombiner<I, M2> messageCombiner =
270       conf.createOutgoingMessageCombiner();
271     if (messageCombiner != null) {
272       Class<?>[] classList =
273           getTypeArguments(MessageCombiner.class, messageCombiner.getClass());
274       checkEquals(classList, ID_PARAM_INDEX, vertexIndexType(),
275           MessageCombiner.class, "vertex index");
276       checkEquals(classList, MSG_COMBINER_PARAM_INDEX,
277           outgoingMessageValueType(), MessageCombiner.class, "message value");
278     }
279   }
280 
281   /** Verify that the vertex output format's generic params match the job. */
282   private void verifyVertexOutputFormatGenericTypes() {
283     Class<? extends EdgeOutputFormat<I, V, E>>
284       edgeOutputFormatClass = conf.getEdgeOutputFormatClass();
285     if (conf.hasEdgeOutputFormat()) {
286       Class<?>[] classList =
287         getTypeArguments(EdgeOutputFormat.class, edgeOutputFormatClass);
288       checkAssignable(classList, ID_PARAM_INDEX, vertexIndexType(),
289           VertexOutputFormat.class, "vertex index");
290       checkAssignable(classList, VALUE_PARAM_INDEX, vertexValueType(),
291           VertexOutputFormat.class, "vertex value");
292       checkAssignable(classList, EDGE_PARAM_INDEX, edgeValueType(),
293           VertexOutputFormat.class, "edge value");
294     }
295   }
296 
297   /** Verify that the edge output format's generic params match the job. */
298   private void verifyEdgeOutputFormatGenericTypes() {
299     Class<? extends VertexOutputFormat<I, V, E>>
300       vertexOutputFormatClass = conf.getVertexOutputFormatClass();
301     if (conf.hasVertexOutputFormat()) {
302       Class<?>[] classList =
303         getTypeArguments(VertexOutputFormat.class, vertexOutputFormatClass);
304       checkAssignable(classList, ID_PARAM_INDEX, vertexIndexType(),
305           VertexOutputFormat.class, "vertex index");
306       checkAssignable(classList, VALUE_PARAM_INDEX, vertexValueType(),
307           VertexOutputFormat.class, "vertex value");
308       checkAssignable(classList, EDGE_PARAM_INDEX, edgeValueType(),
309           VertexOutputFormat.class, "edge value");
310     }
311   }
312 
313   /** Verify that the vertex value factory's type matches the job */
314   private void verifyVertexValueFactoryGenericTypes() {
315     Class<? extends VertexValueFactory<V>>
316         vvfClass = conf.getVertexValueFactoryClass();
317     if (DefaultVertexValueFactory.class.equals(vvfClass)) {
318       return;
319     }
320     Class<?>[] classList = getTypeArguments(VertexValueFactory.class, vvfClass);
321     checkEquals(classList, VALUE_PARAM_VERTEX_VALUE_FACTORY_INDEX,
322         vertexValueType(), VertexValueFactory.class, "vertex value");
323   }
324 
325   /**
326    * If there is a vertex resolver,
327    * validate the generic parameter types.
328    * */
329   private void verifyVertexResolverGenericTypes() {
330     Class<? extends VertexResolver<I, V, E>>
331         vrClass = conf.getVertexResolverClass();
332     if (DefaultVertexResolver.class.equals(vrClass)) {
333       return;
334     }
335     Class<?>[] classList =
336         getTypeArguments(VertexResolver.class, vrClass);
337     checkEquals(classList, ID_PARAM_INDEX, vertexIndexType(),
338         VertexResolver.class, "vertex index");
339     checkEquals(classList, VALUE_PARAM_INDEX, vertexValueType(),
340         VertexResolver.class, "vertex value");
341     checkEquals(classList, EDGE_PARAM_INDEX, edgeValueType(),
342         VertexResolver.class, "edge value");
343   }
344 
345   /**
346    * Check that the type from computation equals the type from the class.
347    *
348    * @param classList classes from type
349    * @param index array index of class to check
350    * @param classFromComputation class from computation
351    * @param klass Class type we're checking, only used for printing name
352    * @param typeName Name of type we're checking
353    */
354   private static void checkEquals(Class<?>[] classList, int index,
355       Class<?> classFromComputation, Class klass, String typeName) {
356     if (classList[index] == null) {
357       LOG.warn(klass.getSimpleName() + " " + typeName + " type is not known");
358     } else if (!classList[index].equals(classFromComputation)) {
359       throw new IllegalStateException(
360           "checkClassTypes: " + typeName + " types not equal, " +
361               "computation - " + classFromComputation +
362               ", " + klass.getSimpleName() + " - " +
363               classList[index]);
364     }
365   }
366 
367   /**
368    * Check that the type from computation is assignable to type from the class.
369    *
370    * @param classList classes from type
371    * @param index array index of class to check
372    * @param classFromComputation class from computation
373    * @param klass Class type we're checking, only used for printing name
374    * @param typeName Name of type we're checking
375    */
376   private static void checkAssignable(Class<?>[] classList, int index,
377       Class<?> classFromComputation, Class klass, String typeName) {
378     if (classList[index] == null) {
379       LOG.warn(klass.getSimpleName() + " " + typeName + " type is not known");
380     } else if (!classList[index].isAssignableFrom(classFromComputation)) {
381       throw new IllegalStateException(
382           "checkClassTypes: " + typeName + " types not assignable, " +
383               "computation - " + classFromComputation +
384               ", " + klass.getSimpleName() + " - " +
385               classList[EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX]);
386     }
387   }
388 }
389