This project has retired. For details please refer to its Attic page.
MasterThread xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.master;
20  
21  import org.apache.giraph.bsp.ApplicationState;
22  import org.apache.giraph.bsp.BspService;
23  import org.apache.giraph.bsp.CentralizedServiceMaster;
24  import org.apache.giraph.bsp.SuperstepState;
25  import org.apache.giraph.counters.GiraphTimers;
26  import org.apache.giraph.graph.Computation;
27  import org.apache.giraph.metrics.GiraphMetrics;
28  import org.apache.hadoop.io.Writable;
29  import org.apache.hadoop.io.WritableComparable;
30  import org.apache.hadoop.mapreduce.Mapper.Context;
31  import org.apache.log4j.Logger;
32  
33  import java.util.Map;
34  import java.util.Map.Entry;
35  import java.util.TreeMap;
36  
37  import static org.apache.giraph.conf.GiraphConstants.SPLIT_MASTER_WORKER;
38  import static org.apache.giraph.conf.GiraphConstants.USE_SUPERSTEP_COUNTERS;
39  
40  /**
41   * Master thread that will coordinate the activities of the tasks.  It runs
42   * on all task processes, however, will only execute its algorithm if it knows
43   * it is the "leader" from ZooKeeper.
44   *
45   * @param <I> Vertex id
46   * @param <V> Vertex value
47   * @param <E> Edge value
48   */
49  @SuppressWarnings("rawtypes")
50  public class MasterThread<I extends WritableComparable, V extends Writable,
51      E extends Writable> extends Thread {
52    /** Counter group name for the Giraph timers */
53    public static final String GIRAPH_TIMERS_COUNTER_GROUP_NAME = "Giraph Timers";
54    /** Class logger */
55    private static final Logger LOG = Logger.getLogger(MasterThread.class);
56    /** Reference to shared BspService */
57    private CentralizedServiceMaster<I, V, E> bspServiceMaster = null;
58    /** Context (for counters) */
59    private final Context context;
60    /** Use superstep counters? */
61    private final boolean superstepCounterOn;
62    /** Are master and worker split or not? */
63    private final boolean splitMasterWorker;
64    /** Setup seconds */
65    private double setupSecs = 0d;
66    /** Superstep timer (in seconds) map */
67    private final Map<Long, Double> superstepSecsMap =
68        new TreeMap<Long, Double>();
69  
70    /**
71     * Constructor.
72     *
73     * @param bspServiceMaster Master that already exists and setup() has
74     *        been called.
75     * @param context Context from the Mapper.
76     */
77    public MasterThread(CentralizedServiceMaster<I, V, E> bspServiceMaster,
78        Context context) {
79      super(MasterThread.class.getName());
80      this.bspServiceMaster = bspServiceMaster;
81      this.context = context;
82      GiraphTimers.init(context);
83      superstepCounterOn = USE_SUPERSTEP_COUNTERS.get(context.getConfiguration());
84      splitMasterWorker = SPLIT_MASTER_WORKER.get(context.getConfiguration());
85    }
86  
87    /**
88     * The master algorithm.  The algorithm should be able to withstand
89     * failures and resume as necessary since the master may switch during a
90     * job.
91     */
92    @Override
93    public void run() {
94      // Algorithm:
95      // 1. Become the master
96      // 2. If desired, restart from a manual checkpoint
97      // 3. Run all supersteps until complete
98      try {
99        long startMillis = System.currentTimeMillis();
100       long initializeMillis = 0;
101       long endMillis = 0;
102       bspServiceMaster.setup();
103       SuperstepState superstepState = SuperstepState.INITIAL;
104 
105       if (bspServiceMaster.becomeMaster()) {
106         // First call to checkWorkers waits for all pending resources.
107         // If these resources are still available at subsequent calls it just
108         // reads zookeeper for the list of healthy workers.
109         bspServiceMaster.checkWorkers();
110         initializeMillis = System.currentTimeMillis();
111         GiraphTimers.getInstance().getInitializeMs().increment(
112             initializeMillis - startMillis);
113         // Attempt to create InputSplits if necessary. Bail out if that fails.
114         if (bspServiceMaster.getRestartedSuperstep() !=
115             BspService.UNSET_SUPERSTEP ||
116             (bspServiceMaster.createMappingInputSplits() != -1 &&
117                 bspServiceMaster.createVertexInputSplits() != -1 &&
118                 bspServiceMaster.createEdgeInputSplits() != -1)) {
119           long setupMillis = System.currentTimeMillis() - initializeMillis;
120           GiraphTimers.getInstance().getSetupMs().increment(setupMillis);
121           setupSecs = setupMillis / 1000.0d;
122           while (!superstepState.isExecutionComplete()) {
123             long startSuperstepMillis = System.currentTimeMillis();
124             long cachedSuperstep = bspServiceMaster.getSuperstep();
125             // If master and worker are running together, worker will call reset
126             if (splitMasterWorker) {
127               GiraphMetrics.get().resetSuperstepMetrics(cachedSuperstep);
128             }
129             Class<? extends Computation> computationClass =
130                 bspServiceMaster.getMasterCompute().getComputation();
131             superstepState = bspServiceMaster.coordinateSuperstep();
132             long superstepMillis = System.currentTimeMillis() -
133                 startSuperstepMillis;
134             superstepSecsMap.put(cachedSuperstep,
135                 superstepMillis / 1000.0d);
136             if (LOG.isInfoEnabled()) {
137               LOG.info("masterThread: Coordination of superstep " +
138                   cachedSuperstep + " took " +
139                   superstepMillis / 1000.0d +
140                   " seconds ended with state " + superstepState +
141                   " and is now on superstep " +
142                   bspServiceMaster.getSuperstep());
143             }
144             if (superstepCounterOn) {
145               String computationName = (computationClass == null) ?
146                   null : computationClass.getSimpleName();
147               GiraphTimers.getInstance().getSuperstepMs(cachedSuperstep,
148                   computationName).increment(superstepMillis);
149             }
150             bspServiceMaster.addGiraphTimersAndSendCounters(cachedSuperstep);
151 
152             bspServiceMaster.postSuperstep();
153 
154             // If a worker failed, restart from a known good superstep
155             if (superstepState == SuperstepState.WORKER_FAILURE) {
156               bspServiceMaster.restartFromCheckpoint(
157                   bspServiceMaster.getLastGoodCheckpoint());
158             }
159             endMillis = System.currentTimeMillis();
160           }
161           bspServiceMaster.setJobState(ApplicationState.FINISHED, -1, -1);
162         }
163       }
164       bspServiceMaster.cleanup(superstepState);
165       if (!superstepSecsMap.isEmpty()) {
166         GiraphTimers.getInstance().getShutdownMs().
167           increment(System.currentTimeMillis() - endMillis);
168         if (LOG.isInfoEnabled()) {
169           LOG.info("setup: Took " + setupSecs + " seconds.");
170         }
171         for (Entry<Long, Double> entry : superstepSecsMap.entrySet()) {
172           if (LOG.isInfoEnabled()) {
173             if (entry.getKey().longValue() ==
174                 BspService.INPUT_SUPERSTEP) {
175               LOG.info("input superstep: Took " +
176                   entry.getValue() + " seconds.");
177             } else {
178               LOG.info("superstep " + entry.getKey() + ": Took " +
179                   entry.getValue() + " seconds.");
180             }
181           }
182           context.progress();
183         }
184         if (LOG.isInfoEnabled()) {
185           LOG.info("shutdown: Took " +
186               (System.currentTimeMillis() - endMillis) /
187               1000.0d + " seconds.");
188           LOG.info("total: Took " +
189               ((System.currentTimeMillis() - initializeMillis) /
190               1000.0d) + " seconds.");
191         }
192         GiraphTimers.getInstance().getTotalMs().
193           increment(System.currentTimeMillis() - initializeMillis);
194       }
195       bspServiceMaster.addGiraphTimersAndSendCounters(
196               bspServiceMaster.getSuperstep());
197       bspServiceMaster.postApplication();
198       // CHECKSTYLE: stop IllegalCatchCheck
199     } catch (Exception e) {
200       // CHECKSTYLE: resume IllegalCatchCheck
201       LOG.error("masterThread: Master algorithm failed with " +
202           e.getClass().getSimpleName(), e);
203       bspServiceMaster.failureCleanup(e);
204       throw new IllegalStateException(e);
205     }
206   }
207 }