This project has retired. For details please refer to its
Attic page.
DefaultJobProgressTrackerService xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.job;
20
21 import org.apache.giraph.conf.GiraphConfiguration;
22 import org.apache.giraph.conf.GiraphConstants;
23 import org.apache.giraph.conf.IntConfOption;
24 import org.apache.giraph.counters.CustomCounter;
25 import org.apache.giraph.counters.GiraphCountersThriftStruct;
26 import org.apache.giraph.master.MasterProgress;
27 import org.apache.giraph.utils.ThreadUtils;
28 import org.apache.giraph.worker.WorkerProgress;
29 import org.apache.hadoop.mapreduce.Job;
30 import org.apache.log4j.Logger;
31
32 import java.io.IOException;
33 import java.util.Collections;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.concurrent.ConcurrentHashMap;
37 import java.util.concurrent.atomic.AtomicReference;
38
39
40
41
42 public class DefaultJobProgressTrackerService
43 implements JobProgressTrackerService {
44
45 public static final IntConfOption MAX_ALLOWED_TIME_WITHOUT_PROGRESS_MS =
46 new IntConfOption(
47 "giraph.maxAllowedTimeWithoutProgressMs",
48 3 * 60 * 60 * 1000,
49 "Max time job is allowed to not make progress before getting killed");
50
51 private static final Logger LOG =
52 Logger.getLogger(JobProgressTrackerService.class);
53
54 private static final int UPDATE_MILLISECONDS = 10 * 1000;
55
56
57 private GiraphConfiguration conf;
58
59 private GiraphJobObserver jobObserver;
60
61 private Thread writerThread;
62
63 private volatile boolean finished = false;
64
65 private int mappersStarted;
66
67 private long lastTimeMappersStartedLogged;
68
69 private final Map<Integer, WorkerProgress> workerProgresses =
70 new ConcurrentHashMap<>();
71
72 private final AtomicReference<MasterProgress> masterProgress =
73 new AtomicReference<>(new MasterProgress());
74
75 private Job job;
76
77 @Override
78 public void init(GiraphConfiguration conf, GiraphJobObserver jobObserver) {
79 this.conf = conf;
80 this.jobObserver = jobObserver;
81
82 if (LOG.isInfoEnabled()) {
83 LOG.info("Waiting for job to start... (this may take a minute)");
84 }
85 startWriterThread();
86 }
87
88
89
90
91 private void startWriterThread() {
92 writerThread = ThreadUtils.startThread(new Runnable() {
93 @Override
94 public void run() {
95 long lastTimeProgressChanged = -1;
96 long maxAllowedTimeWithoutProgress =
97 MAX_ALLOWED_TIME_WITHOUT_PROGRESS_MS.get(conf);
98 CombinedWorkerProgress lastProgress = null;
99 while (!finished) {
100 if (mappersStarted == conf.getMaxMappers() &&
101 !workerProgresses.isEmpty()) {
102
103 CombinedWorkerProgress combinedWorkerProgress =
104 new CombinedWorkerProgress(workerProgresses.values(),
105 masterProgress.get(), conf);
106 if (LOG.isInfoEnabled()) {
107 LOG.info(combinedWorkerProgress.toString());
108 }
109
110 if (combinedWorkerProgress.isDone(conf.getMaxWorkers())) {
111 break;
112 }
113
114 if (!canFinishInTime(conf, job, combinedWorkerProgress)) {
115 killJobWithMessage("Killing the job because it won't " +
116 "complete in max allotted time: " +
117 GiraphConstants.MAX_ALLOWED_JOB_TIME_MS.get(conf) / 1000 +
118 "s");
119 }
120
121 if (lastProgress == null ||
122 combinedWorkerProgress.madeProgressFrom(lastProgress)) {
123 lastProgress = combinedWorkerProgress;
124 lastTimeProgressChanged = System.currentTimeMillis();
125 } else if (lastTimeProgressChanged +
126 maxAllowedTimeWithoutProgress < System.currentTimeMillis()) {
127
128 killJobWithMessage(
129 "Killing the job because it didn't make progress for " +
130 maxAllowedTimeWithoutProgress / 1000 + "s");
131 break;
132 }
133 }
134 if (!ThreadUtils.trySleep(UPDATE_MILLISECONDS)) {
135 break;
136 }
137 }
138 }
139 }, "progress-writer");
140 }
141
142
143
144
145
146
147
148
149 protected boolean canFinishInTime(GiraphConfiguration conf, Job job,
150 CombinedWorkerProgress progress) {
151
152 return true;
153 }
154
155
156
157
158
159
160
161
162 protected boolean killJobWithMessage(String message) {
163 try {
164 if (job.isComplete()) {
165 LOG.info("Job " + job.getJobID() + " is already done");
166 return false;
167 } else {
168 LOG.error(message);
169 job.killJob();
170 return true;
171 }
172 } catch (IOException e) {
173 LOG.error("Failed to kill the job", e);
174 return false;
175 }
176 }
177
178 @Override
179 public void setJob(Job job) {
180 this.job = job;
181 }
182
183
184
185
186
187 protected void jobGotAllMappers() {
188 jobObserver.jobGotAllMappers(job);
189 final long maxAllowedJobTimeMs =
190 GiraphConstants.MAX_ALLOWED_JOB_TIME_MS.get(conf);
191 if (maxAllowedJobTimeMs > 0) {
192
193 ThreadUtils.startThread(new Runnable() {
194 @Override
195 public void run() {
196 if (ThreadUtils.trySleep(maxAllowedJobTimeMs)) {
197 killJobWithMessage("Killing the job because it took longer than " +
198 maxAllowedJobTimeMs + " milliseconds");
199 }
200 }
201 }, "job-runtime-observer");
202 }
203 }
204
205 @Override
206 public synchronized void mapperStarted() {
207 mappersStarted++;
208 if (LOG.isInfoEnabled()) {
209 if (mappersStarted == conf.getMaxMappers()) {
210 LOG.info("Got all " + mappersStarted + " mappers");
211 jobGotAllMappers();
212 } else {
213 if (System.currentTimeMillis() - lastTimeMappersStartedLogged >
214 UPDATE_MILLISECONDS) {
215 lastTimeMappersStartedLogged = System.currentTimeMillis();
216 LOG.info("Got " + mappersStarted + " but needs " +
217 conf.getMaxMappers() + " mappers");
218 }
219 }
220 }
221 }
222
223 @Override
224 public void logInfo(String logLine) {
225 if (LOG.isInfoEnabled()) {
226 LOG.info(logLine);
227 }
228 }
229
230 @Override
231 public void
232 logError(String logLine, byte [] exByteArray) {
233 LOG.error(logLine);
234 }
235
236 @Override
237 public void logFailure(String reason) {
238 LOG.fatal(reason);
239 finished = true;
240 writerThread.interrupt();
241 }
242
243 @Override
244 public void updateProgress(WorkerProgress workerProgress) {
245 workerProgresses.put(workerProgress.getTaskId(), workerProgress);
246 }
247
248 @Override
249 public void updateMasterProgress(MasterProgress masterProgress) {
250 this.masterProgress.set(masterProgress);
251 }
252
253 @Override
254 public void sendMasterCounters(GiraphCountersThriftStruct giraphCounters) {
255 if (LOG.isInfoEnabled()) {
256 List<CustomCounter> counterList = giraphCounters.getCounters();
257 Collections.sort(counterList);
258 for (CustomCounter customCounter : counterList) {
259 LOG.info(String.format("%s: %s: %d%n", customCounter.getGroupName(),
260 customCounter.getCounterName(), customCounter.getValue()));
261 }
262 }
263 }
264
265 @Override
266 public void stop(boolean succeeded) {
267 finished = true;
268 writerThread.interrupt();
269 if (LOG.isInfoEnabled()) {
270 LOG.info("Job " + (succeeded ? "finished successfully" : "failed") +
271 ", cleaning up...");
272 }
273 }
274
275
276
277
278
279
280
281
282 public static JobProgressTrackerService createJobProgressTrackerService(
283 GiraphConfiguration conf, GiraphJobObserver jobObserver) {
284 if (!conf.trackJobProgressOnClient()) {
285 return null;
286 }
287
288 JobProgressTrackerService jobProgressTrackerService =
289 GiraphConstants.JOB_PROGRESS_TRACKER_SERVICE_CLASS.newInstance(conf);
290 jobProgressTrackerService.init(conf, jobObserver);
291 return jobProgressTrackerService;
292 }
293 }