• SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(1)


    背景

    本文基于 SPARK 3.3.0
    从一个unit test来探究SPARK Codegen的逻辑,

      test("SortAggregate should be included in WholeStageCodegen") {
        val df = spark.range(10).agg(max(col("id")), avg(col("id")))
        withSQLConf("spark.sql.test.forceApplySortAggregate" -> "true") {
          val plan = df.queryExecution.executedPlan
          assert(plan.exists(p =>
            p.isInstanceOf[WholeStageCodegenExec] &&
              p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SortAggregateExec]))
          assert(df.collect() === Array(Row(9, 4.5)))
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    分析

    执行计划的真实面目

    Spark的全代码流程,网上和代码中都有提及,如下:

      WholeStageCodegen       Plan A               FakeInput        Plan B
      =========================================================================
     
      -> execute()
          |
       doExecute() --------->   inputRDDs() -------> inputRDDs() ------> execute()
          |
          +----------------->   produce()
                                  |
                               doProduce()  -------> produce()
                                                        |
                                                     doProduce()
                                                        |
                              doConsume() <--------- consume()
                                  |
       doConsume()  <--------  consume()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    整体逻辑都知道,道理都懂,但是里面涉及到了好多细节,就拿以上的例子来说,会生成如下的执行计划:

    *(2) SortAggregate(key=[], functions=[max(id#0L), avg(id#0L)], output=[max(id)#5L, avg(id)#6])
    +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#13]
       +- *(1) SortAggregate(key=[], functions=[partial_max(id#0L), partial_avg(id#0L)], output=[max#12L, sum#13, count#14L])
          +- *(1) Range (0, 10, step=1, splits=2)
    
    • 1
    • 2
    • 3
    • 4

    但是实际在缕代码的时候,你就会发现这个全代码的逻辑根本就缕不通,那是因为CollapseCodegenStages规则会加WholeStageCodegenExecInputAdapter物理计划,加了以后的计划为:

    WholeStageCodegen
    *(2) SortAggregate(key=[], functions=[max(id#0L), avg(id#0L)], output=[max(id)#5L, avg(id)#6])
      InputAdapter
    +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#13]
        WholeStageCodegen
       +- *(1) SortAggregate(key=[], functions=[partial_max(id#0L), partial_avg(id#0L)], output=[max#12L, sum#13, count#14L])
          +- *(1) Range (0, 10, step=1, splits=2)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    注意:类似有*(1),*(2)这种符号的,表明是有全代码生成的,而为什么在物理计划的时候没有显示 WholeStageCodegenExecInputAdapter 计划是因为该两个计划重写了generateTreeString方法:
    WholeStageCodegenExec 重写为如下:

    override def generateTreeString(
          depth: Int,
          lastChildren: Seq[Boolean],
          append: String => Unit,
          verbose: Boolean,
          prefix: String = "",
          addSuffix: Boolean = false,
          maxFields: Int,
          printNodeId: Boolean,
          indent: Int = 0): Unit = {
        child.generateTreeString(
          depth,
          lastChildren,
          append,
          verbose,
          if (printNodeId) "* " else s"*($codegenStageId) ",
          false,
          maxFields,
          printNodeId,
          indent)
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    这里的*($codegenStageId) 就是上面所说的*(1),*(2),而这里的数字1和2代表者不同的两个代码生成阶段,因为Exchange不支持代码生成,所以被隔离成了两个代码生成。而*($codegenStageId) 作为子计划的前缀传递到了下游。
    InputAdapter 重写为如下:

     override def generateTreeString(
          depth: Int,
          lastChildren: Seq[Boolean],
          append: String => Unit,
          verbose: Boolean,
          prefix: String = "",
          addSuffix: Boolean = false,
          maxFields: Int,
          printNodeId: Boolean,
          indent: Int = 0): Unit = {
        child.generateTreeString(
          depth,
          lastChildren,
          append,
          verbose,
          prefix = "",
          addSuffix = false,
          maxFields,
          printNodeId,
          indent)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    看到这里的prefix = “”,所以单纯从执行计划看是没有任何迹象能表明存在着InputAdapter计划.

    所以说,我们最后应该看到的数据流应为:
    第一阶段wholeStageCodegen:

     WholeStageCodegenExec      SortAggregateExec(Partial)     RangeExec        
      =========================================================================
     
      -> execute()
          |
       doExecute() --------->   inputRDDs() -----------------> inputRDDs() 
          |
       doCodeGen()
          |
          +----------------->   produce()
                                  |
                               doProduce() 
                                  |
                               doProduceWithoutKeys() -------> produce()
                                                                  |
                                                              doProduce()
                                                                  |
                               doConsume()<------------------- consume()
                                  |
                               doConsumeWithoutKeys()
                                  |并不是doConsumeWithoutKeys调用consume,而是由doProduceWithoutKeys调用
       doConsume()  <--------  consume()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    第二阶段wholeStageCodegen:

     WholeStageCodegenExec      SortAggregateExec(Final)      InputAdapter       ShuffleExchangeExec        
      ====================================================================================
     
      -> execute()
          |
       doExecute() --------->   inputRDDs() -----------------> inputRDDs() -------> execute()
          |                                                                            |
       doCodeGen()                                                                  doExecute()     
          |                                                                            |
          +----------------->   produce()                                           ShuffledRowRDD
                                  |
                               doProduce() 
                                  |
                               doProduceWithoutKeys() -------> produce()
                                                                  |
                                                              doProduce()
                                                                  |
                               doConsume() <------------------- consume()
                                  |
                               doConsumeWithoutKeys()
                                  |并不是doConsumeWithoutKeys调用consume,而是由doProduceWithoutKeys调用
       doConsume()  <--------  consume()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
  • 相关阅读:
    【深度强化学习】Python:OpenAI Gym-CarRacing 自动驾驶 | 提供项目完整代码 | 车道检测功能 | 路径训练功能 | 车辆控制功能
    计算机毕业设计ssm+vue基本微信小程序的心理服务平台
    2 线程池-ThreadPoolExector分析
    GEO振弦式钢筋计的组装
    汽车企业售后业务数字化转型,究竟有多卷
    卷积神经网络总结
    Python tkinter -- 第18章 画布控件之图像(image)
    LinkedList相较于Arravlist的特点/优化(面试笔记总结速记)
    第四周学习报告
    Mac的Vim配置
  • 原文地址:https://blog.csdn.net/monkeyboy_tech/article/details/126655719