This project has retired. For details please refer to its
Attic page.
MasterThread xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.master;
20
21 import org.apache.giraph.bsp.ApplicationState;
22 import org.apache.giraph.bsp.BspService;
23 import org.apache.giraph.bsp.CentralizedServiceMaster;
24 import org.apache.giraph.bsp.SuperstepState;
25 import org.apache.giraph.counters.GiraphTimers;
26 import org.apache.giraph.graph.Computation;
27 import org.apache.giraph.metrics.GiraphMetrics;
28 import org.apache.hadoop.io.Writable;
29 import org.apache.hadoop.io.WritableComparable;
30 import org.apache.hadoop.mapreduce.Mapper.Context;
31 import org.apache.log4j.Logger;
32
33 import java.util.Map;
34 import java.util.Map.Entry;
35 import java.util.TreeMap;
36
37 import static org.apache.giraph.conf.GiraphConstants.SPLIT_MASTER_WORKER;
38 import static org.apache.giraph.conf.GiraphConstants.USE_SUPERSTEP_COUNTERS;
39
40
41
42
43
44
45
46
47
48
49 @SuppressWarnings("rawtypes")
50 public class MasterThread<I extends WritableComparable, V extends Writable,
51 E extends Writable> extends Thread {
52
53 public static final String GIRAPH_TIMERS_COUNTER_GROUP_NAME = "Giraph Timers";
54
55 private static final Logger LOG = Logger.getLogger(MasterThread.class);
56
57 private CentralizedServiceMaster<I, V, E> bspServiceMaster = null;
58
59 private final Context context;
60
61 private final boolean superstepCounterOn;
62
63 private final boolean splitMasterWorker;
64
65 private double setupSecs = 0d;
66
67 private final Map<Long, Double> superstepSecsMap =
68 new TreeMap<Long, Double>();
69
70
71
72
73
74
75
76
77 public MasterThread(CentralizedServiceMaster<I, V, E> bspServiceMaster,
78 Context context) {
79 super(MasterThread.class.getName());
80 this.bspServiceMaster = bspServiceMaster;
81 this.context = context;
82 GiraphTimers.init(context);
83 superstepCounterOn = USE_SUPERSTEP_COUNTERS.get(context.getConfiguration());
84 splitMasterWorker = SPLIT_MASTER_WORKER.get(context.getConfiguration());
85 }
86
87
88
89
90
91
92 @Override
93 public void run() {
94
95
96
97
98 try {
99 long startMillis = System.currentTimeMillis();
100 long initializeMillis = 0;
101 long endMillis = 0;
102 bspServiceMaster.setup();
103 SuperstepState superstepState = SuperstepState.INITIAL;
104
105 if (bspServiceMaster.becomeMaster()) {
106
107
108
109 bspServiceMaster.checkWorkers();
110 initializeMillis = System.currentTimeMillis();
111 GiraphTimers.getInstance().getInitializeMs().increment(
112 initializeMillis - startMillis);
113
114 if (bspServiceMaster.getRestartedSuperstep() !=
115 BspService.UNSET_SUPERSTEP ||
116 (bspServiceMaster.createMappingInputSplits() != -1 &&
117 bspServiceMaster.createVertexInputSplits() != -1 &&
118 bspServiceMaster.createEdgeInputSplits() != -1)) {
119 long setupMillis = System.currentTimeMillis() - initializeMillis;
120 GiraphTimers.getInstance().getSetupMs().increment(setupMillis);
121 setupSecs = setupMillis / 1000.0d;
122 while (!superstepState.isExecutionComplete()) {
123 long startSuperstepMillis = System.currentTimeMillis();
124 long cachedSuperstep = bspServiceMaster.getSuperstep();
125
126 if (splitMasterWorker) {
127 GiraphMetrics.get().resetSuperstepMetrics(cachedSuperstep);
128 }
129 Class<? extends Computation> computationClass =
130 bspServiceMaster.getMasterCompute().getComputation();
131 superstepState = bspServiceMaster.coordinateSuperstep();
132 long superstepMillis = System.currentTimeMillis() -
133 startSuperstepMillis;
134 superstepSecsMap.put(cachedSuperstep,
135 superstepMillis / 1000.0d);
136 if (LOG.isInfoEnabled()) {
137 LOG.info("masterThread: Coordination of superstep " +
138 cachedSuperstep + " took " +
139 superstepMillis / 1000.0d +
140 " seconds ended with state " + superstepState +
141 " and is now on superstep " +
142 bspServiceMaster.getSuperstep());
143 }
144 if (superstepCounterOn) {
145 String computationName = (computationClass == null) ?
146 null : computationClass.getSimpleName();
147 GiraphTimers.getInstance().getSuperstepMs(cachedSuperstep,
148 computationName).increment(superstepMillis);
149 }
150 bspServiceMaster.addGiraphTimersAndSendCounters(cachedSuperstep);
151
152 bspServiceMaster.postSuperstep();
153
154
155 if (superstepState == SuperstepState.WORKER_FAILURE) {
156 bspServiceMaster.restartFromCheckpoint(
157 bspServiceMaster.getLastGoodCheckpoint());
158 }
159 endMillis = System.currentTimeMillis();
160 }
161 bspServiceMaster.setJobState(ApplicationState.FINISHED, -1, -1);
162 }
163 }
164 bspServiceMaster.cleanup(superstepState);
165 if (!superstepSecsMap.isEmpty()) {
166 GiraphTimers.getInstance().getShutdownMs().
167 increment(System.currentTimeMillis() - endMillis);
168 if (LOG.isInfoEnabled()) {
169 LOG.info("setup: Took " + setupSecs + " seconds.");
170 }
171 for (Entry<Long, Double> entry : superstepSecsMap.entrySet()) {
172 if (LOG.isInfoEnabled()) {
173 if (entry.getKey().longValue() ==
174 BspService.INPUT_SUPERSTEP) {
175 LOG.info("input superstep: Took " +
176 entry.getValue() + " seconds.");
177 } else {
178 LOG.info("superstep " + entry.getKey() + ": Took " +
179 entry.getValue() + " seconds.");
180 }
181 }
182 context.progress();
183 }
184 if (LOG.isInfoEnabled()) {
185 LOG.info("shutdown: Took " +
186 (System.currentTimeMillis() - endMillis) /
187 1000.0d + " seconds.");
188 LOG.info("total: Took " +
189 ((System.currentTimeMillis() - initializeMillis) /
190 1000.0d) + " seconds.");
191 }
192 GiraphTimers.getInstance().getTotalMs().
193 increment(System.currentTimeMillis() - initializeMillis);
194 }
195 bspServiceMaster.addGiraphTimersAndSendCounters(
196 bspServiceMaster.getSuperstep());
197 bspServiceMaster.postApplication();
198
199 } catch (Exception e) {
200
201 LOG.error("masterThread: Master algorithm failed with " +
202 e.getClass().getSimpleName(), e);
203 bspServiceMaster.failureCleanup(e);
204 throw new IllegalStateException(e);
205 }
206 }
207 }