View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.master;
20  
21  import org.apache.giraph.bsp.ApplicationState;
22  import org.apache.giraph.bsp.BspService;
23  import org.apache.giraph.bsp.CentralizedServiceMaster;
24  import org.apache.giraph.bsp.SuperstepState;
25  import org.apache.giraph.counters.GiraphTimers;
26  import org.apache.giraph.graph.Computation;
27  import org.apache.giraph.metrics.GiraphMetrics;
28  import org.apache.hadoop.io.Writable;
29  import org.apache.hadoop.io.WritableComparable;
30  import org.apache.hadoop.mapreduce.Mapper.Context;
31  import org.apache.log4j.Logger;
32  
33  import java.util.Map;
34  import java.util.Map.Entry;
35  import java.util.TreeMap;
36  
37  import static org.apache.giraph.conf.GiraphConstants.SPLIT_MASTER_WORKER;
38  import static org.apache.giraph.conf.GiraphConstants.USE_SUPERSTEP_COUNTERS;
39  
40  /**
41   * Master thread that will coordinate the activities of the tasks.  It runs
42   * on all task processes, however, will only execute its algorithm if it knows
43   * it is the "leader" from ZooKeeper.
44   *
45   * @param <I> Vertex id
46   * @param <V> Vertex value
47   * @param <E> Edge value
48   */
49  @SuppressWarnings("rawtypes")
50  public class MasterThread<I extends WritableComparable, V extends Writable,
51      E extends Writable> extends Thread {
52    /** Counter group name for the Giraph timers */
53    public static final String GIRAPH_TIMERS_COUNTER_GROUP_NAME = "Giraph Timers";
54    /** Class logger */
55    private static final Logger LOG = Logger.getLogger(MasterThread.class);
56    /** Reference to shared BspService */
57    private CentralizedServiceMaster<I, V, E> bspServiceMaster = null;
58    /** Context (for counters) */
59    private final Context context;
60    /** Use superstep counters? */
61    private final boolean superstepCounterOn;
62    /** Are master and worker split or not? */
63    private final boolean splitMasterWorker;
64    /** Setup seconds */
65    private double setupSecs = 0d;
66    /** Superstep timer (in seconds) map */
67    private final Map<Long, Double> superstepSecsMap =
68        new TreeMap<Long, Double>();
69  
70    /**
71     * Constructor.
72     *
73     * @param bspServiceMaster Master that already exists and setup() has
74     *        been called.
75     * @param context Context from the Mapper.
76     */
77    public MasterThread(CentralizedServiceMaster<I, V, E> bspServiceMaster,
78        Context context) {
79      super(MasterThread.class.getName());
80      this.bspServiceMaster = bspServiceMaster;
81      this.context = context;
82      GiraphTimers.init(context);
83      superstepCounterOn = USE_SUPERSTEP_COUNTERS.get(context.getConfiguration());
84      splitMasterWorker = SPLIT_MASTER_WORKER.get(context.getConfiguration());
85    }
86  
87    /**
88     * The master algorithm.  The algorithm should be able to withstand
89     * failures and resume as necessary since the master may switch during a
90     * job.
91     */
92    @Override
93    public void run() {
94      // Algorithm:
95      // 1. Become the master
96      // 2. If desired, restart from a manual checkpoint
97      // 3. Run all supersteps until complete
98      try {
99        long startMillis = System.currentTimeMillis();
100       long initializeMillis = 0;
101       long endMillis = 0;
102       bspServiceMaster.setup();
103       SuperstepState superstepState = SuperstepState.INITIAL;
104 
105       if (bspServiceMaster.becomeMaster()) {
106         // First call to checkWorkers waits for all pending resources.
107         // If these resources are still available at subsequent calls it just
108         // reads zookeeper for the list of healthy workers.
109         bspServiceMaster.checkWorkers();
110         initializeMillis = System.currentTimeMillis();
111         GiraphTimers.getInstance().getInitializeMs().increment(
112             initializeMillis - startMillis);
113         // Attempt to create InputSplits if necessary. Bail out if that fails.
114         if (bspServiceMaster.getRestartedSuperstep() !=
115             BspService.UNSET_SUPERSTEP ||
116             (bspServiceMaster.createMappingInputSplits() != -1 &&
117                 bspServiceMaster.createVertexInputSplits() != -1 &&
118                 bspServiceMaster.createEdgeInputSplits() != -1)) {
119           long setupMillis = System.currentTimeMillis() - initializeMillis;
120           GiraphTimers.getInstance().getSetupMs().increment(setupMillis);
121           setupSecs = setupMillis / 1000.0d;
122           while (!superstepState.isExecutionComplete()) {
123             long startSuperstepMillis = System.currentTimeMillis();
124             long cachedSuperstep = bspServiceMaster.getSuperstep();
125             // If master and worker are running together, worker will call reset
126             if (splitMasterWorker) {
127               GiraphMetrics.get().resetSuperstepMetrics(cachedSuperstep);
128             }
129             Class<? extends Computation> computationClass =
130                 bspServiceMaster.getMasterCompute().getComputation();
131             superstepState = bspServiceMaster.coordinateSuperstep();
132             long superstepMillis = System.currentTimeMillis() -
133                 startSuperstepMillis;
134             superstepSecsMap.put(cachedSuperstep,
135                 superstepMillis / 1000.0d);
136             if (LOG.isInfoEnabled()) {
137               LOG.info("masterThread: Coordination of superstep " +
138                   cachedSuperstep + " took " +
139                   superstepMillis / 1000.0d +
140                   " seconds ended with state " + superstepState +
141                   " and is now on superstep " +
142                   bspServiceMaster.getSuperstep());
143             }
144             if (superstepCounterOn) {
145               String computationName = (computationClass == null) ?
146                   null : computationClass.getSimpleName();
147               GiraphTimers.getInstance().getSuperstepMs(cachedSuperstep,
148                   computationName).increment(superstepMillis);
149             }
150 
151             bspServiceMaster.postSuperstep();
152 
153             // If a worker failed, restart from a known good superstep
154             if (superstepState == SuperstepState.WORKER_FAILURE) {
155               bspServiceMaster.restartFromCheckpoint(
156                   bspServiceMaster.getLastGoodCheckpoint());
157             }
158             endMillis = System.currentTimeMillis();
159           }
160           bspServiceMaster.setJobState(ApplicationState.FINISHED, -1, -1);
161         }
162       }
163       bspServiceMaster.cleanup(superstepState);
164       if (!superstepSecsMap.isEmpty()) {
165         GiraphTimers.getInstance().getShutdownMs().
166           increment(System.currentTimeMillis() - endMillis);
167         if (LOG.isInfoEnabled()) {
168           LOG.info("setup: Took " + setupSecs + " seconds.");
169         }
170         for (Entry<Long, Double> entry : superstepSecsMap.entrySet()) {
171           if (LOG.isInfoEnabled()) {
172             if (entry.getKey().longValue() ==
173                 BspService.INPUT_SUPERSTEP) {
174               LOG.info("input superstep: Took " +
175                   entry.getValue() + " seconds.");
176             } else {
177               LOG.info("superstep " + entry.getKey() + ": Took " +
178                   entry.getValue() + " seconds.");
179             }
180           }
181           context.progress();
182         }
183         if (LOG.isInfoEnabled()) {
184           LOG.info("shutdown: Took " +
185               (System.currentTimeMillis() - endMillis) /
186               1000.0d + " seconds.");
187           LOG.info("total: Took " +
188               ((System.currentTimeMillis() - initializeMillis) /
189               1000.0d) + " seconds.");
190         }
191         GiraphTimers.getInstance().getTotalMs().
192           increment(System.currentTimeMillis() - initializeMillis);
193       }
194       bspServiceMaster.postApplication();
195       // CHECKSTYLE: stop IllegalCatchCheck
196     } catch (Exception e) {
197       // CHECKSTYLE: resume IllegalCatchCheck
198       LOG.error("masterThread: Master algorithm failed with " +
199           e.getClass().getSimpleName(), e);
200       bspServiceMaster.failureCleanup(e);
201       throw new IllegalStateException(e);
202     }
203   }
204 }