This project has retired. For details please refer to its Attic page.
BspInputFormat xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.bsp;
20  
21  import org.apache.giraph.conf.GiraphConstants;
22  import org.apache.hadoop.conf.Configuration;
23  import org.apache.hadoop.io.Text;
24  import org.apache.hadoop.mapreduce.InputFormat;
25  import org.apache.hadoop.mapreduce.InputSplit;
26  import org.apache.hadoop.mapreduce.JobContext;
27  import org.apache.hadoop.mapreduce.RecordReader;
28  import org.apache.hadoop.mapreduce.TaskAttemptContext;
29  import org.apache.log4j.Logger;
30  
31  import java.io.IOException;
32  import java.util.ArrayList;
33  import java.util.List;
34  
35  /**
36   * This InputFormat supports the BSP model by ensuring that the user specifies
37   * how many splits (number of mappers) should be started simultaneously.
38   * The number of splits depends on whether the master and worker processes are
39   * separate.  It is not meant to do any meaningful split of user-data.
40   */
41  public class BspInputFormat extends InputFormat<Text, Text> {
42    /** Class Logger */
43    private static final Logger LOG = Logger.getLogger(BspInputFormat.class);
44  
45    /**
46     * Get the correct number of mappers based on the configuration
47     *
48     * @param conf Configuration to determine the number of mappers
49     * @return Maximum number of tasks
50     */
51    public static int getMaxTasks(Configuration conf) {
52      int maxWorkers = conf.getInt(GiraphConstants.MAX_WORKERS, 0);
53      boolean splitMasterWorker = GiraphConstants.SPLIT_MASTER_WORKER.get(conf);
54      int maxTasks = maxWorkers;
55      // if this is a YARN job, separate ZK should already be running
56      boolean isYarnJob = GiraphConstants.IS_PURE_YARN_JOB.get(conf);
57      if (splitMasterWorker && !isYarnJob) {
58        maxTasks += 1;
59      }
60      if (LOG.isDebugEnabled()) {
61        LOG.debug("getMaxTasks: Max workers = " + maxWorkers +
62            ", split master/worker = " + splitMasterWorker +
63            ", is YARN-only job = " + isYarnJob +
64            ", total max tasks = " + maxTasks);
65      }
66      return maxTasks;
67    }
68  
69    @Override
70    public List<InputSplit> getSplits(JobContext context)
71      throws IOException, InterruptedException {
72      Configuration conf = context.getConfiguration();
73      int maxTasks = getMaxTasks(conf);
74      if (maxTasks <= 0) {
75        throw new InterruptedException(
76            "getSplits: Cannot have maxTasks <= 0 - " + maxTasks);
77      }
78      List<InputSplit> inputSplitList = new ArrayList<InputSplit>();
79      for (int i = 0; i < maxTasks; ++i) {
80        inputSplitList.add(new BspInputSplit());
81      }
82      return inputSplitList;
83    }
84  
85    @Override
86    public RecordReader<Text, Text>
87    createRecordReader(InputSplit split, TaskAttemptContext context)
88      throws IOException, InterruptedException {
89      return new BspRecordReader();
90    }
91  }