View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.master;
20  
21  import org.apache.giraph.bsp.ApplicationState;
22  import org.apache.giraph.bsp.BspService;
23  import org.apache.giraph.bsp.CentralizedServiceMaster;
24  import org.apache.giraph.bsp.SuperstepState;
25  import org.apache.giraph.counters.GiraphTimers;
26  import org.apache.giraph.graph.Computation;
27  import org.apache.giraph.metrics.GiraphMetrics;
28  import org.apache.hadoop.io.Writable;
29  import org.apache.hadoop.io.WritableComparable;
30  import org.apache.hadoop.mapreduce.Mapper.Context;
31  import org.apache.log4j.Logger;
32  
33  import java.util.Map;
34  import java.util.Map.Entry;
35  import java.util.TreeMap;
36  
37  import static org.apache.giraph.conf.GiraphConstants.USE_SUPERSTEP_COUNTERS;
38  
39  /**
40   * Master thread that will coordinate the activities of the tasks.  It runs
41   * on all task processes, however, will only execute its algorithm if it knows
42   * it is the "leader" from ZooKeeper.
43   *
44   * @param <I> Vertex id
45   * @param <V> Vertex value
46   * @param <E> Edge value
47   */
48  @SuppressWarnings("rawtypes")
49  public class MasterThread<I extends WritableComparable, V extends Writable,
50      E extends Writable> extends Thread {
51    /** Counter group name for the Giraph timers */
52    public static final String GIRAPH_TIMERS_COUNTER_GROUP_NAME = "Giraph Timers";
53    /** Class logger */
54    private static final Logger LOG = Logger.getLogger(MasterThread.class);
55    /** Reference to shared BspService */
56    private CentralizedServiceMaster<I, V, E> bspServiceMaster = null;
57    /** Context (for counters) */
58    private final Context context;
59    /** Use superstep counters? */
60    private final boolean superstepCounterOn;
61    /** Setup seconds */
62    private double setupSecs = 0d;
63    /** Superstep timer (in seconds) map */
64    private final Map<Long, Double> superstepSecsMap =
65        new TreeMap<Long, Double>();
66  
67    /**
68     * Constructor.
69     *
70     * @param bspServiceMaster Master that already exists and setup() has
71     *        been called.
72     * @param context Context from the Mapper.
73     */
74    public MasterThread(CentralizedServiceMaster<I, V, E> bspServiceMaster,
75        Context context) {
76      super(MasterThread.class.getName());
77      this.bspServiceMaster = bspServiceMaster;
78      this.context = context;
79      GiraphTimers.init(context);
80      superstepCounterOn = USE_SUPERSTEP_COUNTERS.get(context.getConfiguration());
81    }
82  
83    /**
84     * The master algorithm.  The algorithm should be able to withstand
85     * failures and resume as necessary since the master may switch during a
86     * job.
87     */
88    @Override
89    public void run() {
90      // Algorithm:
91      // 1. Become the master
92      // 2. If desired, restart from a manual checkpoint
93      // 3. Run all supersteps until complete
94      try {
95        long startMillis = System.currentTimeMillis();
96        long initializeMillis = 0;
97        long endMillis = 0;
98        bspServiceMaster.setup();
99        SuperstepState superstepState = SuperstepState.INITIAL;
100 
101       if (bspServiceMaster.becomeMaster()) {
102         // First call to checkWorkers waits for all pending resources.
103         // If these resources are still available at subsequent calls it just
104         // reads zookeeper for the list of healthy workers.
105         bspServiceMaster.checkWorkers();
106         initializeMillis = System.currentTimeMillis();
107         GiraphTimers.getInstance().getInitializeMs().increment(
108             initializeMillis - startMillis);
109         // Attempt to create InputSplits if necessary. Bail out if that fails.
110         if (bspServiceMaster.getRestartedSuperstep() !=
111             BspService.UNSET_SUPERSTEP ||
112             (bspServiceMaster.createMappingInputSplits() != -1 &&
113                 bspServiceMaster.createVertexInputSplits() != -1 &&
114                 bspServiceMaster.createEdgeInputSplits() != -1)) {
115           long setupMillis = System.currentTimeMillis() - initializeMillis;
116           GiraphTimers.getInstance().getSetupMs().increment(setupMillis);
117           setupSecs = setupMillis / 1000.0d;
118           while (!superstepState.isExecutionComplete()) {
119             long startSuperstepMillis = System.currentTimeMillis();
120             long cachedSuperstep = bspServiceMaster.getSuperstep();
121             GiraphMetrics.get().resetSuperstepMetrics(cachedSuperstep);
122             Class<? extends Computation> computationClass =
123                 bspServiceMaster.getMasterCompute().getComputation();
124             superstepState = bspServiceMaster.coordinateSuperstep();
125             long superstepMillis = System.currentTimeMillis() -
126                 startSuperstepMillis;
127             superstepSecsMap.put(cachedSuperstep,
128                 superstepMillis / 1000.0d);
129             if (LOG.isInfoEnabled()) {
130               LOG.info("masterThread: Coordination of superstep " +
131                   cachedSuperstep + " took " +
132                   superstepMillis / 1000.0d +
133                   " seconds ended with state " + superstepState +
134                   " and is now on superstep " +
135                   bspServiceMaster.getSuperstep());
136             }
137             if (superstepCounterOn) {
138               String computationName = (computationClass == null) ?
139                   null : computationClass.getSimpleName();
140               GiraphTimers.getInstance().getSuperstepMs(cachedSuperstep,
141                   computationName).increment(superstepMillis);
142             }
143 
144             bspServiceMaster.postSuperstep();
145 
146             // If a worker failed, restart from a known good superstep
147             if (superstepState == SuperstepState.WORKER_FAILURE) {
148               bspServiceMaster.restartFromCheckpoint(
149                   bspServiceMaster.getLastGoodCheckpoint());
150             }
151             endMillis = System.currentTimeMillis();
152           }
153           bspServiceMaster.setJobState(ApplicationState.FINISHED, -1, -1);
154         }
155       }
156       bspServiceMaster.cleanup(superstepState);
157       if (!superstepSecsMap.isEmpty()) {
158         GiraphTimers.getInstance().getShutdownMs().
159           increment(System.currentTimeMillis() - endMillis);
160         if (LOG.isInfoEnabled()) {
161           LOG.info("setup: Took " + setupSecs + " seconds.");
162         }
163         for (Entry<Long, Double> entry : superstepSecsMap.entrySet()) {
164           if (LOG.isInfoEnabled()) {
165             if (entry.getKey().longValue() ==
166                 BspService.INPUT_SUPERSTEP) {
167               LOG.info("input superstep: Took " +
168                   entry.getValue() + " seconds.");
169             } else {
170               LOG.info("superstep " + entry.getKey() + ": Took " +
171                   entry.getValue() + " seconds.");
172             }
173           }
174           context.progress();
175         }
176         if (LOG.isInfoEnabled()) {
177           LOG.info("shutdown: Took " +
178               (System.currentTimeMillis() - endMillis) /
179               1000.0d + " seconds.");
180           LOG.info("total: Took " +
181               ((System.currentTimeMillis() - initializeMillis) /
182               1000.0d) + " seconds.");
183         }
184         GiraphTimers.getInstance().getTotalMs().
185           increment(System.currentTimeMillis() - initializeMillis);
186       }
187       bspServiceMaster.postApplication();
188       // CHECKSTYLE: stop IllegalCatchCheck
189     } catch (Exception e) {
190       // CHECKSTYLE: resume IllegalCatchCheck
191       LOG.error("masterThread: Master algorithm failed with " +
192           e.getClass().getSimpleName(), e);
193       bspServiceMaster.failureCleanup(e);
194       throw new IllegalStateException(e);
195     }
196   }
197 }