This project has retired. For details please refer to its Attic page.
MigrationPiece xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.migration;
19  
20  import static org.apache.giraph.utils.ReflectionUtils.getTypeArguments;
21  
22  import java.io.IOException;
23  import java.util.Collections;
24  import java.util.List;
25  
26  import org.apache.giraph.block_app.framework.api.BlockMasterApi;
27  import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi;
28  import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
29  import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
30  import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
31  import org.apache.giraph.block_app.framework.piece.PieceWithWorkerContext;
32  import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
33  import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
34  import org.apache.giraph.block_app.migration.MigrationAbstractComputation.MigrationFullAbstractComputation;
35  import org.apache.giraph.block_app.migration.MigrationMasterCompute.MigrationFullMasterCompute;
36  import org.apache.giraph.block_app.migration.MigrationWorkerContext.MigrationFullWorkerContext;
37  import org.apache.giraph.combiner.MessageCombiner;
38  import org.apache.giraph.conf.DefaultMessageClasses;
39  import org.apache.giraph.conf.GiraphConfiguration;
40  import org.apache.giraph.conf.GiraphConstants;
41  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
42  import org.apache.giraph.conf.MessageClasses;
43  import org.apache.giraph.conf.TypesHolder;
44  import org.apache.giraph.factories.DefaultMessageValueFactory;
45  import org.apache.giraph.function.Consumer;
46  import org.apache.giraph.function.ObjectTransfer;
47  import org.apache.giraph.function.Supplier;
48  import org.apache.giraph.graph.Vertex;
49  import org.apache.giraph.utils.ReflectionUtils;
50  import org.apache.hadoop.io.Writable;
51  import org.apache.hadoop.io.WritableComparable;
52  
53  import com.google.common.base.Preconditions;
54  
55  
56  /**
57   * Piece used when migrating applications to Blocks Framework.
58   *
59   * There are two migration levels:
60   * <ul>
61   * <li>
62   * drop-in replacement migration is completely compatible with previous code.
63   * Only necessary thing is to change parent classes from (AbstractComputation,
64   * MasterCompute, WorkerContext) to (MigrationFullAbstractComputation,
65   * MigrationFullMasterCompute and MigrationFullWorkerContext).
66   * After that, all you need to do is extend MigrationBlockFactory, and pass
67   * appropriate types and call createMigrationAppBlock with initial computations.
68   * <br>
69   * You can now combine multiple applications, or use any library written in the
70   * framework, but your application is left as one whole indivisible block.
71   * </li>
72   * <li>
73   * Piece-wise migration - which gives a set of independent pieces, which can
74   * then be combined with appropriate ordering logic within a BlockFactory.
75   * You need to modify parent classes in your code to
76   * (MigrationAbstractComputation, MigrationMasterCompute and
77   * MigrationWorkerContext), which don't have any methods that affect computation
78   *  ordering - and so calling those methods should be
79   * moved to logic within BlockFactory.
80   * Calling MigrationPiece.createMigrationPiece and passing appropriate
81   * computations, gives you an independent piece, that you can then use in the
82   * same way as before, but also combine it in any other way with other pieces
83   * you have or are written within a library.
84   * </li>
85   * </ul>
86   *
87   * Generally, migration path can be to first move to drop-in replacement without
88   * any effort, and then see which parts need to be modified to be able to use
89   * piece-wise migration. At the end, it should be trivial to move from
90   * piece-wise migration to directly using pieces, by just moving code around,
91   * if you want to.
92   *
93   * @param <I> Vertex id type
94   * @param <V> Vertex value type
95   * @param <E> Edge value type
96   * @param <MPrev> Previous piece message type
97   * @param <M> Message type
98   */
99  @SuppressWarnings("rawtypes")
100 public final class MigrationPiece<I extends WritableComparable,
101     V extends Writable, E extends Writable, MPrev extends Writable,
102     M extends Writable> extends PieceWithWorkerContext<I, V, E, M,
103     MigrationWorkerContext, Writable, MigrationSuperstepStage> {
104 
105   private final Class<? extends MigrationAbstractComputation<I, V, E, MPrev, M>>
106   computationClass;
107 
108   private final transient MigrationMasterCompute masterCompute;
109   private final Supplier<Iterable<MPrev>> previousMessagesSupplier;
110   private final Consumer<Iterable<M>> currentMessagesConsumer;
111   private final transient Class<M> messageClass;
112   private final transient Class<? extends MessageCombiner<? super I, M>>
113   messageCombinerClass;
114 
115   private final boolean isFullMigration;
116   private final boolean isFirstStep;
117 
118   private transient MigrationPiece nextPiece;
119   private boolean isHalted;
120 
121   private MigrationPiece(
122       Class<? extends MigrationAbstractComputation<I, V, E, MPrev, M>>
123         computationClass,
124       MigrationMasterCompute masterCompute, Supplier<Iterable<MPrev>>
125         previousMessagesSupplier,
126       Consumer<Iterable<M>> currentMessagesConsumer, Class<M> messageClass,
127       Class<? extends MessageCombiner<? super I, M>> messageCombinerClass,
128       boolean isFullMigration, boolean isFirstStep) {
129     this.computationClass = computationClass;
130     this.masterCompute = masterCompute;
131     this.previousMessagesSupplier = previousMessagesSupplier;
132     this.currentMessagesConsumer = currentMessagesConsumer;
133     this.messageClass = messageClass;
134     this.messageCombinerClass = messageCombinerClass;
135     this.isFullMigration = isFullMigration;
136     this.isFirstStep = isFirstStep;
137     isHalted = false;
138     nextPiece = null;
139     sanityChecks();
140   }
141 
142 
143   @SuppressWarnings("unchecked")
144   static <I extends WritableComparable, V extends Writable, E extends Writable,
145   MR extends Writable, MS extends Writable>
146   MigrationPiece<I, V, E, MR, MS> createFirstFullMigrationPiece(
147       Class<? extends MigrationAbstractComputation<I, V, E, MR, MS>>
148         computationClass,
149       MigrationFullMasterCompute masterCompute,
150       Class<MS> messageClass,
151       Class<? extends MessageCombiner<? super I, MS>> messageCombinerClass) {
152     ObjectTransfer transfer = new ObjectTransfer();
153     return new MigrationPiece<>(
154         computationClass, masterCompute, transfer, transfer, messageClass,
155         messageCombinerClass,
156         true, true);
157   }
158 
159   public static <I extends WritableComparable, V extends Writable,
160   E extends Writable, MR extends Writable, MS extends Writable>
161   MigrationPiece<I, V, E, MR, MS> createMigrationPiece(
162       Class<? extends MigrationAbstractComputation<I, V, E, MR, MS>>
163         computationClass,
164       MigrationMasterCompute masterCompute,
165       Supplier<Iterable<MR>> previousMessagesSupplier,
166       Consumer<Iterable<MS>> currentMessagesConsumer,
167       Class<MS> messageClass,
168       Class<? extends MessageCombiner<? super I, MS>> messageCombinerClass) {
169     return new MigrationPiece<>(
170         computationClass, masterCompute, previousMessagesSupplier,
171         currentMessagesConsumer, messageClass, messageCombinerClass,
172         false, false);
173   }
174 
175 
176   private void sanityChecks() {
177     Preconditions.checkState(isFullMigration ==
178         MigrationFullAbstractComputation.class
179           .isAssignableFrom(computationClass));
180   }
181 
182   void sanityTypeChecks(
183       GiraphConfiguration conf, Class<?> previousMessageClass) {
184     if (computationClass != null) {
185       final Class<?> vertexIdClass = GiraphConstants.VERTEX_ID_CLASS.get(conf);
186       final Class<?> vertexValueClass =
187           GiraphConstants.VERTEX_VALUE_CLASS.get(conf);
188       final Class<?> edgeValueClass =
189           GiraphConstants.EDGE_VALUE_CLASS.get(conf);
190 
191       Class<?>[] classList = getTypeArguments(
192           TypesHolder.class, computationClass);
193       Preconditions.checkArgument(classList.length == 5);
194 
195       ReflectionUtils.verifyTypes(
196           vertexIdClass, classList[0], "vertexId", computationClass);
197       ReflectionUtils.verifyTypes(
198           vertexValueClass, classList[1], "vertexValue", computationClass);
199       ReflectionUtils.verifyTypes(
200           edgeValueClass, classList[2], "edgeValue", computationClass);
201       if (previousMessageClass != null) {
202         ReflectionUtils.verifyTypes(
203             previousMessageClass, classList[3], "recvMessage",
204             computationClass);
205       }
206       ReflectionUtils.verifyTypes(
207           messageClass, classList[4], "sendMessage", computationClass);
208     }
209   }
210 
211   @Override
212   public void registerAggregators(BlockMasterApi masterApi)
213       throws InstantiationException, IllegalAccessException {
214     if (masterCompute != null) {
215       masterCompute.init(masterApi);
216       masterCompute.initialize();
217     }
218   }
219 
220   @Override
221   public VertexSender<I, V, E> getVertexSender(
222       BlockWorkerSendApi<I, V, E, M> workerApi,
223       MigrationSuperstepStage executionStage) {
224     if (computationClass == null || isFirstStep) {
225       return null;
226     }
227 
228     final MigrationAbstractComputation<I, V, E, MPrev, M> computation =
229         ReflectionUtils.newInstance(computationClass);
230     computation.init(
231         workerApi, getWorkerValue(workerApi),
232         executionStage.getMigrationSuperstep() - 1);
233     computation.preSuperstep();
234 
235     return new InnerVertexSender() {
236       @Override
237       public void vertexSend(Vertex<I, V, E> vertex) {
238         try {
239           Iterable<MPrev> messages = null;
240           if (previousMessagesSupplier != null) {
241             messages = previousMessagesSupplier.get();
242           }
243           if (messages == null) {
244             messages = Collections.<MPrev>emptyList();
245           }
246           computation.compute(vertex, messages);
247         } catch (IOException e) {
248           throw new RuntimeException(e);
249         }
250       }
251 
252       @Override
253       public void postprocess() {
254         computation.postSuperstep();
255       }
256     };
257   }
258 
259   @Override
260   public void workerContextSend(
261       BlockWorkerContextSendApi<I, Writable> workerContextApi,
262       MigrationSuperstepStage executionStage,
263       MigrationWorkerContext workerValue) {
264     if (workerValue != null && !isFirstStep) {
265       workerValue.setApi(workerContextApi);
266       workerValue.postSuperstep();
267     }
268   }
269 
270   @SuppressWarnings("unchecked")
271   @Override
272   public void masterCompute(BlockMasterApi masterApi,
273       MigrationSuperstepStage executionStage) {
274     MigrationFullMasterCompute masterComputeF =
275         isFullMigration ? (MigrationFullMasterCompute) masterCompute : null;
276 
277     if (masterCompute != null) {
278       masterCompute.init(masterApi);
279 
280       if (masterComputeF != null) {
281         masterComputeF.init(
282             executionStage.getMigrationSuperstep(),
283             computationClass, messageClass, messageCombinerClass);
284       }
285 
286       masterCompute.compute();
287     }
288 
289     if (isFullMigration) {
290       if (masterComputeF != null) {
291         isHalted = masterComputeF.isHalted();
292         if (masterComputeF.isHalted()) {
293           nextPiece = null;
294         } else {
295           if (masterComputeF.getNewComputationClass() != null ||
296               masterComputeF.getNewMessage() != null ||
297                   masterComputeF.getNewMessageCombiner() != null) {
298             nextPiece = new MigrationPiece(
299                 masterComputeF.getComputationClass(),
300                 masterComputeF,
301                 previousMessagesSupplier,
302                 currentMessagesConsumer,
303                 masterComputeF.getOutgoingMessage(),
304                 masterComputeF.getMessageCombiner(),
305                 true, false);
306           } else {
307             nextPiece = this;
308           }
309         }
310       } else {
311         nextPiece = this;
312       }
313       if (nextPiece != null) {
314         if (nextPiece.isFirstStep) {
315           nextPiece = new MigrationPiece<>(
316               computationClass,
317               masterComputeF,
318               previousMessagesSupplier,
319               currentMessagesConsumer,
320               messageClass,
321               messageCombinerClass,
322               true, false);
323         }
324         nextPiece.sanityTypeChecks(masterApi.getConf(), messageClass);
325       }
326     } else {
327       Preconditions.checkState(!isHalted);
328       Preconditions.checkState(nextPiece == null);
329     }
330   }
331 
332   @Override
333   public void workerContextReceive(
334       BlockWorkerContextReceiveApi workerContextApi,
335       MigrationSuperstepStage executionStage,
336       MigrationWorkerContext workerValue, List<Writable> workerMessages) {
337     if (workerValue != null) {
338       workerValue.setApi(workerContextApi);
339       workerValue.setReceivedMessages(workerMessages);
340 
341       if (isFirstStep && workerValue instanceof MigrationFullWorkerContext) {
342         try {
343           ((MigrationFullWorkerContext) workerValue).preApplication();
344         } catch (InstantiationException | IllegalAccessException e) {
345           throw new RuntimeException(e);
346         }
347       }
348 
349       if (!isHalted) {
350         workerValue.preSuperstep();
351       }
352 
353       if (isHalted && workerValue instanceof MigrationFullWorkerContext) {
354         ((MigrationFullWorkerContext) workerValue).postApplication();
355       }
356     }
357   }
358 
359   @Override
360   public VertexReceiver<I, V, E, M> getVertexReceiver(
361       BlockWorkerReceiveApi<I> workerApi,
362       MigrationSuperstepStage executionStage) {
363     if (currentMessagesConsumer == null || isHalted) {
364       return null;
365     }
366 
367     return new InnerVertexReceiver() {
368       @Override
369       public void vertexReceive(Vertex<I, V, E> vertex, Iterable<M> messages) {
370         currentMessagesConsumer.apply(messages);
371       }
372     };
373   }
374 
375   @Override
376   public MessageClasses<I, M> getMessageClasses(
377       ImmutableClassesGiraphConfiguration conf) {
378     return new DefaultMessageClasses(
379         messageClass,
380         DefaultMessageValueFactory.class,
381         messageCombinerClass,
382         GiraphConstants.MESSAGE_ENCODE_AND_STORE_TYPE.get(conf));
383   }
384 
385   @Override
386   public MigrationSuperstepStage nextExecutionStage(
387       MigrationSuperstepStage executionStage) {
388     return executionStage.changedMigrationSuperstep(
389         executionStage.getMigrationSuperstep() + 1);
390   }
391 
392   public MigrationPiece getNextPiece() {
393     Preconditions.checkState(isFullMigration);
394     MigrationPiece res = nextPiece;
395     nextPiece = null;
396     return res;
397   }
398 
399 }