This project has retired. For details please refer to its Attic page.
GiraphJob xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.job;
20  
21  import com.google.common.collect.ImmutableList;
22  import org.apache.giraph.bsp.BspInputFormat;
23  import org.apache.giraph.conf.GiraphConfiguration;
24  import org.apache.giraph.conf.GiraphConstants;
25  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
26  import org.apache.giraph.graph.GraphMapper;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.ipc.Client;
29  import org.apache.hadoop.mapreduce.Job;
30  import org.apache.log4j.Logger;
31  
32  import java.io.IOException;
33  
34  /**
35   * Generates an appropriate internal {@link Job} for using Giraph in Hadoop.
36   * Uses composition to avoid unwanted {@link Job} methods from exposure
37   * to the user.
38   */
39  public class GiraphJob {
40    static {
41      Configuration.addDefaultResource("giraph-site.xml");
42    }
43  
44    /** Class logger */
45    private static final Logger LOG = Logger.getLogger(GiraphJob.class);
46    /** Internal delegated job to proxy interface requests for Job */
47    private final DelegatedJob delegatedJob;
48    /** Name of the job */
49    private String jobName;
50    /** Helper configuration from the job */
51    private final GiraphConfiguration giraphConfiguration;
52  
53    /**
54     * Delegated job that simply passes along the class GiraphConfiguration.
55     */
56    private class DelegatedJob extends Job {
57      /** Ensure that for job initiation the super.getConfiguration() is used */
58      private boolean jobInited = false;
59  
60      /**
61       * Constructor
62       *
63       * @param conf Configuration
64       * @throws IOException
65       */
66      DelegatedJob(Configuration conf) throws IOException {
67        super(conf);
68      }
69  
70      @Override
71      public Configuration getConfiguration() {
72        if (jobInited) {
73          return giraphConfiguration;
74        } else {
75          return super.getConfiguration();
76        }
77      }
78    }
79  
80    /**
81     * Constructor that will instantiate the configuration
82     *
83     * @param jobName User-defined job name
84     * @throws IOException
85     */
86    public GiraphJob(String jobName) throws IOException {
87      this(new GiraphConfiguration(), jobName);
88    }
89  
90    /**
91     * Constructor.
92     *
93     * @param configuration User-defined configuration
94     * @param jobName User-defined job name
95     * @throws IOException
96     */
97    public GiraphJob(Configuration configuration,
98                     String jobName) throws IOException {
99      this(new GiraphConfiguration(configuration), jobName);
100   }
101 
102   /**
103    * Constructor.
104    *
105    * @param giraphConfiguration User-defined configuration
106    * @param jobName User-defined job name
107    * @throws IOException
108    */
109   public GiraphJob(GiraphConfiguration giraphConfiguration,
110                    String jobName) throws IOException {
111     this.jobName = jobName;
112     this.giraphConfiguration = giraphConfiguration;
113     this.delegatedJob = new DelegatedJob(giraphConfiguration);
114   }
115 
116   public String getJobName() {
117     return jobName;
118   }
119 
120   public void setJobName(String jobName) {
121     this.jobName = jobName;
122   }
123 
124   /**
125    * Get the configuration from the internal job.
126    *
127    * @return Configuration used by the job.
128    */
129   public GiraphConfiguration getConfiguration() {
130     return giraphConfiguration;
131   }
132 
133   /**
134    * Be very cautious when using this method as it returns the internal job
135    * of {@link GiraphJob}.  This should only be used for methods that require
136    * access to the actual {@link Job}, i.e. FileInputFormat#addInputPath().
137    *
138    * @return Internal job that will actually be submitted to Hadoop.
139    */
140   public Job getInternalJob() {
141     delegatedJob.jobInited = true;
142     return delegatedJob;
143   }
144 
145   /**
146    * Check if the configuration is local.  If it is local, do additional
147    * checks due to the restrictions of LocalJobRunner. This checking is
148    * performed here because the local job runner is MRv1-configured.
149    *
150    * @param conf Configuration
151    */
152   private static void checkLocalJobRunnerConfiguration(
153       ImmutableClassesGiraphConfiguration conf) {
154     String jobTracker = conf.get("mapred.job.tracker", null);
155     if (!jobTracker.equals("local")) {
156       // Nothing to check
157       return;
158     }
159 
160     int maxWorkers = conf.getMaxWorkers();
161     if (maxWorkers != 1) {
162       throw new IllegalArgumentException(
163           "checkLocalJobRunnerConfiguration: When using " +
164               "LocalJobRunner, must have only one worker since " +
165           "only 1 task at a time!");
166     }
167     if (conf.getSplitMasterWorker()) {
168       throw new IllegalArgumentException(
169           "checkLocalJobRunnerConfiguration: When using " +
170               "LocalJobRunner, you cannot run in split master / worker " +
171           "mode since there is only 1 task at a time!");
172     }
173   }
174 
175   /**
176    * Check whether a specified int conf value is set and if not, set it.
177    *
178    * @param param Conf value to check
179    * @param defaultValue Assign to value if not set
180    */
181   private void setIntConfIfDefault(String param, int defaultValue) {
182     if (giraphConfiguration.getInt(param, Integer.MIN_VALUE) ==
183         Integer.MIN_VALUE) {
184       giraphConfiguration.setInt(param, defaultValue);
185     }
186   }
187 
188   /**
189    * Runs the actual graph application through Hadoop Map-Reduce.
190    *
191    * @param verbose If true, provide verbose output, false otherwise
192    * @return True if success, false otherwise
193    * @throws ClassNotFoundException
194    * @throws InterruptedException
195    * @throws IOException
196    */
197   public final boolean run(boolean verbose)
198     throws IOException, InterruptedException, ClassNotFoundException {
199     // Most users won't hit this hopefully and can set it higher if desired
200     setIntConfIfDefault("mapreduce.job.counters.limit", 512);
201 
202     // Capacity scheduler-specific settings.  These should be enough for
203     // a reasonable Giraph job
204     setIntConfIfDefault("mapred.job.map.memory.mb", 1024);
205     setIntConfIfDefault("mapred.job.reduce.memory.mb", 0);
206 
207     // Speculative execution doesn't make sense for Giraph
208     giraphConfiguration.setBoolean(
209         "mapred.map.tasks.speculative.execution", false);
210 
211     // Set the ping interval to 5 minutes instead of one minute
212     // (DEFAULT_PING_INTERVAL)
213     Client.setPingInterval(giraphConfiguration, 60000 * 5);
214 
215     // Should work in MAPREDUCE-1938 to let the user jars/classes
216     // get loaded first
217     giraphConfiguration.setBoolean("mapreduce.user.classpath.first", true);
218     giraphConfiguration.setBoolean("mapreduce.job.user.classpath.first", true);
219 
220     // If the checkpoint frequency is 0 (no failure handling), set the max
221     // tasks attempts to be 1 to encourage faster failure of unrecoverable jobs
222     if (giraphConfiguration.getCheckpointFrequency() == 0) {
223       int oldMaxTaskAttempts = giraphConfiguration.getMaxTaskAttempts();
224       giraphConfiguration.setMaxTaskAttempts(1);
225       if (LOG.isInfoEnabled()) {
226         LOG.info("run: Since checkpointing is disabled (default), " +
227             "do not allow any task retries (setting " +
228             GiraphConstants.MAX_TASK_ATTEMPTS.getKey() + " = 1, " +
229             "old value = " + oldMaxTaskAttempts + ")");
230       }
231     }
232 
233     // Set the job properties, check them, and submit the job
234     ImmutableClassesGiraphConfiguration conf =
235         new ImmutableClassesGiraphConfiguration(giraphConfiguration);
236     checkLocalJobRunnerConfiguration(conf);
237 
238     int tryCount = 0;
239     GiraphJobRetryChecker retryChecker = conf.getJobRetryChecker();
240     while (true) {
241       GiraphJobObserver jobObserver = conf.getJobObserver();
242 
243       JobProgressTrackerService jobProgressTrackerService =
244           DefaultJobProgressTrackerService.createJobProgressTrackerService(
245               conf, jobObserver);
246       ClientThriftServer clientThriftServer = null;
247       if (jobProgressTrackerService != null) {
248         clientThriftServer = new ClientThriftServer(
249             conf, ImmutableList.of(jobProgressTrackerService));
250       }
251 
252       tryCount++;
253       Job submittedJob = new Job(conf, jobName);
254       if (submittedJob.getJar() == null) {
255         submittedJob.setJarByClass(getClass());
256       }
257       submittedJob.setNumReduceTasks(0);
258       submittedJob.setMapperClass(GraphMapper.class);
259       submittedJob.setInputFormatClass(BspInputFormat.class);
260       submittedJob.setOutputFormatClass(
261           GiraphConstants.HADOOP_OUTPUT_FORMAT_CLASS.get(conf));
262       if (jobProgressTrackerService != null) {
263         jobProgressTrackerService.setJob(submittedJob);
264       }
265 
266       jobObserver.launchingJob(submittedJob);
267       submittedJob.submit();
268       if (LOG.isInfoEnabled()) {
269         LOG.info("Tracking URL: " + submittedJob.getTrackingURL());
270         LOG.info(
271             "Waiting for resources... Job will start only when it gets all " +
272                 (conf.getMinWorkers() + 1) + " mappers");
273       }
274       jobObserver.jobRunning(submittedJob);
275       HaltApplicationUtils.printHaltInfo(submittedJob, conf);
276 
277       boolean passed = submittedJob.waitForCompletion(verbose);
278       if (jobProgressTrackerService != null) {
279         jobProgressTrackerService.stop(passed);
280       }
281       if (clientThriftServer != null) {
282         clientThriftServer.stopThriftServer();
283       }
284 
285       jobObserver.jobFinished(submittedJob, passed);
286 
287       if (!passed) {
288         String restartFrom = retryChecker.shouldRestartCheckpoint(submittedJob);
289         if (restartFrom != null) {
290           GiraphConstants.RESTART_JOB_ID.set(conf, restartFrom);
291           continue;
292         }
293       }
294 
295       if (passed || !retryChecker.shouldRetry(submittedJob, tryCount)) {
296         return passed;
297       }
298       if (LOG.isInfoEnabled()) {
299         LOG.info("run: Retrying job, " + tryCount + " try");
300       }
301     }
302   }
303 }