This project has retired. For details please refer to its Attic page.
GiraphYarnTask xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.yarn;
19  
20  import org.apache.giraph.conf.GiraphConfiguration;
21  import org.apache.giraph.conf.GiraphConstants;
22  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23  import org.apache.giraph.graph.GraphTaskManager;
24  
25  import org.apache.giraph.io.VertexOutputFormat;
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.io.Writable;
28  import org.apache.hadoop.io.WritableComparable;
29  import org.apache.hadoop.mapreduce.MapContext;
30  import org.apache.hadoop.mapreduce.Mapper.Context;
31  import org.apache.hadoop.mapreduce.OutputCommitter;
32  import org.apache.hadoop.mapreduce.TaskID;
33  import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
34  import org.apache.hadoop.mapreduce.task.MapContextImpl;
35  import org.apache.hadoop.mapreduce.TaskAttemptID;
36  import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
37  import org.apache.log4j.Logger;
38  
39  import java.io.IOException;
40  
41  /**
42   * This process will execute the BSP graph tasks alloted to this YARN
43   * execution container. All tasks will be performed by calling the
44   * GraphTaskManager object. Since this GiraphYarnTask will
45   * not be passing data by key-value pairs through the MR framework, the
46   * Mapper parameter types are irrelevant, and set to <code>Object</code> type.
47   *
48   * @param <I> Vertex id
49   * @param <V> Vertex data
50   * @param <E> Edge data
51   */
52  public class GiraphYarnTask<I extends WritableComparable, V extends Writable,
53      E extends Writable> {
54    static {
55      Configuration.addDefaultResource(GiraphConstants.GIRAPH_YARN_CONF_FILE);
56    }
57    /** Class logger */
58    private static final Logger LOG = Logger.getLogger(GiraphYarnTask.class);
59    /** Manage the framework-agnostic Giraph task for this job run */
60    private GraphTaskManager<I, V, E> graphTaskManager;
61    /** Giraph task ID number must start @ index 0. Used by ZK, BSP, etc. */
62    private final int bspTaskId;
63    /** A special "dummy" override of Mapper#Context, used to deliver MRv1 deps */
64    private Context proxy;
65    /** Configuration to hand off into Giraph, through wrapper Mapper#Context */
66    private ImmutableClassesGiraphConfiguration conf;
67  
68    /**
69     * Constructor. Build our DUMMY MRv1 data structures to pass to our
70     * GiraphTaskManager. This allows us to continue to look the other way
71     * while Giraph relies on MRv1 under the hood.
72     * @param taskAttemptId the MRv1 TaskAttemptID we constructed from CLI args
73     *                      supplied by GiraphApplicationMaster.
74     */
75    public GiraphYarnTask(final TaskAttemptID taskAttemptId) {
76      conf = new ImmutableClassesGiraphConfiguration<I, V, E>(
77        new GiraphConfiguration());
78      bspTaskId = taskAttemptId.getTaskID().getId();
79      conf.setInt("mapred.task.partition", bspTaskId);
80      proxy = buildProxyMapperContext(taskAttemptId);
81      graphTaskManager = new GraphTaskManager<I, V, E>(proxy);
82    }
83  
84    /**
85     * Run one Giraph worker (or master) task, hosted in this execution container.
86     */
87    public void run() {
88      // Notify the master quicker if there is worker failure rather than
89      // waiting for ZooKeeper to timeout and delete the ephemeral znodes
90      try {
91        graphTaskManager.setup(null); // defaults GTM to "assume fatjar mode"
92        graphTaskManager.execute();
93        graphTaskManager.cleanup();
94        graphTaskManager.sendWorkerCountersAndFinishCleanup();
95      } catch (InterruptedException ie) {
96        LOG.error("run() caught an unrecoverable InterruptedException.", ie);
97      } catch (IOException ioe) {
98        throw new RuntimeException(
99          "run() caught an unrecoverable IOException.", ioe);
100       // CHECKSTYLE: stop IllegalCatch
101     } catch (RuntimeException e) {
102       // CHECKSTYLE: resume IllegalCatch
103       graphTaskManager.zooKeeperCleanup();
104       graphTaskManager.workerFailureCleanup();
105       throw new RuntimeException(
106         "run: Caught an unrecoverable exception " + e.getMessage(), e);
107     } finally {
108       // YARN: must complete the commit of the final output, Hadoop isn't there.
109       finalizeYarnJob();
110     }
111   }
112 
113   /**
114    * Without Hadoop MR to finish the consolidation of all the task output from
115    * each HDFS task tmp dir, it won't get done. YARN has some job finalization
116    * it must do "for us." -- AND must delete "jar cache" in HDFS too!
117    */
118   private void finalizeYarnJob() {
119     if (conf.isPureYarnJob() && graphTaskManager.isMaster() &&
120       conf.getVertexOutputFormatClass() != null) {
121       try {
122         LOG.info("Master is ready to commit final job output data.");
123         VertexOutputFormat vertexOutputFormat =
124           conf.createWrappedVertexOutputFormat();
125         OutputCommitter outputCommitter =
126           vertexOutputFormat.getOutputCommitter(proxy);
127         // now we will have our output in OUTDIR if all went well...
128         outputCommitter.commitJob(proxy);
129         LOG.info("Master has committed the final job output data.");
130       } catch (InterruptedException ie) {
131         LOG.error("Interrupted while attempting to obtain " +
132           "OutputCommitter.", ie);
133       } catch (IOException ioe) {
134         LOG.error("Master task's attempt to commit output has " +
135           "FAILED.", ioe);
136       }
137     }
138   }
139 
140   /**
141    * Utility to generate dummy Mapper#Context for use in Giraph internals.
142    * This is the "key hack" to inject MapReduce-related data structures
143    * containing YARN cluster metadata (and our GiraphConf from the AppMaster)
144    * into our Giraph BSP task code.
145    * @param tid the TaskAttemptID to construct this Mapper#Context from.
146    * @return sort of a Mapper#Context if you squint just right.
147    */
148   private Context buildProxyMapperContext(final TaskAttemptID tid) {
149     MapContext mc = new MapContextImpl<Object, Object, Object, Object>(
150       conf, // our Configuration, populated back at the GiraphYarnClient.
151       tid,  // our TaskAttemptId, generated w/YARN app, container, attempt IDs
152       null, // RecordReader here will never be used by Giraph
153       null, // RecordWriter here will never be used by Giraph
154       null, // OutputCommitter here will never be used by Giraph
155       new TaskAttemptContextImpl.DummyReporter() { // goes in task logs for now
156         @Override
157         public void setStatus(String msg) {
158           LOG.info("[STATUS: task-" + bspTaskId + "] " + msg);
159         }
160       },
161       null); // Input split setting here will never be used by Giraph
162 
163     // now, we wrap our MapContext ref so we can produce a Mapper#Context
164     WrappedMapper<Object, Object, Object, Object> wrappedMapper
165       = new WrappedMapper<Object, Object, Object, Object>();
166     return wrappedMapper.getMapContext(mc);
167   }
168 
169   /**
170    * Task entry point.
171    * @param args CLI arguments injected by GiraphApplicationMaster to hand off
172    *             job, task, and attempt ID's to this (and every) Giraph task.
173    *             Args should be: <code>AppId ContainerId AppAttemptId</code>
174    */
175   @SuppressWarnings("rawtypes")
176   public static void main(String[] args) {
177     if (args.length != 4) {
178       throw new IllegalStateException("GiraphYarnTask could not construct " +
179         "a TaskAttemptID for the Giraph job from args: " + printArgs(args));
180     }
181     try {
182       GiraphYarnTask<?, ?, ?> giraphYarnTask =
183         new GiraphYarnTask(getTaskAttemptID(args));
184       giraphYarnTask.run();
185       // CHECKSTYLE: stop IllegalCatch
186     } catch (Throwable t) {
187       // CHECKSTYLE resume IllegalCatch
188       LOG.error("GiraphYarnTask threw a top-level exception, failing task", t);
189       System.exit(2);
190     } // ALWAYS finish a YARN task or AppMaster with System#exit!!!
191     System.exit(0);
192   }
193 
194   /**
195    * Utility to create a TaskAttemptId we can feed to our fake Mapper#Context.
196    *
197    * NOTE: ContainerId will serve as MR TaskID for Giraph tasks.
198    * YARN container 1 is always AppMaster, so the least container id we will
199    * ever get from YARN for a Giraph task is container id 2. Giraph on MapReduce
200    * tasks must start at index 0. So we SUBTRACT TWO from each container id.
201    *
202    * @param args the command line args, fed to us by GiraphApplicationMaster.
203    * @return the TaskAttemptId object, populated with YARN job data.
204    */
205   private static TaskAttemptID getTaskAttemptID(String[] args) {
206     return new TaskAttemptID(
207       args[0], // YARN ApplicationId Cluster Timestamp
208       Integer.parseInt(args[1]), // YARN ApplicationId #
209       TaskID.getTaskType('m'),  // Make Giraph think this is a Mapper task.
210       Integer.parseInt(args[2]) - 2, // YARN ContainerId MINUS TWO (see above)
211       Integer.parseInt(args[3])); // YARN AppAttemptId #
212   }
213 
214   /**
215    * Utility to help log command line args in the event of an error.
216    * @param args the CLI args.
217    * @return a pretty-print of the input args.
218    */
219   private static String printArgs(String[] args) {
220     int count = 0;
221     StringBuilder sb = new StringBuilder();
222     for (String arg : args) {
223       sb.append("arg[" + (count++) + "] == " + arg + ", ");
224     }
225     return sb.toString();
226   }
227 }