View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.graph;
20  
21  import org.apache.commons.lang3.exception.ExceptionUtils;
22  import org.apache.giraph.writable.kryo.KryoWritableWrapper;
23  import org.apache.hadoop.filecache.DistributedCache;
24  import org.apache.hadoop.io.Writable;
25  import org.apache.hadoop.io.WritableComparable;
26  import org.apache.hadoop.mapreduce.Mapper;
27  import org.apache.log4j.Logger;
28  
29  import java.io.IOException;
30  
31  /**
32   * This mapper that will execute the BSP graph tasks alloted to this worker.
33   * All tasks will be performed by calling the GraphTaskManager object managed by
34   * this GraphMapper wrapper classs. Since this mapper will
35   * not be passing data by key-value pairs through the MR framework, the
36   * Mapper parameter types are irrelevant, and set to <code>Object</code> type.
37   *
38   * @param <I> Vertex id
39   * @param <V> Vertex data
40   * @param <E> Edge data
41   */
42  @SuppressWarnings("rawtypes")
43  public class GraphMapper<I extends WritableComparable, V extends Writable,
44      E extends Writable> extends
45      Mapper<Object, Object, Object, Object> {
46    /** Class logger */
47    private static final Logger LOG = Logger.getLogger(GraphMapper.class);
48    /** Manage the framework-agnostic Giraph tasks for this job run */
49    private GraphTaskManager<I, V, E> graphTaskManager;
50  
51    @Override
52    public void setup(Context context)
53      throws IOException, InterruptedException {
54      // Execute all Giraph-related role(s) assigned to this compute node.
55      // Roles can include "master," "worker," "zookeeper," or . . . ?
56      graphTaskManager = new GraphTaskManager<I, V, E>(context);
57      graphTaskManager.setup(
58        DistributedCache.getLocalCacheArchives(context.getConfiguration()));
59    }
60  
61    /**
62     * GraphMapper is designed to host the compute node in a Hadoop
63     * Mapper task. The GraphTaskManager owned by GraphMapper manages all
64     * framework-independent Giraph BSP operations for this job run.
65     * @param key unused arg required by Mapper API
66     * @param value unused arg required by Mapper API
67     * @param context the Mapper#Context required to report progress
68     *                to the underlying cluster
69     */
70    @Override
71    public void map(Object key, Object value, Context context)
72      throws IOException, InterruptedException {
73      // a no-op in Giraph
74    }
75  
76    @Override
77    public void cleanup(Context context)
78      throws IOException, InterruptedException {
79      graphTaskManager.cleanup();
80    }
81  
82    @Override
83    public void run(Context context) throws IOException, InterruptedException {
84      // Notify the master quicker if there is worker failure rather than
85      // waiting for ZooKeeper to timeout and delete the ephemeral znodes
86      try {
87        setup(context);
88        while (context.nextKeyValue()) {
89          graphTaskManager.execute();
90        }
91        cleanup(context);
92        // Checkstyle exception due to needing to dump ZooKeeper failure
93        // on exception
94        // CHECKSTYLE: stop IllegalCatch
95      } catch (RuntimeException e) {
96        // CHECKSTYLE: resume IllegalCatch
97        byte [] exByteArray = KryoWritableWrapper.convertToByteArray(e);
98        LOG.error("Caught an unrecoverable exception " + e.getMessage(), e);
99        graphTaskManager.getJobProgressTracker().logError(
100           "Exception occurred on mapper " +
101               graphTaskManager.getConf().getTaskPartition() + ": " +
102               ExceptionUtils.getStackTrace(e), exByteArray);
103       graphTaskManager.zooKeeperCleanup();
104       graphTaskManager.workerFailureCleanup();
105       throw new IllegalStateException(
106           "run: Caught an unrecoverable exception " + e.getMessage(), e);
107     }
108   }
109 
110 }