This project has retired. For details please refer to its Attic page.
SimpleMasterComputeComputation xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.examples;
20  
21  import org.apache.giraph.aggregators.DoubleOverwriteAggregator;
22  import org.apache.giraph.graph.BasicComputation;
23  import org.apache.giraph.master.DefaultMasterCompute;
24  import org.apache.giraph.graph.Vertex;
25  import org.apache.giraph.worker.WorkerContext;
26  import org.apache.hadoop.io.DoubleWritable;
27  import org.apache.hadoop.io.FloatWritable;
28  import org.apache.hadoop.io.LongWritable;
29  import org.apache.log4j.Logger;
30  
31  import java.io.IOException;
32  
33  /**
34   * Demonstrates a computation with a centralized part implemented via a
35   * MasterCompute.
36   */
37  public class SimpleMasterComputeComputation extends BasicComputation<
38      LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
39    /** Aggregator to get values from the master to the workers */
40    public static final String SMC_AGG = "simplemastercompute.aggregator";
41    /** Logger */
42    private static final Logger LOG =
43        Logger.getLogger(SimpleMasterComputeComputation.class);
44  
45    @Override
46    public void compute(
47        Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
48        Iterable<DoubleWritable> messages) throws IOException {
49      double oldSum = getSuperstep() == 0 ? 0 : vertex.getValue().get();
50      double newValue = this.<DoubleWritable>getAggregatedValue(SMC_AGG).get();
51      double newSum = oldSum + newValue;
52      vertex.setValue(new DoubleWritable(newSum));
53      SimpleMasterComputeWorkerContext workerContext = getWorkerContext();
54      workerContext.setFinalSum(newSum);
55      LOG.info("Current sum: " + newSum);
56    }
57  
58    /**
59     * Worker context used with {@link SimpleMasterComputeComputation}.
60     */
61    public static class SimpleMasterComputeWorkerContext
62        extends WorkerContext {
63      /** Final sum value for verification for local jobs */
64      private static double FINAL_SUM;
65  
66      @Override
67      public void preApplication()
68        throws InstantiationException, IllegalAccessException {
69      }
70  
71      @Override
72      public void preSuperstep() {
73      }
74  
75      @Override
76      public void postSuperstep() {
77      }
78  
79      @Override
80      public void postApplication() {
81      }
82  
83      public static void setFinalSum(double sum) {
84        FINAL_SUM = sum;
85      }
86  
87      public static double getFinalSum() {
88        return FINAL_SUM;
89      }
90    }
91  
92    /**
93     * MasterCompute used with {@link SimpleMasterComputeComputation}.
94     */
95    public static class SimpleMasterCompute
96        extends DefaultMasterCompute {
97      @Override
98      public void compute() {
99        setAggregatedValue(SMC_AGG,
100           new DoubleWritable(((double) getSuperstep()) / 2 + 1));
101       if (getSuperstep() == 10) {
102         haltComputation();
103       }
104     }
105 
106     @Override
107     public void initialize() throws InstantiationException,
108         IllegalAccessException {
109       registerAggregator(SMC_AGG, DoubleOverwriteAggregator.class);
110     }
111   }
112 }