1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.examples;
2021import com.google.common.base.Preconditions;
22import org.apache.giraph.edge.Edge;
23import org.apache.giraph.graph.Vertex;
24import org.apache.giraph.utils.MathUtils;
25import org.apache.hadoop.io.DoubleWritable;
26import org.apache.hadoop.io.LongWritable;
2728/**29 * Executes "RandomWalkWithRestart", a random walk on the graph which is biased30 * towards a source vertex. The resulting probabilities of staying at a given31 * vertex can be interpreted as a measure of proximity to the source vertex.32 */33publicclassRandomWalkWithRestartComputation34extends RandomWalkComputation<DoubleWritable> {
3536/** Configuration parameter for the source vertex */37staticfinal String SOURCE_VERTEX = RandomWalkWithRestartComputation.class38 .getName() + ".sourceVertex";
3940/**41 * Checks whether the currently executed vertex is the source vertex42 * @param vertex Vertex43 * @return is the currently executed vertex the source vertex?44 */45privateboolean isSourceVertex(Vertex<LongWritable, ?, ?> vertex) {
46return ((RandomWalkWorkerContext) getWorkerContext()).isSource(
47 vertex.getId().get());
48 }
4950/**51 * Returns the number of source vertices.52 * @return The number of source vertices.53 */54privateint numSourceVertices() {
55return ((RandomWalkWorkerContext) getWorkerContext()).numSources();
56 }
5758 @Override
59protecteddouble transitionProbability(
60 Vertex<LongWritable, DoubleWritable, DoubleWritable>
61 vertex,
62double stateProbability, Edge<LongWritable, DoubleWritable> edge) {
63return stateProbability * edge.getValue().get();
64 }
6566 @Override
67protecteddouble recompute(
68 Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex,
69 Iterable<DoubleWritable> transitionProbabilities,
70double teleportationProbability) {
71int numSourceVertices = numSourceVertices();
72 Preconditions.checkState(numSourceVertices > 0, "No source vertex found");
7374double stateProbability = MathUtils.sum(transitionProbabilities);
75// Add the contribution of dangling nodes (weakly preferential76// implementation: dangling nodes redistribute uniformly)77 stateProbability += getDanglingProbability() / getTotalNumVertices();
78// The random walk might teleport back to one of the source vertexes79 stateProbability *= 1 - teleportationProbability;
80if (isSourceVertex(vertex)) {
81 stateProbability += teleportationProbability / numSourceVertices;
82 }
83return stateProbability;
84 }
85 }