Spark SQL:Catalyst优化引擎原理

Spark SQL:SparkSession概述》中讲述了SparkSession的整体框架,本篇详细介绍Spark SQL中最为重要的Catalyst优化引擎。

概述

从源码的角度,Spark SQL包含以下4个模块:

1
2
3
4
catalyst
core
hive
hive-thriftserver

作为Catalyst的核心,其定义了一个通用框架来表示、并使用规则进行操作。在此框架之上,构建了针对在关系查询处理过程当中的类库(例如,表达式、逻辑查询计划)、以及一系列的规则用来处理执行查询时的不同阶段。

同时,Catalyst也提供了一些公共的扩展点,包括:外部数据源和用户定义类型。

数据模型

树和节点

Catalyst中的主要数据类型是是由节点构成的。每个节点都有自己的节点类型,并且每个节点都会有零或多个子节点。节点的具体类型是作为TreeNode类的子类进行定义的。节点是不可变的类型,可以利用函数转换进行操作。

在SQL/DateFrame转换的每个阶段当中TreeNode的具体类型是不同的。Catalyst中节点类型有分别针对:表达式、数据类型、逻辑运算符、以及物理运算符。

TreeNode

抽象类TreeNode表示树中的节点,每个节点中可以包含子节点,它是一个树状结构。其内部定义了一系列的针对数进行的转换操作,如:

  • find()、foreach()、等;
  • map()、flatMap()、transform()、等;
  • collect()、等;
1
2
3
4
5
6
7
8
9
10
11
abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
self: BaseType =>

val origin: Origin = CurrentOrigin.get

/**
* Returns a Seq of the children of this node.
* Children should not change. Immutability required for containsChild optimization
*/
def children: Seq[BaseType]
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 函数先在当前节点上执行,然后递归地在子节点上执行
*/
def foreach(f: BaseType => Unit): Unit = {
f(this)
children.foreach(_.foreach(f))
}

/**
* 返回的树是由`rule`表示的规则递归地应用到当前节点的结果。
* 如果`rule`没有被应用到当前节点,那么返回未修改的当前节点。
* 注意:rule是一个偏函数
*/
def transform(rule: PartialFunction[BaseType, BaseType]): BaseType = {
transformDown(rule)
}

/**
* Faster version of equality which short-circuits when two treeNodes are the same instance.
* We don't just override Object.equals, as doing so prevents the scala compiler from
* generating case class `equals` methods
*/
def fastEquals(other: TreeNode[_]): Boolean = {
this.eq(other) || this == other
}

查询计划(QueryPlan)

QueryPlan是LogicalPlan和SparkPlan的基类,它们分别代表了逻辑计划和物理计划。 它表示结构化查询的关系运算符,子节点的输出作为其输入,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 当前节点所输出的Attribute,表示查询结果的投影
def output: Seq[Attribute]
lazy val schema: StructType = StructType.fromAttributes(output)

// 由子节点输入给当前运算符节点的属性集
def inputSet: AttributeSet =
AttributeSet(children.flatMap(_.asInstanceOf[QueryPlan[PlanType]].output))

// 当前运算符节点所使用的属性集
val allAttributes: AttributeSeq = children.flatMap(_.output)

// 返回当前运算符节点中出现的表达式
final def expressions: Seq[Expression]
// 当前运算符的表达式中所有出现过的属性集
def references: AttributeSet = AttributeSet(expressions.flatMap(_.references))

//当前节点的所有子查询
def subqueries: Seq[PlanType]
def innerChildren: Seq[QueryPlan[_]] = subqueries

逻辑计划(LogicalPlan)

LogicalPlan代表的一个运算符树,参考未解决的逻辑计划分析后的逻辑计划优化的逻辑计划

代码参考spark-catalyst模块,其中:

  1. org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala文件中给出了基本的逻辑运算符,如:

    • Project、Distinct、Aggregate
    • Filter
    • Join、Except、Intersect
    • Sort、Limit
    • InsertIntoTable、InsertIntoDir
  2. org/apache/spark/sql/catalyst/plans/logical/object.scala

  3. org/apache/spark/sql/catalyst/plans/logical/hints.scala

  4. org/apache/spark/sql/catalyst/plans/logical/Statistics.scala

物理计划(SparkPlan)

参考物理计划

代码参考spark-core模块,其中:

  1. org/apache/spark/sql/execution/basicPhysicalOperators.scala
  2. org/apache/spark/sql/execution/DataSourceScanExec.scala
  3. org/apache/spark/sql/execution/WholeStageCodegenExec.scala
  4. org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
  5. org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala

表达式(Expression)

Expression代表一个表达式树。例如,对于表达式:x+(1+2),Scala代码代表的表达式树为:

1
Add(Attribute(x), Add(Literal(1), Literal(2)))

代码参考spark-catalyst模块中的org/apache/spark/sql/catalyst/expressions代码包,其中:

  1. org/apache/spark/sql/catalyst/expressions/literals.scala

    • Literal
  2. org/apache/spark/sql/catalyst/expressions/arithmetic.scala

    • Add、Subtract、Multiply、Divide
    • Pmod
    • Abs、Greatest、Least,等
  3. org/apache/spark/sql/catalyst/expressions/mathExpressions.scala

    • Ceil、Floor、Cos、Log,等
  4. org/apache/spark/sql/catalyst/expressions/predicates.scala

    • Not、In、And、Or、EqualTo、LessThan,等
  5. org/apache/spark/sql/catalyst/expressions/stringExpressions.scala

    • ConcatWs、Upper、Lower,等
  6. org/apache/spark/sql/catalyst/expressions/Cast.scala

    • Cast
  7. org/apache/spark/sql/catalyst/expressions/namedExpressions.scala

    • AttributeReference、PrettyAttribute、OuterReference

规则

规则可以看作是从一棵树到另一棵树的函数。虽然规则可以在其入参的树上执行任意的代码逻辑,但是最普遍方案是使用一组模式匹配函数来查找子树,然后使用特定结构对子树进行替换。

Catalyst中,“树”提供了transform方法,它对该树的所有节点递归地运用一个模式匹配函数、将与模式匹配的节点进行转换。传递给transform方法的模式匹配表达式是偏函数,这样只会匹配所有可能树中某个“规则”所关注的那一部分树类型。Catalyst会测试树的哪一部分会被给定的规则所应用,会自动跳过不匹配的节点、并进入其子树中进行深度匹配。这就意味着,“规则”只需要关心其给出的优化策略所适用的“树”结构,而无需关心与其不匹配的“树”结构。因此,当有新类型的运算符被添加到系统时,“规则”是不需要修改的。

实际当中,要完全转换一棵树,规则可能需要被多次执行。Catalyst将规则分组成batch,然后执行batch直到到达一个固定的点,即直到运用batch里的规则树不再产生变化了。

RuleExecutor

抽象类RuleExecutor负责执行batch规则对TreeNode进行转换。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
/** Defines a sequence of rule batches, to be overridden by the implementation. */
protected def batches: Seq[Batch]

/**
* Defines a check function that checks for structural integrity of the plan after the execution
* of each rule. For example, we can check whether a plan is still resolved after each rule in
* `Optimizer`, so we can catch rules that return invalid plans. The check function returns
* `false` if the given plan doesn't pass the structural integrity check.
*/
protected def isPlanIntegral(plan: TreeType): Boolean = true

/**
* 以批次的形式,执行子类中定义的规则。批次是顺序执行的,批次内部的规则也是按顺序执行的。
*/
def execute(plan: TreeType): TreeType = {
var curPlan = plan
val queryExecutionMetrics = RuleExecutor.queryExecutionMeter

batches.foreach { batch =>
val batchStartPlan = curPlan
var iteration = 1
var lastPlan = curPlan
var continue = true

// Run until fix point (or the max number of iterations as specified in the strategy.
while (continue) {
// 顺序执行当前batch中的规则
curPlan = batch.rules.foldLeft(curPlan) {
case (plan, rule) =>
val startTime = System.nanoTime()
val result = rule(plan) // 使用规则对计划进行转换
val runTime = System.nanoTime() - startTime

// 如果规则成功应用到了计划,那么result与plan不相等
if (!result.fastEquals(plan)) {
queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName)
queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime)
logTrace(
s"""
|=== Applying Rule ${rule.ruleName} ===
|${sideBySide(plan.treeString, result.treeString).mkString("\n")}
""".stripMargin)
}
queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime)
queryExecutionMetrics.incNumExecution(rule.ruleName)

// Run the structural integrity checker against the plan after each rule.
if (!isPlanIntegral(result)) {
val message = s"After applying rule ${rule.ruleName} in batch ${batch.name}, " +
"the structural integrity of the plan is broken."
throw new TreeNodeException(result, message, null)
}

result
}

// 以下逻辑是该batch的收敛条件
iteration += 1
if (iteration > batch.strategy.maxIterations) {
// Only log if this is a rule that is supposed to run more than once.
if (iteration != 2) {
val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}"
if (Utils.isTesting) {
throw new TreeNodeException(curPlan, message, null)
} else {
logWarning(message)
}
}
continue = false
}

if (curPlan.fastEquals(lastPlan)) {
logTrace(
s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.")
continue = false
}
lastPlan = curPlan
}

if (!batchStartPlan.fastEquals(curPlan)) {
logDebug(
s"""
|=== Result of Batch ${batch.name} ===
|${sideBySide(batchStartPlan.treeString, curPlan.treeString).mkString("\n")}
""".stripMargin)
} else {
logTrace(s"Batch ${batch.name} has no effect.")
}
}

curPlan
}
}

Rule

1
2
3
4
5
6
7
8
9
10
abstract class Rule[TreeType <: TreeNode[_]] extends Logging {

/** Name for this rule, automatically inferred based on class name. */
val ruleName: String = {
val className = getClass.getName
if (className endsWith "$") className.dropRight(1) else className
}

def apply(plan: TreeType): TreeType
}
  1. 分析阶段的规则:
    • org.apache.spark.sql.catalyst.analysis.Analyzer#batches
  2. 逻辑优化阶段的规则:
    • org.apache.spark.sql.execution.SparkOptimizer#defaultBatches
  3. 物理计划阶段的规则:
    • org/apache/spark/sql/execution/subquery.scala
    • org/apache/spark/sql/execution/WholeStageCodegenExec.scala
    • org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
    • org/apache/spark/sql/execution/exchange/Exchange.scala

DataSet&DataFrame

DataSet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* Wrap a Dataset action to track the QueryExecution and time cost, then report to the
* user-registered callback functions.
*/
private def withAction[U](name: String, qe: QueryExecution)(action: SparkPlan => U) = {
try {
qe.executedPlan.foreach { plan =>
plan.resetMetrics()
}
val start = System.nanoTime()
val result = SQLExecution.withNewExecutionId(sparkSession, qe) {
action(qe.executedPlan)
}
val end = System.nanoTime()
sparkSession.listenerManager.onSuccess(name, qe, end - start)
result
} catch {
case e: Throwable =>
sparkSession.listenerManager.onFailure(name, qe, e)
throw e
}
}

/**
* Collect all elements from a spark plan.
*/
private def collectFromPlan(plan: SparkPlan): Array[T] = {
// This projection writes output to a `InternalRow`, which means applying this projection is not
// thread-safe. Here we create the projection inside this method to make `Dataset` thread-safe.
val objProj = GenerateSafeProjection.generate(deserializer :: Nil)
plan.executeCollect().map { row =>
// The row returned by SafeProjection is `SpecificInternalRow`, which ignore the data type
// parameter of its `get` method, so it's safe to use null here.
objProj(row).get(0, null).asInstanceOf[T]
}
}

DataType

TODO

执行过程

重要接口

在具体介绍各阶段逻辑前,我们先介绍几个比较重要的接口。

QueryExecution

该类描述了执行一个关系查询所涉及的关键工作流程:

  1. 该类的入参logical: LogicalPlan表示的是未分析的逻辑计划;
  2. analyzed: LogicalPlan表示分析后的逻辑计划;
  3. optimizedPlan: LogicalPlan表示优化后的逻辑计划;
  4. executedPlan: SparkPlan表示物理计划;

SQL解析阶段(parser)

SQL语句首先通过parser模块被解析为语法树,该树被称为Unresolved Logical Plan。

该阶段相关代码对应org.apache.spark.sql.catalyst.parser包:

分析阶段(analysis)

下面的序列图描述了分析阶段的过程:

AnalysisHelper

所有的逻辑计划都继承了该trait;该接口定义了一个标记analyzed,当前树一旦完成了分析过程,会被设置为true;它也提供了一些resolve方法,这些resolve方法不会递归进入到analyzed标记为true的子计划中,分析器中的规则应该使用这些resolve方法,而非TreeNodeQueryPlan中定义的transform形式的方法,为了防止使用transform方法,该接口重写transform*方法为抛出异常。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
/**
* 递归地将该计划树内所有节点标记为analyzed,该方法只由CheckAnalysis调用
*/
private[catalyst] def setAnalyzed(): Unit = {
if (!_analyzed) {
_analyzed = true
children.foreach(_.setAnalyzed())
}
}

/**
* 递归地对表达式树进行转换,跳过已经分析过的节点
*/
def resolveExpressions(r: PartialFunction[Expression, Expression]): LogicalPlan = {
resolveOperators {
case p => p.transformExpressions(r)
}
}

/**
* 返回该节点的拷贝,其中,“规则”已经被递归地运用到该树;如果“规则”没有应用到该节点,那么返回该节点自身。
* 该方法类似于transform方法,但该方法跳过了被标记为analyzed的子树。
*
* @param rule the function use to transform this nodes children
*/
def resolveOperators(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = {
resolveOperatorsDown(rule)
}

/**
* 返回该节点的拷贝,其中,“规则”首先被递归地运用到该节点的子树,然后再应用到该节点(即后序、自底向上进行);
* 如果“规则”没有应用到任何节点,那么保持不变。
*/
def resolveOperatorsUp(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = {
if (!analyzed) {
AnalysisHelper.allowInvokingTransformsInAnalyzer {
// 1.递归地将“规则”运用到该节点的子树,并返回运用后当前节点的拷贝
val afterRuleOnChildren = mapChildren(_.resolveOperatorsUp(rule))
// 2.比较当前节点和新的拷贝
// 2.1 如果相同,说明规则没有应用到该节点的子树,那么将“规则”直接应用到当前节点
if (self fastEquals afterRuleOnChildren) {
CurrentOrigin.withOrigin(origin) {
rule.applyOrElse(self, identity[LogicalPlan])
}
} else {
// 2.2 如果不同,说明规则成功应用到了该节点的某子树,那么将“规则”应用到当前节点的新拷贝实例
CurrentOrigin.withOrigin(origin) {
rule.applyOrElse(afterRuleOnChildren, identity[LogicalPlan])
}
}
}
} else {
self
}
}

/** 类似于resolveOperatorsUp,但方向是自顶向下进行*/
def resolveOperatorsDown(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = {
if (!analyzed) {
AnalysisHelper.allowInvokingTransformsInAnalyzer {
// 直接将“规则”应用于当前节点
val afterRule = CurrentOrigin.withOrigin(origin) {
rule.applyOrElse(self, identity[LogicalPlan])
}

// Check if unchanged and then possibly return old copy to avoid gc churn.
if (self fastEquals afterRule) {
mapChildren(_.resolveOperatorsDown(rule))
} else {
afterRule.mapChildren(_.resolveOperatorsDown(rule))
}
}
} else {
self
}
}

transform*方法都被assertNotAnalysisRule()进行了防护,这些方法一旦被analyzer使用,就会抛出异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
protected def assertNotAnalysisRule(): Unit = {
if (Utils.isTesting && // 首先检查是否Spark运行单元测试,默认需要运行单元测试
AnalysisHelper.inAnalyzer.get > 0 &&
AnalysisHelper.resolveOperatorDepth.get == 0) {
throw new RuntimeException("This method should not be called in the analyzer")
}
}

/**
* 分析器应该使用resolveOperatorsDown()
*/
override def transformDown(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = {
assertNotAnalysisRule()
super.transformDown(rule)
}

/**
* 分析器应该使用resolveOperators()
*/
override def transformUp(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = {
assertNotAnalysisRule()
super.transformUp(rule)
}

/**
* 分析器应该使用resolveExpressions()
*/
override def transformAllExpressions(rule: PartialFunction[Expression, Expression]): this.type = {
assertNotAnalysisRule()
super.transformAllExpressions(rule)
}

Analyzer

逻辑查询计划的分析器,借助SessionCatalog中的信息,它可以将UnresolvedAttributeUnresolvedRelation转化为类型完全的对象。附件中包含了示例SQL经过Analyzer应用一系列规则分析后最终得到分析后的逻辑计划。

1
2
3
4
5
6
7
class Analyzer(
catalog: SessionCatalog,
conf: SQLConf,
maxIterations: Int)
extends RuleExecutor[LogicalPlan] with CheckAnalysis {
... ...
}

以下为分析器定义的规则:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
lazy val batches: Seq[Batch] = Seq(
Batch("Hints", fixedPoint,
new ResolveHints.ResolveBroadcastHints(conf),
ResolveHints.ResolveCoalesceHints,
ResolveHints.RemoveAllHints),
Batch("Simple Sanity Check", Once,
LookupFunctions),
Batch("Substitution", fixedPoint,
CTESubstitution,
WindowsSubstitution,
EliminateUnions,
new SubstituteUnresolvedOrdinals(conf)),
Batch("Resolution", fixedPoint,
ResolveTableValuedFunctions ::
ResolveRelations ::
ResolveReferences ::
ResolveCreateNamedStruct ::
ResolveDeserializer ::
ResolveNewInstance ::
ResolveUpCast ::
ResolveGroupingAnalytics ::
ResolvePivot ::
ResolveOrdinalInOrderByAndGroupBy ::
ResolveAggAliasInGroupBy ::
ResolveMissingReferences ::
ExtractGenerator ::
ResolveGenerate ::
ResolveFunctions ::
ResolveAliases ::
ResolveSubquery ::
ResolveSubqueryColumnAliases ::
ResolveWindowOrder ::
ResolveWindowFrame ::
ResolveNaturalAndUsingJoin ::
ResolveOutputRelation ::
ExtractWindowExpressions ::
GlobalAggregates ::
ResolveAggregateFunctions ::
TimeWindowing ::
ResolveInlineTables(conf) ::
ResolveHigherOrderFunctions(catalog) ::
ResolveLambdaVariables(conf) ::
ResolveTimeZone(conf) ::
ResolveRandomSeed ::
TypeCoercion.typeCoercionRules(conf) ++
extendedResolutionRules : _*), // 扩展规则,参考代码BaseSessionStateBuilder在创建analyzer的逻辑
Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*),// 扩展规则,参考代码BaseSessionStateBuilder在创建analyzer的逻辑
Batch("View", Once,
AliasViewChild(conf)),
Batch("Nondeterministic", Once,
PullOutNondeterministic),
Batch("UDF", Once,
HandleNullInputsForUDF),
Batch("FixNullability", Once,
FixNullability),
Batch("Subquery", Once,
UpdateOuterReferences),
Batch("Cleanup", fixedPoint,
CleanupAliases)
)

CheckAnalysis

该接口负责对分析后的逻辑计划进行检查,其中主要逻辑定义在checkAnalysis()方法中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def checkAnalysis(plan: LogicalPlan): Unit = {
// We transform up and order the rules so as to catch the first possible failure instead
// of the result of cascading resolution failures.
plan.foreachUp {
... ...
}
extendedCheckRules.foreach(_(plan))
plan.foreachUp {
case o if !o.resolved => failAnalysis(s"unresolved operator ${o.simpleString}")
case _ =>
}

plan.setAnalyzed()
}

逻辑优化阶段(optimizer)

下面的序列图描述了优化阶段的过程:

SparkOptimizer

附件中包含了示例对应的未优化过的逻辑计划经过SparkOptimizer应用一系列规则优化后最终得到优化过的逻辑计划。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class SparkOptimizer(
catalog: SessionCatalog,
experimentalMethods: ExperimentalMethods)
extends Optimizer(catalog) {

override def defaultBatches: Seq[Batch] = (preOptimizationBatches ++ super.defaultBatches :+
Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+
Batch("Extract Python UDFs", Once,
Seq(ExtractPythonUDFFromAggregate, ExtractPythonUDFs): _*) :+
Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions) :+
Batch("Parquet Schema Pruning", Once, ParquetSchemaPruning)) ++
postHocOptimizationBatches :+
Batch("User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*)
... ...
}

物理计划阶段(planning)

下面的序列图描述了物理计划阶段的过程:

QueryPlanner

QueryPlanner使用策略将LogicalPlan转换为一个或多个物理计划。子类负责指定一组GenericStrategy对象,每个GenericStrategy对象可以返回一组可能的物理计划。如果一个策略不能计划好逻辑计划树中剩余的运算符,那么它会调用GenericStrategy.planLater()方法来返回一个占位对象,该占位对象会被收集起来,然后使用其它可用的策略来填补。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = {
// Obviously a lot to do here still...

// Collect physical plan candidates.
val candidates = strategies.iterator.flatMap(_(plan))

// The candidates may contain placeholders marked as [[planLater]],
// so try to replace them by their child plans.
val plans = candidates.flatMap { candidate =>
val placeholders = collectPlaceholders(candidate)

if (placeholders.isEmpty) {
// Take the candidate as is because it does not contain placeholders.
Iterator(candidate)
} else {
// Plan the logical plan marked as [[planLater]] and replace the placeholders.
placeholders.iterator.foldLeft(Iterator(candidate)) {
case (candidatesWithPlaceholders, (placeholder, logicalPlan)) =>
// Plan the logical plan for the placeholder.
val childPlans = this.plan(logicalPlan)

candidatesWithPlaceholders.flatMap { candidateWithPlaceholders =>
childPlans.map { childPlan =>
// Replace the placeholder by the child plan
candidateWithPlaceholders.transformUp {
case p if p.eq(placeholder) => childPlan
}
}
}
}
}
}

val pruned = prunePlans(plans)
assert(pruned.hasNext, s"No plan for $plan")
pruned
}

SparkStrategies

该抽象类只负责定义一系列的策略对象。

SparkStrategy

策略对象负责将逻辑计划转换为零个或多个SparkPlan

SparkPlanner

1
2
3
4
5
6
7
8
9
10
11
12
13
override def strategies: Seq[Strategy] =
experimentalMethods.extraStrategies ++
extraPlanningStrategies ++ (
PythonEvals ::
DataSourceV2Strategy ::
FileSourceStrategy ::
DataSourceStrategy(conf) ::
SpecialLimits ::
Aggregation ::
Window ::
JoinSelection ::
InMemoryScans ::
BasicOperators :: Nil)

SparkPlan

代码生成阶段

TODO

示例

SQL解析

1
2
3
4
5
6
7
8
9
SELECT avg(total) AS avgScore
FROM (
SELECT
student.id,
( 100 + 80 + score.mathScore + score.englishScore ) AS total
FROM student
JOIN score ON (student.id = score.studentId)
WHERE student.age < 35
) tmp
1
2
3
4
5
6
7
8
== Parsed Logical Plan ==
'Project ['avg('total) AS avgScore#33]
+- 'SubqueryAlias `tmp`
+- 'Project ['student.id, (((100 + 80) + 'score.mathScore) + 'score.englishScore) AS total#32]
+- 'Filter ('student.age < 35)
+- 'Join Inner, ('student.id = 'score.studentId)
:- 'UnresolvedRelation `student`
+- 'UnresolvedRelation `score`

分析阶段

附件中包含了示例SQL经过Analyzer应用一系列规则分析后最终得到分析后的逻辑计划。

1
2
3
4
5
6
7
8
9
10
11
== Analyzed Logical Plan ==
avgScore: double
Aggregate [avg(total#32) AS avgScore#33]
+- SubqueryAlias `tmp`
+- Project [id#10, ((cast((100 + 80) as double) + cast(mathScore#27 as double)) + cast(englishScore#28 as double)) AS total#32]
+- Filter (cast(age#12 as int) < 35)
+- Join Inner, (id#10 = studentId#26)
:- SubqueryAlias `student`
: +- Relation[id#10,name#11,age#12] csv
+- SubqueryAlias `score`
+- Relation[studentId#26,mathScore#27,englishScore#28] csv

逻辑优化阶段

附件中包含了示例对应的未优化过的逻辑计划经过SparkOptimizer应用一系列规则优化后最终得到优化过的逻辑计划。

1
2
3
4
5
6
7
8
9
== Optimized Logical Plan ==
Aggregate [avg(total#32) AS avgScore#33]
+- Project [((180.0 + cast(mathScore#27 as double)) + cast(englishScore#28 as double)) AS total#32]
+- Join Inner, (id#10 = studentId#26)
:- Project [id#10]
: +- Filter ((isnotnull(age#12) && (cast(age#12 as int) < 35)) && isnotnull(id#10))
: +- Relation[id#10,name#11,age#12] csv
+- Filter isnotnull(studentId#26)
+- Relation[studentId#26,mathScore#27,englishScore#28] csv

物理计划

1
2
3
4
5
6
7
8
9
10
HashAggregate(keys=[], functions=[avg(total#32)], output=[avgScore#33])
+- HashAggregate(keys=[], functions=[partial_avg(total#32)], output=[sum#37, count#38L])
+- Project [((180.0 + cast(mathScore#27 as double)) + cast(englishScore#28 as double)) AS total#32]
+- BroadcastHashJoin [id#10], [studentId#26], Inner, BuildLeft
:- Project [id#10]
: +- Filter ((isnotnull(age#12) && (cast(age#12 as int) < 35)) && isnotnull(id#10))
: +- FileScan csv [id#10,age#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[../student.csv], PartitionFilters: [], PushedFilters: [IsNotNull(age), IsNotNull(id)], ReadSchema: struct<id:string,age:string>
+- Project [studentId#26, mathScore#27, englishScore#28]
+- Filter isnotnull(studentId#26)
+- FileScan csv [studentId#26,mathScore#27,englishScore#28] Batched: false, Format: CSV, Location: InMemoryFileIndex[../score.csv], PartitionFilters: [], PushedFilters: [IsNotNull(studentId)], ReadSchema: struct<studentId:string,mathScore:string,englishScore:string>
1
2
3
4
5
6
7
8
9
10
11
12
13
== Physical Plan ==
*(3) HashAggregate(keys=[], functions=[avg(total#32)], output=[avgScore#33])
+- Exchange SinglePartition
+- *(2) HashAggregate(keys=[], functions=[partial_avg(total#32)], output=[sum#37, count#38L])
+- *(2) Project [((180.0 + cast(mathScore#27 as double)) + cast(englishScore#28 as double)) AS total#32]
+- *(2) BroadcastHashJoin [id#10], [studentId#26], Inner, BuildLeft
:- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
: +- *(1) Project [id#10]
: +- *(1) Filter ((isnotnull(age#12) && (cast(age#12 as int) < 35)) && isnotnull(id#10))
: +- *(1) FileScan csv [id#10,age#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:../student.csv], PartitionFilters: [], PushedFilters: [IsNotNull(age), IsNotNull(id)], ReadSchema: struct<id:string,age:string>
+- *(2) Project [studentId#26, mathScore#27, englishScore#28]
+- *(2) Filter isnotnull(studentId#26)
+- *(2) FileScan csv [studentId#26,mathScore#27,englishScore#28] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:../score.csv], PartitionFilters: [], PushedFilters: [IsNotNull(studentId)], ReadSchema: struct<studentId:string,mathScore:string,englishScore:string>

代码生成阶段

TODO

NOTEs

本文以Spark 2.4.3为基础。

参考

  1. http://hbasefly.com/2017/03/01/sparksql-catalyst/
  2. https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html
  3. https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-catalyst.html
  4. http://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf
  5. http://www.jasongj.com/spark/rbo/
  6. https://www.infoq.cn/article/xEwaUj8RN74lvbRpTBa5
  7. https://blog.csdn.net/wypblog/article/details/106066551
  8. https://www.codercto.com/a/82411.html
  9. https://www.jianshu.com/p/93adf2675064