This project has retired. For details please refer to its Attic page.
DefaultJobProgressTrackerService xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.job;
20  
21  import org.apache.giraph.conf.GiraphConfiguration;
22  import org.apache.giraph.conf.GiraphConstants;
23  import org.apache.giraph.conf.IntConfOption;
24  import org.apache.giraph.counters.CustomCounter;
25  import org.apache.giraph.counters.GiraphCountersThriftStruct;
26  import org.apache.giraph.master.MasterProgress;
27  import org.apache.giraph.utils.ThreadUtils;
28  import org.apache.giraph.worker.WorkerProgress;
29  import org.apache.hadoop.mapreduce.Job;
30  import org.apache.log4j.Logger;
31  
32  import java.io.IOException;
33  import java.util.Collections;
34  import java.util.List;
35  import java.util.Map;
36  import java.util.concurrent.ConcurrentHashMap;
37  import java.util.concurrent.atomic.AtomicReference;
38  
39  /**
40   * Default implementation of JobProgressTrackerService
41   */
42  public class DefaultJobProgressTrackerService
43      implements JobProgressTrackerService {
44    /** Max time job is allowed to not make progress before getting killed */
45    public static final IntConfOption MAX_ALLOWED_TIME_WITHOUT_PROGRESS_MS =
46        new IntConfOption(
47            "giraph.maxAllowedTimeWithoutProgressMs",
48            3 * 60 * 60 * 1000, // Allow 3h
49            "Max time job is allowed to not make progress before getting killed");
50    /** Class logger */
51    private static final Logger LOG =
52        Logger.getLogger(JobProgressTrackerService.class);
53    /** How often to print job's progress */
54    private static final int UPDATE_MILLISECONDS = 10 * 1000;
55  
56    /** Configuration */
57    private GiraphConfiguration conf;
58    /** Giraph job callback */
59    private GiraphJobObserver jobObserver;
60    /** Thread which periodically writes job's progress */
61    private Thread writerThread;
62    /** Whether application is finished */
63    private volatile boolean finished = false;
64    /** Number of mappers which the job got */
65    private int mappersStarted;
66    /** Last time number of mappers started was logged */
67    private long lastTimeMappersStartedLogged;
68    /** Map of worker progresses */
69    private final Map<Integer, WorkerProgress> workerProgresses =
70        new ConcurrentHashMap<>();
71    /** Master progress */
72    private final AtomicReference<MasterProgress> masterProgress =
73        new AtomicReference<>(new MasterProgress());
74    /** Job */
75    private Job job;
76  
77    @Override
78    public void init(GiraphConfiguration conf, GiraphJobObserver jobObserver) {
79      this.conf = conf;
80      this.jobObserver = jobObserver;
81  
82      if (LOG.isInfoEnabled()) {
83        LOG.info("Waiting for job to start... (this may take a minute)");
84      }
85      startWriterThread();
86    }
87  
88    /**
89     * Start the thread which writes progress periodically
90     */
91    private void startWriterThread() {
92      writerThread = ThreadUtils.startThread(new Runnable() {
93        @Override
94        public void run() {
95          long lastTimeProgressChanged = -1;
96          long maxAllowedTimeWithoutProgress =
97              MAX_ALLOWED_TIME_WITHOUT_PROGRESS_MS.get(conf);
98          CombinedWorkerProgress lastProgress = null;
99          while (!finished) {
100           if (mappersStarted == conf.getMaxMappers() &&
101               !workerProgresses.isEmpty()) {
102             // Combine and log
103             CombinedWorkerProgress combinedWorkerProgress =
104                 new CombinedWorkerProgress(workerProgresses.values(),
105                     masterProgress.get(), conf);
106             if (LOG.isInfoEnabled()) {
107               LOG.info(combinedWorkerProgress.toString());
108             }
109             // Check if application is done
110             if (combinedWorkerProgress.isDone(conf.getMaxWorkers())) {
111               break;
112             }
113 
114             if (!canFinishInTime(conf, job, combinedWorkerProgress)) {
115               killJobWithMessage("Killing the job because it won't " +
116                 "complete in max allotted time: " +
117                 GiraphConstants.MAX_ALLOWED_JOB_TIME_MS.get(conf) / 1000 +
118                 "s");
119             }
120 
121             if (lastProgress == null ||
122                 combinedWorkerProgress.madeProgressFrom(lastProgress)) {
123               lastProgress = combinedWorkerProgress;
124               lastTimeProgressChanged = System.currentTimeMillis();
125             } else if (lastTimeProgressChanged +
126                 maxAllowedTimeWithoutProgress < System.currentTimeMillis()) {
127               // Job didn't make progress in too long, killing it
128               killJobWithMessage(
129                   "Killing the job because it didn't make progress for " +
130                       maxAllowedTimeWithoutProgress / 1000 + "s");
131               break;
132             }
133           }
134           if (!ThreadUtils.trySleep(UPDATE_MILLISECONDS)) {
135             break;
136           }
137         }
138       }
139     }, "progress-writer");
140   }
141 
142   /**
143    * Determine if the job will finish in allotted time
144    * @param conf Giraph configuration
145    * @param job Job
146    * @param progress Combined worker progress
147    * @return true it the job can finish in allotted time, false otherwise
148    */
149   protected boolean canFinishInTime(GiraphConfiguration conf, Job job,
150       CombinedWorkerProgress progress) {
151     // No defaut implementation.
152     return true;
153   }
154 
155   /**
156    * Kill job with message describing why it's being killed
157    *
158    * @param message Message describing why job is being killed
159    * @return True iff job was killed successfully, false if job was already
160    * done or kill failed
161    */
162   protected boolean killJobWithMessage(String message) {
163     try {
164       if (job.isComplete()) {
165         LOG.info("Job " + job.getJobID() + " is already done");
166         return false;
167       } else {
168         LOG.error(message);
169         job.killJob();
170         return true;
171       }
172     } catch (IOException e) {
173       LOG.error("Failed to kill the job", e);
174       return false;
175     }
176   }
177 
178   @Override
179   public void setJob(Job job) {
180     this.job = job;
181   }
182 
183   /**
184    * Called when job got all mappers, used to check MAX_ALLOWED_JOB_TIME_MS
185    * and potentially start a thread which will kill the job after this time
186    */
187   protected void jobGotAllMappers() {
188     jobObserver.jobGotAllMappers(job);
189     final long maxAllowedJobTimeMs =
190         GiraphConstants.MAX_ALLOWED_JOB_TIME_MS.get(conf);
191     if (maxAllowedJobTimeMs > 0) {
192       // Start a thread which will kill the job if running for too long
193       ThreadUtils.startThread(new Runnable() {
194         @Override
195         public void run() {
196           if (ThreadUtils.trySleep(maxAllowedJobTimeMs)) {
197             killJobWithMessage("Killing the job because it took longer than " +
198                 maxAllowedJobTimeMs + " milliseconds");
199           }
200         }
201       }, "job-runtime-observer");
202     }
203   }
204 
205   @Override
206   public synchronized void mapperStarted() {
207     mappersStarted++;
208     if (LOG.isInfoEnabled()) {
209       if (mappersStarted == conf.getMaxMappers()) {
210         LOG.info("Got all " + mappersStarted + " mappers");
211         jobGotAllMappers();
212       } else {
213         if (System.currentTimeMillis() - lastTimeMappersStartedLogged >
214             UPDATE_MILLISECONDS) {
215           lastTimeMappersStartedLogged = System.currentTimeMillis();
216           LOG.info("Got " + mappersStarted + " but needs " +
217               conf.getMaxMappers() + " mappers");
218         }
219       }
220     }
221   }
222 
223   @Override
224   public void logInfo(String logLine) {
225     if (LOG.isInfoEnabled()) {
226       LOG.info(logLine);
227     }
228   }
229 
230   @Override
231   public void
232   logError(String logLine, byte [] exByteArray) {
233     LOG.error(logLine);
234   }
235 
236   @Override
237   public void logFailure(String reason) {
238     LOG.fatal(reason);
239     finished = true;
240     writerThread.interrupt();
241   }
242 
243   @Override
244   public void updateProgress(WorkerProgress workerProgress) {
245     workerProgresses.put(workerProgress.getTaskId(), workerProgress);
246   }
247 
248   @Override
249   public void updateMasterProgress(MasterProgress masterProgress) {
250     this.masterProgress.set(masterProgress);
251   }
252 
253   @Override
254   public void sendMasterCounters(GiraphCountersThriftStruct giraphCounters) {
255     if (LOG.isInfoEnabled()) {
256       List<CustomCounter> counterList = giraphCounters.getCounters();
257       Collections.sort(counterList);
258       for (CustomCounter customCounter : counterList) {
259         LOG.info(String.format("%s: %s: %d%n", customCounter.getGroupName(),
260               customCounter.getCounterName(), customCounter.getValue()));
261       }
262     }
263   }
264 
265   @Override
266   public void stop(boolean succeeded) {
267     finished = true;
268     writerThread.interrupt();
269     if (LOG.isInfoEnabled()) {
270       LOG.info("Job " + (succeeded ? "finished successfully" : "failed") +
271           ", cleaning up...");
272     }
273   }
274 
275   /**
276    * Create job progress server on job client if enabled in configuration.
277    *
278    * @param conf        Configuration
279    * @param jobObserver Giraph job callbacks
280    * @return JobProgressTrackerService
281    */
282   public static JobProgressTrackerService createJobProgressTrackerService(
283       GiraphConfiguration conf, GiraphJobObserver jobObserver) {
284     if (!conf.trackJobProgressOnClient()) {
285       return null;
286     }
287 
288     JobProgressTrackerService jobProgressTrackerService =
289         GiraphConstants.JOB_PROGRESS_TRACKER_SERVICE_CLASS.newInstance(conf);
290     jobProgressTrackerService.init(conf, jobObserver);
291     return jobProgressTrackerService;
292   }
293 }