Spark SQL:SparkSession概述

SparkSession是开启Spark SQL的切入点。本文将对SparkSession进行详细介绍。

概述

上图描述了SparkSession中的组件类图,我们来分析一下此图。SparkSession的关键组件有:SparkSessionExtensions、SharedState、SessionState、以及Catalog。其中:

  1. SharedState、SessionState担负着保持“状态”信息的作用:SharedState保持同一个SQLContext下,跨SparkSession所共享的状态信息;而SessionState保存特定某个SparkSession中的状态信息。
  2. 图中左侧的灰色背景区域代表的是SharedState内部关系图,其中橙色的类表示的是编目系统(catalog),这个编目系统(catalog)也是SessionState通过SessionCatalog API所访问的编目系统(catalog)。
  3. 图中右侧的浅蓝色背景区域代表的是BaseSessionStateBuilder在构建SessionState时的关系图。
  4. Catalog是暴露给API用户用于访问底层的外部catalog系统的接口,SparkSession使用CatalogImpl来具体实现,它底层是通过SessionState中SessionCatalog进而实现访问底层的外部catalog系统。
  5. SparkSessionExtensions是扩展点的容器,扩展点以xxxBuilder的形式注入到该容器。

SparkSessionExtensions

扩展点的容器,通过该接口用户可以将Analyzer规则、Optimizer规则、计划策略以及自定义的解析器注入到SparkSession。

SparkSessionExtensions内部

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* 注入的数据类型为函数
*/
type RuleBuilder = SparkSession => Rule[LogicalPlan]
type CheckRuleBuilder = SparkSession => LogicalPlan => Unit
type StrategyBuilder = SparkSession => Strategy
type ParserBuilder = (SparkSession, ParserInterface) => ParserInterface

/**
* 这些分析器规则作为分析阶段中resolution过程的一部分被执行
*/
private[this] val resolutionRuleBuilders = mutable.Buffer.empty[RuleBuilder]

/**
* 这些分析器规则在resolution过程后被执行
*/
private[this] val postHocResolutionRuleBuilders = mutable.Buffer.empty[RuleBuilder]

/**
* 校验分析规则:这些规则在分析阶段后被执行,它用来探测LogicalPlan中存在的问题。
*/
private[this] val checkRuleBuilders = mutable.Buffer.empty[CheckRuleBuilder]

/**
* 这些规则在运算符优化时被执行,优化器规则用来提高分析后逻辑计划的质量,这些规则应该修改LogicalPlan的结果
*/
private[this] val optimizerRules = mutable.Buffer.empty[RuleBuilder]

/**
* 这些策略被用来将LogicalPlan转换为可执行的org.apache.spark.sql.execution.SparkPlan
*/
private[this] val plannerStrategyBuilders = mutable.Buffer.empty[StrategyBuilder]

/**
*
*/
private[this] val parserBuilders = mutable.Buffer.empty[ParserBuilder]

注入扩展点

使用SparkSession.Builder.withExtensions()方法就可以将自定义的扩展点注入SparkSession实例。

1
2
3
4
5
6
7
8
9
10
11
12
SparkSession.builder()
.master("...")
.conf("...", true)
.withExtensions { extensions =>
extensions.injectResolutionRule { session =>
...
}
extensions.injectParser { (session, parser) =>
...
}
}
.getOrCreate()

SparkSession

SparkSession类

1
2
3
4
5
6
7
8
9
10
11
lazy val sessionState: SessionState = {
parentSessionState
.map(_.clone(this))
.getOrElse {
val state = SparkSession.instantiateSessionState(
SparkSession.sessionStateClassName(sparkContext.conf),
self)
initialSessionOptions.foreach { case (k, v) => state.conf.setConfString(k, v) }
state
}
}

SparkSession object中工具方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private val HIVE_SESSION_STATE_BUILDER_CLASS_NAME =
"org.apache.spark.sql.hive.HiveSessionStateBuilder"

// 基于spark.sql.catalogImplementation的配置实例化相应的SessionStateBuilder实例
private def sessionStateClassName(conf: SparkConf): String = {
conf.get(CATALOG_IMPLEMENTATION) match {
case "hive" => HIVE_SESSION_STATE_BUILDER_CLASS_NAME
case "in-memory" => classOf[SessionStateBuilder].getCanonicalName
}
}

private def instantiateSessionState(
className: String,
sparkSession: SparkSession): SessionState = {
try {
// invoke `new [Hive]SessionStateBuilder(SparkSession, Option[SessionState])`
val clazz = Utils.classForName(className)
val ctor = clazz.getConstructors.head
ctor.newInstance(sparkSession, None).asInstanceOf[BaseSessionStateBuilder].build()
} catch {
case NonFatal(e) =>
throw new IllegalArgumentException(s"Error while instantiating '$className':", e)
}
}

SharedState

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
// 解析metastore warehouse路径
val warehousePath: String = {
// 将hive-site.xml加载到hadoopConfiguration配置中
val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml")
if (configFile != null) {
logInfo(s"loading hive config file: $configFile")
sparkContext.hadoopConfiguration.addResource(configFile)
}

// hive.metastore.warehouse.dir only stay in hadoopConf
sparkContext.conf.remove("hive.metastore.warehouse.dir")
// Set the Hive metastore warehouse path to the one we use
val hiveWarehouseDir = sparkContext.hadoopConfiguration.get("hive.metastore.warehouse.dir")
if (hiveWarehouseDir != null && !sparkContext.conf.contains(WAREHOUSE_PATH.key)) {
// 如果设置了hive.metastore.warehouse.dir,没有设置spark.sql.warehouse.dir
// 将hive.metastore.warehouse.dir赋值给spark.sql.warehouse.dir
sparkContext.conf.set(WAREHOUSE_PATH.key, hiveWarehouseDir)
logInfo(s"${WAREHOUSE_PATH.key} is not set, but hive.metastore.warehouse.dir " +
s"is set. Setting ${WAREHOUSE_PATH.key} to the value of " +
s"hive.metastore.warehouse.dir ('$hiveWarehouseDir').")
hiveWarehouseDir
} else {
// 如果设置了spark.sql.warehouse.dir,将其覆盖hive.metastore.warehouse.dir
val sparkWarehouseDir = sparkContext.conf.get(WAREHOUSE_PATH)
logInfo(s"Setting hive.metastore.warehouse.dir ('$hiveWarehouseDir') to the value of " +
s"${WAREHOUSE_PATH.key} ('$sparkWarehouseDir').")
sparkContext.hadoopConfiguration.set("hive.metastore.warehouse.dir", sparkWarehouseDir)
sparkWarehouseDir
}
}
logInfo(s"Warehouse path is '$warehousePath'.")


/**
* 缓存查询结果的类
*/
val cacheManager: CacheManager = new CacheManager

/**
* A status store to query SQL status/metrics of this Spark application, based on SQL-specific
* [[org.apache.spark.scheduler.SparkListenerEvent]]s.
*/
val statusStore: SQLAppStatusStore = {
val kvStore = sparkContext.statusStore.store.asInstanceOf[ElementTrackingStore]
val listener = new SQLAppStatusListener(sparkContext.conf, kvStore, live = true)
sparkContext.listenerBus.addToStatusQueue(listener)
val statusStore = new SQLAppStatusStore(kvStore, Some(listener))
sparkContext.ui.foreach(new SQLTab(statusStore, _))
statusStore
}

/**
* catalog系统,它维护与外部系统的交互
*/
lazy val externalCatalog: ExternalCatalogWithListener = {
// 实例化catalog系统
val externalCatalog = SharedState.reflect[ExternalCatalog, SparkConf, Configuration](
SharedState.externalCatalogClassName(sparkContext.conf),
sparkContext.conf,
sparkContext.hadoopConfiguration)

val defaultDbDefinition = CatalogDatabase(
SessionCatalog.DEFAULT_DATABASE,
"default database",
CatalogUtils.stringToURI(warehousePath), // metastore warehouse路径
Map())
// 判断default库是否存在,如果不存在,则通过catalog API在外部系统创建出来
if (!externalCatalog.databaseExists(SessionCatalog.DEFAULT_DATABASE)) {
// There may be another Spark application creating default database at the same time, here we
// set `ignoreIfExists = true` to avoid `DatabaseAlreadyExists` exception.
externalCatalog.createDatabase(defaultDbDefinition, ignoreIfExists = true)
}

// Wrap to provide catalog events
val wrapped = new ExternalCatalogWithListener(externalCatalog)

// Make sure we propagate external catalog events to the spark listener bus
wrapped.addListener(new ExternalCatalogEventListener {
override def onEvent(event: ExternalCatalogEvent): Unit = {
sparkContext.listenerBus.post(event)
}
})

wrapped
}

/**
* 全局临时视图管理器
*/
lazy val globalTempViewManager: GlobalTempViewManager = {
// metastore中不应该存在系统维护的数据库(spark.sql.globalTempDatabase,默认为global_temp)
val globalTempDB = sparkContext.conf.get(GLOBAL_TEMP_DATABASE).toLowerCase(Locale.ROOT)
if (externalCatalog.databaseExists(globalTempDB)) {
throw new SparkException(
s"$globalTempDB is a system preserved database, please rename your existing database " +
"to resolve the name conflict, or set a different value for " +
s"${GLOBAL_TEMP_DATABASE.key}, and launch your Spark application again.")
}
new GlobalTempViewManager(globalTempDB)
}

BaseSessionStateBuilder

为每个SparkSession创建一个BaseSessionStateBuilder的具体对象,BaseSessionStateBuilder提供了创建SessionState的build方法。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
/**
* 管理用户注册的方法
*/
protected lazy val functionRegistry: FunctionRegistry = {
parentState.map(_.functionRegistry).getOrElse(FunctionRegistry.builtin).clone()
}

/**
* 可以用来定义自定义的优化规则和自定义的计划策略
*/
protected lazy val experimentalMethods: ExperimentalMethods = {
parentState.map(_.experimentalMethods.clone()).getOrElse(new ExperimentalMethods)
}

/**
* SQL Parser:用于从SQL文本中提取表达式、计划、表标识符
*/
protected lazy val sqlParser: ParserInterface = {
// 以SparkSqlParser为基础,结合自定义的扩展点构建ParserInterface
extensions.buildParser(session, new SparkSqlParser(conf))
}

/**
* Catalog:用于管理表和数据库的状态
*/
protected lazy val catalog: SessionCatalog = {
val catalog = new SessionCatalog(
() => session.sharedState.externalCatalog,
() => session.sharedState.globalTempViewManager,
functionRegistry,
conf,
SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
sqlParser,
resourceLoader)
parentState.foreach(_.catalog.copyStateTo(catalog))
catalog
}

/**
* 暴露给用户用于注册用户自定义函数
* Note 1: 用户自定义函数必须是deterministic.
* Note 2: 依赖functionRegistry字段
*/
protected def udfRegistration: UDFRegistration = new UDFRegistration(functionRegistry)

/**
* 逻辑查询计划分析器,用于解析未解析过的属性和关系
*/
protected def analyzer: Analyzer = new Analyzer(catalog, conf) {
override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
new FindDataSourceTable(session) +:
new ResolveSQLOnFile(session) +:
customResolutionRules

override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
PreprocessTableCreation(session) +:
PreprocessTableInsertion(conf) +:
DataSourceAnalysis(conf) +:
customPostHocResolutionRules

override val extendedCheckRules: Seq[LogicalPlan => Unit] =
PreWriteCheck +:
PreReadCheck +:
HiveOnlyCheck +:
customCheckRules
}

/**
* 逻辑查询计划优化器
*/
protected def optimizer: Optimizer = {
new SparkOptimizer(catalog, experimentalMethods) {
override def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] =
super.extendedOperatorOptimizationRules ++ customOperatorOptimizationRules
}
}

/**
* 计划器:将优化过的逻辑计划转换为物理计划
*/
protected def planner: SparkPlanner = {
new SparkPlanner(session.sparkContext, conf, experimentalMethods) { // ExperimentalMethods
override def extraPlanningStrategies: Seq[Strategy] =
super.extraPlanningStrategies ++ customPlanningStrategies
}
}

/**
* 创建查询执行对象
*/
protected def createQueryExecution: LogicalPlan => QueryExecution = { plan =>
new QueryExecution(session, plan)
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 创建SessionState
*/
def build(): SessionState = {
new SessionState(
session.sharedState,
conf,
experimentalMethods,
functionRegistry,
udfRegistration,
() => catalog,
sqlParser,
() => analyzer,
() => optimizer,
planner,
streamingQueryManager,
listenerManager,
() => resourceLoader,
createQueryExecution,
createClone)
}

SessionState

(略)

SessionCatalog

SparkSession所实际使用的内部catalog。此类充当到底层metastore的代理,将调用转发到ExternalCatalog来执行。同时它也负责管理SparkSession所拥有的临时视图和函数。

该类中所有对于数据库和分区的操作都是直接与底层catalog进行交互,对分区的操作都是与metastore表相关的。

有两种类型的表:临时视图(view)和metastore表(table)。临时视图在session之间是隔离的,这类表不属于任何数据库;metastore表属于某数据库,可以跨session使用,因为这类表的元数据被持久化到了底层catalog。

有两种类型的函数:临时函数和metastore函数。临时函数在session之间是隔离的,这类函数不属于任何数据库;metastore函数属于某数据库,可以跨session使用,因为这类函数的元数据被持久化到了底层catalog中。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
protected val tempViews = new mutable.HashMap[String, LogicalPlan]

/**
* 创建属于session本地的临时视图
*/
def createTempView(
name: String,
tableDefinition: LogicalPlan,
overrideIfExists: Boolean): Unit = synchronized {
val table = formatTableName(name)
if (tempViews.contains(table) && !overrideIfExists) {
throw new TempTableAlreadyExistsException(name)
}
tempViews.put(table, tableDefinition)
}

/**
* 创建全局临时视图
*/
def createGlobalTempView(
name: String,
viewDefinition: LogicalPlan,
overrideIfExists: Boolean): Unit = {
globalTempViewManager.create(formatTableName(name), viewDefinition, overrideIfExists)
}

/**
* Alter the definition of a local/global temp view matching the given name, returns true if a
* temp view is matched and altered, false otherwise.
*/
def alterTempViewDefinition(
name: TableIdentifier,
viewDefinition: LogicalPlan): Boolean = synchronized {
val viewName = formatTableName(name.table)
if (name.database.isEmpty) {
if (tempViews.contains(viewName)) {
createTempView(viewName, viewDefinition, overrideIfExists = true)
true
} else {
false
}
} else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) {
globalTempViewManager.update(viewName, viewDefinition)
} else {
false
}
}

/**
* 返回session本地的临时视图
*/
def getTempView(name: String): Option[LogicalPlan] = synchronized {
tempViews.get(formatTableName(name))
}

/**
* 返回全局临时视图
*/
def getGlobalTempView(name: String): Option[LogicalPlan] = {
globalTempViewManager.get(formatTableName(name))
}

/**
* 删除session本地的临时视图
*/
def dropTempView(name: String): Boolean = synchronized {
tempViews.remove(formatTableName(name)).isDefined
}

/**
* 删除全局临时视图
*/
def dropGlobalTempView(name: String): Boolean = {
globalTempViewManager.remove(formatTableName(name))
}

ExternalCatalog

系统catalog(函数、分区、表、数据库)的管理接口,它负责实际当中与外部系统(如,hive或内存)进行交互。

注意:$\color{red}{该接口只用于非临时性的项,像“临时表”这样的临时性的项不应该由接口处理。}$

有三个具体实现类:

  • InMemoryCatalog:使用内存来实现对系统catalog存储的方案,次方案为测试性或探索性的方案,生产环境不建议使用该方案,此方案为默认方案;
  • HiveExternalCatalog:使用Hive来实现对系统catalog持久化存储的方案;
  • ExternalCatalogWithListener:它是一个包装器,将调用代理给ExternalCatalog子类(由参数spark.sql.catalogImplementation配置)的相应方法,并在调用同时添加了监听事件的功能;

FunctionRegistry

用来管理用户自定义函数的catalog,该catalog所属于一个SparkSession。

我们来看一下FunctionRegistry对象的工具方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// expressions中包含了Spark内置的所有函数
val expressions: Map[String, (ExpressionInfo, FunctionBuilder)] = Map(
// misc non-aggregate functions

// math functions

// aggregate functions

// string functions

// datetime functions

// collection functions

CreateStruct.registryEntry,

// misc functions

// grouping sets

// window functions

// predicates

// comparison operators

// bitwise

// json

// cast

// Cast aliases
)

val builtin: SimpleFunctionRegistry = {
val fr = new SimpleFunctionRegistry
expressions.foreach {
// 将内置函数注册到session级别的SimpleFunctionRegistry中
case (name, (info, builder)) => fr.registerFunction(FunctionIdentifier(name), info, builder)
}
fr
}

SimpleFunctionRegistry

1
2
3
// 持有注册的函数
private val functionBuilders =
new mutable.HashMap[FunctionIdentifier, (ExpressionInfo, FunctionBuilder)]

GlobalTempViewManager

TODO

ExperimentalMethods

可以用于定义自定义的优化规则和自定义的计划策略

ParserInterface

TODO

QueryExecution

该类描述了一个关系查询所涉及的主要工作流程。

NOTEs

本文以Spark 2.4.3为基础。