This project has retired. For details please refer to its Attic page.
RandomWalkVertexMasterCompute xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.examples;
20  
21  import org.apache.giraph.aggregators.DoubleSumAggregator;
22  import org.apache.giraph.aggregators.LongSumAggregator;
23  import org.apache.giraph.master.DefaultMasterCompute;
24  import org.apache.hadoop.io.DoubleWritable;
25  import org.apache.hadoop.io.LongWritable;
26  import org.apache.log4j.Logger;
27  
28  /**
29   * Master compute associated with {@link RandomWalkComputation}. It handles
30   * dangling nodes.
31   */
32  public class RandomWalkVertexMasterCompute extends DefaultMasterCompute {
33  
34    /** threshold for the L1 norm of the state vector difference  */
35    static final double CONVERGENCE_THRESHOLD = 0.00001;
36  
37    /** logger */
38    private static final Logger LOG =
39        Logger.getLogger(RandomWalkVertexMasterCompute.class);
40  
41    @Override
42    public void compute() {
43      double danglingContribution =
44          this.<DoubleWritable>getAggregatedValue(
45              RandomWalkComputation.CUMULATIVE_DANGLING_PROBABILITY).get();
46      double cumulativeProbability =
47          this.<DoubleWritable>getAggregatedValue(
48              RandomWalkComputation.CUMULATIVE_PROBABILITY).get();
49      double l1NormOfStateDiff =
50          this.<DoubleWritable>getAggregatedValue(
51              RandomWalkComputation.L1_NORM_OF_PROBABILITY_DIFFERENCE).get();
52      long numDanglingVertices =
53          this.<LongWritable>getAggregatedValue(
54              RandomWalkComputation.NUM_DANGLING_VERTICES).get();
55  
56      LOG.info("[Superstep " + getSuperstep() + "] Dangling contribution = " +
57          danglingContribution + ", number of dangling vertices = " +
58          numDanglingVertices + ", cumulative probability = " +
59          cumulativeProbability + ", L1 Norm of state vector difference = " +
60          l1NormOfStateDiff);
61  
62      // Convergence check: halt once the L1 norm of the difference between the
63      // state vectors fall below the threshold
64      if (getSuperstep() > 1 && l1NormOfStateDiff < CONVERGENCE_THRESHOLD) {
65        haltComputation();
66      }
67    }
68  
69    @Override
70    public void initialize() throws InstantiationException,
71        IllegalAccessException {
72      registerAggregator(RandomWalkComputation.NUM_DANGLING_VERTICES,
73          LongSumAggregator.class);
74      registerAggregator(RandomWalkComputation.CUMULATIVE_DANGLING_PROBABILITY,
75          DoubleSumAggregator.class);
76      registerAggregator(RandomWalkComputation.CUMULATIVE_PROBABILITY,
77          DoubleSumAggregator.class);
78      registerAggregator(RandomWalkComputation.L1_NORM_OF_PROBABILITY_DIFFERENCE,
79          DoubleSumAggregator.class);
80    }
81  }