View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.migration;
19  
20  import java.util.Iterator;
21  
22  import org.apache.giraph.block_app.framework.AbstractBlockFactory;
23  import org.apache.giraph.block_app.framework.block.Block;
24  import org.apache.giraph.block_app.framework.block.SequenceBlock;
25  import org.apache.giraph.block_app.framework.piece.AbstractPiece;
26  import org.apache.giraph.block_app.framework.piece.Piece;
27  import org.apache.giraph.block_app.migration.MigrationAbstractComputation.MigrationFullAbstractComputation;
28  import org.apache.giraph.block_app.migration.MigrationMasterCompute.MigrationFullMasterCompute;
29  import org.apache.giraph.combiner.MessageCombiner;
30  import org.apache.giraph.conf.GiraphConfiguration;
31  import org.apache.giraph.function.Consumer;
32  import org.apache.hadoop.io.Writable;
33  import org.apache.hadoop.io.WritableComparable;
34  
35  import com.google.common.collect.AbstractIterator;
36  import com.google.common.collect.Iterators;
37  
38  /**
39   * BlockFactory to extend when using drop-in migration
40   */
41  public abstract class MigrationFullBlockFactory
42      extends AbstractBlockFactory<MigrationSuperstepStage> {
43  
44    @Override
45    public MigrationSuperstepStage createExecutionStage(
46        GiraphConfiguration conf) {
47      return new MigrationSuperstepStageImpl();
48    }
49  
50    @Override
51    protected Class<? extends MigrationWorkerContext> getWorkerContextValueClass(
52        GiraphConfiguration conf) {
53      return MigrationWorkerContext.class;
54    }
55  
56    @SuppressWarnings("rawtypes")
57    public <I extends WritableComparable, V extends Writable, E extends Writable,
58    MR extends Writable, MS extends Writable>
59    Block createMigrationAppBlock(
60        Class<? extends MigrationFullAbstractComputation<I, V, E, MR, MS>>
61          computationClass,
62        MigrationFullMasterCompute masterCompute,
63        Class<MS> messageClass,
64        Class<? extends MessageCombiner<? super I, MS>> messageCombinerClass,
65        GiraphConfiguration conf) {
66      final MigrationPiece<I, V, E, MR, MS> piece =
67          MigrationPiece.createFirstFullMigrationPiece(
68              computationClass, masterCompute, messageClass,
69              messageCombinerClass);
70      piece.sanityTypeChecks(conf, null);
71  
72      return new SequenceBlock(
73          new Piece<WritableComparable, Writable, Writable,
74              Writable, MigrationSuperstepStage>() {
75            @Override
76            public MigrationSuperstepStage nextExecutionStage(
77                MigrationSuperstepStage executionStage) {
78              return executionStage.changedMigrationSuperstep(0);
79            }
80          },
81          new Block() {
82            private MigrationPiece curPiece = piece;
83  
84            @Override
85            public Iterator<AbstractPiece> iterator() {
86              return Iterators.concat(
87                  Iterators.singletonIterator(curPiece),
88                  new AbstractIterator<AbstractPiece>() {
89                    @Override
90                    protected AbstractPiece computeNext() {
91                      curPiece = curPiece.getNextPiece();
92                      if (curPiece == null) {
93                        endOfData();
94                      }
95                      return curPiece;
96                    }
97                  });
98            }
99  
100           @Override
101           public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
102             consumer.apply(curPiece);
103           }
104         }
105     );
106   }
107 }