View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.yarn;
19  
20  import org.apache.giraph.conf.GiraphConfiguration;
21  import org.apache.giraph.conf.GiraphConstants;
22  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23  import org.apache.giraph.graph.GraphTaskManager;
24  
25  import org.apache.giraph.io.VertexOutputFormat;
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.io.Writable;
28  import org.apache.hadoop.io.WritableComparable;
29  import org.apache.hadoop.mapreduce.MapContext;
30  import org.apache.hadoop.mapreduce.Mapper.Context;
31  import org.apache.hadoop.mapreduce.OutputCommitter;
32  import org.apache.hadoop.mapreduce.TaskID;
33  import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
34  import org.apache.hadoop.mapreduce.task.MapContextImpl;
35  import org.apache.hadoop.mapreduce.TaskAttemptID;
36  import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
37  import org.apache.log4j.Logger;
38  
39  import java.io.IOException;
40  
41  /**
42   * This process will execute the BSP graph tasks alloted to this YARN
43   * execution container. All tasks will be performed by calling the
44   * GraphTaskManager object. Since this GiraphYarnTask will
45   * not be passing data by key-value pairs through the MR framework, the
46   * Mapper parameter types are irrelevant, and set to <code>Object</code> type.
47   *
48   * @param <I> Vertex id
49   * @param <V> Vertex data
50   * @param <E> Edge data
51   */
52  public class GiraphYarnTask<I extends WritableComparable, V extends Writable,
53      E extends Writable> {
54    static {
55      Configuration.addDefaultResource(GiraphConstants.GIRAPH_YARN_CONF_FILE);
56    }
57    /** Class logger */
58    private static final Logger LOG = Logger.getLogger(GiraphYarnTask.class);
59    /** Manage the framework-agnostic Giraph task for this job run */
60    private GraphTaskManager<I, V, E> graphTaskManager;
61    /** Giraph task ID number must start @ index 0. Used by ZK, BSP, etc. */
62    private final int bspTaskId;
63    /** A special "dummy" override of Mapper#Context, used to deliver MRv1 deps */
64    private Context proxy;
65    /** Configuration to hand off into Giraph, through wrapper Mapper#Context */
66    private ImmutableClassesGiraphConfiguration conf;
67  
68    /**
69     * Constructor. Build our DUMMY MRv1 data structures to pass to our
70     * GiraphTaskManager. This allows us to continue to look the other way
71     * while Giraph relies on MRv1 under the hood.
72     * @param taskAttemptId the MRv1 TaskAttemptID we constructed from CLI args
73     *                      supplied by GiraphApplicationMaster.
74     */
75    public GiraphYarnTask(final TaskAttemptID taskAttemptId) {
76      conf = new ImmutableClassesGiraphConfiguration<I, V, E>(
77        new GiraphConfiguration());
78      bspTaskId = taskAttemptId.getTaskID().getId();
79      conf.setInt("mapred.task.partition", bspTaskId);
80      proxy = buildProxyMapperContext(taskAttemptId);
81      graphTaskManager = new GraphTaskManager<I, V, E>(proxy);
82    }
83  
84    /**
85     * Run one Giraph worker (or master) task, hosted in this execution container.
86     */
87    public void run() {
88      // Notify the master quicker if there is worker failure rather than
89      // waiting for ZooKeeper to timeout and delete the ephemeral znodes
90      try {
91        graphTaskManager.setup(null); // defaults GTM to "assume fatjar mode"
92        graphTaskManager.execute();
93        graphTaskManager.cleanup();
94      } catch (InterruptedException ie) {
95        LOG.error("run() caught an unrecoverable InterruptedException.", ie);
96      } catch (IOException ioe) {
97        throw new RuntimeException(
98          "run() caught an unrecoverable IOException.", ioe);
99        // CHECKSTYLE: stop IllegalCatch
100     } catch (RuntimeException e) {
101       // CHECKSTYLE: resume IllegalCatch
102       graphTaskManager.zooKeeperCleanup();
103       graphTaskManager.workerFailureCleanup();
104       throw new RuntimeException(
105         "run: Caught an unrecoverable exception " + e.getMessage(), e);
106     } finally {
107       // YARN: must complete the commit of the final output, Hadoop isn't there.
108       finalizeYarnJob();
109     }
110   }
111 
112   /**
113    * Without Hadoop MR to finish the consolidation of all the task output from
114    * each HDFS task tmp dir, it won't get done. YARN has some job finalization
115    * it must do "for us." -- AND must delete "jar cache" in HDFS too!
116    */
117   private void finalizeYarnJob() {
118     if (conf.isPureYarnJob() && graphTaskManager.isMaster() &&
119       conf.getVertexOutputFormatClass() != null) {
120       try {
121         LOG.info("Master is ready to commit final job output data.");
122         VertexOutputFormat vertexOutputFormat =
123           conf.createWrappedVertexOutputFormat();
124         OutputCommitter outputCommitter =
125           vertexOutputFormat.getOutputCommitter(proxy);
126         // now we will have our output in OUTDIR if all went well...
127         outputCommitter.commitJob(proxy);
128         LOG.info("Master has committed the final job output data.");
129       } catch (InterruptedException ie) {
130         LOG.error("Interrupted while attempting to obtain " +
131           "OutputCommitter.", ie);
132       } catch (IOException ioe) {
133         LOG.error("Master task's attempt to commit output has " +
134           "FAILED.", ioe);
135       }
136     }
137   }
138 
139   /**
140    * Utility to generate dummy Mapper#Context for use in Giraph internals.
141    * This is the "key hack" to inject MapReduce-related data structures
142    * containing YARN cluster metadata (and our GiraphConf from the AppMaster)
143    * into our Giraph BSP task code.
144    * @param tid the TaskAttemptID to construct this Mapper#Context from.
145    * @return sort of a Mapper#Context if you squint just right.
146    */
147   private Context buildProxyMapperContext(final TaskAttemptID tid) {
148     MapContext mc = new MapContextImpl<Object, Object, Object, Object>(
149       conf, // our Configuration, populated back at the GiraphYarnClient.
150       tid,  // our TaskAttemptId, generated w/YARN app, container, attempt IDs
151       null, // RecordReader here will never be used by Giraph
152       null, // RecordWriter here will never be used by Giraph
153       null, // OutputCommitter here will never be used by Giraph
154       new TaskAttemptContextImpl.DummyReporter() { // goes in task logs for now
155         @Override
156         public void setStatus(String msg) {
157           LOG.info("[STATUS: task-" + bspTaskId + "] " + msg);
158         }
159       },
160       null); // Input split setting here will never be used by Giraph
161 
162     // now, we wrap our MapContext ref so we can produce a Mapper#Context
163     WrappedMapper<Object, Object, Object, Object> wrappedMapper
164       = new WrappedMapper<Object, Object, Object, Object>();
165     return wrappedMapper.getMapContext(mc);
166   }
167 
168   /**
169    * Task entry point.
170    * @param args CLI arguments injected by GiraphApplicationMaster to hand off
171    *             job, task, and attempt ID's to this (and every) Giraph task.
172    *             Args should be: <code>AppId ContainerId AppAttemptId</code>
173    */
174   @SuppressWarnings("rawtypes")
175   public static void main(String[] args) {
176     if (args.length != 4) {
177       throw new IllegalStateException("GiraphYarnTask could not construct " +
178         "a TaskAttemptID for the Giraph job from args: " + printArgs(args));
179     }
180     try {
181       GiraphYarnTask<?, ?, ?> giraphYarnTask =
182         new GiraphYarnTask(getTaskAttemptID(args));
183       giraphYarnTask.run();
184       // CHECKSTYLE: stop IllegalCatch
185     } catch (Throwable t) {
186       // CHECKSTYLE resume IllegalCatch
187       LOG.error("GiraphYarnTask threw a top-level exception, failing task", t);
188       System.exit(2);
189     } // ALWAYS finish a YARN task or AppMaster with System#exit!!!
190     System.exit(0);
191   }
192 
193   /**
194    * Utility to create a TaskAttemptId we can feed to our fake Mapper#Context.
195    *
196    * NOTE: ContainerId will serve as MR TaskID for Giraph tasks.
197    * YARN container 1 is always AppMaster, so the least container id we will
198    * ever get from YARN for a Giraph task is container id 2. Giraph on MapReduce
199    * tasks must start at index 0. So we SUBTRACT TWO from each container id.
200    *
201    * @param args the command line args, fed to us by GiraphApplicationMaster.
202    * @return the TaskAttemptId object, populated with YARN job data.
203    */
204   private static TaskAttemptID getTaskAttemptID(String[] args) {
205     return new TaskAttemptID(
206       args[0], // YARN ApplicationId Cluster Timestamp
207       Integer.parseInt(args[1]), // YARN ApplicationId #
208       TaskID.getTaskType('m'),  // Make Giraph think this is a Mapper task.
209       Integer.parseInt(args[2]) - 2, // YARN ContainerId MINUS TWO (see above)
210       Integer.parseInt(args[3])); // YARN AppAttemptId #
211   }
212 
213   /**
214    * Utility to help log command line args in the event of an error.
215    * @param args the CLI args.
216    * @return a pretty-print of the input args.
217    */
218   private static String printArgs(String[] args) {
219     int count = 0;
220     StringBuilder sb = new StringBuilder();
221     for (String arg : args) {
222       sb.append("arg[" + (count++) + "] == " + arg + ", ");
223     }
224     return sb.toString();
225   }
226 }