This project has retired. For details please refer to its
        
        Attic page.
      
 
AbstractBlockFactory xref
1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  package org.apache.giraph.block_app.framework;
19  
20  import java.util.List;
21  
22  import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
23  import org.apache.giraph.conf.BulkConfigurator;
24  import org.apache.giraph.conf.GiraphConfiguration;
25  import org.apache.giraph.conf.GiraphConstants;
26  import org.apache.giraph.conf.StrConfOption;
27  import org.apache.giraph.edge.IdAndNullArrayEdges;
28  import org.apache.giraph.edge.IdAndValueArrayEdges;
29  import org.apache.giraph.edge.LongDiffNullArrayEdges;
30  import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
31  import org.apache.giraph.types.ops.TypeOps;
32  import org.apache.giraph.types.ops.TypeOpsUtils;
33  import org.apache.giraph.utils.ReflectionUtils;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.io.LongWritable;
36  import org.apache.hadoop.io.NullWritable;
37  import org.apache.hadoop.io.Writable;
38  import org.apache.hadoop.io.WritableComparable;
39  
40  
41  
42  
43  
44  
45  
46  
47  public abstract class AbstractBlockFactory<S> implements BlockFactory<S> {
48    
49  
50  
51  
52    public static final StrConfOption CONFIGURATORS = new StrConfOption(
53        "digraph.block_factory_configurators", null, "");
54  
55    @Override
56    public List<String> getGcJavaOpts(Configuration conf) {
57      return null;
58    }
59  
60    @Override
61    public final void initConfig(GiraphConfiguration conf) {
62      initConfigurators(conf);
63      GiraphConstants.VERTEX_ID_CLASS.setIfUnset(conf, getVertexIDClass(conf));
64      GiraphConstants.VERTEX_VALUE_CLASS.setIfUnset(
65          conf, getVertexValueClass(conf));
66      GiraphConstants.EDGE_VALUE_CLASS.setIfUnset(conf, getEdgeValueClass(conf));
67      GiraphConstants.RESOLVER_CREATE_VERTEX_ON_MSGS.setIfUnset(
68          conf, shouldCreateVertexOnMsgs(conf));
69      if (shouldSendOneMessageToAll(conf)) {
70        GiraphConstants.MESSAGE_ENCODE_AND_STORE_TYPE.setIfUnset(
71            conf, MessageEncodeAndStoreType.EXTRACT_BYTEARRAY_PER_PARTITION);
72      }
73  
74      BlockUtils.BLOCK_WORKER_CONTEXT_VALUE_CLASS.setIfUnset(
75          conf, getWorkerContextValueClass(conf));
76  
77      
78      if (!GiraphConstants.VERTEX_EDGES_CLASS.contains(conf)) {
79        @SuppressWarnings("rawtypes")
80        Class<? extends WritableComparable> vertexIDClass =
81            GiraphConstants.VERTEX_ID_CLASS.get(conf);
82        Class<? extends Writable> edgeValueClass =
83            GiraphConstants.EDGE_VALUE_CLASS.get(conf);
84  
85  
86        @SuppressWarnings("rawtypes")
87        PrimitiveIdTypeOps<? extends WritableComparable> idTypeOps =
88            TypeOpsUtils.getPrimitiveIdTypeOpsOrNull(vertexIDClass);
89        if (edgeValueClass.equals(NullWritable.class)) {
90          if (vertexIDClass.equals(LongWritable.class)) {
91            GiraphConstants.VERTEX_EDGES_CLASS.set(
92                conf, LongDiffNullArrayEdges.class);
93          } else if (idTypeOps != null) {
94            GiraphConstants.VERTEX_EDGES_CLASS.set(
95                conf, IdAndNullArrayEdges.class);
96          }
97        } else {
98          TypeOps<?> edgeValueTypeOps =
99              TypeOpsUtils.getTypeOpsOrNull(edgeValueClass);
100         if (edgeValueTypeOps != null && idTypeOps != null) {
101           GiraphConstants.VERTEX_EDGES_CLASS.set(
102               conf, IdAndValueArrayEdges.class);
103         }
104       }
105     }
106 
107     additionalInitConfig(conf);
108   }
109 
110   @Override
111   public void registerOutputs(GiraphConfiguration conf) {
112   }
113 
114   private void initConfigurators(GiraphConfiguration conf) {
115     String configurators = CONFIGURATORS.get(conf);
116     if (configurators != null) {
117       String[] split = configurators.split(",");
118       for (String configurator : split) {
119         runConfigurator(conf, configurator);
120       }
121     }
122   }
123 
124   private void runConfigurator(GiraphConfiguration conf, String configurator) {
125     String[] packages = getConvenienceConfiguratorPackages();
126     String[] prefixes = new String[packages.length + 1];
127     prefixes[0] = "";
128     for (int i = 0; i < packages.length; i++) {
129       prefixes[i + 1] = packages[i] + ".";
130     }
131 
132     for (String prefix : prefixes) {
133       try {
134         @SuppressWarnings({ "unchecked", "rawtypes" })
135         Class<BulkConfigurator> confClass =
136             (Class) Class.forName(prefix + configurator);
137         BulkConfigurator c = ReflectionUtils.newInstance(confClass);
138         c.configure(conf);
139         return;
140       
141       
142       } catch (ClassNotFoundException e) {
143       }
144       
145     }
146     throw new IllegalStateException(
147         "Configurator " + configurator + " not found");
148   }
149 
150   
151 
152 
153 
154   protected void additionalInitConfig(GiraphConfiguration conf) {
155   }
156 
157   
158 
159 
160   @SuppressWarnings("rawtypes")
161   protected abstract Class<? extends WritableComparable> getVertexIDClass(
162       GiraphConfiguration conf);
163 
164   
165 
166 
167   protected abstract Class<? extends Writable> getVertexValueClass(
168       GiraphConfiguration conf);
169 
170   
171 
172 
173   protected abstract Class<? extends Writable> getEdgeValueClass(
174       GiraphConfiguration conf);
175 
176   
177 
178 
179   protected Class<?> getWorkerContextValueClass(GiraphConfiguration conf) {
180     return Object.class;
181   }
182 
183   
184 
185 
186 
187   protected boolean shouldCreateVertexOnMsgs(GiraphConfiguration conf) {
188     return true;
189   }
190 
191   
192   protected boolean shouldSendOneMessageToAll(GiraphConfiguration conf) {
193     return false;
194   }
195 
196   
197 
198 
199 
200 
201   protected String[] getConvenienceConfiguratorPackages() {
202     return new String[] { };
203   }
204 }