1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.giraph.yarn;
19
20 import org.apache.giraph.conf.GiraphConfiguration;
21 import org.apache.giraph.conf.GiraphConstants;
22 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23 import org.apache.giraph.graph.GraphTaskManager;
24
25 import org.apache.giraph.io.VertexOutputFormat;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.io.Writable;
28 import org.apache.hadoop.io.WritableComparable;
29 import org.apache.hadoop.mapreduce.MapContext;
30 import org.apache.hadoop.mapreduce.Mapper.Context;
31 import org.apache.hadoop.mapreduce.OutputCommitter;
32 import org.apache.hadoop.mapreduce.TaskID;
33 import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
34 import org.apache.hadoop.mapreduce.task.MapContextImpl;
35 import org.apache.hadoop.mapreduce.TaskAttemptID;
36 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
37 import org.apache.log4j.Logger;
38
39 import java.io.IOException;
40
41 /**
42 * This process will execute the BSP graph tasks alloted to this YARN
43 * execution container. All tasks will be performed by calling the
44 * GraphTaskManager object. Since this GiraphYarnTask will
45 * not be passing data by key-value pairs through the MR framework, the
46 * Mapper parameter types are irrelevant, and set to <code>Object</code> type.
47 *
48 * @param <I> Vertex id
49 * @param <V> Vertex data
50 * @param <E> Edge data
51 */
52 public class GiraphYarnTask<I extends WritableComparable, V extends Writable,
53 E extends Writable> {
54 static {
55 Configuration.addDefaultResource(GiraphConstants.GIRAPH_YARN_CONF_FILE);
56 }
57 /** Class logger */
58 private static final Logger LOG = Logger.getLogger(GiraphYarnTask.class);
59 /** Manage the framework-agnostic Giraph task for this job run */
60 private GraphTaskManager<I, V, E> graphTaskManager;
61 /** Giraph task ID number must start @ index 0. Used by ZK, BSP, etc. */
62 private final int bspTaskId;
63 /** A special "dummy" override of Mapper#Context, used to deliver MRv1 deps */
64 private Context proxy;
65 /** Configuration to hand off into Giraph, through wrapper Mapper#Context */
66 private ImmutableClassesGiraphConfiguration conf;
67
68 /**
69 * Constructor. Build our DUMMY MRv1 data structures to pass to our
70 * GiraphTaskManager. This allows us to continue to look the other way
71 * while Giraph relies on MRv1 under the hood.
72 * @param taskAttemptId the MRv1 TaskAttemptID we constructed from CLI args
73 * supplied by GiraphApplicationMaster.
74 */
75 public GiraphYarnTask(final TaskAttemptID taskAttemptId) {
76 conf = new ImmutableClassesGiraphConfiguration<I, V, E>(
77 new GiraphConfiguration());
78 bspTaskId = taskAttemptId.getTaskID().getId();
79 conf.setInt("mapred.task.partition", bspTaskId);
80 proxy = buildProxyMapperContext(taskAttemptId);
81 graphTaskManager = new GraphTaskManager<I, V, E>(proxy);
82 }
83
84 /**
85 * Run one Giraph worker (or master) task, hosted in this execution container.
86 */
87 public void run() {
88 // Notify the master quicker if there is worker failure rather than
89 // waiting for ZooKeeper to timeout and delete the ephemeral znodes
90 try {
91 graphTaskManager.setup(null); // defaults GTM to "assume fatjar mode"
92 graphTaskManager.execute();
93 graphTaskManager.cleanup();
94 graphTaskManager.sendWorkerCountersAndFinishCleanup();
95 } catch (InterruptedException ie) {
96 LOG.error("run() caught an unrecoverable InterruptedException.", ie);
97 } catch (IOException ioe) {
98 throw new RuntimeException(
99 "run() caught an unrecoverable IOException.", ioe);
100 // CHECKSTYLE: stop IllegalCatch
101 } catch (RuntimeException e) {
102 // CHECKSTYLE: resume IllegalCatch
103 graphTaskManager.zooKeeperCleanup();
104 graphTaskManager.workerFailureCleanup();
105 throw new RuntimeException(
106 "run: Caught an unrecoverable exception " + e.getMessage(), e);
107 } finally {
108 // YARN: must complete the commit of the final output, Hadoop isn't there.
109 finalizeYarnJob();
110 }
111 }
112
113 /**
114 * Without Hadoop MR to finish the consolidation of all the task output from
115 * each HDFS task tmp dir, it won't get done. YARN has some job finalization
116 * it must do "for us." -- AND must delete "jar cache" in HDFS too!
117 */
118 private void finalizeYarnJob() {
119 if (conf.isPureYarnJob() && graphTaskManager.isMaster() &&
120 conf.getVertexOutputFormatClass() != null) {
121 try {
122 LOG.info("Master is ready to commit final job output data.");
123 VertexOutputFormat vertexOutputFormat =
124 conf.createWrappedVertexOutputFormat();
125 OutputCommitter outputCommitter =
126 vertexOutputFormat.getOutputCommitter(proxy);
127 // now we will have our output in OUTDIR if all went well...
128 outputCommitter.commitJob(proxy);
129 LOG.info("Master has committed the final job output data.");
130 } catch (InterruptedException ie) {
131 LOG.error("Interrupted while attempting to obtain " +
132 "OutputCommitter.", ie);
133 } catch (IOException ioe) {
134 LOG.error("Master task's attempt to commit output has " +
135 "FAILED.", ioe);
136 }
137 }
138 }
139
140 /**
141 * Utility to generate dummy Mapper#Context for use in Giraph internals.
142 * This is the "key hack" to inject MapReduce-related data structures
143 * containing YARN cluster metadata (and our GiraphConf from the AppMaster)
144 * into our Giraph BSP task code.
145 * @param tid the TaskAttemptID to construct this Mapper#Context from.
146 * @return sort of a Mapper#Context if you squint just right.
147 */
148 private Context buildProxyMapperContext(final TaskAttemptID tid) {
149 MapContext mc = new MapContextImpl<Object, Object, Object, Object>(
150 conf, // our Configuration, populated back at the GiraphYarnClient.
151 tid, // our TaskAttemptId, generated w/YARN app, container, attempt IDs
152 null, // RecordReader here will never be used by Giraph
153 null, // RecordWriter here will never be used by Giraph
154 null, // OutputCommitter here will never be used by Giraph
155 new TaskAttemptContextImpl.DummyReporter() { // goes in task logs for now
156 @Override
157 public void setStatus(String msg) {
158 LOG.info("[STATUS: task-" + bspTaskId + "] " + msg);
159 }
160 },
161 null); // Input split setting here will never be used by Giraph
162
163 // now, we wrap our MapContext ref so we can produce a Mapper#Context
164 WrappedMapper<Object, Object, Object, Object> wrappedMapper
165 = new WrappedMapper<Object, Object, Object, Object>();
166 return wrappedMapper.getMapContext(mc);
167 }
168
169 /**
170 * Task entry point.
171 * @param args CLI arguments injected by GiraphApplicationMaster to hand off
172 * job, task, and attempt ID's to this (and every) Giraph task.
173 * Args should be: <code>AppId ContainerId AppAttemptId</code>
174 */
175 @SuppressWarnings("rawtypes")
176 public static void main(String[] args) {
177 if (args.length != 4) {
178 throw new IllegalStateException("GiraphYarnTask could not construct " +
179 "a TaskAttemptID for the Giraph job from args: " + printArgs(args));
180 }
181 try {
182 GiraphYarnTask<?, ?, ?> giraphYarnTask =
183 new GiraphYarnTask(getTaskAttemptID(args));
184 giraphYarnTask.run();
185 // CHECKSTYLE: stop IllegalCatch
186 } catch (Throwable t) {
187 // CHECKSTYLE resume IllegalCatch
188 LOG.error("GiraphYarnTask threw a top-level exception, failing task", t);
189 System.exit(2);
190 } // ALWAYS finish a YARN task or AppMaster with System#exit!!!
191 System.exit(0);
192 }
193
194 /**
195 * Utility to create a TaskAttemptId we can feed to our fake Mapper#Context.
196 *
197 * NOTE: ContainerId will serve as MR TaskID for Giraph tasks.
198 * YARN container 1 is always AppMaster, so the least container id we will
199 * ever get from YARN for a Giraph task is container id 2. Giraph on MapReduce
200 * tasks must start at index 0. So we SUBTRACT TWO from each container id.
201 *
202 * @param args the command line args, fed to us by GiraphApplicationMaster.
203 * @return the TaskAttemptId object, populated with YARN job data.
204 */
205 private static TaskAttemptID getTaskAttemptID(String[] args) {
206 return new TaskAttemptID(
207 args[0], // YARN ApplicationId Cluster Timestamp
208 Integer.parseInt(args[1]), // YARN ApplicationId #
209 TaskID.getTaskType('m'), // Make Giraph think this is a Mapper task.
210 Integer.parseInt(args[2]) - 2, // YARN ContainerId MINUS TWO (see above)
211 Integer.parseInt(args[3])); // YARN AppAttemptId #
212 }
213
214 /**
215 * Utility to help log command line args in the event of an error.
216 * @param args the CLI args.
217 * @return a pretty-print of the input args.
218 */
219 private static String printArgs(String[] args) {
220 int count = 0;
221 StringBuilder sb = new StringBuilder();
222 for (String arg : args) {
223 sb.append("arg[" + (count++) + "] == " + arg + ", ");
224 }
225 return sb.toString();
226 }
227 }