Spark SQL:Datasource框架

(未完)

DataFrameReader

该方法只适用于HadoopFsRelationProvider表示的数据源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def load(paths: String*): DataFrame = {
if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
throw new AnalysisException("Hive data source can only be used with tables, you can not " +
"read files of Hive data source directly.")
}

val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf)
if (classOf[DataSourceV2].isAssignableFrom(cls)) {
val ds = cls.newInstance().asInstanceOf[DataSourceV2]
if (ds.isInstanceOf[ReadSupport]) {
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
ds = ds, conf = sparkSession.sessionState.conf)
val pathsOption = {
val objectMapper = new ObjectMapper()
DataSourceOptions.PATHS_KEY -> objectMapper.writeValueAsString(paths.toArray)
}
Dataset.ofRows(sparkSession, DataSourceV2Relation.create(
ds, sessionOptions ++ extraOptions.toMap + pathsOption,
userSpecifiedSchema = userSpecifiedSchema))
} else {
loadV1Source(paths: _*)
}
} else {
loadV1Source(paths: _*)
}
}

org/apache/spark/sql/execution/datasources

DataSource类

它是Spark SQL中负责可插拔式数据源框架中主要的类,它作为具体数据源实现与Spark SQL框架之间桥梁,是对具体数据源实现的描述。该类的主要功能有:

  1. 提供描述底层数据源的参数;

  2. 用于将描述解析成查询计划中所使用的具体实现,如resolveRelation()

  3. 使用外部类库,写出数据;

    从使用者的角度,DataSource对象可以通过org.apache.spark.sql.DataFrameReaderCREATE TABLE USING被显式创建。另外,该类也被使用在将来自元数据库的描述解析为具体实现的过程当中。

区别几个概念:

  1. 数据源(或者数据源实现类,即数据源provider类),它们用于创建各自所针对的关系(即,relation);
  2. DataSource类,

resolveRelation()

作用是创建解析后的BaseRelation,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def resolveRelation(checkFilesExist: Boolean = true): BaseRelation = {
// 每次创建数据源实现类的实例
val relation = (providingClass.newInstance(), userSpecifiedSchema) match {
// TODO: Throw when too much is given.
case (dataSource: SchemaRelationProvider, Some(schema)) =>
dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions, schema)
case (dataSource: RelationProvider, None) =>
dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions)
case (_: SchemaRelationProvider, None) =>
throw new AnalysisException(s"A schema needs to be specified when using $className.")
case (dataSource: RelationProvider, Some(schema)) =>
val baseRelation =
dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions)
if (baseRelation.schema != schema) {
throw new AnalysisException(s"$className does not allow user-specified schemas.")
}
baseRelation

// We are reading from the results of a streaming query. Load files from the metadata log
// instead of listing them using HDFS APIs.
case (format: FileFormat, _)
if FileStreamSink.hasMetadata(
caseInsensitiveOptions.get("path").toSeq ++ paths,
sparkSession.sessionState.newHadoopConf()) =>
... ...
HadoopFsRelation(
fileCatalog,
partitionSchema = fileCatalog.partitionSchema,
dataSchema = dataSchema,
bucketSpec = None,
format,
caseInsensitiveOptions)(sparkSession)

// This is a non-streaming file based datasource.
case (format: FileFormat, _) =>
val globbedPaths =
checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, checkFilesExist = checkFilesExist)
val useCatalogFileIndex = sparkSession.sqlContext.conf.manageFilesourcePartitions &&
catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog &&
catalogTable.get.partitionColumnNames.nonEmpty
val (fileCatalog, dataSchema, partitionSchema) = if (useCatalogFileIndex) {
val defaultTableSize = sparkSession.sessionState.conf.defaultSizeInBytes
val index = new CatalogFileIndex(
sparkSession,
catalogTable.get,
catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize))
(index, catalogTable.get.dataSchema, catalogTable.get.partitionSchema)
} else {
val index = createInMemoryFileIndex(globbedPaths)
val (resultDataSchema, resultPartitionSchema) =
getOrInferFileFormatSchema(format, Some(index))
(index, resultDataSchema, resultPartitionSchema)
}

HadoopFsRelation(
fileCatalog,
partitionSchema = partitionSchema,
dataSchema = dataSchema.asNullable,
bucketSpec = bucketSpec,
format,
caseInsensitiveOptions)(sparkSession)

case _ =>
throw new AnalysisException(
s"$className is not a valid Spark SQL Data Source.")
}

// 校验逻辑

relation
}

DataSource对象

这个对象我们主要关注它的lookupDataSource(provider: String, conf: SQLConf): Class[_]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def lookupDataSource(provider: String, conf: SQLConf): Class[_] = {
val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) match {
case name if name.equalsIgnoreCase("orc") &&
conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native" =>
classOf[OrcFileFormat].getCanonicalName
case name if name.equalsIgnoreCase("orc") &&
conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "hive" =>
"org.apache.spark.sql.hive.orc.OrcFileFormat"
case "com.databricks.spark.avro" if conf.replaceDatabricksSparkAvroEnabled =>
"org.apache.spark.sql.avro.AvroFileFormat"
case name => name
}
val provider2 = s"$provider1.DefaultSource"
val loader = Utils.getContextOrSparkClassLoader
// 加载所有注册为DataSourceRegister接口的类型
val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)

try {
// 首先,在注册为DataSourceRegister的类型中查找是否别名与请求的provider1有匹配的
serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider1)).toList match {
// 请求的provider格式与注册的别名都不配置
case Nil =>
try {
// 直接加载provider1和provider2所表示的类
Try(loader.loadClass(provider1)).orElse(Try(loader.loadClass(provider2))) match {
case Success(dataSource) =>
// Found the data source using fully qualified path
dataSource
case Failure(error) =>
... ...
}
} catch {
... ...
}
case head :: Nil =>
// 有准确的注册别名
head.getClass
case sources =>
// 对于有多个注册别名与之配置的情况,如果只有一个类的是"org.apache.spark"开头,就使用这个Spark内部的类。
val sourceNames = sources.map(_.getClass.getName)
val internalSources = sources.filter(_.getClass.getName.startsWith("org.apache.spark"))
if (internalSources.size == 1) {
logWarning(s"Multiple sources found for $provider1 (${sourceNames.mkString(", ")}), " +
s"defaulting to the internal datasource (${internalSources.head.getClass.getName}).")
internalSources.head.getClass
} else {
throw new AnalysisException(s"Multiple sources found for $provider1 " +
s"(${sourceNames.mkString(", ")}), please specify the fully qualified class name.")
}
}
} catch {
... ...
}
}

CSVFileFormat
JsonFileFormat
OrcFileFormat
ParquetFileFormat
TextFileFormat
LibSVMFileFormat

HiveFileFormat

ImageFileFormat

ConsoleSinkProvider
JdbcRelationProviderRateStreamProvider
TextSocketSourceProvider
KafkaSourceProvider

org/apache/spark/sql/sources

DataSourceRegister

数据源实现这个trait,这样就可以为数据源注册别名了。

数据源相关接口

数据源也称为数据源provider

RelationProvider

SchemaRelationProvider

StreamSourceProvider

CreatableRelationProvider

StreamSinkProvider

BaseRelation

它表示带有schema的元组集合。该类的子类必须能够以StructType的形式生成数据的schema。其具体的实现需要继承一种Scan类,这些Scan中为执行定义了不同的抽象方法。

TableScan

它是一个BaseRelation,将其元组集合生成为Row对象的RDD。

PrunedScan

它是一个BaseRelation,在将其元组集合生成为Row对象的RDD之前,它能排除掉那些不必要的列。

PrunedFilteredScan

它是一个BaseRelation,在将匹配的元组集合生成为Row对象的RDD之前,它能排除掉不必要的列、并且使用所选的谓词进行过滤

CatalystScan

InsertableRelation

它是一个BaseRelation,它可以用来将数据插入其内。

NOTEs

本文以Spark 2.4.3为基础。

参考

  1. https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-DataSource.html