Hive优化系列1:MR&Hive如何控制map和reduce任务数量
TODO
Map任务数量
MapReduce任务
首先,我们先介绍下InputFormat
接口,该接口中定义了两个方法:
getSplits()
:定义了如何将MR作业的输入文件划分成分片的逻辑,而一个逻辑分片实际会由一个Mapper
任务进行驱动;getRecordReader()
/createRecordReader()
:返回一个分片的记录读取器;
具体定义如下:
1 | // hadoop 1.x |
1 | // hadoop 2.x |
有了上面的知识,我们来学习下具体的InputFormat
的分片逻辑。
FileInputFormat
FileInputFormat
作为一个抽象类,在Hadoop 1.x和2.x中分别位于包org.apache.hadoop.mapred
和org.apache.hadoop.mapreduce.lib.input
内。其具体子类有:
TextInputFormat
KeyValueTextInputFormat
SequenceFileInputFormat
FixedLengthInputFormat
getSplits方法
该方法的逻辑比较简单,它遍历每个文件,根据每个文件是否可切分进行分片:
- 对于不可切分的文件格式,一个文件划分一个分片;
- 对于可切分的文件格式,根据
computeSplitSize()
计算的分片大小,对该文件进行分片;
computeSplitSize方法
Hadoop 1.x的computeSplitSize()
如下,它计算的逻辑分片大小满足:
- 分片不会过小,由minSize=
mapreduce.input.fileinputformat.split.minsize
控制; - 分片不会过大,不会超过该文件的块大小和目标分片大小;
- 其中,目标分片大小(goalSize) = 输入文件总大小 / 期望的map数,其中
期望的map数
在Hadoop 1.x中可以通过JobConf.setNumMapTasks()
设置;
1 | // hadoop 1.x |
Hadoop 2.x的computeSplitSize()
如下,它的逻辑与Hadoop 1.x基本一致,只是使用了maxSize
而不再计算目标分片大小了:
- minSize=
mapreduce.input.fileinputformat.split.minsize
- maxSize=
mapreduce.input.fileinputformat.split.maxsize
1 | // hadoop 2.x |
CombineFileInputFormat
CombineFileInputFormat
作为FileInputFormat
的子类,它重写了getSplits()
了,其子类有:
CombineTextInputFormat
CombineSequenceFileInputFormat
getSplits方法
- 该方法首先会校验如下参数:
minSizeNode
=mapreduce.input.fileinputformat.split.minsize.per.node
minSizeRack
=mapreduce.input.fileinputformat.split.minsize.per.rack
maxSize
=mapreduce.input.fileinputformat.split.maxsize
并且,需要满足:minSizeNode
<= minSizeRack
<= maxSize
;
- 针对每个
MultiPathFilter
,对该filter下的输入文件列表调用getMoreSplits()
分配分片;
getMoreSplits方法
- 该方法首先会遍历文件列表,将记账信息填入如下Map中:
1 | // key: 机器节点,value:该节点包含的所有块数据 |
- 调用
createSplits()
执行真正的分片逻辑;
createSplits方法
- 对于每个节点,对该节点上未分配的数据块进行逐个累计,累计值达到
maxSize
则创建一个分片,之后不再该节点继续进行分配而是跳至下一个节点执行分配(这样的目的是使得分配到分片更加均匀),如图中M-1
和M-3
; - 节点上剩余数据块累计值如果大于等于
minSizeNode
,则创建一个分片,如图中M-2
; - 经过步骤1、2后完成了节点内数据块合并,这之后某些节点上仍然存有较小数据量的数据块,如图中所示;
- 接下来要进行机架内的合并处理:
- 对于每个机架,对该机架内未分配的数据进行累计,累计值达到
maxSize
则创建一个分片,如图中M-4
; - 机架内剩余数据块累计值如果大于等于
minSizeRack
,则创建一个分片,如图中M-5
; - 经过步骤5、6后完成了机架内数据块合并,这之后某些节点上仍然存有较小数据量的数据块,这些数据块称为Overflow Blocks,如图中所示;
- 最后是合并这些Overflow Blocks,累计值每达到
maxSize
则创建一个分片,如图中M-6
;
Hive SQL
HiveInputFormat
CombineHiveInputFormat
Reduce任务数量
MapReduce任务
API | 配置 |
---|---|
Job.setNumReduceTasks(int tasks) | mapreduce.job.reduces |
Hive SQL
配置项 | 含义 | 默认值 |
---|---|---|
hive.exec.reducers.bytes.per.reducer | 每个Reducer所处理的输入文件大小 | 256 * 1000 * 1000(256Mb) |
hive.exec.reducers.max | Reducer最大数量 | 1009 |
MapRedTask.setNumberOfReducers()
参考
- https://blog.csdn.net/u013332124/article/details/97373278
- https://blog.csdn.net/u013332124/article/details/100177553?ivk_sa=1026860c
- https://blog.csdn.net/u013332124/article/details/97373278
- https://www.cnblogs.com/kwzblog/p/9542333.html
- https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/DeprecatedProperties.html
- https://mp.weixin.qq.com/s/QcWqHmaq1bFAvOzDhbynrQ