1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.yarn;
1920import org.apache.giraph.conf.GiraphConfiguration;
21import org.apache.giraph.conf.GiraphConstants;
22import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23import org.apache.giraph.graph.GraphTaskManager;
2425import org.apache.giraph.io.VertexOutputFormat;
26import org.apache.hadoop.conf.Configuration;
27import org.apache.hadoop.io.Writable;
28import org.apache.hadoop.io.WritableComparable;
29import org.apache.hadoop.mapreduce.MapContext;
30import org.apache.hadoop.mapreduce.Mapper.Context;
31import org.apache.hadoop.mapreduce.OutputCommitter;
32import org.apache.hadoop.mapreduce.TaskID;
33import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
34import org.apache.hadoop.mapreduce.task.MapContextImpl;
35import org.apache.hadoop.mapreduce.TaskAttemptID;
36import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
37import org.apache.log4j.Logger;
3839import java.io.IOException;
4041/**42 * This process will execute the BSP graph tasks alloted to this YARN43 * execution container. All tasks will be performed by calling the44 * GraphTaskManager object. Since this GiraphYarnTask will45 * not be passing data by key-value pairs through the MR framework, the46 * Mapper parameter types are irrelevant, and set to <code>Object</code> type.47 *48 * @param <I> Vertex id49 * @param <V> Vertex data50 * @param <E> Edge data51 */52publicclass GiraphYarnTask<I extends WritableComparable, V extends Writable,
53 E extends Writable> {
54static {
55 Configuration.addDefaultResource(GiraphConstants.GIRAPH_YARN_CONF_FILE);
56 }
57/** Class logger */58privatestaticfinal Logger LOG = Logger.getLogger(GiraphYarnTask.class);
59/** Manage the framework-agnostic Giraph task for this job run */60private GraphTaskManager<I, V, E> graphTaskManager;
61/** Giraph task ID number must start @ index 0. Used by ZK, BSP, etc. */62privatefinalint bspTaskId;
63/** A special "dummy" override of Mapper#Context, used to deliver MRv1 deps */64private Context proxy;
65/** Configuration to hand off into Giraph, through wrapper Mapper#Context */66privateImmutableClassesGiraphConfiguration conf;
6768/**69 * Constructor. Build our DUMMY MRv1 data structures to pass to our70 * GiraphTaskManager. This allows us to continue to look the other way71 * while Giraph relies on MRv1 under the hood.72 * @param taskAttemptId the MRv1 TaskAttemptID we constructed from CLI args73 * supplied by GiraphApplicationMaster.74 */75publicGiraphYarnTask(final TaskAttemptID taskAttemptId) {
76 conf = new ImmutableClassesGiraphConfiguration<I, V, E>(
77newGiraphConfiguration());
78 bspTaskId = taskAttemptId.getTaskID().getId();
79 conf.setInt("mapred.task.partition", bspTaskId);
80 proxy = buildProxyMapperContext(taskAttemptId);
81 graphTaskManager = new GraphTaskManager<I, V, E>(proxy);
82 }
8384/**85 * Run one Giraph worker (or master) task, hosted in this execution container.86 */87publicvoid run() {
88// Notify the master quicker if there is worker failure rather than89// waiting for ZooKeeper to timeout and delete the ephemeral znodes90try {
91 graphTaskManager.setup(null); // defaults GTM to "assume fatjar mode"92 graphTaskManager.execute();
93 graphTaskManager.cleanup();
94 graphTaskManager.sendWorkerCountersAndFinishCleanup();
95 } catch (InterruptedException ie) {
96 LOG.error("run() caught an unrecoverable InterruptedException.", ie);
97 } catch (IOException ioe) {
98thrownew RuntimeException(
99"run() caught an unrecoverable IOException.", ioe);
100// CHECKSTYLE: stop IllegalCatch101 } catch (RuntimeException e) {
102// CHECKSTYLE: resume IllegalCatch103 graphTaskManager.zooKeeperCleanup();
104 graphTaskManager.workerFailureCleanup();
105thrownew RuntimeException(
106"run: Caught an unrecoverable exception " + e.getMessage(), e);
107 } finally {
108// YARN: must complete the commit of the final output, Hadoop isn't there.109 finalizeYarnJob();
110 }
111 }
112113/**114 * Without Hadoop MR to finish the consolidation of all the task output from115 * each HDFS task tmp dir, it won't get done. YARN has some job finalization116 * it must do "for us." -- AND must delete "jar cache" in HDFS too!117 */118privatevoid finalizeYarnJob() {
119if (conf.isPureYarnJob() && graphTaskManager.isMaster() &&
120 conf.getVertexOutputFormatClass() != null) {
121try {
122 LOG.info("Master is ready to commit final job output data.");
123VertexOutputFormat vertexOutputFormat =
124 conf.createWrappedVertexOutputFormat();
125 OutputCommitter outputCommitter =
126 vertexOutputFormat.getOutputCommitter(proxy);
127// now we will have our output in OUTDIR if all went well...128 outputCommitter.commitJob(proxy);
129 LOG.info("Master has committed the final job output data.");
130 } catch (InterruptedException ie) {
131 LOG.error("Interrupted while attempting to obtain " +
132"OutputCommitter.", ie);
133 } catch (IOException ioe) {
134 LOG.error("Master task's attempt to commit output has " +
135"FAILED.", ioe);
136 }
137 }
138 }
139140/**141 * Utility to generate dummy Mapper#Context for use in Giraph internals.142 * This is the "key hack" to inject MapReduce-related data structures143 * containing YARN cluster metadata (and our GiraphConf from the AppMaster)144 * into our Giraph BSP task code.145 * @param tid the TaskAttemptID to construct this Mapper#Context from.146 * @return sort of a Mapper#Context if you squint just right.147 */148private Context buildProxyMapperContext(final TaskAttemptID tid) {
149 MapContext mc = new MapContextImpl<Object, Object, Object, Object>(
150 conf, // our Configuration, populated back at the GiraphYarnClient.151 tid, // our TaskAttemptId, generated w/YARN app, container, attempt IDs152null, // RecordReader here will never be used by Giraph153null, // RecordWriter here will never be used by Giraph154null, // OutputCommitter here will never be used by Giraph155new TaskAttemptContextImpl.DummyReporter() { // goes in task logs for now156 @Override
157publicvoid setStatus(String msg) {
158 LOG.info("[STATUS: task-" + bspTaskId + "] " + msg);
159 }
160 },
161null); // Input split setting here will never be used by Giraph162163// now, we wrap our MapContext ref so we can produce a Mapper#Context164 WrappedMapper<Object, Object, Object, Object> wrappedMapper
165 = new WrappedMapper<Object, Object, Object, Object>();
166return wrappedMapper.getMapContext(mc);
167 }
168169/**170 * Task entry point.171 * @param args CLI arguments injected by GiraphApplicationMaster to hand off172 * job, task, and attempt ID's to this (and every) Giraph task.173 * Args should be: <code>AppId ContainerId AppAttemptId</code>174 */175 @SuppressWarnings("rawtypes")
176publicstaticvoid main(String[] args) {
177if (args.length != 4) {
178thrownew IllegalStateException("GiraphYarnTask could not construct " +
179"a TaskAttemptID for the Giraph job from args: " + printArgs(args));
180 }
181try {
182 GiraphYarnTask<?, ?, ?> giraphYarnTask =
183newGiraphYarnTask(getTaskAttemptID(args));
184 giraphYarnTask.run();
185// CHECKSTYLE: stop IllegalCatch186 } catch (Throwable t) {
187// CHECKSTYLE resume IllegalCatch188 LOG.error("GiraphYarnTask threw a top-level exception, failing task", t);
189 System.exit(2);
190 } // ALWAYS finish a YARN task or AppMaster with System#exit!!!191 System.exit(0);
192 }
193194/**195 * Utility to create a TaskAttemptId we can feed to our fake Mapper#Context.196 *197 * NOTE: ContainerId will serve as MR TaskID for Giraph tasks.198 * YARN container 1 is always AppMaster, so the least container id we will199 * ever get from YARN for a Giraph task is container id 2. Giraph on MapReduce200 * tasks must start at index 0. So we SUBTRACT TWO from each container id.201 *202 * @param args the command line args, fed to us by GiraphApplicationMaster.203 * @return the TaskAttemptId object, populated with YARN job data.204 */205privatestatic TaskAttemptID getTaskAttemptID(String[] args) {
206returnnew TaskAttemptID(
207 args[0], // YARN ApplicationId Cluster Timestamp208 Integer.parseInt(args[1]), // YARN ApplicationId #209 TaskID.getTaskType('m'), // Make Giraph think this is a Mapper task.210 Integer.parseInt(args[2]) - 2, // YARN ContainerId MINUS TWO (see above)211 Integer.parseInt(args[3])); // YARN AppAttemptId #212 }
213214/**215 * Utility to help log command line args in the event of an error.216 * @param args the CLI args.217 * @return a pretty-print of the input args.218 */219privatestatic String printArgs(String[] args) {
220int count = 0;
221 StringBuilder sb = new StringBuilder();
222for (String arg : args) {
223 sb.append("arg[" + (count++) + "] == " + arg + ", ");
224 }
225return sb.toString();
226 }
227 }