1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.bsp;
2021import org.apache.giraph.conf.GiraphConstants;
22import org.apache.hadoop.conf.Configuration;
23import org.apache.hadoop.io.Text;
24import org.apache.hadoop.mapreduce.InputFormat;
25import org.apache.hadoop.mapreduce.InputSplit;
26import org.apache.hadoop.mapreduce.JobContext;
27import org.apache.hadoop.mapreduce.RecordReader;
28import org.apache.hadoop.mapreduce.TaskAttemptContext;
29import org.apache.log4j.Logger;
3031import java.io.IOException;
32import java.util.ArrayList;
33import java.util.List;
3435/**36 * This InputFormat supports the BSP model by ensuring that the user specifies37 * how many splits (number of mappers) should be started simultaneously.38 * The number of splits depends on whether the master and worker processes are39 * separate. It is not meant to do any meaningful split of user-data.40 */41publicclassBspInputFormatextends InputFormat<Text, Text> {
42/** Class Logger */43privatestaticfinal Logger LOG = Logger.getLogger(BspInputFormat.class);
4445/**46 * Get the correct number of mappers based on the configuration47 *48 * @param conf Configuration to determine the number of mappers49 * @return Maximum number of tasks50 */51publicstaticint getMaxTasks(Configuration conf) {
52int maxWorkers = conf.getInt(GiraphConstants.MAX_WORKERS, 0);
53boolean splitMasterWorker = GiraphConstants.SPLIT_MASTER_WORKER.get(conf);
54int maxTasks = maxWorkers;
55// if this is a YARN job, separate ZK should already be running56boolean isYarnJob = GiraphConstants.IS_PURE_YARN_JOB.get(conf);
57if (splitMasterWorker && !isYarnJob) {
58 maxTasks += 1;
59 }
60if (LOG.isDebugEnabled()) {
61 LOG.debug("getMaxTasks: Max workers = " + maxWorkers +
62", split master/worker = " + splitMasterWorker +
63", is YARN-only job = " + isYarnJob +
64", total max tasks = " + maxTasks);
65 }
66return maxTasks;
67 }
6869 @Override
70public List<InputSplit> getSplits(JobContext context)
71throws IOException, InterruptedException {
72 Configuration conf = context.getConfiguration();
73int maxTasks = getMaxTasks(conf);
74if (maxTasks <= 0) {
75thrownew InterruptedException(
76"getSplits: Cannot have maxTasks <= 0 - " + maxTasks);
77 }
78 List<InputSplit> inputSplitList = new ArrayList<InputSplit>();
79for (int i = 0; i < maxTasks; ++i) {
80 inputSplitList.add(newBspInputSplit());
81 }
82return inputSplitList;
83 }
8485 @Override
86public RecordReader<Text, Text>
87 createRecordReader(InputSplit split, TaskAttemptContext context)
88throws IOException, InterruptedException {
89returnnewBspRecordReader();
90 }
91 }