This project has retired. For details please refer to its Attic page.
MigrationMasterCompute xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.migration;
19  
20  import java.io.DataInput;
21  import java.io.DataOutput;
22  import java.io.IOException;
23  
24  import org.apache.giraph.aggregators.Aggregator;
25  import org.apache.giraph.block_app.framework.api.BlockMasterApi;
26  import org.apache.giraph.block_app.framework.api.StatusReporter;
27  import org.apache.giraph.block_app.migration.MigrationAbstractComputation.MigrationFullAbstractComputation;
28  import org.apache.giraph.combiner.MessageCombiner;
29  import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
30  import org.apache.giraph.conf.TypesHolder;
31  import org.apache.giraph.reducers.ReduceOperation;
32  import org.apache.giraph.utils.ReflectionUtils;
33  import org.apache.hadoop.io.Writable;
34  
35  /**
36   * Replacement for MasterCompute when migrating to Blocks Framework,
37   * disallowing functions that are tied to execution order.
38   */
39  @SuppressWarnings({"unchecked", "rawtypes"})
40  public abstract class MigrationMasterCompute
41      extends DefaultImmutableClassesGiraphConfigurable implements Writable {
42    private BlockMasterApi api;
43  
44    final void init(BlockMasterApi masterApi) {
45      this.api = masterApi;
46      setConf(masterApi.getConf());
47    }
48  
49    @Override
50    public void readFields(DataInput in) throws IOException {
51    }
52  
53    @Override
54    public void write(DataOutput out) throws IOException {
55    }
56  
57    public void compute() {
58    }
59  
60    public void initialize() throws InstantiationException,
61        IllegalAccessException {
62    }
63  
64    @SuppressWarnings("deprecation")
65    public long getTotalNumVertices() {
66      return api.getTotalNumVertices();
67    }
68  
69    @SuppressWarnings("deprecation")
70    public long getTotalNumEdges() {
71      return api.getTotalNumEdges();
72    }
73  
74    public final <S, R extends Writable> void registerReducer(
75        String name, ReduceOperation<S, R> reduceOp) {
76      api.registerReducer(name, reduceOp);
77    }
78  
79    public final <S, R extends Writable> void registerReducer(
80        String name, ReduceOperation<S, R> reduceOp, R globalInitialValue) {
81      api.registerReducer(
82          name, reduceOp, globalInitialValue);
83    }
84  
85    public final <T extends Writable> T getReduced(String name) {
86      return api.getReduced(name);
87    }
88  
89    public final void broadcast(String name, Writable object) {
90      api.broadcast(name, object);
91    }
92  
93    public final <A extends Writable> boolean registerAggregator(
94      String name, Class<? extends Aggregator<A>> aggregatorClass)
95      throws InstantiationException, IllegalAccessException {
96      return api.registerAggregator(
97          name, aggregatorClass);
98    }
99  
100   @SuppressWarnings("deprecation")
101   public final <A extends Writable> boolean registerPersistentAggregator(
102       String name,
103       Class<? extends Aggregator<A>> aggregatorClass) throws
104       InstantiationException, IllegalAccessException {
105     return api.registerPersistentAggregator(name, aggregatorClass);
106   }
107 
108   public final <A extends Writable> A getAggregatedValue(String name) {
109     return api.<A>getAggregatedValue(name);
110   }
111 
112   public final <A extends Writable> void setAggregatedValue(
113       String name, A value) {
114     api.setAggregatedValue(name, value);
115   }
116 
117   public final void logToCommandLine(String line) {
118     api.logToCommandLine(line);
119   }
120 
121   public final StatusReporter getContext() {
122     return api;
123   }
124 
125   /**
126    * Drop-in replacement for MasterCompute when migrating
127    * to Blocks Framework.
128    */
129   public static class MigrationFullMasterCompute
130       extends MigrationMasterCompute {
131     private long superstep;
132     private boolean halt;
133     private Class<? extends MigrationAbstractComputation> computationClass;
134     private Class<? extends MigrationAbstractComputation> newComputationClass;
135     private Class<? extends Writable> originalMessage;
136     private Class<? extends Writable> newMessage;
137     private Class<? extends MessageCombiner> originalMessageCombiner;
138     private Class<? extends MessageCombiner> newMessageCombiner;
139 
140     final void init(
141         long superstep,
142         Class<? extends MigrationAbstractComputation> computationClass,
143         Class<? extends Writable> message,
144         Class<? extends MessageCombiner> messageCombiner) {
145       this.superstep = superstep;
146       this.halt = false;
147       this.computationClass = computationClass;
148       this.newComputationClass = null;
149       this.originalMessage = message;
150       this.newMessage = null;
151       this.originalMessageCombiner = messageCombiner;
152       this.newMessageCombiner = null;
153     }
154 
155     public final long getSuperstep() {
156       return superstep;
157     }
158 
159     @Override
160     public final long getTotalNumVertices() {
161       if (superstep == 0) {
162         throw new RuntimeException(
163             "getTotalNumVertices not available in superstep=0");
164       }
165       return super.getTotalNumVertices();
166     }
167 
168     @Override
169     public final long getTotalNumEdges() {
170       if (superstep == 0) {
171         throw new RuntimeException(
172             "getTotalNumEdges not available in superstep=0");
173       }
174       return super.getTotalNumEdges();
175     }
176 
177 
178     public final void haltComputation() {
179       halt = true;
180     }
181 
182     public final boolean isHalted() {
183       return halt;
184     }
185 
186     public final void setComputation(
187         Class<? extends MigrationFullAbstractComputation> computation) {
188       if (computation != null) {
189         newComputationClass = computation;
190       } else {
191         // TODO
192         this.computationClass = null;
193       }
194     }
195 
196     public final
197     Class<? extends MigrationAbstractComputation> getComputation() {
198       if (newComputationClass != null) {
199         return newComputationClass;
200       }
201       if (computationClass != null) {
202         return computationClass;
203       }
204       return null;
205     }
206 
207     public final void setMessageCombiner(
208         Class<? extends MessageCombiner> combinerClass) {
209       this.newMessageCombiner = combinerClass;
210     }
211 
212     public final Class<? extends MessageCombiner> getMessageCombiner() {
213       return newMessageCombiner != null ?
214         newMessageCombiner : originalMessageCombiner;
215     }
216 
217     public final void setIncomingMessage(
218         Class<? extends Writable> incomingMessageClass) {
219       if (!originalMessage.equals(incomingMessageClass)) {
220         throw new IllegalArgumentException(
221             originalMessage + " and " + incomingMessageClass + " must be same");
222       }
223     }
224 
225     public final void setOutgoingMessage(
226         Class<? extends Writable> outgoingMessageClass) {
227       newMessage = outgoingMessageClass;
228     }
229 
230     final Class<? extends Writable> getOutgoingMessage() {
231       if (newMessage != null) {
232         return newMessage;
233       }
234 
235       if (newComputationClass == null) {
236         return originalMessage;
237       }
238       Class[] computationTypes = ReflectionUtils.getTypeArguments(
239           TypesHolder.class, newComputationClass);
240       return computationTypes[4];
241     }
242 
243     final Class<? extends MigrationAbstractComputation> getComputationClass() {
244       return newComputationClass != null ?
245         newComputationClass : computationClass;
246     }
247 
248     final
249     Class<? extends MigrationAbstractComputation> getNewComputationClass() {
250       return newComputationClass;
251     }
252 
253     final Class<? extends Writable> getNewMessage() {
254       return newMessage;
255     }
256 
257     final Class<? extends MessageCombiner> getNewMessageCombiner() {
258       return newMessageCombiner;
259     }
260   }
261 }