View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.graph;
20  
21  import org.apache.commons.lang3.exception.ExceptionUtils;
22  import org.apache.hadoop.filecache.DistributedCache;
23  import org.apache.hadoop.io.Writable;
24  import org.apache.hadoop.io.WritableComparable;
25  import org.apache.hadoop.mapreduce.Mapper;
26  import org.apache.log4j.Logger;
27  
28  import java.io.IOException;
29  
30  /**
31   * This mapper that will execute the BSP graph tasks alloted to this worker.
32   * All tasks will be performed by calling the GraphTaskManager object managed by
33   * this GraphMapper wrapper classs. Since this mapper will
34   * not be passing data by key-value pairs through the MR framework, the
35   * Mapper parameter types are irrelevant, and set to <code>Object</code> type.
36   *
37   * @param <I> Vertex id
38   * @param <V> Vertex data
39   * @param <E> Edge data
40   */
41  @SuppressWarnings("rawtypes")
42  public class GraphMapper<I extends WritableComparable, V extends Writable,
43      E extends Writable> extends
44      Mapper<Object, Object, Object, Object> {
45    /** Class logger */
46    private static final Logger LOG = Logger.getLogger(GraphMapper.class);
47    /** Manage the framework-agnostic Giraph tasks for this job run */
48    private GraphTaskManager<I, V, E> graphTaskManager;
49  
50    @Override
51    public void setup(Context context)
52      throws IOException, InterruptedException {
53      // Execute all Giraph-related role(s) assigned to this compute node.
54      // Roles can include "master," "worker," "zookeeper," or . . . ?
55      graphTaskManager = new GraphTaskManager<I, V, E>(context);
56      graphTaskManager.setup(
57        DistributedCache.getLocalCacheArchives(context.getConfiguration()));
58  
59      // Setting the default handler for uncaught exceptions.
60      Thread.setDefaultUncaughtExceptionHandler(
61          graphTaskManager.createUncaughtExceptionHandler());
62    }
63  
64    /**
65     * GraphMapper is designed to host the compute node in a Hadoop
66     * Mapper task. The GraphTaskManager owned by GraphMapper manages all
67     * framework-independent Giraph BSP operations for this job run.
68     * @param key unused arg required by Mapper API
69     * @param value unused arg required by Mapper API
70     * @param context the Mapper#Context required to report progress
71     *                to the underlying cluster
72     */
73    @Override
74    public void map(Object key, Object value, Context context)
75      throws IOException, InterruptedException {
76      // a no-op in Giraph
77    }
78  
79    @Override
80    public void cleanup(Context context)
81      throws IOException, InterruptedException {
82      graphTaskManager.cleanup();
83    }
84  
85    @Override
86    public void run(Context context) throws IOException, InterruptedException {
87      // Notify the master quicker if there is worker failure rather than
88      // waiting for ZooKeeper to timeout and delete the ephemeral znodes
89      try {
90        setup(context);
91        while (context.nextKeyValue()) {
92          graphTaskManager.execute();
93        }
94        cleanup(context);
95        // Checkstyle exception due to needing to dump ZooKeeper failure
96        // on exception
97        // CHECKSTYLE: stop IllegalCatch
98      } catch (RuntimeException e) {
99        // CHECKSTYLE: resume IllegalCatch
100       LOG.error("Caught an unrecoverable exception " + e.getMessage(), e);
101       graphTaskManager.getJobProgressTracker().logError(
102           "Exception occurred on mapper " +
103               graphTaskManager.getConf().getTaskPartition() + ": " +
104               ExceptionUtils.getStackTrace(e));
105       graphTaskManager.zooKeeperCleanup();
106       graphTaskManager.workerFailureCleanup();
107       throw new IllegalStateException(
108           "run: Caught an unrecoverable exception " + e.getMessage(), e);
109     }
110   }
111 
112 }