This project has retired. For details please refer to its Attic page.
BlockUtils xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.framework;
19  
20  import static org.apache.giraph.utils.ReflectionUtils.getTypeArguments;
21  
22  import java.lang.reflect.Field;
23  import java.lang.reflect.Modifier;
24  
25  import org.apache.giraph.block_app.framework.api.giraph.BlockComputation;
26  import org.apache.giraph.block_app.framework.api.giraph.BlockMasterCompute;
27  import org.apache.giraph.block_app.framework.api.giraph.BlockWorkerContext;
28  import org.apache.giraph.block_app.framework.block.Block;
29  import org.apache.giraph.block_app.framework.block.PieceCount;
30  import org.apache.giraph.block_app.framework.piece.AbstractPiece;
31  import org.apache.giraph.block_app.framework.piece.Piece;
32  import org.apache.giraph.conf.BooleanConfOption;
33  import org.apache.giraph.conf.ClassConfOption;
34  import org.apache.giraph.conf.GiraphConfiguration;
35  import org.apache.giraph.conf.GiraphConstants;
36  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
37  import org.apache.giraph.conf.MessageClasses;
38  import org.apache.giraph.function.Consumer;
39  import org.apache.giraph.types.NoMessage;
40  import org.apache.giraph.utils.ReflectionUtils;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.log4j.Logger;
43  
44  import com.google.common.base.Preconditions;
45  
46  /**
47   * Utility functions for block applications
48   */
49  @SuppressWarnings({ "rawtypes", "unchecked" })
50  public class BlockUtils {
51    /** Property describing BlockFactory to use for current application run */
52    public static final ClassConfOption<BlockFactory> BLOCK_FACTORY_CLASS =
53        ClassConfOption.create("digraph.block_factory", null, BlockFactory.class,
54            "block factory describing giraph job");
55  
56    /** Property describing block worker context value class to use */
57    public static final ClassConfOption<Object> BLOCK_WORKER_CONTEXT_VALUE_CLASS =
58        ClassConfOption.create(
59            "digraph.block_worker_context_value_class",
60            Object.class, Object.class,
61            "block worker context value class");
62  
63    /** Property describing whether to log execution status as application runs */
64    public static final
65    BooleanConfOption LOG_EXECUTION_STATUS = new BooleanConfOption(
66        "giraph.block_utils.log_execution_status", true,
67        "Log execution status (of which pieces are being executed, etc)");
68  
69    private static final Logger LOG = Logger.getLogger(BlockUtils.class);
70  
71    /** Dissallow constructor */
72    private BlockUtils() { }
73  
74    /**
75     * Create new BlockFactory that is specified in the configuration.
76     */
77    public static <S> BlockFactory<S> createBlockFactory(Configuration conf) {
78      return ReflectionUtils.newInstance(BLOCK_FACTORY_CLASS.get(conf));
79    }
80  
81    /**
82     * Set which BlockFactory class to be used for the application.
83     * (generally useful within tests only)
84     */
85    public static void setBlockFactoryClass(Configuration conf,
86        Class<? extends BlockFactory<?>> clazz) {
87      BLOCK_FACTORY_CLASS.set(conf, clazz);
88    }
89  
90    /**
91     * Set block factory, and initialize configs with it.
92     * Should be used only if there are no configuration options set after
93     * this method call.
94     */
95    public static void setAndInitBlockFactoryClass(GiraphConfiguration conf,
96        Class<? extends BlockFactory<?>> clazz) {
97      BLOCK_FACTORY_CLASS.set(conf, clazz);
98      initAndCheckConfig(conf);
99    }
100 
101   /**
102    * Initializes configuration, such that running it executes block application.
103    *
104    * Additionally, checks types of all pieces with a block application.
105    */
106   public static void initAndCheckConfig(GiraphConfiguration conf) {
107     conf.setMasterComputeClass(BlockMasterCompute.class);
108     conf.setComputationClass(BlockComputation.class);
109     conf.setWorkerContextClass(BlockWorkerContext.class);
110 
111     Preconditions.checkState(
112         GiraphConstants.OUTGOING_MESSAGE_VALUE_CLASS.get(conf) == null,
113         "Message types should only be specified in Pieces, " +
114         "but outgoing was specified globally");
115     Preconditions.checkState(
116         GiraphConstants.OUTGOING_MESSAGE_VALUE_FACTORY_CLASS
117           .isDefaultValue(conf),
118         "Message types should only be specified in Pieces, " +
119         "but factory was specified globally");
120     Preconditions.checkState(
121         GiraphConstants.MESSAGE_COMBINER_CLASS.get(conf) == null,
122         "Message combiner should only be specified in Pieces, " +
123         "but was specified globally");
124 
125     BlockFactory<?> blockFactory = createBlockFactory(conf);
126     blockFactory.initConfig(conf);
127 
128     Preconditions.checkState(
129         GiraphConstants.OUTGOING_MESSAGE_VALUE_CLASS.get(conf) == null,
130         "Outgoing message type was specified in blockFactory.initConfig");
131     Preconditions.checkState(
132         GiraphConstants.OUTGOING_MESSAGE_VALUE_FACTORY_CLASS
133           .isDefaultValue(conf),
134         "Outgoing message factory type was specified in " +
135         "blockFactory.initConfig");
136     Preconditions.checkState(
137         GiraphConstants.MESSAGE_COMBINER_CLASS.get(conf) == null,
138         "Message combiner type was specified in blockFactory.initConfig");
139 
140     GiraphConstants.OUTGOING_MESSAGE_VALUE_CLASS.set(conf, NoMessage.class);
141 
142     final ImmutableClassesGiraphConfiguration immConf =
143         new ImmutableClassesGiraphConfiguration<>(conf);
144 
145     // Create blocks to detect issues before creating a Giraph job
146     // They will not be used here
147     Block executionBlock = blockFactory.createBlock(immConf);
148     checkBlockTypes(
149         executionBlock, blockFactory.createExecutionStage(immConf), immConf);
150 
151     PieceCount pieceCount = executionBlock.getPieceCount();
152     if (pieceCount.isKnown()) {
153       GiraphConstants.SUPERSTEP_COUNT.set(conf, pieceCount.getCount() + 1);
154     }
155 
156     // check for non 'static final' fields in BlockFactories
157     Class<?> bfClass = blockFactory.getClass();
158     while (!bfClass.equals(Object.class)) {
159       for (Field field : bfClass.getDeclaredFields()) {
160         if (!Modifier.isStatic(field.getModifiers()) ||
161             !Modifier.isFinal(field.getModifiers())) {
162           throw new IllegalStateException("BlockFactory (" + bfClass +
163               ") cannot have any mutable (non 'static final') fields as a " +
164               "safety measure, as createBlock function is called from a " +
165               "different context then all other functions, use conf argument " +
166               "instead, or make it 'static final'. Field present: " + field);
167         }
168       }
169       bfClass = bfClass.getSuperclass();
170     }
171 
172     // Register outputs
173     blockFactory.registerOutputs(conf);
174   }
175 
176   public static void checkBlockTypes(
177       Block executionBlock, Object executionStage,
178       final ImmutableClassesGiraphConfiguration conf) {
179     LOG.info("Executing application - " + executionBlock);
180 
181     final Class<?> vertexIdClass = conf.getVertexIdClass();
182     final Class<?> vertexValueClass = conf.getVertexValueClass();
183     final Class<?> edgeValueClass = conf.getEdgeValueClass();
184     final Class<?> workerContextValueClass =
185         BLOCK_WORKER_CONTEXT_VALUE_CLASS.get(conf);
186     final Class<?> executionStageClass = executionStage.getClass();
187 
188     // Check for type inconsistencies
189     executionBlock.forAllPossiblePieces(new Consumer<AbstractPiece>() {
190       @Override
191       public void apply(AbstractPiece piece) {
192         if (!piece.getClass().equals(Piece.class)) {
193           Class<?>[] classList = getTypeArguments(
194               AbstractPiece.class, piece.getClass());
195           Preconditions.checkArgument(classList.length == 7);
196 
197           ReflectionUtils.verifyTypes(
198               vertexIdClass, classList[0], "vertexId", piece.getClass());
199           ReflectionUtils.verifyTypes(
200               vertexValueClass, classList[1], "vertexValue", piece.getClass());
201           ReflectionUtils.verifyTypes(
202               edgeValueClass, classList[2], "edgeValue", piece.getClass());
203 
204           MessageClasses classes = piece.getMessageClasses(conf);
205           Class<?> messageType = classes.getMessageClass();
206           if (messageType == null) {
207             messageType = NoMessage.class;
208           }
209           ReflectionUtils.verifyTypes(
210               messageType, classList[3], "message", piece.getClass());
211 
212           ReflectionUtils.verifyTypes(
213               workerContextValueClass, classList[4],
214               "workerContextValue", piece.getClass());
215           // No need to check worker context message class at all
216 
217           ReflectionUtils.verifyTypes(
218               executionStageClass, classList[6],
219               "executionStage", piece.getClass());
220         }
221       }
222     });
223   }
224 }