/** * Returns a Seq of the children of this node. * Children should not change. Immutability required for containsChild optimization */ defchildren: Seq[BaseType] }
/** * Faster version of equality which short-circuits when two treeNodes are the same instance. * We don't just override Object.equals, as doing so prevents the scala compiler from * generating case class `equals` methods */ deffastEquals(other: TreeNode[_]): Boolean = { this.eq(other) || this == other }
abstractclassRuleExecutor[TreeType <: TreeNode[_]] extendsLogging{ /** Defines a sequence of rule batches, to be overridden by the implementation. */ protecteddefbatches: Seq[Batch]
/** * Defines a check function that checks for structural integrity of the plan after the execution * of each rule. For example, we can check whether a plan is still resolved after each rule in * `Optimizer`, so we can catch rules that return invalid plans. The check function returns * `false` if the given plan doesn't pass the structural integrity check. */ protecteddefisPlanIntegral(plan: TreeType): Boolean = true
/** * 以批次的形式,执行子类中定义的规则。批次是顺序执行的,批次内部的规则也是按顺序执行的。 */ defexecute(plan: TreeType): TreeType = { var curPlan = plan val queryExecutionMetrics = RuleExecutor.queryExecutionMeter
batches.foreach { batch => val batchStartPlan = curPlan var iteration = 1 var lastPlan = curPlan varcontinue = true
// Run until fix point (or the max number of iterations as specified in the strategy. while (continue) { // 顺序执行当前batch中的规则 curPlan = batch.rules.foldLeft(curPlan) { case (plan, rule) => val startTime = System.nanoTime() val result = rule(plan) // 使用规则对计划进行转换 val runTime = System.nanoTime() - startTime
// Run the structural integrity checker against the plan after each rule. if (!isPlanIntegral(result)) { val message = s"After applying rule ${rule.ruleName} in batch ${batch.name}, " + "the structural integrity of the plan is broken." thrownewTreeNodeException(result, message, null) }
result } // 以下逻辑是该batch的收敛条件 iteration += 1 if (iteration > batch.strategy.maxIterations) { // Only log if this is a rule that is supposed to run more than once. if (iteration != 2) { val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}" if (Utils.isTesting) { thrownewTreeNodeException(curPlan, message, null) } else { logWarning(message) } } continue = false }
if (curPlan.fastEquals(lastPlan)) { logTrace( s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.") continue = false } lastPlan = curPlan }
if (!batchStartPlan.fastEquals(curPlan)) { logDebug( s""" |=== Result of Batch ${batch.name} === |${sideBySide(batchStartPlan.treeString, curPlan.treeString).mkString("\n")} """.stripMargin) } else { logTrace(s"Batch ${batch.name} has no effect.") } }
/** Name for this rule, automatically inferred based on class name. */ val ruleName: String = { val className = getClass.getName if (className endsWith "$") className.dropRight(1) else className }
/** * Wrap a Dataset action to track the QueryExecution and time cost, then report to the * user-registered callback functions. */ privatedefwithAction[U](name: String, qe: QueryExecution)(action: SparkPlan => U) = { try { qe.executedPlan.foreach { plan => plan.resetMetrics() } val start = System.nanoTime() val result = SQLExecution.withNewExecutionId(sparkSession, qe) { action(qe.executedPlan) } val end = System.nanoTime() sparkSession.listenerManager.onSuccess(name, qe, end - start) result } catch { case e: Throwable => sparkSession.listenerManager.onFailure(name, qe, e) throw e } }
/** * Collect all elements from a spark plan. */ privatedefcollectFromPlan(plan: SparkPlan): Array[T] = { // This projection writes output to a `InternalRow`, which means applying this projection is not // thread-safe. Here we create the projection inside this method to make `Dataset` thread-safe. val objProj = GenerateSafeProjection.generate(deserializer :: Nil) plan.executeCollect().map { row => // The row returned by SafeProjection is `SpecificInternalRow`, which ignore the data type // parameter of its `get` method, so it's safe to use null here. objProj(row).get(0, null).asInstanceOf[T] } }
/** * 递归地将该计划树内所有节点标记为analyzed,该方法只由CheckAnalysis调用 */ private[catalyst] defsetAnalyzed(): Unit = { if (!_analyzed) { _analyzed = true children.foreach(_.setAnalyzed()) } }
/** * 递归地对表达式树进行转换,跳过已经分析过的节点 */ defresolveExpressions(r: PartialFunction[Expression, Expression]): LogicalPlan = { resolveOperators { case p => p.transformExpressions(r) } }
/** * 返回该节点的拷贝,其中,“规则”已经被递归地运用到该树;如果“规则”没有应用到该节点,那么返回该节点自身。 * 该方法类似于transform方法,但该方法跳过了被标记为analyzed的子树。 * * @param rule the function use to transform this nodes children */ defresolveOperators(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = { resolveOperatorsDown(rule) }
protecteddefassertNotAnalysisRule(): Unit = { if (Utils.isTesting && // 首先检查是否Spark运行单元测试,默认需要运行单元测试 AnalysisHelper.inAnalyzer.get > 0 && AnalysisHelper.resolveOperatorDepth.get == 0) { thrownewRuntimeException("This method should not be called in the analyzer") } }
defcheckAnalysis(plan: LogicalPlan): Unit = { // We transform up and order the rules so as to catch the first possible failure instead // of the result of cascading resolution failures. plan.foreachUp { ... ... } extendedCheckRules.foreach(_(plan)) plan.foreachUp { case o if !o.resolved => failAnalysis(s"unresolved operator ${o.simpleString}") case _ => }
defplan(plan: LogicalPlan): Iterator[PhysicalPlan] = { // Obviously a lot to do here still...
// Collect physical plan candidates. val candidates = strategies.iterator.flatMap(_(plan))
// The candidates may contain placeholders marked as [[planLater]], // so try to replace them by their child plans. val plans = candidates.flatMap { candidate => val placeholders = collectPlaceholders(candidate)
if (placeholders.isEmpty) { // Take the candidate as is because it does not contain placeholders. Iterator(candidate) } else { // Plan the logical plan marked as [[planLater]] and replace the placeholders. placeholders.iterator.foldLeft(Iterator(candidate)) { case (candidatesWithPlaceholders, (placeholder, logicalPlan)) => // Plan the logical plan for the placeholder. val childPlans = this.plan(logicalPlan)
candidatesWithPlaceholders.flatMap { candidateWithPlaceholders => childPlans.map { childPlan => // Replace the placeholder by the child plan candidateWithPlaceholders.transformUp { case p if p.eq(placeholder) => childPlan } } } } } }
val pruned = prunePlans(plans) assert(pruned.hasNext, s"No plan for $plan") pruned }
SELECTavg(total) AS avgScore FROM ( SELECT student.id, ( 100 + 80 + score.mathScore + score.englishScore ) AS total FROM student JOIN score ON (student.id = score.studentId) WHERE student.age < 35 ) tmp