This project has retired. For details please refer to its Attic page.
AbstractBlockFactory xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.framework;
19  
20  import java.util.List;
21  
22  import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
23  import org.apache.giraph.conf.BulkConfigurator;
24  import org.apache.giraph.conf.GiraphConfiguration;
25  import org.apache.giraph.conf.GiraphConstants;
26  import org.apache.giraph.conf.StrConfOption;
27  import org.apache.giraph.edge.IdAndNullArrayEdges;
28  import org.apache.giraph.edge.IdAndValueArrayEdges;
29  import org.apache.giraph.edge.LongDiffNullArrayEdges;
30  import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
31  import org.apache.giraph.types.ops.TypeOps;
32  import org.apache.giraph.types.ops.TypeOpsUtils;
33  import org.apache.giraph.utils.ReflectionUtils;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.io.LongWritable;
36  import org.apache.hadoop.io.NullWritable;
37  import org.apache.hadoop.io.Writable;
38  import org.apache.hadoop.io.WritableComparable;
39  
40  /**
41   * Default block factory abstract class, providing default methods that need
42   * to be/can be overridden for specifying required/most common parameters,
43   * to simplify setting properties.
44   *
45   * @param <S> Execution stage type
46   */
47  public abstract class AbstractBlockFactory<S> implements BlockFactory<S> {
48    /**
49     * Comma separated list of BulkConfigurators, that are going to be called
50     * to simplify specifying of large number of properties.
51     */
52    public static final StrConfOption CONFIGURATORS = new StrConfOption(
53        "digraph.block_factory_configurators", null, "");
54  
55    @Override
56    public List<String> getGcJavaOpts(Configuration conf) {
57      return null;
58    }
59  
60    @Override
61    public final void initConfig(GiraphConfiguration conf) {
62      initConfigurators(conf);
63      GiraphConstants.VERTEX_ID_CLASS.setIfUnset(conf, getVertexIDClass(conf));
64      GiraphConstants.VERTEX_VALUE_CLASS.setIfUnset(
65          conf, getVertexValueClass(conf));
66      GiraphConstants.EDGE_VALUE_CLASS.setIfUnset(conf, getEdgeValueClass(conf));
67      GiraphConstants.RESOLVER_CREATE_VERTEX_ON_MSGS.setIfUnset(
68          conf, shouldCreateVertexOnMsgs(conf));
69      if (shouldSendOneMessageToAll(conf)) {
70        GiraphConstants.MESSAGE_ENCODE_AND_STORE_TYPE.setIfUnset(
71            conf, MessageEncodeAndStoreType.EXTRACT_BYTEARRAY_PER_PARTITION);
72      }
73  
74      BlockUtils.BLOCK_WORKER_CONTEXT_VALUE_CLASS.setIfUnset(
75          conf, getWorkerContextValueClass(conf));
76  
77      // optimize edge structure, if available and not set already
78      if (!GiraphConstants.VERTEX_EDGES_CLASS.contains(conf)) {
79        @SuppressWarnings("rawtypes")
80        Class<? extends WritableComparable> vertexIDClass =
81            GiraphConstants.VERTEX_ID_CLASS.get(conf);
82        Class<? extends Writable> edgeValueClass =
83            GiraphConstants.EDGE_VALUE_CLASS.get(conf);
84  
85  
86        @SuppressWarnings("rawtypes")
87        PrimitiveIdTypeOps<? extends WritableComparable> idTypeOps =
88            TypeOpsUtils.getPrimitiveIdTypeOpsOrNull(vertexIDClass);
89        if (edgeValueClass.equals(NullWritable.class)) {
90          if (vertexIDClass.equals(LongWritable.class)) {
91            GiraphConstants.VERTEX_EDGES_CLASS.set(
92                conf, LongDiffNullArrayEdges.class);
93          } else if (idTypeOps != null) {
94            GiraphConstants.VERTEX_EDGES_CLASS.set(
95                conf, IdAndNullArrayEdges.class);
96          }
97        } else {
98          TypeOps<?> edgeValueTypeOps =
99              TypeOpsUtils.getTypeOpsOrNull(edgeValueClass);
100         if (edgeValueTypeOps != null && idTypeOps != null) {
101           GiraphConstants.VERTEX_EDGES_CLASS.set(
102               conf, IdAndValueArrayEdges.class);
103         }
104       }
105     }
106 
107     additionalInitConfig(conf);
108   }
109 
110   @Override
111   public void registerOutputs(GiraphConfiguration conf) {
112   }
113 
114   private void initConfigurators(GiraphConfiguration conf) {
115     String configurators = CONFIGURATORS.get(conf);
116     if (configurators != null) {
117       String[] split = configurators.split(",");
118       for (String configurator : split) {
119         runConfigurator(conf, configurator);
120       }
121     }
122   }
123 
124   private void runConfigurator(GiraphConfiguration conf, String configurator) {
125     String[] packages = getConvenienceConfiguratorPackages();
126     String[] prefixes = new String[packages.length + 1];
127     prefixes[0] = "";
128     for (int i = 0; i < packages.length; i++) {
129       prefixes[i + 1] = packages[i] + ".";
130     }
131 
132     for (String prefix : prefixes) {
133       try {
134         @SuppressWarnings({ "unchecked", "rawtypes" })
135         Class<BulkConfigurator> confClass =
136             (Class) Class.forName(prefix + configurator);
137         BulkConfigurator c = ReflectionUtils.newInstance(confClass);
138         c.configure(conf);
139         return;
140       // CHECKSTYLE: stop EmptyBlock
141       // ignore ClassNotFoundException, and continue the loop
142       } catch (ClassNotFoundException e) {
143       }
144       // CHECKSTYLE: resume EmptyBlock
145     }
146     throw new IllegalStateException(
147         "Configurator " + configurator + " not found");
148   }
149 
150   /**
151    * Additional configuration initialization, other then overriding
152    * class specification.
153    */
154   protected void additionalInitConfig(GiraphConfiguration conf) {
155   }
156 
157   /**
158    * Concrete vertex id class application will use.
159    */
160   @SuppressWarnings("rawtypes")
161   protected abstract Class<? extends WritableComparable> getVertexIDClass(
162       GiraphConfiguration conf);
163 
164   /**
165    * Concrete vertex value class application will use.
166    */
167   protected abstract Class<? extends Writable> getVertexValueClass(
168       GiraphConfiguration conf);
169 
170   /**
171    * Concrete edge value class application will use.
172    */
173   protected abstract Class<? extends Writable> getEdgeValueClass(
174       GiraphConfiguration conf);
175 
176   /**
177    * Concrete worker context value class application will use, if overridden.
178    */
179   protected Class<?> getWorkerContextValueClass(GiraphConfiguration conf) {
180     return Object.class;
181   }
182 
183   /**
184    * Override if vertices shouldn't be created by default, if message is sent
185    * to a vertex that doesn't exist.
186    */
187   protected boolean shouldCreateVertexOnMsgs(GiraphConfiguration conf) {
188     return true;
189   }
190 
191   // TODO - see if it should be deprecated
192   protected boolean shouldSendOneMessageToAll(GiraphConfiguration conf) {
193     return false;
194   }
195 
196   /**
197    * Provide list of strings representing packages where configurators will
198    * be searched for, allowing that full path is not required for
199    * CONFIGURATORS option.
200    */
201   protected String[] getConvenienceConfiguratorPackages() {
202     return new String[] { };
203   }
204 }