1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.job;
2021import com.google.common.collect.ImmutableList;
22import org.apache.giraph.bsp.BspInputFormat;
23import org.apache.giraph.conf.GiraphConfiguration;
24import org.apache.giraph.conf.GiraphConstants;
25import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
26import org.apache.giraph.graph.GraphMapper;
27import org.apache.hadoop.conf.Configuration;
28import org.apache.hadoop.ipc.Client;
29import org.apache.hadoop.mapreduce.Job;
30import org.apache.log4j.Logger;
3132import java.io.IOException;
3334/**35 * Generates an appropriate internal {@link Job} for using Giraph in Hadoop.36 * Uses composition to avoid unwanted {@link Job} methods from exposure37 * to the user.38 */39publicclassGiraphJob {
40static {
41 Configuration.addDefaultResource("giraph-site.xml");
42 }
4344/** Class logger */45privatestaticfinal Logger LOG = Logger.getLogger(GiraphJob.class);
46/** Internal delegated job to proxy interface requests for Job */47privatefinalDelegatedJob delegatedJob;
48/** Name of the job */49private String jobName;
50/** Helper configuration from the job */51privatefinalGiraphConfiguration giraphConfiguration;
5253/**54 * Delegated job that simply passes along the class GiraphConfiguration.55 */56privateclassDelegatedJobextends Job {
57/** Ensure that for job initiation the super.getConfiguration() is used */58privateboolean jobInited = false;
5960/**61 * Constructor62 *63 * @param conf Configuration64 * @throws IOException65 */66DelegatedJob(Configuration conf) throws IOException {
67super(conf);
68 }
6970 @Override
71public Configuration getConfiguration() {
72if (jobInited) {
73return giraphConfiguration;
74 } else {
75returnsuper.getConfiguration();
76 }
77 }
78 }
7980/**81 * Constructor that will instantiate the configuration82 *83 * @param jobName User-defined job name84 * @throws IOException85 */86publicGiraphJob(String jobName) throws IOException {
87this(newGiraphConfiguration(), jobName);
88 }
8990/**91 * Constructor.92 *93 * @param configuration User-defined configuration94 * @param jobName User-defined job name95 * @throws IOException96 */97publicGiraphJob(Configuration configuration,
98 String jobName) throws IOException {
99this(newGiraphConfiguration(configuration), jobName);
100 }
101102/**103 * Constructor.104 *105 * @param giraphConfiguration User-defined configuration106 * @param jobName User-defined job name107 * @throws IOException108 */109publicGiraphJob(GiraphConfiguration giraphConfiguration,
110 String jobName) throws IOException {
111this.jobName = jobName;
112this.giraphConfiguration = giraphConfiguration;
113this.delegatedJob = newDelegatedJob(giraphConfiguration);
114 }
115116public String getJobName() {
117return jobName;
118 }
119120publicvoid setJobName(String jobName) {
121this.jobName = jobName;
122 }
123124/**125 * Get the configuration from the internal job.126 *127 * @return Configuration used by the job.128 */129publicGiraphConfiguration getConfiguration() {
130return giraphConfiguration;
131 }
132133/**134 * Be very cautious when using this method as it returns the internal job135 * of {@link GiraphJob}. This should only be used for methods that require136 * access to the actual {@link Job}, i.e. FileInputFormat#addInputPath().137 *138 * @return Internal job that will actually be submitted to Hadoop.139 */140public Job getInternalJob() {
141 delegatedJob.jobInited = true;
142return delegatedJob;
143 }
144145/**146 * Check if the configuration is local. If it is local, do additional147 * checks due to the restrictions of LocalJobRunner. This checking is148 * performed here because the local job runner is MRv1-configured.149 *150 * @param conf Configuration151 */152privatestaticvoid checkLocalJobRunnerConfiguration(
153ImmutableClassesGiraphConfiguration conf) {
154 String jobTracker = conf.get("mapred.job.tracker", null);
155if (!jobTracker.equals("local")) {
156// Nothing to check157return;
158 }
159160int maxWorkers = conf.getMaxWorkers();
161if (maxWorkers != 1) {
162thrownew IllegalArgumentException(
163"checkLocalJobRunnerConfiguration: When using " +
164"LocalJobRunner, must have only one worker since " +
165"only 1 task at a time!");
166 }
167if (conf.getSplitMasterWorker()) {
168thrownew IllegalArgumentException(
169"checkLocalJobRunnerConfiguration: When using " +
170"LocalJobRunner, you cannot run in split master / worker " +
171"mode since there is only 1 task at a time!");
172 }
173 }
174175/**176 * Check whether a specified int conf value is set and if not, set it.177 *178 * @param param Conf value to check179 * @param defaultValue Assign to value if not set180 */181privatevoid setIntConfIfDefault(String param, int defaultValue) {
182if (giraphConfiguration.getInt(param, Integer.MIN_VALUE) ==
183 Integer.MIN_VALUE) {
184 giraphConfiguration.setInt(param, defaultValue);
185 }
186 }
187188/**189 * Runs the actual graph application through Hadoop Map-Reduce.190 *191 * @param verbose If true, provide verbose output, false otherwise192 * @return True if success, false otherwise193 * @throws ClassNotFoundException194 * @throws InterruptedException195 * @throws IOException196 */197publicfinalboolean run(boolean verbose)
198throws IOException, InterruptedException, ClassNotFoundException {
199// Most users won't hit this hopefully and can set it higher if desired200 setIntConfIfDefault("mapreduce.job.counters.limit", 512);
201202// Capacity scheduler-specific settings. These should be enough for203// a reasonable Giraph job204 setIntConfIfDefault("mapred.job.map.memory.mb", 1024);
205 setIntConfIfDefault("mapred.job.reduce.memory.mb", 0);
206207// Speculative execution doesn't make sense for Giraph208 giraphConfiguration.setBoolean(
209"mapred.map.tasks.speculative.execution", false);
210211// Set the ping interval to 5 minutes instead of one minute212// (DEFAULT_PING_INTERVAL)213 Client.setPingInterval(giraphConfiguration, 60000 * 5);
214215// Should work in MAPREDUCE-1938 to let the user jars/classes216// get loaded first217 giraphConfiguration.setBoolean("mapreduce.user.classpath.first", true);
218 giraphConfiguration.setBoolean("mapreduce.job.user.classpath.first", true);
219220// If the checkpoint frequency is 0 (no failure handling), set the max221// tasks attempts to be 1 to encourage faster failure of unrecoverable jobs222if (giraphConfiguration.getCheckpointFrequency() == 0) {
223int oldMaxTaskAttempts = giraphConfiguration.getMaxTaskAttempts();
224 giraphConfiguration.setMaxTaskAttempts(1);
225if (LOG.isInfoEnabled()) {
226 LOG.info("run: Since checkpointing is disabled (default), " +
227"do not allow any task retries (setting " +
228 GiraphConstants.MAX_TASK_ATTEMPTS.getKey() + " = 1, " +
229"old value = " + oldMaxTaskAttempts + ")");
230 }
231 }
232233// Set the job properties, check them, and submit the job234ImmutableClassesGiraphConfiguration conf =
235newImmutableClassesGiraphConfiguration(giraphConfiguration);
236 checkLocalJobRunnerConfiguration(conf);
237238int tryCount = 0;
239GiraphJobRetryChecker retryChecker = conf.getJobRetryChecker();
240while (true) {
241GiraphJobObserver jobObserver = conf.getJobObserver();
242243JobProgressTrackerService jobProgressTrackerService =
244 DefaultJobProgressTrackerService.createJobProgressTrackerService(
245 conf, jobObserver);
246ClientThriftServer clientThriftServer = null;
247if (jobProgressTrackerService != null) {
248 clientThriftServer = newClientThriftServer(
249 conf, ImmutableList.of(jobProgressTrackerService));
250 }
251252 tryCount++;
253 Job submittedJob = new Job(conf, jobName);
254if (submittedJob.getJar() == null) {
255 submittedJob.setJarByClass(getClass());
256 }
257 submittedJob.setNumReduceTasks(0);
258 submittedJob.setMapperClass(GraphMapper.class);
259 submittedJob.setInputFormatClass(BspInputFormat.class);
260 submittedJob.setOutputFormatClass(
261 GiraphConstants.HADOOP_OUTPUT_FORMAT_CLASS.get(conf));
262if (jobProgressTrackerService != null) {
263 jobProgressTrackerService.setJob(submittedJob);
264 }
265266 jobObserver.launchingJob(submittedJob);
267 submittedJob.submit();
268if (LOG.isInfoEnabled()) {
269 LOG.info("Tracking URL: " + submittedJob.getTrackingURL());
270 LOG.info(
271"Waiting for resources... Job will start only when it gets all " +
272 (conf.getMinWorkers() + 1) + " mappers");
273 }
274 jobObserver.jobRunning(submittedJob);
275 HaltApplicationUtils.printHaltInfo(submittedJob, conf);
276277boolean passed = submittedJob.waitForCompletion(verbose);
278if (jobProgressTrackerService != null) {
279 jobProgressTrackerService.stop(passed);
280 }
281if (clientThriftServer != null) {
282 clientThriftServer.stopThriftServer();
283 }
284285 jobObserver.jobFinished(submittedJob, passed);
286287if (!passed) {
288 String restartFrom = retryChecker.shouldRestartCheckpoint(submittedJob);
289if (restartFrom != null) {
290 GiraphConstants.RESTART_JOB_ID.set(conf, restartFrom);
291continue;
292 }
293 }
294295if (passed || !retryChecker.shouldRetry(submittedJob, tryCount)) {
296return passed;
297 }
298if (LOG.isInfoEnabled()) {
299 LOG.info("run: Retrying job, " + tryCount + " try");
300 }
301 }
302 }
303 }