This project has retired. For details please refer to its Attic page.
MigrationFullBlockFactory xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.migration;
19  
20  import java.util.Iterator;
21  
22  import org.apache.giraph.block_app.framework.AbstractBlockFactory;
23  import org.apache.giraph.block_app.framework.block.Block;
24  import org.apache.giraph.block_app.framework.block.PieceCount;
25  import org.apache.giraph.block_app.framework.block.SequenceBlock;
26  import org.apache.giraph.block_app.framework.piece.AbstractPiece;
27  import org.apache.giraph.block_app.framework.piece.Piece;
28  import org.apache.giraph.block_app.migration.MigrationAbstractComputation.MigrationFullAbstractComputation;
29  import org.apache.giraph.block_app.migration.MigrationMasterCompute.MigrationFullMasterCompute;
30  import org.apache.giraph.combiner.MessageCombiner;
31  import org.apache.giraph.conf.GiraphConfiguration;
32  import org.apache.giraph.function.Consumer;
33  import org.apache.hadoop.io.Writable;
34  import org.apache.hadoop.io.WritableComparable;
35  
36  import com.google.common.collect.AbstractIterator;
37  import com.google.common.collect.Iterators;
38  
39  /**
40   * BlockFactory to extend when using drop-in migration
41   */
42  public abstract class MigrationFullBlockFactory
43      extends AbstractBlockFactory<MigrationSuperstepStage> {
44  
45    @Override
46    public MigrationSuperstepStage createExecutionStage(
47        GiraphConfiguration conf) {
48      return new MigrationSuperstepStageImpl();
49    }
50  
51    @Override
52    protected Class<? extends MigrationWorkerContext> getWorkerContextValueClass(
53        GiraphConfiguration conf) {
54      return MigrationWorkerContext.class;
55    }
56  
57    @SuppressWarnings("rawtypes")
58    public <I extends WritableComparable, V extends Writable, E extends Writable,
59    MR extends Writable, MS extends Writable>
60    Block createMigrationAppBlock(
61        Class<? extends MigrationFullAbstractComputation<I, V, E, MR, MS>>
62          computationClass,
63        MigrationFullMasterCompute masterCompute,
64        Class<MS> messageClass,
65        Class<? extends MessageCombiner<? super I, MS>> messageCombinerClass,
66        GiraphConfiguration conf) {
67      final MigrationPiece<I, V, E, MR, MS> piece =
68          MigrationPiece.createFirstFullMigrationPiece(
69              computationClass, masterCompute, messageClass,
70              messageCombinerClass);
71      piece.sanityTypeChecks(conf, null);
72  
73      return new SequenceBlock(
74          new Piece<WritableComparable, Writable, Writable,
75              Writable, MigrationSuperstepStage>() {
76            @Override
77            public MigrationSuperstepStage nextExecutionStage(
78                MigrationSuperstepStage executionStage) {
79              return executionStage.changedMigrationSuperstep(0);
80            }
81          },
82          new Block() {
83            private MigrationPiece curPiece = piece;
84  
85            @Override
86            public Iterator<AbstractPiece> iterator() {
87              return Iterators.concat(
88                  Iterators.singletonIterator(curPiece),
89                  new AbstractIterator<AbstractPiece>() {
90                    @Override
91                    protected AbstractPiece computeNext() {
92                      curPiece = curPiece.getNextPiece();
93                      if (curPiece == null) {
94                        endOfData();
95                      }
96                      return curPiece;
97                    }
98                  });
99            }
100 
101           @Override
102           public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
103             consumer.apply(curPiece);
104           }
105 
106           @Override
107           public PieceCount getPieceCount() {
108             return curPiece.getPieceCount();
109           }
110         }
111     );
112   }
113 }