1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.graph;
2021import org.apache.commons.lang3.exception.ExceptionUtils;
22import org.apache.giraph.writable.kryo.KryoWritableWrapper;
23import org.apache.hadoop.filecache.DistributedCache;
24import org.apache.hadoop.io.Writable;
25import org.apache.hadoop.io.WritableComparable;
26import org.apache.hadoop.mapreduce.Mapper;
27import org.apache.log4j.Logger;
2829import java.io.IOException;
3031/**32 * This mapper that will execute the BSP graph tasks alloted to this worker.33 * All tasks will be performed by calling the GraphTaskManager object managed by34 * this GraphMapper wrapper classs. Since this mapper will35 * not be passing data by key-value pairs through the MR framework, the36 * Mapper parameter types are irrelevant, and set to <code>Object</code> type.37 *38 * @param <I> Vertex id39 * @param <V> Vertex data40 * @param <E> Edge data41 */42 @SuppressWarnings("rawtypes")
43publicclass GraphMapper<I extends WritableComparable, V extends Writable,
44 E extends Writable> extends45 Mapper<Object, Object, Object, Object> {
46/** Class logger */47privatestaticfinal Logger LOG = Logger.getLogger(GraphMapper.class);
48/** Manage the framework-agnostic Giraph tasks for this job run */49private GraphTaskManager<I, V, E> graphTaskManager;
5051 @Override
52publicvoid setup(Context context)
53throws IOException, InterruptedException {
54// Execute all Giraph-related role(s) assigned to this compute node.55// Roles can include "master," "worker," "zookeeper," or . . . ?56 graphTaskManager = new GraphTaskManager<I, V, E>(context);
57 graphTaskManager.setup(
58 DistributedCache.getLocalCacheArchives(context.getConfiguration()));
59 }
6061/**62 * GraphMapper is designed to host the compute node in a Hadoop63 * Mapper task. The GraphTaskManager owned by GraphMapper manages all64 * framework-independent Giraph BSP operations for this job run.65 * @param key unused arg required by Mapper API66 * @param value unused arg required by Mapper API67 * @param context the Mapper#Context required to report progress68 * to the underlying cluster69 */70 @Override
71publicvoid map(Object key, Object value, Context context)
72throws IOException, InterruptedException {
73// a no-op in Giraph74 }
7576 @Override
77publicvoid cleanup(Context context)
78throws IOException, InterruptedException {
79 graphTaskManager.cleanup();
80 graphTaskManager.sendWorkerCountersAndFinishCleanup();
81 }
8283 @Override
84publicvoid run(Context context) throws IOException, InterruptedException {
85// Notify the master quicker if there is worker failure rather than86// waiting for ZooKeeper to timeout and delete the ephemeral znodes87try {
88 setup(context);
89while (context.nextKeyValue()) {
90 graphTaskManager.execute();
91 }
92 cleanup(context);
93// Checkstyle exception due to needing to dump ZooKeeper failure94// on exception95// CHECKSTYLE: stop IllegalCatch96 } catch (RuntimeException e) {
97// CHECKSTYLE: resume IllegalCatch98 byte [] exByteArray = KryoWritableWrapper.convertToByteArray(e);
99 LOG.error("Caught an unrecoverable exception " + e.getMessage(), e);
100 graphTaskManager.getJobProgressTracker().logError(
101"Exception occurred on mapper " +
102 graphTaskManager.getConf().getTaskPartition() + ": " +
103 ExceptionUtils.getStackTrace(e), exByteArray);
104 graphTaskManager.zooKeeperCleanup();
105 graphTaskManager.workerFailureCleanup();
106thrownew IllegalStateException(
107"run: Caught an unrecoverable exception " + e.getMessage(), e);
108 }
109 }
110111 }