This project has retired. For details please refer to its Attic page.
SccPhaseMasterCompute xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.examples.scc;
19  
20  import org.apache.giraph.aggregators.BooleanOrAggregator;
21  import org.apache.giraph.aggregators.IntOverwriteAggregator;
22  import org.apache.giraph.master.DefaultMasterCompute;
23  import org.apache.hadoop.io.BooleanWritable;
24  import org.apache.hadoop.io.IntWritable;
25  
26  /**
27   * This master compute keeps track of what phase is being currently executed by
28   * the Strongly Connected Components computation. The phases comprehend the
29   * following: 1 - Transpose (comprehends 2 supersteps, one to propagate parent
30   * vertices ids and another one to store them by their respective children) 2 -
31   * Trimming (this phase can happen multiple times) 3 - Forward Traversal 4 -
32   * Backward Traversal
33   */
34  public class SccPhaseMasterCompute extends DefaultMasterCompute {
35  
36    /**
37     * Aggregator that stores the current phase
38     */
39    public static final String PHASE = "scccompute.phase";
40  
41    /**
42     * Flags whether a new maximum was found in the Forward Traversal phase
43     */
44    public static final String NEW_MAXIMUM = "scccompute.max";
45  
46    /**
47     * Flags whether a vertex converged in the Backward Traversal phase
48     */
49    public static final String CONVERGED = "scccompute.converged";
50  
51    /**
52     * Enumerates the possible phases of the algorithm.
53     */
54    public enum Phases {
55      /** Tranpose and Trimming phases **/
56      TRANSPOSE, TRIMMING,
57      /** Maximum id propagation **/
58      FORWARD_TRAVERSAL,
59      /** Vertex convergence in SCC **/
60      BACKWARD_TRAVERSAL_START, BACKWARD_TRAVERSAL_REST
61    };
62  
63    @Override
64    public void initialize() throws InstantiationException,
65        IllegalAccessException {
66      registerPersistentAggregator(PHASE, IntOverwriteAggregator.class);
67      registerAggregator(NEW_MAXIMUM, BooleanOrAggregator.class);
68      registerAggregator(CONVERGED, BooleanOrAggregator.class);
69    }
70  
71    @Override
72    public void compute() {
73      if (getSuperstep() == 0) {
74        setPhase(Phases.TRANSPOSE);
75      } else {
76        Phases currPhase = getPhase();
77        switch (currPhase) {
78        case TRANSPOSE:
79          setPhase(Phases.TRIMMING);
80          break;
81        case TRIMMING :
82          setPhase(Phases.FORWARD_TRAVERSAL);
83          break;
84        case FORWARD_TRAVERSAL :
85          BooleanWritable newMaxFound = getAggregatedValue(NEW_MAXIMUM);
86          // If no new maximum value was found it means the propagation
87          // converged, so we can move to the next phase
88          if (!newMaxFound.get()) {
89            setPhase(Phases.BACKWARD_TRAVERSAL_START);
90          }
91          break;
92        case BACKWARD_TRAVERSAL_START :
93          setPhase(Phases.BACKWARD_TRAVERSAL_REST);
94          break;
95        case BACKWARD_TRAVERSAL_REST :
96          BooleanWritable converged = getAggregatedValue(CONVERGED);
97          if (!converged.get()) {
98            setPhase(Phases.TRANSPOSE);
99          }
100         break;
101       default :
102         break;
103       }
104     }
105   }
106 
107   /**
108    * Sets the next phase of the algorithm.
109    * @param phase
110    *          Next phase.
111    */
112   private void setPhase(Phases phase) {
113     setAggregatedValue(PHASE, new IntWritable(phase.ordinal()));
114   }
115 
116   /**
117    * Get current phase.
118    * @return Current phase as enumerator.
119    */
120   private Phases getPhase() {
121     IntWritable phaseInt = getAggregatedValue(PHASE);
122     return getPhase(phaseInt);
123   }
124 
125   /**
126    * Helper function to convert from internal aggregated value to a Phases
127    * enumerator.
128    * @param phaseInt
129    *          An integer that matches a position in the Phases enumerator.
130    * @return A Phases' item for the given position.
131    */
132   public static Phases getPhase(IntWritable phaseInt) {
133     return Phases.values()[phaseInt.get()];
134   }
135 
136 }