1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.examples;
2021import org.apache.giraph.aggregators.DoubleSumAggregator;
22import org.apache.giraph.aggregators.LongSumAggregator;
23import org.apache.giraph.master.DefaultMasterCompute;
24import org.apache.hadoop.io.DoubleWritable;
25import org.apache.hadoop.io.LongWritable;
26import org.apache.log4j.Logger;
2728/**29 * Master compute associated with {@link RandomWalkComputation}. It handles30 * dangling nodes.31 */32publicclassRandomWalkVertexMasterComputeextendsDefaultMasterCompute {
3334/** threshold for the L1 norm of the state vector difference */35staticfinaldouble CONVERGENCE_THRESHOLD = 0.00001;
3637/** logger */38privatestaticfinal Logger LOG =
39 Logger.getLogger(RandomWalkVertexMasterCompute.class);
4041 @Override
42publicvoid compute() {
43double danglingContribution =
44this.<DoubleWritable>getAggregatedValue(
45 RandomWalkComputation.CUMULATIVE_DANGLING_PROBABILITY).get();
46double cumulativeProbability =
47this.<DoubleWritable>getAggregatedValue(
48 RandomWalkComputation.CUMULATIVE_PROBABILITY).get();
49double l1NormOfStateDiff =
50this.<DoubleWritable>getAggregatedValue(
51 RandomWalkComputation.L1_NORM_OF_PROBABILITY_DIFFERENCE).get();
52long numDanglingVertices =
53this.<LongWritable>getAggregatedValue(
54 RandomWalkComputation.NUM_DANGLING_VERTICES).get();
5556 LOG.info("[Superstep " + getSuperstep() + "] Dangling contribution = " +
57 danglingContribution + ", number of dangling vertices = " +
58 numDanglingVertices + ", cumulative probability = " +
59 cumulativeProbability + ", L1 Norm of state vector difference = " +
60 l1NormOfStateDiff);
6162// Convergence check: halt once the L1 norm of the difference between the63// state vectors fall below the threshold64if (getSuperstep() > 1 && l1NormOfStateDiff < CONVERGENCE_THRESHOLD) {
65 haltComputation();
66 }
67 }
6869 @Override
70publicvoid initialize() throws InstantiationException,
71 IllegalAccessException {
72 registerAggregator(RandomWalkComputation.NUM_DANGLING_VERTICES,
73 LongSumAggregator.class);
74 registerAggregator(RandomWalkComputation.CUMULATIVE_DANGLING_PROBABILITY,
75 DoubleSumAggregator.class);
76 registerAggregator(RandomWalkComputation.CUMULATIVE_PROBABILITY,
77 DoubleSumAggregator.class);
78 registerAggregator(RandomWalkComputation.L1_NORM_OF_PROBABILITY_DIFFERENCE,
79 DoubleSumAggregator.class);
80 }
81 }