Spark如何控制map任务数量

《Hive优化系列1:MR&Hive如何控制map和reduce任务数量》一文中介绍了Hadoop和Hive中Map和Reduce任务数量是如何决定的,本文我们将介绍Spark中基于文件的表其物理计划中map任务的产生过程。

FileSourceScanExec

工作过程:

  1. 首先遍历每一个文件,根据计算出来的最大分片大小对可切分的文件进行切分;
  2. 对切分后的文件分片根据分片大小降序排序,根据计算出来的最大分片大小将小的文件分片合并分配到相应分区,进而便是map任务的个数;

createNonBucketedReadRDD()方法为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private def createNonBucketedReadRDD(
readFile: (PartitionedFile) => Iterator[InternalRow],
selectedPartitions: Array[PartitionDirectory],
fsRelation: HadoopFsRelation): RDD[InternalRow] = {
val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
val maxSplitBytes =
FilePartition.maxSplitBytes(fsRelation.sparkSession, selectedPartitions)
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
s"open cost is considered as scanning $openCostInBytes bytes.")

val splitFiles = selectedPartitions.flatMap { partition =>
partition.files.flatMap { file =>
// getPath() is very expensive so we only want to call it once in this block:
val filePath = file.getPath
val isSplitable = relation.fileFormat.isSplitable(
relation.sparkSession, relation.options, filePath)
PartitionedFileUtil.splitFiles(
sparkSession = relation.sparkSession,
file = file,
filePath = filePath,
isSplitable = isSplitable,
maxSplitBytes = maxSplitBytes,
partitionValues = partition.values
)
}
}.sortBy(_.length)(implicitly[Ordering[Long]].reverse)

val partitions =
FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes)

// 使用分配的分区进而创建FileScanRDD
new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
}

切分文件

FilePartition

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def maxSplitBytes(
sparkSession: SparkSession,
selectedPartitions: Seq[PartitionDirectory]): Long = {
// spark.sql.files.maxPartitionBytes
val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
// spark.sql.files.openCostInBytes
val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
// spark.sql.files.minPartitionNum
val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum
.getOrElse(sparkSession.sparkContext.defaultParallelism)
// 计算目标分片大小
val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
val bytesPerCore = totalBytes / minPartitionNum

Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
}

要点:

  1. 分片不会过大,由spark.sql.files.maxPartitionBytes控制;

  2. 分片不会过小,由spark.sql.files.openCostInBytes控制;

  3. 目标分片大小等于:${\sum_{i=1}^n{(文件大小_i+开销)_i}}\over{并行度}$;

对文件进行分区

FilePartition

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def getFilePartitions(
sparkSession: SparkSession,
partitionedFiles: Seq[PartitionedFile],
maxSplitBytes: Long): Seq[FilePartition] = {
val partitions = new ArrayBuffer[FilePartition]
val currentFiles = new ArrayBuffer[PartitionedFile]
var currentSize = 0L

/** Close the current partition and move to the next. */
def closePartition(): Unit = {
if (currentFiles.nonEmpty) {
// Copy to a new Array.
val newPartition = FilePartition(partitions.size, currentFiles.toArray)
partitions += newPartition
}
currentFiles.clear()
currentSize = 0
}

val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
// 使用"Next Fit Decreasing"将文件分配到分区
partitionedFiles.foreach { file =>
// maxSplitBytes控制分区大小,达到指定阈值,则创建分区
// 注意,没有计入文件开销
if (currentSize + file.length > maxSplitBytes) {
closePartition()
}
// Add the given file to the current partition.
// 注意,当前文件在添加到当前分区时考虑了文件开销
currentSize += file.length + openCostInBytes
currentFiles += file
}
closePartition()
partitions.toSeq
}