View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.job;
20  
21  import org.apache.giraph.conf.GiraphConfiguration;
22  import org.apache.giraph.conf.GiraphConstants;
23  import org.apache.giraph.conf.IntConfOption;
24  import org.apache.giraph.master.MasterProgress;
25  import org.apache.giraph.utils.ThreadUtils;
26  import org.apache.giraph.worker.WorkerProgress;
27  import org.apache.hadoop.mapreduce.Job;
28  import org.apache.log4j.Logger;
29  
30  import java.io.IOException;
31  import java.util.Map;
32  import java.util.concurrent.ConcurrentHashMap;
33  import java.util.concurrent.atomic.AtomicReference;
34  
35  /**
36   * Default implementation of JobProgressTrackerService
37   */
38  public class DefaultJobProgressTrackerService
39      implements JobProgressTrackerService {
40    /** Max time job is allowed to not make progress before getting killed */
41    public static final IntConfOption MAX_ALLOWED_TIME_WITHOUT_PROGRESS_MS =
42        new IntConfOption(
43            "giraph.maxAllowedTimeWithoutProgressMs",
44            3 * 60 * 60 * 1000, // Allow 3h
45            "Max time job is allowed to not make progress before getting killed");
46    /** Class logger */
47    private static final Logger LOG =
48        Logger.getLogger(JobProgressTrackerService.class);
49    /** How often to print job's progress */
50    private static final int UPDATE_MILLISECONDS = 10 * 1000;
51  
52    /** Configuration */
53    private GiraphConfiguration conf;
54    /** Giraph job callback */
55    private GiraphJobObserver jobObserver;
56    /** Thread which periodically writes job's progress */
57    private Thread writerThread;
58    /** Whether application is finished */
59    private volatile boolean finished = false;
60    /** Number of mappers which the job got */
61    private int mappersStarted;
62    /** Last time number of mappers started was logged */
63    private long lastTimeMappersStartedLogged;
64    /** Map of worker progresses */
65    private final Map<Integer, WorkerProgress> workerProgresses =
66        new ConcurrentHashMap<>();
67    /** Master progress */
68    private final AtomicReference<MasterProgress> masterProgress =
69        new AtomicReference<>(new MasterProgress());
70    /** Job */
71    private Job job;
72  
73    @Override
74    public void init(GiraphConfiguration conf, GiraphJobObserver jobObserver) {
75      this.conf = conf;
76      this.jobObserver = jobObserver;
77  
78      if (LOG.isInfoEnabled()) {
79        LOG.info("Waiting for job to start... (this may take a minute)");
80      }
81      startWriterThread();
82    }
83  
84    /**
85     * Start the thread which writes progress periodically
86     */
87    private void startWriterThread() {
88      writerThread = ThreadUtils.startThread(new Runnable() {
89        @Override
90        public void run() {
91          long lastTimeProgressChanged = -1;
92          long maxAllowedTimeWithoutProgress =
93              MAX_ALLOWED_TIME_WITHOUT_PROGRESS_MS.get(conf);
94          CombinedWorkerProgress lastProgress = null;
95          while (!finished) {
96            if (mappersStarted == conf.getMaxWorkers() + 1 &&
97                !workerProgresses.isEmpty()) {
98              // Combine and log
99              CombinedWorkerProgress combinedWorkerProgress =
100                 new CombinedWorkerProgress(workerProgresses.values(),
101                     masterProgress.get(), conf);
102             if (LOG.isInfoEnabled()) {
103               LOG.info(combinedWorkerProgress.toString());
104             }
105             // Check if application is done
106             if (combinedWorkerProgress.isDone(conf.getMaxWorkers())) {
107               break;
108             }
109 
110             if (!canFinishInTime(conf, job, combinedWorkerProgress)) {
111               killJobWithMessage("Killing the job because it won't " +
112                 "complete in max allotted time: " +
113                 GiraphConstants.MAX_ALLOWED_JOB_TIME_MS.get(conf) / 1000 +
114                 "s");
115             }
116 
117             if (lastProgress == null ||
118                 combinedWorkerProgress.madeProgressFrom(lastProgress)) {
119               lastProgress = combinedWorkerProgress;
120               lastTimeProgressChanged = System.currentTimeMillis();
121             } else if (lastTimeProgressChanged +
122                 maxAllowedTimeWithoutProgress < System.currentTimeMillis()) {
123               // Job didn't make progress in too long, killing it
124               killJobWithMessage(
125                   "Killing the job because it didn't make progress for " +
126                       maxAllowedTimeWithoutProgress / 1000 + "s");
127               break;
128             }
129           }
130           if (!ThreadUtils.trySleep(UPDATE_MILLISECONDS)) {
131             break;
132           }
133         }
134       }
135     }, "progress-writer");
136   }
137 
138   /**
139    * Determine if the job will finish in allotted time
140    * @param conf Giraph configuration
141    * @param job Job
142    * @param progress Combined worker progress
143    * @return true it the job can finish in allotted time, false otherwise
144    */
145   protected boolean canFinishInTime(GiraphConfiguration conf, Job job,
146       CombinedWorkerProgress progress) {
147     // No defaut implementation.
148     return true;
149   }
150 
151   /**
152    * Kill job with message describing why it's being killed
153    *
154    * @param message Message describing why job is being killed
155    * @return True iff job was killed successfully, false if job was already
156    * done or kill failed
157    */
158   protected boolean killJobWithMessage(String message) {
159     try {
160       if (job.isComplete()) {
161         LOG.info("Job " + job.getJobID() + " is already done");
162         return false;
163       } else {
164         LOG.error(message);
165         job.killJob();
166         return true;
167       }
168     } catch (IOException e) {
169       LOG.error("Failed to kill the job", e);
170       return false;
171     }
172   }
173 
174   @Override
175   public void setJob(Job job) {
176     this.job = job;
177   }
178 
179   /**
180    * Called when job got all mappers, used to check MAX_ALLOWED_JOB_TIME_MS
181    * and potentially start a thread which will kill the job after this time
182    */
183   protected void jobGotAllMappers() {
184     jobObserver.jobGotAllMappers(job);
185     final long maxAllowedJobTimeMs =
186         GiraphConstants.MAX_ALLOWED_JOB_TIME_MS.get(conf);
187     if (maxAllowedJobTimeMs > 0) {
188       // Start a thread which will kill the job if running for too long
189       ThreadUtils.startThread(new Runnable() {
190         @Override
191         public void run() {
192           if (ThreadUtils.trySleep(maxAllowedJobTimeMs)) {
193             killJobWithMessage("Killing the job because it took longer than " +
194                 maxAllowedJobTimeMs + " milliseconds");
195           }
196         }
197       }, "job-runtime-observer");
198     }
199   }
200 
201   @Override
202   public synchronized void mapperStarted() {
203     mappersStarted++;
204     if (LOG.isInfoEnabled()) {
205       if (mappersStarted == conf.getMaxWorkers() + 1) {
206         LOG.info("Got all " + mappersStarted + " mappers");
207         jobGotAllMappers();
208       } else {
209         if (System.currentTimeMillis() - lastTimeMappersStartedLogged >
210             UPDATE_MILLISECONDS) {
211           lastTimeMappersStartedLogged = System.currentTimeMillis();
212           LOG.info("Got " + mappersStarted + " but needs " +
213               (conf.getMaxWorkers() + 1) + " mappers");
214         }
215       }
216     }
217   }
218 
219   @Override
220   public void logInfo(String logLine) {
221     if (LOG.isInfoEnabled()) {
222       LOG.info(logLine);
223     }
224   }
225 
226   @Override
227   public void
228   logError(String logLine, byte [] exByteArray) {
229     LOG.error(logLine);
230   }
231 
232   @Override
233   public void logFailure(String reason) {
234     LOG.fatal(reason);
235     finished = true;
236     writerThread.interrupt();
237   }
238 
239   @Override
240   public void updateProgress(WorkerProgress workerProgress) {
241     workerProgresses.put(workerProgress.getTaskId(), workerProgress);
242   }
243 
244   @Override
245   public void updateMasterProgress(MasterProgress masterProgress) {
246     this.masterProgress.set(masterProgress);
247   }
248 
249   @Override
250   public void stop(boolean succeeded) {
251     finished = true;
252     writerThread.interrupt();
253     if (LOG.isInfoEnabled()) {
254       LOG.info("Job " + (succeeded ? "finished successfully" : "failed") +
255           ", cleaning up...");
256     }
257   }
258 
259   /**
260    * Create job progress server on job client if enabled in configuration.
261    *
262    * @param conf        Configuration
263    * @param jobObserver Giraph job callbacks
264    * @return JobProgressTrackerService
265    */
266   public static JobProgressTrackerService createJobProgressTrackerService(
267       GiraphConfiguration conf, GiraphJobObserver jobObserver) {
268     if (!conf.trackJobProgressOnClient()) {
269       return null;
270     }
271 
272     JobProgressTrackerService jobProgressTrackerService =
273         GiraphConstants.JOB_PROGRESS_TRACKER_CLASS.newInstance(conf);
274     jobProgressTrackerService.init(conf, jobObserver);
275     return jobProgressTrackerService;
276   }
277 }