privatedefcreateNonBucketedReadRDD( readFile: (PartitionedFile) => Iterator[InternalRow], selectedPartitions: Array[PartitionDirectory], fsRelation: HadoopFsRelation): RDD[InternalRow] = { val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes val maxSplitBytes = FilePartition.maxSplitBytes(fsRelation.sparkSession, selectedPartitions) logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + s"open cost is considered as scanning $openCostInBytes bytes.")
val splitFiles = selectedPartitions.flatMap { partition => partition.files.flatMap { file => // getPath() is very expensive so we only want to call it once in this block: val filePath = file.getPath val isSplitable = relation.fileFormat.isSplitable( relation.sparkSession, relation.options, filePath) PartitionedFileUtil.splitFiles( sparkSession = relation.sparkSession, file = file, filePath = filePath, isSplitable = isSplitable, maxSplitBytes = maxSplitBytes, partitionValues = partition.values ) } }.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
val partitions = FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes)
defgetFilePartitions( sparkSession: SparkSession, partitionedFiles: Seq[PartitionedFile], maxSplitBytes: Long): Seq[FilePartition] = { val partitions = newArrayBuffer[FilePartition] val currentFiles = newArrayBuffer[PartitionedFile] var currentSize = 0L
/** Close the current partition and move to the next. */ defclosePartition(): Unit = { if (currentFiles.nonEmpty) { // Copy to a new Array. val newPartition = FilePartition(partitions.size, currentFiles.toArray) partitions += newPartition } currentFiles.clear() currentSize = 0 }
val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes // 使用"Next Fit Decreasing"将文件分配到分区 partitionedFiles.foreach { file => // maxSplitBytes控制分区大小,达到指定阈值,则创建分区 // 注意,没有计入文件开销 if (currentSize + file.length > maxSplitBytes) { closePartition() } // Add the given file to the current partition. // 注意,当前文件在添加到当前分区时考虑了文件开销 currentSize += file.length + openCostInBytes currentFiles += file } closePartition() partitions.toSeq }
对关键字 GROUPING SETS后面指定的多个分组集合中每个集合都进行一次分组操作,等价于按照每个分组集合单独进行聚合,然后再将这些聚合结果进行 UNION ALL。
注意:为了兼容Hive,Spark也支持GROUP BY ... GROUPING SETS (...)这样的写法。GROUP BY 表达式通常被忽略,但是如果GROUP BY 表达式中包含有GROUPING SETS表达式以外的表达式,那么这些额外的表达式将被包含进分组表达式中且其值为NULL,例如SELECT a, b, c FROM ... GROUP BY a, b, c GROUPING SETS (a, b)
GROUPING SETS
等价表示
GROUP BY GROUPING SETS((a), (b))
GROUP BY GROUPING SETS(a, b)
GROUP BY a
UNION ALL
GROUP BY b
GROUP BY a,b GROUPING SETS(a, b)
SELECT a, NULL ... GROUP BY a
UNION ALL
SELECT NULL, b ... GROUP BY b
ROLLUP
1 2 3
GROUP BY 分组表达式1[, 分组表达式2, ...] WITHROLLUP
GROUPBYROLLUP(分组集1[, 分组集2, ...])
RULLUP
等价表示
GROUP BY a, b WITH ROLLUP
GROUP BY ROLLUP(a, b)
GROUP BY GROUPING SETS((a, b), (a), ())
CUBE
1 2 3
GROUP BY 分组表达式1[, 分组表达式2, ...] WITHCUBE
GROUPBYCUBE(分组集1[, 分组集2, ...])
CUBE
等价表示
GROUP BY a, b WITH CUBE
GROUP BY CUBE(a,b)
GROUP BY GROUPING SETS((a,b), (a), (b), ())
GROUP BY CUBE(a, b, (a,c))
GROUP BY GROUPING SETS((a,b,c), (a,b), (a,c), (b,a,c), (a), (b), (a,c), ())
sql> SET TIME ZONE 'UTC'; sql> SELECT to_timestamp('2022-05-15 19:00:00', 'yyyy-MM-dd HH:mm:ss'); 2022-05-15T19:00:00.000+0000
示例2:入参时间字符串没有时区信息,则默认使用系统当前时区对其进行解析
1 2 3
sql> SET TIME ZONE 'Asia/Shanghai'; sql> SELECT to_timestamp('2022-05-15 19:00:00', 'yyyy-MM-dd HH:mm:ss'); 2022-05-15T19:00:00.000+0800
示例3:入参时间字符串包含明确的时区信息
1 2 3
sql> SET TIME ZONE 'UTC'; sql> SELECT to_timestamp('2022-05-15T19:00:00.000+0800', "yyyy-MM-dd'T'HH:mm:ss.SSSZ"); 2022-05-15T11:00:00.000+0000
示例4:入参时间字符串包含明确的时区信息
1 2 3
sql> SET TIME ZONE 'Asia/Shanghai'; sql> SELECT to_timestamp('2022-05-15T19:00:00.000+0800', "yyyy-MM-dd'T'HH:mm:ss.SSSZ"); 2022-05-15T19:00:00.000+0800