This project has retired. For details please refer to its Attic page.
SimpleMigrationMasterBlockFactory xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.examples.block_app;
19  
20  import java.io.IOException;
21  
22  import org.apache.giraph.aggregators.DoubleOverwriteAggregator;
23  import org.apache.giraph.block_app.framework.block.Block;
24  import org.apache.giraph.block_app.migration.MigrationAbstractComputation.MigrationFullBasicComputation;
25  import org.apache.giraph.block_app.migration.MigrationFullBlockFactory;
26  import org.apache.giraph.block_app.migration.MigrationMasterCompute.MigrationFullMasterCompute;
27  import org.apache.giraph.block_app.migration.MigrationWorkerContext.MigrationFullWorkerContext;
28  import org.apache.giraph.conf.GiraphConfiguration;
29  import org.apache.giraph.graph.Vertex;
30  import org.apache.hadoop.io.DoubleWritable;
31  import org.apache.hadoop.io.FloatWritable;
32  import org.apache.hadoop.io.LongWritable;
33  import org.apache.log4j.Logger;
34  
35  /**
36   * Demonstrates using migration library for Blocks Framework,
37   * as an drop-in replacement, without any changes.
38   */
39  public class SimpleMigrationMasterBlockFactory
40      extends MigrationFullBlockFactory {
41    @Override
42    public Block createBlock(GiraphConfiguration conf) {
43      return createMigrationAppBlock(
44          SimpleMigrationMasterComputation.class,
45          new SimpleMigrationMasterCompute(),
46          DoubleWritable.class,
47          null,
48          conf);
49    }
50  
51    @Override
52    protected Class<LongWritable> getVertexIDClass(GiraphConfiguration conf) {
53      return LongWritable.class;
54    }
55  
56    @Override
57    protected Class<DoubleWritable> getVertexValueClass(
58        GiraphConfiguration conf) {
59      return DoubleWritable.class;
60    }
61  
62    @Override
63    protected Class<FloatWritable> getEdgeValueClass(GiraphConfiguration conf) {
64      return FloatWritable.class;
65    }
66  
67    @Override
68    protected
69    Class<SimpleMigrationMasterWorkerContext> getWorkerContextValueClass(
70        GiraphConfiguration conf) {
71      return SimpleMigrationMasterWorkerContext.class;
72    }
73  
74    // Full copy from org.apache.giraph.examples.SimpleMasterComputeComputation
75    // Just extending MigrationFull drop-in replacements instead.
76  
77    /**
78     * Demonstrates a computation with a centralized part implemented via a
79     * MasterCompute.
80     */
81    public static class SimpleMigrationMasterComputation
82        extends MigrationFullBasicComputation<LongWritable, DoubleWritable,
83        FloatWritable, DoubleWritable> {
84      /** Aggregator to get values from the master to the workers */
85      public static final String SMC_AGG = "simplemastercompute.aggregator";
86  
87      /** Logger */
88      private static final Logger LOG =
89          Logger.getLogger(SimpleMigrationMasterComputation.class);
90  
91      @Override
92      public void compute(
93          Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
94          Iterable<DoubleWritable> messages) throws IOException {
95        double oldSum = getSuperstep() == 0 ? 0 : vertex.getValue().get();
96        double newValue = this.<DoubleWritable>getAggregatedValue(SMC_AGG).get();
97        double newSum = oldSum + newValue;
98        vertex.setValue(new DoubleWritable(newSum));
99        SimpleMigrationMasterWorkerContext workerContext = getWorkerContext();
100       workerContext.setFinalSum(newSum);
101       LOG.info("Current sum: " + newSum);
102     }
103   }
104 
105   /**
106    * Worker context used with {@link SimpleMigrationMasterComputation}.
107    */
108   public static class SimpleMigrationMasterWorkerContext
109       extends MigrationFullWorkerContext {
110     /** Final sum value for verification for local jobs */
111     private static double FINAL_SUM;
112 
113     @Override
114     public void preApplication()
115       throws InstantiationException, IllegalAccessException {
116     }
117 
118     @Override
119     public void preSuperstep() {
120     }
121 
122     @Override
123     public void postSuperstep() {
124     }
125 
126     @Override
127     public void postApplication() {
128     }
129 
130     public static void setFinalSum(double sum) {
131       FINAL_SUM = sum;
132     }
133 
134     public static double getFinalSum() {
135       return FINAL_SUM;
136     }
137   }
138 
139   /**
140    * MasterCompute used with {@link SimpleMigrationMasterComputation}.
141    */
142   public static class SimpleMigrationMasterCompute
143       extends MigrationFullMasterCompute {
144     @Override
145     public void compute() {
146       setAggregatedValue(SimpleMigrationMasterComputation.SMC_AGG,
147           new DoubleWritable(((double) getSuperstep()) / 2 + 1));
148       if (getSuperstep() == 10) {
149         haltComputation();
150       }
151     }
152 
153     @Override
154     public void initialize() throws InstantiationException,
155         IllegalAccessException {
156       registerAggregator(SimpleMigrationMasterComputation.SMC_AGG,
157           DoubleOverwriteAggregator.class);
158     }
159   }
160 }