View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.job;
20  
21  import org.apache.giraph.conf.GiraphConfiguration;
22  import org.apache.giraph.conf.GiraphConstants;
23  import org.apache.giraph.worker.WorkerProgress;
24  import org.apache.hadoop.mapreduce.Job;
25  import org.apache.log4j.Logger;
26  
27  import java.io.IOException;
28  import java.util.Map;
29  import java.util.concurrent.ConcurrentHashMap;
30  
31  /**
32   * Default implementation of JobProgressTrackerService
33   */
34  public class DefaultJobProgressTrackerService
35      implements JobProgressTrackerService {
36    /** Class logger */
37    private static final Logger LOG =
38        Logger.getLogger(JobProgressTrackerService.class);
39    /** How often to print job's progress */
40    private static final int UPDATE_MILLISECONDS = 10 * 1000;
41  
42    /** Configuration */
43    private GiraphConfiguration conf;
44    /** Giraph job callback */
45    private GiraphJobObserver jobObserver;
46    /** Thread which periodically writes job's progress */
47    private Thread writerThread;
48    /** Whether application is finished */
49    private volatile boolean finished = false;
50    /** Number of mappers which the job got */
51    private int mappersStarted;
52    /** Last time number of mappers started was logged */
53    private long lastTimeMappersStartedLogged;
54    /** Map of worker progresses */
55    private final Map<Integer, WorkerProgress> workerProgresses =
56        new ConcurrentHashMap<>();
57    /** Job */
58    private Job job;
59  
60    @Override
61    public void init(GiraphConfiguration conf, GiraphJobObserver jobObserver) {
62      this.conf = conf;
63      this.jobObserver = jobObserver;
64  
65      if (LOG.isInfoEnabled()) {
66        LOG.info("Waiting for job to start... (this may take a minute)");
67      }
68      startWriterThread();
69    }
70  
71    /**
72     * Start the thread which writes progress periodically
73     */
74    private void startWriterThread() {
75      writerThread = new Thread(new Runnable() {
76        @Override
77        public void run() {
78          while (!finished) {
79            if (mappersStarted == conf.getMaxWorkers() + 1 &&
80                !workerProgresses.isEmpty()) {
81              // Combine and log
82              CombinedWorkerProgress combinedWorkerProgress =
83                  new CombinedWorkerProgress(workerProgresses.values(), conf);
84              if (LOG.isInfoEnabled()) {
85                LOG.info(combinedWorkerProgress.toString());
86              }
87              // Check if application is done
88              if (combinedWorkerProgress.isDone(conf.getMaxWorkers())) {
89                break;
90              }
91            }
92            try {
93              Thread.sleep(UPDATE_MILLISECONDS);
94            } catch (InterruptedException e) {
95              if (LOG.isInfoEnabled()) {
96                LOG.info("Progress thread interrupted");
97              }
98              break;
99            }
100         }
101       }
102     });
103     writerThread.setDaemon(true);
104     writerThread.start();
105   }
106 
107   @Override
108   public void setJob(Job job) {
109     this.job = job;
110   }
111 
112   /**
113    * Called when job got all mappers, used to check MAX_ALLOWED_JOB_TIME_MS
114    * and potentially start a thread which will kill the job after this time
115    */
116   private void jobGotAllMappers() {
117     jobObserver.jobGotAllMappers(job);
118     final long maxAllowedJobTimeMs =
119         GiraphConstants.MAX_ALLOWED_JOB_TIME_MS.get(conf);
120     if (maxAllowedJobTimeMs > 0) {
121       // Start a thread which will kill the job if running for too long
122       Thread killThread = new Thread(new Runnable() {
123         @Override
124         public void run() {
125           try {
126             Thread.sleep(maxAllowedJobTimeMs);
127             try {
128               LOG.warn("Killing job because it took longer than " +
129                   maxAllowedJobTimeMs + " milliseconds");
130               job.killJob();
131             } catch (IOException e) {
132               LOG.warn("Failed to kill job", e);
133             }
134           } catch (InterruptedException e) {
135             if (LOG.isDebugEnabled()) {
136               LOG.debug("Thread checking for jobs max allowed time " +
137                   "interrupted");
138             }
139           }
140         }
141       });
142       killThread.setDaemon(true);
143       killThread.start();
144     }
145   }
146 
147   @Override
148   public synchronized void mapperStarted() {
149     mappersStarted++;
150     if (LOG.isInfoEnabled()) {
151       if (mappersStarted == conf.getMaxWorkers() + 1) {
152         LOG.info("Got all " + mappersStarted + " mappers");
153         jobGotAllMappers();
154       } else {
155         if (System.currentTimeMillis() - lastTimeMappersStartedLogged >
156             UPDATE_MILLISECONDS) {
157           lastTimeMappersStartedLogged = System.currentTimeMillis();
158           LOG.info("Got " + mappersStarted + " but needs " +
159               (conf.getMaxWorkers() + 1) + " mappers");
160         }
161       }
162     }
163   }
164 
165   @Override
166   public void logInfo(String logLine) {
167     if (LOG.isInfoEnabled()) {
168       LOG.info(logLine);
169     }
170   }
171 
172   @Override
173   public void logError(String logLine) {
174     LOG.error(logLine);
175   }
176 
177   @Override
178   public void logFailure(String reason) {
179     LOG.fatal(reason);
180     finished = true;
181     writerThread.interrupt();
182   }
183 
184   @Override
185   public void updateProgress(WorkerProgress workerProgress) {
186     workerProgresses.put(workerProgress.getTaskId(), workerProgress);
187   }
188 
189   @Override
190   public void stop(boolean succeeded) {
191     finished = true;
192     writerThread.interrupt();
193     if (LOG.isInfoEnabled()) {
194       LOG.info("Job " + (succeeded ? "finished successfully" : "failed") +
195           ", cleaning up...");
196     }
197   }
198 
199   /**
200    * Create job progress server on job client if enabled in configuration.
201    *
202    * @param conf        Configuration
203    * @param jobObserver Giraph job callbacks
204    * @return JobProgressTrackerService
205    */
206   public static JobProgressTrackerService createJobProgressTrackerService(
207       GiraphConfiguration conf, GiraphJobObserver jobObserver) {
208     if (!conf.trackJobProgressOnClient()) {
209       return null;
210     }
211 
212     JobProgressTrackerService jobProgressTrackerService =
213         GiraphConstants.JOB_PROGRESS_TRACKER_CLASS.newInstance(conf);
214     jobProgressTrackerService.init(conf, jobObserver);
215     return jobProgressTrackerService;
216   }
217 }