Hive优化系列1:MR&Hive如何控制map和reduce任务数量

TODO

Map任务数量

MapReduce任务

首先,我们先介绍下InputFormat接口,该接口中定义了两个方法:

  1. getSplits():定义了如何将MR作业的输入文件划分成分片的逻辑,而一个逻辑分片实际会由一个Mapper任务进行驱动;
  2. getRecordReader()/createRecordReader():返回一个分片的记录读取器;

具体定义如下:

1
2
3
4
5
6
7
8
// hadoop 1.x
public interface InputFormat<K, V> {
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;

RecordReader<K, V> getRecordReader(InputSplit split,
JobConf job,
Reporter reporter) throws IOException;
}
1
2
3
4
5
6
7
8
9
10
11
12
// hadoop 2.x
public abstract class InputFormat<K, V> {
public abstract
List<InputSplit> getSplits(JobContext context
) throws IOException, InterruptedException;

public abstract
RecordReader<K,V> createRecordReader(InputSplit split,
TaskAttemptContext context
) throws IOException,
InterruptedException;
}

有了上面的知识,我们来学习下具体的InputFormat的分片逻辑。

FileInputFormat

FileInputFormat作为一个抽象类,在Hadoop 1.x和2.x中分别位于包org.apache.hadoop.mapredorg.apache.hadoop.mapreduce.lib.input内。其具体子类有:

  • TextInputFormat

  • KeyValueTextInputFormat

  • SequenceFileInputFormat

  • FixedLengthInputFormat

getSplits方法

该方法的逻辑比较简单,它遍历每个文件,根据每个文件是否可切分进行分片:

  • 对于不可切分的文件格式,一个文件划分一个分片;
  • 对于可切分的文件格式,根据computeSplitSize()计算的分片大小,对该文件进行分片;

computeSplitSize方法

Hadoop 1.x的computeSplitSize()如下,它计算的逻辑分片大小满足:

  1. 分片不会过小,由minSize=mapreduce.input.fileinputformat.split.minsize控制;
  2. 分片不会过大,不会超过该文件的块大小和目标分片大小;
  3. 其中,目标分片大小(goalSize) = 输入文件总大小 / 期望的map数,其中期望的map数在Hadoop 1.x中可以通过 JobConf.setNumMapTasks()设置;
1
2
3
4
5
// hadoop 1.x
protected long computeSplitSize(long goalSize, long minSize,
long blockSize) {
return Math.max(minSize, Math.min(goalSize, blockSize));
}

Hadoop 2.x的computeSplitSize()如下,它的逻辑与Hadoop 1.x基本一致,只是使用了maxSize而不再计算目标分片大小了:

  1. minSize=mapreduce.input.fileinputformat.split.minsize
  2. maxSize=mapreduce.input.fileinputformat.split.maxsize
1
2
3
4
5
// hadoop 2.x
protected long computeSplitSize(long blockSize, long minSize,
long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}

CombineFileInputFormat

CombineFileInputFormat作为FileInputFormat的子类,它重写了getSplits()了,其子类有:

  • CombineTextInputFormat

  • CombineSequenceFileInputFormat

getSplits方法

  1. 该方法首先会校验如下参数:
    • minSizeNode=mapreduce.input.fileinputformat.split.minsize.per.node
    • minSizeRack=mapreduce.input.fileinputformat.split.minsize.per.rack
    • maxSize=mapreduce.input.fileinputformat.split.maxsize

并且,需要满足:minSizeNode <= minSizeRack <= maxSize

  1. 针对每个MultiPathFilter,对该filter下的输入文件列表调用getMoreSplits()分配分片;

getMoreSplits方法

  1. 该方法首先会遍历文件列表,将记账信息填入如下Map中:
1
2
3
4
5
6
7
8
9
10
11
// key: 机器节点,value:该节点包含的所有块数据
HashMap<String, List<OneBlockInfo>> rackToBlocks =
new HashMap<String, List<OneBlockInfo>>();

// mapping from a block to the nodes on which it has replicas
HashMap<OneBlockInfo, String[]> blockToNodes =
new HashMap<OneBlockInfo, String[]>();

// mapping from a node to the list of blocks that it contains
HashMap<String, Set<OneBlockInfo>> nodeToBlocks =
new HashMap<String, Set<OneBlockInfo>>();
  1. 调用createSplits()执行真正的分片逻辑;

createSplits方法

  1. 对于每个节点,对该节点上未分配的数据块进行逐个累计,累计值达到maxSize则创建一个分片,之后不再该节点继续进行分配而是跳至下一个节点执行分配(这样的目的是使得分配到分片更加均匀),如图中M-1M-3
  2. 节点上剩余数据块累计值如果大于等于minSizeNode,则创建一个分片,如图中M-2
  3. 经过步骤1、2后完成了节点内数据块合并,这之后某些节点上仍然存有较小数据量的数据块,如图中所示;
  4. 接下来要进行机架内的合并处理:
  5. 对于每个机架,对该机架内未分配的数据进行累计,累计值达到maxSize则创建一个分片,如图中M-4
  6. 机架内剩余数据块累计值如果大于等于minSizeRack,则创建一个分片,如图中M-5
  7. 经过步骤5、6后完成了机架内数据块合并,这之后某些节点上仍然存有较小数据量的数据块,这些数据块称为Overflow Blocks,如图中所示;
  8. 最后是合并这些Overflow Blocks,累计值每达到maxSize则创建一个分片,如图中M-6

Hive SQL

HiveInputFormat

CombineHiveInputFormat

Reduce任务数量

MapReduce任务

API 配置
Job.setNumReduceTasks(int tasks) mapreduce.job.reduces

Hive SQL

配置项 含义 默认值
hive.exec.reducers.bytes.per.reducer 每个Reducer所处理的输入文件大小 256 * 1000 * 1000(256Mb)
hive.exec.reducers.max Reducer最大数量 1009

MapRedTask.setNumberOfReducers()

参考

  1. https://blog.csdn.net/u013332124/article/details/97373278
  2. https://blog.csdn.net/u013332124/article/details/100177553?ivk_sa=1026860c
  3. https://blog.csdn.net/u013332124/article/details/97373278
  4. https://www.cnblogs.com/kwzblog/p/9542333.html
  5. https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/DeprecatedProperties.html
  6. https://mp.weixin.qq.com/s/QcWqHmaq1bFAvOzDhbynrQ