1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| private def createNonBucketedReadRDD( readFile: (PartitionedFile) => Iterator[InternalRow], selectedPartitions: Array[PartitionDirectory], fsRelation: HadoopFsRelation): RDD[InternalRow] = { val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes val maxSplitBytes = FilePartition.maxSplitBytes(fsRelation.sparkSession, selectedPartitions) logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + s"open cost is considered as scanning $openCostInBytes bytes.")
val splitFiles = selectedPartitions.flatMap { partition => partition.files.flatMap { file => val filePath = file.getPath val isSplitable = relation.fileFormat.isSplitable( relation.sparkSession, relation.options, filePath) PartitionedFileUtil.splitFiles( sparkSession = relation.sparkSession, file = file, filePath = filePath, isSplitable = isSplitable, maxSplitBytes = maxSplitBytes, partitionValues = partition.values ) } }.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
val partitions = FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes)
new FileScanRDD(fsRelation.sparkSession, readFile, partitions) }
|