View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.framework;
19  
20  import static org.apache.giraph.utils.ReflectionUtils.getTypeArguments;
21  
22  import java.lang.reflect.Field;
23  import java.lang.reflect.Modifier;
24  
25  import org.apache.giraph.block_app.framework.api.giraph.BlockComputation;
26  import org.apache.giraph.block_app.framework.api.giraph.BlockMasterCompute;
27  import org.apache.giraph.block_app.framework.api.giraph.BlockWorkerContext;
28  import org.apache.giraph.block_app.framework.block.Block;
29  import org.apache.giraph.block_app.framework.piece.AbstractPiece;
30  import org.apache.giraph.block_app.framework.piece.Piece;
31  import org.apache.giraph.conf.BooleanConfOption;
32  import org.apache.giraph.conf.ClassConfOption;
33  import org.apache.giraph.conf.GiraphConfiguration;
34  import org.apache.giraph.conf.GiraphConstants;
35  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
36  import org.apache.giraph.conf.MessageClasses;
37  import org.apache.giraph.function.Consumer;
38  import org.apache.giraph.types.NoMessage;
39  import org.apache.giraph.utils.ReflectionUtils;
40  import org.apache.hadoop.conf.Configuration;
41  import org.apache.log4j.Logger;
42  
43  import com.google.common.base.Preconditions;
44  
45  /**
46   * Utility functions for block applications
47   */
48  @SuppressWarnings({ "rawtypes", "unchecked" })
49  public class BlockUtils {
50    /** Property describing BlockFactory to use for current application run */
51    public static final ClassConfOption<BlockFactory> BLOCK_FACTORY_CLASS =
52        ClassConfOption.create("digraph.block_factory", null, BlockFactory.class,
53            "block factory describing giraph job");
54  
55    /** Property describing block worker context value class to use */
56    public static final ClassConfOption<Object> BLOCK_WORKER_CONTEXT_VALUE_CLASS =
57        ClassConfOption.create(
58            "digraph.block_worker_context_value_class",
59            Object.class, Object.class,
60            "block worker context value class");
61  
62    /** Property describing whether to log execution status as application runs */
63    public static final
64    BooleanConfOption LOG_EXECUTION_STATUS = new BooleanConfOption(
65        "giraph.block_utils.log_execution_status", true,
66        "Log execution status (of which pieces are being executed, etc)");
67  
68    private static final Logger LOG = Logger.getLogger(BlockUtils.class);
69  
70    /** Dissallow constructor */
71    private BlockUtils() { }
72  
73    /**
74     * Create new BlockFactory that is specified in the configuration.
75     */
76    public static <S> BlockFactory<S> createBlockFactory(Configuration conf) {
77      return ReflectionUtils.newInstance(BLOCK_FACTORY_CLASS.get(conf));
78    }
79  
80    /**
81     * Set which BlockFactory class to be used for the application.
82     * (generally useful within tests only)
83     */
84    public static void setBlockFactoryClass(Configuration conf,
85        Class<? extends BlockFactory<?>> clazz) {
86      BLOCK_FACTORY_CLASS.set(conf, clazz);
87    }
88  
89    /**
90     * Set block factory, and initialize configs with it.
91     * Should be used only if there are no configuration options set after
92     * this method call.
93     */
94    public static void setAndInitBlockFactoryClass(GiraphConfiguration conf,
95        Class<? extends BlockFactory<?>> clazz) {
96      BLOCK_FACTORY_CLASS.set(conf, clazz);
97      initAndCheckConfig(conf);
98    }
99  
100   /**
101    * Initializes configuration, such that running it executes block application.
102    *
103    * Additionally, checks types of all pieces with a block application.
104    */
105   public static void initAndCheckConfig(GiraphConfiguration conf) {
106     conf.setMasterComputeClass(BlockMasterCompute.class);
107     conf.setComputationClass(BlockComputation.class);
108     conf.setWorkerContextClass(BlockWorkerContext.class);
109 
110     Preconditions.checkState(
111         GiraphConstants.OUTGOING_MESSAGE_VALUE_CLASS.get(conf) == null,
112         "Message types should only be specified in Pieces, " +
113         "but outgoing was specified globally");
114     Preconditions.checkState(
115         GiraphConstants.OUTGOING_MESSAGE_VALUE_FACTORY_CLASS
116           .isDefaultValue(conf),
117         "Message types should only be specified in Pieces, " +
118         "but factory was specified globally");
119     Preconditions.checkState(
120         GiraphConstants.MESSAGE_COMBINER_CLASS.get(conf) == null,
121         "Message combiner should only be specified in Pieces, " +
122         "but was specified globally");
123 
124     BlockFactory<?> blockFactory = createBlockFactory(conf);
125     blockFactory.initConfig(conf);
126 
127     Preconditions.checkState(
128         GiraphConstants.OUTGOING_MESSAGE_VALUE_CLASS.get(conf) == null,
129         "Outgoing message type was specified in blockFactory.initConfig");
130     Preconditions.checkState(
131         GiraphConstants.OUTGOING_MESSAGE_VALUE_FACTORY_CLASS
132           .isDefaultValue(conf),
133         "Outgoing message factory type was specified in " +
134         "blockFactory.initConfig");
135     Preconditions.checkState(
136         GiraphConstants.MESSAGE_COMBINER_CLASS.get(conf) == null,
137         "Message combiner type was specified in blockFactory.initConfig");
138 
139     GiraphConstants.OUTGOING_MESSAGE_VALUE_CLASS.set(conf, NoMessage.class);
140 
141     final ImmutableClassesGiraphConfiguration immConf =
142         new ImmutableClassesGiraphConfiguration<>(conf);
143 
144     // Create blocks to detect issues before creating a Giraph job
145     // They will not be used here
146     Block executionBlock = blockFactory.createBlock(immConf);
147     checkBlockTypes(
148         executionBlock, blockFactory.createExecutionStage(immConf), immConf);
149 
150     // check for non 'static final' fields in BlockFactories
151     Class<?> bfClass = blockFactory.getClass();
152     while (!bfClass.equals(Object.class)) {
153       for (Field field : bfClass.getDeclaredFields()) {
154         if (!Modifier.isStatic(field.getModifiers()) ||
155             !Modifier.isFinal(field.getModifiers())) {
156           throw new IllegalStateException("BlockFactory (" + bfClass +
157               ") cannot have any mutable (non 'static final') fields as a " +
158               "safety measure, as createBlock function is called from a " +
159               "different context then all other functions, use conf argument " +
160               "instead, or make it 'static final'. Field present: " + field);
161         }
162       }
163       bfClass = bfClass.getSuperclass();
164     }
165 
166     // Register outputs
167     blockFactory.registerOutputs(conf);
168   }
169 
170   public static void checkBlockTypes(
171       Block executionBlock, Object executionStage,
172       final ImmutableClassesGiraphConfiguration conf) {
173     LOG.info("Executing application - " + executionBlock);
174 
175     final Class<?> vertexIdClass = conf.getVertexIdClass();
176     final Class<?> vertexValueClass = conf.getVertexValueClass();
177     final Class<?> edgeValueClass = conf.getEdgeValueClass();
178     final Class<?> workerContextValueClass =
179         BLOCK_WORKER_CONTEXT_VALUE_CLASS.get(conf);
180     final Class<?> executionStageClass = executionStage.getClass();
181 
182     // Check for type inconsistencies
183     executionBlock.forAllPossiblePieces(new Consumer<AbstractPiece>() {
184       @Override
185       public void apply(AbstractPiece piece) {
186         if (!piece.getClass().equals(Piece.class)) {
187           Class<?>[] classList = getTypeArguments(
188               AbstractPiece.class, piece.getClass());
189           Preconditions.checkArgument(classList.length == 7);
190 
191           ReflectionUtils.verifyTypes(
192               vertexIdClass, classList[0], "vertexId", piece.getClass());
193           ReflectionUtils.verifyTypes(
194               vertexValueClass, classList[1], "vertexValue", piece.getClass());
195           ReflectionUtils.verifyTypes(
196               edgeValueClass, classList[2], "edgeValue", piece.getClass());
197 
198           MessageClasses classes = piece.getMessageClasses(conf);
199           Class<?> messageType = classes.getMessageClass();
200           if (messageType == null) {
201             messageType = NoMessage.class;
202           }
203           ReflectionUtils.verifyTypes(
204               messageType, classList[3], "message", piece.getClass());
205 
206           ReflectionUtils.verifyTypes(
207               workerContextValueClass, classList[4],
208               "workerContextValue", piece.getClass());
209           // No need to check worker context message class at all
210 
211           ReflectionUtils.verifyTypes(
212               executionStageClass, classList[6],
213               "executionStage", piece.getClass());
214         }
215       }
216     });
217   }
218 }