This project has retired. For details please refer to its Attic page.
RandomWalkComputation xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.examples;
20  
21  import org.apache.giraph.graph.BasicComputation;
22  import org.apache.giraph.edge.Edge;
23  import org.apache.giraph.graph.Vertex;
24  import org.apache.hadoop.io.DoubleWritable;
25  import org.apache.hadoop.io.LongWritable;
26  import org.apache.hadoop.io.Writable;
27  
28  import java.io.IOException;
29  
30  /**
31   * Base class for executing a random walk on a graph
32   *
33   * @param <E> edge type
34   */
35  public abstract class RandomWalkComputation<E extends Writable>
36      extends BasicComputation<LongWritable, DoubleWritable, E, DoubleWritable> {
37    /** Configuration parameter for the number of supersteps to execute */
38    static final String MAX_SUPERSTEPS = RandomWalkComputation.class.getName() +
39        ".maxSupersteps";
40    /** Configuration parameter for the teleportation probability */
41    static final String TELEPORTATION_PROBABILITY = RandomWalkComputation.class
42        .getName() + ".teleportationProbability";
43    /** Name of aggregator for the probability assigned to dangling vertices */
44    static final String CUMULATIVE_DANGLING_PROBABILITY =
45        RandomWalkComputation.class.getName() + ".cumulativeDanglingProbability";
46    /** Name of aggregator for the probability assigned to all vertices */
47    static final String CUMULATIVE_PROBABILITY = RandomWalkComputation.class
48        .getName() + ".cumulativeProbability";
49      /** Name of aggregator for the number of dangling vertices */
50    static final String NUM_DANGLING_VERTICES = RandomWalkComputation.class
51        .getName() + ".numDanglingVertices";
52    /** Name of aggregator for the L1 norm of the probability difference, used
53     * for convergence detection */
54    static final String L1_NORM_OF_PROBABILITY_DIFFERENCE =
55        RandomWalkComputation.class.getName() + ".l1NormOfProbabilityDifference";
56    /** Reusable {@link DoubleWritable} instance to avoid object instantiation */
57    private final DoubleWritable doubleWritable = new DoubleWritable();
58    /** Reusable {@link LongWritable} for counting dangling vertices */
59    private final LongWritable one = new LongWritable(1);
60  
61    /**
62     * Compute an initial probability value for the vertex. Per default,
63     * we start with a uniform distribution.
64     * @return The initial probability value.
65     */
66    protected double initialProbability() {
67      return 1.0 / getTotalNumVertices();
68    }
69  
70    /**
71     * Compute the probability of transitioning to a neighbor vertex
72     * @param vertex Vertex
73     * @param stateProbability current steady state probability of the vertex
74     * @param edge edge to neighbor
75     * @return the probability of transitioning to a neighbor vertex
76     */
77    protected abstract double transitionProbability(
78        Vertex<LongWritable, DoubleWritable, E> vertex,
79        double stateProbability,
80        Edge<LongWritable, E> edge);
81  
82    /**
83     * Perform a single step of a random walk computation.
84     * @param vertex Vertex
85     * @param messages Messages received in the previous step.
86     * @param teleportationProbability Probability of teleporting to another
87     *          vertex.
88     * @return The new probability value.
89     */
90    protected abstract double recompute(
91        Vertex<LongWritable, DoubleWritable, E> vertex,
92        Iterable<DoubleWritable> messages,
93        double teleportationProbability);
94  
95    /**
96     * Returns the cumulative probability from dangling vertices.
97     * @return The cumulative probability from dangling vertices.
98     */
99    protected double getDanglingProbability() {
100     return this.<DoubleWritable>getAggregatedValue(
101         RandomWalkComputation.CUMULATIVE_DANGLING_PROBABILITY).get();
102   }
103 
104   /**
105    * Returns the cumulative probability from dangling vertices.
106    * @return The cumulative probability from dangling vertices.
107    */
108   protected double getPreviousCumulativeProbability() {
109     return this.<DoubleWritable>getAggregatedValue(
110         RandomWalkComputation.CUMULATIVE_PROBABILITY).get();
111   }
112 
113   @Override
114   public void compute(
115       Vertex<LongWritable, DoubleWritable, E> vertex,
116       Iterable<DoubleWritable> messages) throws IOException {
117     double stateProbability;
118 
119     if (getSuperstep() > 0) {
120 
121       double previousStateProbability = vertex.getValue().get();
122       stateProbability =
123           recompute(vertex, messages, teleportationProbability());
124 
125       // Important: rescale for numerical stability
126       stateProbability /= getPreviousCumulativeProbability();
127 
128       doubleWritable.set(Math.abs(stateProbability - previousStateProbability));
129       aggregate(L1_NORM_OF_PROBABILITY_DIFFERENCE, doubleWritable);
130 
131     } else {
132       stateProbability = initialProbability();
133     }
134 
135     vertex.getValue().set(stateProbability);
136 
137     aggregate(CUMULATIVE_PROBABILITY, vertex.getValue());
138 
139     // Compute dangling node contribution for next superstep
140     if (vertex.getNumEdges() == 0) {
141       aggregate(NUM_DANGLING_VERTICES, one);
142       aggregate(CUMULATIVE_DANGLING_PROBABILITY, vertex.getValue());
143     }
144 
145     if (getSuperstep() < maxSupersteps()) {
146       for (Edge<LongWritable, E> edge : vertex.getEdges()) {
147         double transitionProbability =
148             transitionProbability(vertex, stateProbability, edge);
149         doubleWritable.set(transitionProbability);
150         sendMessage(edge.getTargetVertexId(), doubleWritable);
151       }
152     } else {
153       vertex.voteToHalt();
154     }
155   }
156 
157   /**
158    * Reads the number of supersteps to execute from the configuration
159    * @return number of supersteps to execute
160    */
161   private int maxSupersteps() {
162     return ((RandomWalkWorkerContext) getWorkerContext()).getMaxSupersteps();
163   }
164 
165   /**
166    * Reads the teleportation probability from the configuration
167    * @return teleportation probability
168    */
169   protected double teleportationProbability() {
170     return ((RandomWalkWorkerContext) getWorkerContext())
171         .getTeleportationProbability();
172   }
173 }