1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.examples;
2021import org.apache.giraph.graph.BasicComputation;
22import org.apache.giraph.edge.Edge;
23import org.apache.giraph.graph.Vertex;
24import org.apache.hadoop.io.DoubleWritable;
25import org.apache.hadoop.io.LongWritable;
26import org.apache.hadoop.io.Writable;
2728import java.io.IOException;
2930/**31 * Base class for executing a random walk on a graph32 *33 * @param <E> edge type34 */35publicabstractclass RandomWalkComputation<E extends Writable>
36extends BasicComputation<LongWritable, DoubleWritable, E, DoubleWritable> {
37/** Configuration parameter for the number of supersteps to execute */38staticfinal String MAX_SUPERSTEPS = RandomWalkComputation.class.getName() +
39".maxSupersteps";
40/** Configuration parameter for the teleportation probability */41staticfinal String TELEPORTATION_PROBABILITY = RandomWalkComputation.class42 .getName() + ".teleportationProbability";
43/** Name of aggregator for the probability assigned to dangling vertices */44staticfinal String CUMULATIVE_DANGLING_PROBABILITY =
45 RandomWalkComputation.class.getName() + ".cumulativeDanglingProbability";
46/** Name of aggregator for the probability assigned to all vertices */47staticfinal String CUMULATIVE_PROBABILITY = RandomWalkComputation.class48 .getName() + ".cumulativeProbability";
49/** Name of aggregator for the number of dangling vertices */50staticfinal String NUM_DANGLING_VERTICES = RandomWalkComputation.class51 .getName() + ".numDanglingVertices";
52/** Name of aggregator for the L1 norm of the probability difference, used53 * for convergence detection */54staticfinal String L1_NORM_OF_PROBABILITY_DIFFERENCE =
55 RandomWalkComputation.class.getName() + ".l1NormOfProbabilityDifference";
56/** Reusable {@link DoubleWritable} instance to avoid object instantiation */57privatefinal DoubleWritable doubleWritable = new DoubleWritable();
58/** Reusable {@link LongWritable} for counting dangling vertices */59privatefinal LongWritable one = new LongWritable(1);
6061/**62 * Compute an initial probability value for the vertex. Per default,63 * we start with a uniform distribution.64 * @return The initial probability value.65 */66protecteddouble initialProbability() {
67return 1.0 / getTotalNumVertices();
68 }
6970/**71 * Compute the probability of transitioning to a neighbor vertex72 * @param vertex Vertex73 * @param stateProbability current steady state probability of the vertex74 * @param edge edge to neighbor75 * @return the probability of transitioning to a neighbor vertex76 */77protectedabstractdouble transitionProbability(
78 Vertex<LongWritable, DoubleWritable, E> vertex,
79double stateProbability,
80 Edge<LongWritable, E> edge);
8182/**83 * Perform a single step of a random walk computation.84 * @param vertex Vertex85 * @param messages Messages received in the previous step.86 * @param teleportationProbability Probability of teleporting to another87 * vertex.88 * @return The new probability value.89 */90protectedabstractdouble recompute(
91 Vertex<LongWritable, DoubleWritable, E> vertex,
92 Iterable<DoubleWritable> messages,
93double teleportationProbability);
9495/**96 * Returns the cumulative probability from dangling vertices.97 * @return The cumulative probability from dangling vertices.98 */99protecteddouble getDanglingProbability() {
100returnthis.<DoubleWritable>getAggregatedValue(
101 RandomWalkComputation.CUMULATIVE_DANGLING_PROBABILITY).get();
102 }
103104/**105 * Returns the cumulative probability from dangling vertices.106 * @return The cumulative probability from dangling vertices.107 */108protecteddouble getPreviousCumulativeProbability() {
109returnthis.<DoubleWritable>getAggregatedValue(
110 RandomWalkComputation.CUMULATIVE_PROBABILITY).get();
111 }
112113 @Override
114publicvoid compute(
115 Vertex<LongWritable, DoubleWritable, E> vertex,
116 Iterable<DoubleWritable> messages) throws IOException {
117double stateProbability;
118119if (getSuperstep() > 0) {
120121double previousStateProbability = vertex.getValue().get();
122 stateProbability =
123 recompute(vertex, messages, teleportationProbability());
124125// Important: rescale for numerical stability126 stateProbability /= getPreviousCumulativeProbability();
127128 doubleWritable.set(Math.abs(stateProbability - previousStateProbability));
129 aggregate(L1_NORM_OF_PROBABILITY_DIFFERENCE, doubleWritable);
130131 } else {
132 stateProbability = initialProbability();
133 }
134135 vertex.getValue().set(stateProbability);
136137 aggregate(CUMULATIVE_PROBABILITY, vertex.getValue());
138139// Compute dangling node contribution for next superstep140if (vertex.getNumEdges() == 0) {
141 aggregate(NUM_DANGLING_VERTICES, one);
142 aggregate(CUMULATIVE_DANGLING_PROBABILITY, vertex.getValue());
143 }
144145if (getSuperstep() < maxSupersteps()) {
146for (Edge<LongWritable, E> edge : vertex.getEdges()) {
147double transitionProbability =
148 transitionProbability(vertex, stateProbability, edge);
149 doubleWritable.set(transitionProbability);
150 sendMessage(edge.getTargetVertexId(), doubleWritable);
151 }
152 } else {
153 vertex.voteToHalt();
154 }
155 }
156157/**158 * Reads the number of supersteps to execute from the configuration159 * @return number of supersteps to execute160 */161privateint maxSupersteps() {
162return ((RandomWalkWorkerContext) getWorkerContext()).getMaxSupersteps();
163 }
164165/**166 * Reads the teleportation probability from the configuration167 * @return teleportation probability168 */169protecteddouble teleportationProbability() {
170return ((RandomWalkWorkerContext) getWorkerContext())
171 .getTeleportationProbability();
172 }
173 }