1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.examples.scc;
1920import org.apache.giraph.aggregators.BooleanOrAggregator;
21import org.apache.giraph.aggregators.IntOverwriteAggregator;
22import org.apache.giraph.master.DefaultMasterCompute;
23import org.apache.hadoop.io.BooleanWritable;
24import org.apache.hadoop.io.IntWritable;
2526/**27 * This master compute keeps track of what phase is being currently executed by28 * the Strongly Connected Components computation. The phases comprehend the29 * following: 1 - Transpose (comprehends 2 supersteps, one to propagate parent30 * vertices ids and another one to store them by their respective children) 2 -31 * Trimming (this phase can happen multiple times) 3 - Forward Traversal 4 -32 * Backward Traversal33 */34publicclassSccPhaseMasterComputeextendsDefaultMasterCompute {
3536/**37 * Aggregator that stores the current phase38 */39publicstaticfinal String PHASE = "scccompute.phase";
4041/**42 * Flags whether a new maximum was found in the Forward Traversal phase43 */44publicstaticfinal String NEW_MAXIMUM = "scccompute.max";
4546/**47 * Flags whether a vertex converged in the Backward Traversal phase48 */49publicstaticfinal String CONVERGED = "scccompute.converged";
5051/**52 * Enumerates the possible phases of the algorithm.53 */54public enum Phases {
55/** Tranpose and Trimming phases **/56 TRANSPOSE, TRIMMING,
57/** Maximum id propagation **/58 FORWARD_TRAVERSAL,
59/** Vertex convergence in SCC **/60 BACKWARD_TRAVERSAL_START, BACKWARD_TRAVERSAL_REST
61 };
6263 @Override
64publicvoid initialize() throws InstantiationException,
65 IllegalAccessException {
66 registerPersistentAggregator(PHASE, IntOverwriteAggregator.class);
67 registerAggregator(NEW_MAXIMUM, BooleanOrAggregator.class);
68 registerAggregator(CONVERGED, BooleanOrAggregator.class);
69 }
7071 @Override
72publicvoid compute() {
73if (getSuperstep() == 0) {
74 setPhase(Phases.TRANSPOSE);
75 } else {
76Phases currPhase = getPhase();
77switch (currPhase) {
78case TRANSPOSE:
79 setPhase(Phases.TRIMMING);
80break;
81case TRIMMING :
82 setPhase(Phases.FORWARD_TRAVERSAL);
83break;
84case FORWARD_TRAVERSAL :
85 BooleanWritable newMaxFound = getAggregatedValue(NEW_MAXIMUM);
86// If no new maximum value was found it means the propagation87// converged, so we can move to the next phase88if (!newMaxFound.get()) {
89 setPhase(Phases.BACKWARD_TRAVERSAL_START);
90 }
91break;
92case BACKWARD_TRAVERSAL_START :
93 setPhase(Phases.BACKWARD_TRAVERSAL_REST);
94break;
95case BACKWARD_TRAVERSAL_REST :
96 BooleanWritable converged = getAggregatedValue(CONVERGED);
97if (!converged.get()) {
98 setPhase(Phases.TRANSPOSE);
99 }
100break;
101default :
102break;
103 }
104 }
105 }
106107/**108 * Sets the next phase of the algorithm.109 * @param phase110 * Next phase.111 */112privatevoid setPhase(Phases phase) {
113 setAggregatedValue(PHASE, new IntWritable(phase.ordinal()));
114 }
115116/**117 * Get current phase.118 * @return Current phase as enumerator.119 */120privatePhases getPhase() {
121 IntWritable phaseInt = getAggregatedValue(PHASE);
122return getPhase(phaseInt);
123 }
124125/**126 * Helper function to convert from internal aggregated value to a Phases127 * enumerator.128 * @param phaseInt129 * An integer that matches a position in the Phases enumerator.130 * @return A Phases' item for the given position.131 */132publicstaticPhases getPhase(IntWritable phaseInt) {
133return Phases.values()[phaseInt.get()];
134 }
135136 }