• Hudi Spark SQL源码学习总结-CTAS


    前言

    上一篇文章Hudi Spark SQL源码学习总结-Create Table总结了Create Table源码执行逻辑,这一篇继续总结CTAS,之所以总结CTAS,是之前在我提交的一个PR中发现,Spark2和Spark3.2.1版本的CTAS的逻辑不一样,最终走的Hudi实现类也不一样,所以本文分Spark2和Spark3.2.1两个版本分析

    不同点

    先总结一下Spark2和Spark3.2.1的整体逻辑的不同点

    Spark2: visitCreateTable->CreateTable->CreateHoodieTableAsSelectCommand.run
    Spark3.2.1: 前提配置了:spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog,如果没有配置则和Spark2一样
    visitCreateTable->CreateTableAsSelectStatement->isV2Provider->true->CreateTableAsSelect->HoodieCatalog.createHoodieTable
    visitCreateTable->CreateTableAsSelectStatement->isV2Provider->false->CreateTable->CreateHoodieTableAsSelectCommand.run

    Spark2和Spark3.2.1不同的关键点有两个:

    • 1、配置spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog
    • 2、isV2Provider(“hudi”)返回ture

    只要有一个不满足,Spark3.2.1的逻辑就和Spark2一样,引进HoodieCatalog和令hudi为V2Provider的PR为: [HUDI-3254] Introduce HoodieCatalog to manage tables for Spark Datasource V2
    目前master最新代码已将spark3.2.1的isV2Provider(“hudi”)改为了false,也就是Spark2和Saprk3.2.1的逻辑又一致了,PR:[HUDI-4178] Addressing performance regressions in Spark DataSourceV2 Integration

    版本

    Hudi https://github.com/apache/hudi/pull/5592 本文基于这个PR对应的代码进行调试分析,因为我就是在贡献这个PR时才发现Spark3.2.1和Saprk2的CTAS的逻辑不同的

    示例代码

    还是直接拿源码里的TestCreateTable的测试语句

    spark.sql(
            s"""
               | create table $tableName using hudi
               | partitioned by (dt)
               | tblproperties(
               |    hoodie.database.name = "databaseName",
               |    hoodie.table.name = "tableName",
               |    primaryKey = 'id',
               |    preCombineField = 'ts',
               |    hoodie.datasource.write.operation = 'upsert',
               |    type = '$tableType'
               | )
               | AS
               | select 1 as id, 'a1' as name, 10 as price, '2021-04-01' as dt, 1000 as ts
             """.stripMargin
          )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    不过需要提一下, 这里的spark是如何创建的,因为在分析Spark3.2.1的逻辑时会用到,先贴在这里:

      protected lazy val spark: SparkSession = SparkSession.builder()
        .master("local[1]")
        .appName("hoodie sql test")
        .withExtensions(new HoodieSparkSessionExtension)
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        .config("hoodie.insert.shuffle.parallelism", "4")
        .config("hoodie.upsert.shuffle.parallelism", "4")
        .config("hoodie.delete.shuffle.parallelism", "4")
        .config("spark.sql.warehouse.dir", sparkWareHouse.getCanonicalPath)
        .config("spark.sql.session.timeZone", "CTT")
        .config(sparkConf())
        .getOrCreate()
    
       def sparkConf(): SparkConf = {
        val sparkConf = new SparkConf()
        if (HoodieSparkUtils.gteqSpark3_2) {
          sparkConf.set("spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
        }
        sparkConf
      }   
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    打印执行计划

    和上篇文章一样我们先打印一下计划,方便我们分析

    config("spark.sql.planChangeLog.level", "INFO")
    val df = spark.sql(ctasSql)
    df.explain(true)
    
    • 1
    • 2
    • 3

    和上一篇文章的不同点是加了配置"spark.sql.planChangeLog.level", "INFO",之所以上篇文章不加这篇文章加,是因为这个配置在Spark3.1.0才有得,所以对于Spark2的代码不生效,不过在我们分析Spark3.2.1的执行计划会比较有用,另外提一下,开启这个配置是通过logBasedOnLevel(message)来打印信息的,一共有三个方法调用了logBasedOnLevel,分别为logRule:如果rule生效,打印oldPlannewPlanlogBatch:打印Batch的前后信息,logMetrics:打印整体指标,但是在planner.plan中没有调用这几个方法,所以对于分析哪些strategies会生效是没用的,不过对于分析analysis阶段的哪些规则会生效还是非常有用的

      private def logBasedOnLevel(f: => String): Unit = {
        logLevel match {
          case "TRACE" => logTrace(f)
          case "DEBUG" => logDebug(f)
          case "INFO" => logInfo(f)
          case "WARN" => logWarning(f)
          case "ERROR" => logError(f)
          case _ => logTrace(f)
        }
      }
      def logRule(ruleName: String, oldPlan: TreeType, newPlan: TreeType): Unit = {
        if (!newPlan.fastEquals(oldPlan)) {
          if (logRules.isEmpty || logRules.get.contains(ruleName)) {
            def message(): String = {
              s"""
                 |=== Applying Rule $ruleName ===
                 |${sideBySide(oldPlan.treeString, newPlan.treeString).mkString("\n")}
               """.stripMargin
            }
    
            logBasedOnLevel(message)
          }
        }
      }
      def logBatch(batchName: String, oldPlan: TreeType, newPlan: TreeType): Unit = {
        if (logBatches.isEmpty || logBatches.get.contains(batchName)) {
          def message(): String = {
            if (!oldPlan.fastEquals(newPlan)) {
              s"""
                 |=== Result of Batch $batchName ===
                 |${sideBySide(oldPlan.treeString, newPlan.treeString).mkString("\n")}
              """.stripMargin
            } else {
              s"Batch $batchName has no effect."
            }
          }
    
          logBasedOnLevel(message)
        }
      }
      def logMetrics(metrics: QueryExecutionMetrics): Unit = {
        val totalTime = metrics.time / NANOS_PER_SECOND.toDouble
        val totalTimeEffective = metrics.timeEffective / NANOS_PER_SECOND.toDouble
        val message =
          s"""
             |=== Metrics of Executed Rules ===
             |Total number of runs: ${metrics.numRuns}
             |Total time: $totalTime seconds
             |Total number of effective runs: ${metrics.numEffectiveRuns}
             |Total time of effective runs: $totalTimeEffective seconds
          """.stripMargin
    
        logBasedOnLevel(message)
      }  
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    Spark2

    Spark2的逻辑和上一篇文章差不多,由于上一篇已经总结过了,所以本文只讲不同的地方,如果掌握了上一篇文章的逻辑的话,再看CTAS的逻辑还是比较简单的。

    打印信息

    == Parsed Logical Plan ==
    'CreateTable `h0`, ErrorIfExists
    +- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
       +- OneRowRelation
    
    == Analyzed Logical Plan ==
    CreateHoodieTableAsSelectCommand `h0`, ErrorIfExists
       +- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
          +- OneRowRelation
    
    == Optimized Logical Plan ==
    CreateHoodieTableAsSelectCommand `h0`, ErrorIfExists
       +- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
          +- OneRowRelation
    
    == Physical Plan ==
    Execute CreateHoodieTableAsSelectCommand
       +- CreateHoodieTableAsSelectCommand `h0`, ErrorIfExists
             +- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
                +- OneRowRelation
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    singleStatement

    根据上篇文章中的逻辑,可知这里的CTAS语句同样对应Spark源码里的SqlBase.g4

    singleStatement
        : statement EOF
        ;
    
    statement
        : query                                                            #statementDefault
        | USE db=identifier                                                #use
        | CREATE DATABASE (IF NOT EXISTS)? identifier
            (COMMENT comment=STRING)? locationSpec?
            (WITH DBPROPERTIES tablePropertyList)?                         #createDatabase
        | ALTER DATABASE identifier SET DBPROPERTIES tablePropertyList     #setDatabaseProperties
        | DROP DATABASE (IF EXISTS)? identifier (RESTRICT | CASCADE)?      #dropDatabase
        | createTableHeader ('(' colTypeList ')')? tableProvider
            ((OPTIONS options=tablePropertyList) |
            (PARTITIONED BY partitionColumnNames=identifierList) |
            bucketSpec |
            locationSpec |
            (COMMENT comment=STRING) |
            (TBLPROPERTIES tableProps=tablePropertyList))*
            (AS? query)?                                                   #createTable
        | createTableHeader ('(' columns=colTypeList ')')?
            ((COMMENT comment=STRING) |
            (PARTITIONED BY '(' partitionColumns=colTypeList ')') |
            bucketSpec |
            skewSpec |
            rowFormat |
            createFileFormat |
            locationSpec |
            (TBLPROPERTIES tableProps=tablePropertyList))*
            (AS? query)?                                                   #createHiveTable
            ......
    
    tableProvider
        : USING qualifiedName
        ;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    不过这里有点不同的是:query不为空 (AS? query) ,所以在visitCreateTable中返回CreateTable(tableDesc, mode, Some(query))

      override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) {
        val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
        if (external) {
          operationNotAllowed("CREATE EXTERNAL TABLE ... USING", ctx)
        }
    
        checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx)
        checkDuplicateClauses(ctx.OPTIONS, "OPTIONS", ctx)
        checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx)
        checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx)
        checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx)
        checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)
    
        val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
        // provider为hudi
        val provider = ctx.tableProvider.qualifiedName.getText
        val schema = Option(ctx.colTypeList()).map(createSchema)
        val partitionColumnNames =
          Option(ctx.partitionColumnNames)
            .map(visitIdentifierList(_).toArray)
            .getOrElse(Array.empty[String])
        val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty)
        val bucketSpec = ctx.bucketSpec().asScala.headOption.map(visitBucketSpec)
    
        val location = ctx.locationSpec.asScala.headOption.map(visitLocationSpec)
        val storage = DataSource.buildStorageFormatFromOptions(options)
    
        if (location.isDefined && storage.locationUri.isDefined) {
          throw new ParseException(
            "LOCATION and 'path' in OPTIONS are both used to indicate the custom table path, " +
              "you can only specify one of them.", ctx)
        }
        val customLocation = storage.locationUri.orElse(location.map(CatalogUtils.stringToURI))
    
        val tableType = if (customLocation.isDefined) {
          CatalogTableType.EXTERNAL
        } else {
          CatalogTableType.MANAGED
        }
    
        val tableDesc = CatalogTable(
          identifier = table,
          tableType = tableType,
          storage = storage.copy(locationUri = customLocation),
          schema = schema.getOrElse(new StructType),
          provider = Some(provider),
          partitionColumnNames = partitionColumnNames,
          bucketSpec = bucketSpec,
          properties = properties,
          comment = Option(ctx.comment).map(string))
    
        // Determine the storage mode.
        val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
    
        if (ctx.query != null) {
          // Get the backing query.
          val query = plan(ctx.query)
    
          if (temp) {
            operationNotAllowed("CREATE TEMPORARY TABLE ... USING ... AS query", ctx)
          }
    
          // Don't allow explicit specification of schema for CTAS
          if (schema.nonEmpty) {
            operationNotAllowed(
              "Schema may not be specified in a Create Table As Select (CTAS) statement",
              ctx)
          }
          CreateTable(tableDesc, mode, Some(query))
        } else {
          if (temp) {
            if (ifNotExists) {
              operationNotAllowed("CREATE TEMPORARY TABLE IF NOT EXISTS", ctx)
            }
    
            logWarning(s"CREATE TEMPORARY TABLE ... USING ... is deprecated, please use " +
              "CREATE TEMPORARY VIEW ... USING ... instead")
            // Unlike CREATE TEMPORARY VIEW USING, CREATE TEMPORARY TABLE USING does not support
            // IF NOT EXISTS. Users are not allowed to replace the existing temp table.
            CreateTempViewUsing(table, schema, replace = false, global = false, provider, options)
          } else {
            CreateTable(tableDesc, mode, None)
          }
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85

    analysis

    那么在analysis阶段中Hudi的自定义规则customResolutionRules中的HoodieAnalysis的apply方法中会被匹配到

    case class HoodieAnalysis(sparkSession: SparkSession) extends Rule[LogicalPlan]
      with SparkAdapterSupport {
    
      override def apply(plan: LogicalPlan): LogicalPlan = {
        plan match {
          // Convert to MergeIntoHoodieTableCommand
          case m @ MergeIntoTable(target, _, _, _, _)
            if m.resolved && sparkAdapter.isHoodieTable(target, sparkSession) =>
              MergeIntoHoodieTableCommand(m)
    
          // Convert to UpdateHoodieTableCommand
          case u @ UpdateTable(table, _, _)
            if u.resolved && sparkAdapter.isHoodieTable(table, sparkSession) =>
              UpdateHoodieTableCommand(u)
    
          // Convert to DeleteHoodieTableCommand
          case d @ DeleteFromTable(table, _)
            if d.resolved && sparkAdapter.isHoodieTable(table, sparkSession) =>
              DeleteHoodieTableCommand(d)
    
          // Convert to InsertIntoHoodieTableCommand
          case l if sparkAdapter.isInsertInto(l) =>
            val (table, partition, query, overwrite, _) = sparkAdapter.getInsertIntoChildren(l).get
            table match {
              case relation: LogicalRelation if sparkAdapter.isHoodieTable(relation, sparkSession) =>
                new InsertIntoHoodieTableCommand(relation, query, partition, overwrite)
              case _ =>
                l
            }
    
          // Convert to CreateHoodieTableAsSelectCommand
          case CreateTable(table, mode, Some(query))
            if query.resolved && sparkAdapter.isHoodieTable(table) =>
              CreateHoodieTableAsSelectCommand(table, mode, query)
    
          // Convert to CompactionHoodieTableCommand
          case CompactionTable(table, operation, options)
            if table.resolved && sparkAdapter.isHoodieTable(table, sparkSession) =>
            val tableId = getTableIdentifier(table)
            val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableId)
            CompactionHoodieTableCommand(catalogTable, operation, options)
          // Convert to CompactionHoodiePathCommand
          case CompactionPath(path, operation, options) =>
            CompactionHoodiePathCommand(path, operation, options)
          // Convert to CompactionShowOnTable
          case CompactionShowOnTable(table, limit)
            if sparkAdapter.isHoodieTable(table, sparkSession) =>
            val tableId = getTableIdentifier(table)
            val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableId)
            CompactionShowHoodieTableCommand(catalogTable, limit)
          // Convert to CompactionShowHoodiePathCommand
          case CompactionShowOnPath(path, limit) =>
            CompactionShowHoodiePathCommand(path, limit)
          // Convert to HoodieCallProcedureCommand
          case c@CallCommand(_, _) =>
            val procedure: Option[Procedure] = loadProcedure(c.name)
            val input = buildProcedureArgs(c.args)
            if (procedure.nonEmpty) {
              CallProcedureHoodieCommand(procedure.get, input)
            } else {
              c
            }
          case _ => plan
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    这里转化为CreateHoodieTableAsSelectCommand,它和CreateHoodieTableCommand一样是是Command的子类

    case class CreateHoodieTableAsSelectCommand(
       table: CatalogTable,
       mode: SaveMode,
       query: LogicalPlan) extends HoodieLeafRunnableCommand {
      override def innerChildren: Seq[QueryPlan[_]] = Seq(query)
    
      override def run(sparkSession: SparkSession): Seq[Row] = {
        assert(table.tableType != CatalogTableType.VIEW)
        assert(table.provider.isDefined)
    
        val sessionState = sparkSession.sessionState
        val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
        val tableIdentWithDB = table.identifier.copy(database = Some(db))
        val tableName = tableIdentWithDB.unquotedString
    
        if (sessionState.catalog.tableExists(tableIdentWithDB)) {
          assert(mode != SaveMode.Overwrite,
            s"Expect the table $tableName has been dropped when the save mode is Overwrite")
    
          if (mode == SaveMode.ErrorIfExists) {
            throw new RuntimeException(s"Table $tableName already exists. You need to drop it first.")
          }
          if (mode == SaveMode.Ignore) {
            // Since the table already exists and the save mode is Ignore, we will just return.
            // scalastyle:off
            return Seq.empty
            // scalastyle:on
          }
        }
        ......
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    所以会最终会调用ExecutedCommandExec.executeCollect,触发CreateHoodieTableAsSelectCommand重写的run方法,实现Hudi自己的逻辑,Hudi自己的逻辑可以在Hudi源码里调试跟踪,本文就不总结了。

    Spark3.2.1

    Hudi支持不同的Spark版本,默认是Spark2.4.4,要想使用Spark3.2.1版本,可以通过如下命令编译打包:

    mvn clean package -DskipTests -Dspark3.2 -Dscala-2.12
    
    • 1

    要想调试Spark3.2.1,可以根据上面的命令先打包或者install到本地,再新建一个测试项目引用我们自己打的包用来调试,也可以直接在Hudi源码里修改配置Idea Spark3.2.1的环境,不过比较麻烦,本人用的第二种方法,同样的这里也只讲关键的不同点

    打印信息

    Spark3.2.1的打印信息会比Spark2更全一点,我们可以看到最终的Physical PlanAtomicCreateTableAsSelectHoodieCatalog有关

    == Parsed Logical Plan ==
    'CreateTableAsSelectStatement [h0], [dt], [hoodie.datasource.write.operation=upsert, primaryKey=id, hoodie.table.name=tableName, hoodie.database.name=databaseName, type=cow, preCombineField=ts], hudi, false, false
    +- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
       +- OneRowRelation
    
    == Analyzed Logical Plan ==
    CreateTableAsSelect org.apache.spark.sql.hudi.catalog.HoodieCatalog@6dbbdf92, default.h0, [dt], [provider=hudi, hoodie.datasource.write.operation=upsert, primaryKey=id, hoodie.table.name=tableName, hoodie.database.name=databaseName, type=cow, preCombineField=ts], false
    +- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
       +- OneRowRelation
    
    == Optimized Logical Plan ==
    CommandResult AtomicCreateTableAsSelect org.apache.spark.sql.hudi.catalog.HoodieCatalog@6dbbdf92, default.h0, [dt], Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4], [provider=hudi, hoodie.datasource.write.operation=upsert, primaryKey=id, hoodie.table.name=tableName, hoodie.database.name=databaseName, owner=dongkelun01, type=cow, preCombineField=ts], [], false
       +- CreateTableAsSelect org.apache.spark.sql.hudi.catalog.HoodieCatalog@6dbbdf92, default.h0, [dt], [provider=hudi, hoodie.datasource.write.operation=upsert, primaryKey=id, hoodie.table.name=tableName, hoodie.database.name=databaseName, type=cow, preCombineField=ts], false
          +- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
             +- OneRowRelation
    
    == Physical Plan ==
    CommandResult <empty>
       +- AtomicCreateTableAsSelect org.apache.spark.sql.hudi.catalog.HoodieCatalog@6dbbdf92, default.h0, [dt], Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4], [provider=hudi, hoodie.datasource.write.operation=upsert, primaryKey=id, hoodie.table.name=tableName, hoodie.database.name=databaseName, owner=dongkelun01, type=cow, preCombineField=ts], [], false
          +- *(1) Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
             +- *(1) Scan OneRowRelation[]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    PlanChangeLogger的日志比较多,会打印哪些规则没有生效,哪些规则生效了,具体怎么生效的等,由于比较多,这里只贴一小部分

    7720 [ScalaTest-run-running-TestCreateTable] INFO  org.apache.spark.sql.catalyst.rules.PlanChangeLogger  - Batch Substitution has no effect.
    7721 [ScalaTest-run-running-TestCreateTable] INFO  org.apache.spark.sql.catalyst.rules.PlanChangeLogger  - Batch Disable Hints has no effect.
    7724 [ScalaTest-run-running-TestCreateTable] INFO  org.apache.spark.sql.catalyst.rules.PlanChangeLogger  - Batch Hints has no effect.
    7728 [ScalaTest-run-running-TestCreateTable] INFO  org.apache.spark.sql.catalyst.rules.PlanChangeLogger  - Batch Simple Sanity Check has no effect.
    8309 [ScalaTest-run-running-TestCreateTable] INFO  org.apache.spark.sql.catalyst.rules.PlanChangeLogger  - 
    === Applying Rule org.apache.spark.sql.catalyst.analysis.ResolveSessionCatalog ===
    !'CreateTableAsSelectStatement [h0], [dt], [hoodie.datasource.write.operation=upsert, primaryKey=id, hoodie.table.name=tableName, hoodie.database.name=databaseName, type=cow, preCombineField=ts], hudi, false, false   CreateTableAsSelect org.apache.spark.sql.hudi.catalog.HoodieCatalog@c3719e5, default.h0, [dt], [provider=hudi, hoodie.datasource.write.operation=upsert, primaryKey=id, hoodie.table.name=tableName, hoodie.database.name=databaseName, type=cow, preCombineField=ts], false
     +- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]                                                                                                                                   +- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
        +- OneRowRelation                                                                                                                                                                                                       +- OneRowRelation
               
    8331 [ScalaTest-run-running-TestCreateTable] INFO  org.apache.spark.sql.catalyst.rules.PlanChangeLogger  - 
    === Result of Batch Resolution ===
    !'CreateTableAsSelectStatement [h0], [dt], [hoodie.datasource.write.operation=upsert, primaryKey=id, hoodie.table.name=tableName, hoodie.database.name=databaseName, type=cow, preCombineField=ts], hudi, false, false   CreateTableAsSelect org.apache.spark.sql.hudi.catalog.HoodieCatalog@c3719e5, default.h0, [dt], [provider=hudi, hoodie.datasource.write.operation=upsert, primaryKey=id, hoodie.table.name=tableName, hoodie.database.name=databaseName, type=cow, preCombineField=ts], false
     +- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]                                                                                                                                   +- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
        +- OneRowRelation                                                                                                                                                                                                       +- OneRowRelation
              
    8334 [ScalaTest-run-running-TestCreateTable] INFO  org.apache.spark.sql.catalyst.rules.PlanChangeLogger  - Batch Remove TempResolvedColumn has no effect.
    ......
    === Metrics of Executed Rules ===
    Total number of runs: 141
    Total time: 0.6459626 seconds
    Total number of effective runs: 2
    Total time of effective runs: 0.302878 seconds 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    由于比较长,导致换行,oldPan和newPlan的对比效果不是很明显,不过可以大概看出来前后变化就行,也可以自己调试对比

    ANTLR

    上一篇讲到Hudi有三个g4文件,一个在hudi-spark模块下,另外两个在hudi-spark2模块下,同样的在hudi-spark3模块下也有两个同名的g4文件,不过内容和Spark2的不一样,具体为:

    • hudi-spark模块下的 HoodieSqlCommon.g4

    • hudi-spark3模块下的 SqlBase.g4,拷贝自的Spark3.2.0源码里的SqlBase.g4

    • hudi-spark3模块下的 HoodieSqlBase.g4 其中导入了上面的SqlBase.g4

    HoodieSqlBase.g4

    grammar HoodieSqlBase;
    
    import SqlBase;
    
    singleStatement
        : statement EOF
        ;
    
    statement
        : query                                                            #queryStatement
        | ctes? dmlStatementNoWith                                         #dmlStatement
        | createTableHeader ('(' colTypeList ')')? tableProvider?
            createTableClauses
            (AS? query)?                                                   #createTable
        | .*?                                                              #passThrough
        ;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    parsing

    同样的parsePlan首先调用HoodieCommonSqlParser.parsePlan,这个是公共的,和Spark2一样,返回null,调用sparkExtendedParser.parsePlan

      private lazy val builder = new HoodieSqlCommonAstBuilder(session, delegate)
      private lazy val sparkExtendedParser = sparkAdapter.createExtendedSparkParser
        .map(_(session, delegate)).getOrElse(delegate)
    
      override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
        builder.visit(parser.singleStatement()) match {
          case plan: LogicalPlan => plan
          case _=> sparkExtendedParser.parsePlan(sqlText)
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    上一篇讲到Spark2的sparkExtendedParserHoodieSpark2ExtendedSqlParser,那么Spark3的是啥呢?很简单和Spark2的逻辑一样,来看一下:

      private lazy val sparkExtendedParser = sparkAdapter.createExtendedSparkParser
        .map(_(session, delegate)).getOrElse(delegate)
    
      lazy val sparkAdapter: SparkAdapter = {
        val adapterClass = if (HoodieSparkUtils.isSpark3_2) {
          "org.apache.spark.sql.adapter.Spark3_2Adapter"
        } else if (HoodieSparkUtils.isSpark3_0 || HoodieSparkUtils.isSpark3_1) {
          "org.apache.spark.sql.adapter.Spark3_1Adapter"
        } else {
          "org.apache.spark.sql.adapter.Spark2Adapter"
        }
        getClass.getClassLoader.loadClass(adapterClass)
          .newInstance().asInstanceOf[SparkAdapter]
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    根据上面的代码可知sparkAdapterSpark3_2Adapter,接着看一下Spark3_2Adapter.createExtendedSparkParser

      override def createExtendedSparkParser: Option[(SparkSession, ParserInterface) => ParserInterface] = {
        Some(
          (spark: SparkSession, delegate: ParserInterface) => new HoodieSpark3_2ExtendedSqlParser(spark, delegate)
        )
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    所以这里的Spark3的sparkExtendedParserHoodieSpark3_2ExtendedSqlParser,接着到了HoodieSpark3_2ExtendedSqlParser.parsePlan,这里和Spark2的逻辑不一样

      override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
        builder.visit(parser.singleStatement()) match {
          case plan: LogicalPlan => plan
          case _=> delegate.parsePlan(sqlText)
        }
      }
    
      protected def parse[T](command: String)(toResult: HoodieSqlBaseParser => T): T = {
        logDebug(s"Parsing command: $command")
    
        val lexer = new HoodieSqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
        lexer.removeErrorListeners()
        lexer.addErrorListener(ParseErrorListener)
    
        val tokenStream = new CommonTokenStream(lexer)
        val parser = new HoodieSqlBaseParser(tokenStream)
        parser.addParseListener(PostProcessor)
        parser.removeErrorListeners()
        parser.addErrorListener(ParseErrorListener)
    //    parser.legacy_setops_precedence_enabled = conf.setOpsPrecedenceEnforced
        parser.legacy_exponent_literal_as_decimal_enabled = conf.exponentLiteralAsDecimalEnabled
        parser.SQL_standard_keyword_behavior = conf.ansiEnabled
    
        try {
          try {
            // first, try parsing with potentially faster SLL mode
            parser.getInterpreter.setPredictionMode(PredictionMode.SLL)
            toResult(parser)
          }
          catch {
            case e: ParseCancellationException =>
              // if we fail, parse with LL mode
              tokenStream.seek(0) // rewind input stream
              parser.reset()
    
              // Try Again.
              parser.getInterpreter.setPredictionMode(PredictionMode.LL)
              toResult(parser)
          }
        }
        catch {
          case e: ParseException if e.command.isDefined =>
            throw e
          case e: ParseException =>
            throw e.withCommand(command)
          case e: AnalysisException =>
            val position = Origin(e.line, e.startPosition)
            throw new ParseException(Option(command), e.message, position, position)
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    可以看到这里parser同样为HoodieSqlBaseParser,但是对于builder.visit(parser.singleStatement()),Spark3和Spark2是不一样的,为啥不一样?我们接着往下看:
    我们在Spark3的HoodieSqlBase.g4中可以看到statement中是有#createTable

        | createTableHeader ('(' colTypeList ')')? tableProvider?
            createTableClauses
            (AS? query)?                                                   #createTable
    
    • 1
    • 2
    • 3

    其中 createTableHeadertableProviderASquery都是引自SqlBase.g4,所以这里的CTAS能匹配上,这里的parser.singleStatement()和之前讲的一样,最终都会调用builder.visitCreateTable,不同的是,这里的builderHoodieSpark3_2ExtendedSqlAstBuilder,所以需要看一下它的visitCreateTable有何不同

      override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) {
        val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
    
        val columns = Option(ctx.colTypeList()).map(visitColTypeList).getOrElse(Nil)
        val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)
        val (partTransforms, partCols, bucketSpec, properties, options, location, comment, serdeInfo) =
          visitCreateTableClauses(ctx.createTableClauses())
    
        if (provider.isDefined && serdeInfo.isDefined) {
          operationNotAllowed(s"CREATE TABLE ... USING ... ${serdeInfo.get.describe}", ctx)
        }
    
        if (temp) {
          val asSelect = if (ctx.query == null) "" else " AS ..."
          operationNotAllowed(
            s"CREATE TEMPORARY TABLE ...$asSelect, use CREATE TEMPORARY VIEW instead", ctx)
        }
    
        val partitioning = partitionExpressions(partTransforms, partCols, ctx)
    
        Option(ctx.query).map(plan) match {
          case Some(_) if columns.nonEmpty =>
            operationNotAllowed(
              "Schema may not be specified in a Create Table As Select (CTAS) statement",
              ctx)
    
          case Some(_) if partCols.nonEmpty =>
            // non-reference partition columns are not allowed because schema can't be specified
            operationNotAllowed(
              "Partition column types may not be specified in Create Table As Select (CTAS)",
              ctx)
    
          case Some(query) =>
            CreateTableAsSelectStatement(
              table, query, partitioning, bucketSpec, properties, provider, options, location, comment,
              writeOptions = Map.empty, serdeInfo, external = external, ifNotExists = ifNotExists)
    
          case _ =>
            // Note: table schema includes both the table columns list and the partition columns
            // with data type.
            val schema = StructType(columns ++ partCols)
            CreateTableStatement(table, schema, partitioning, bucketSpec, properties, provider,
              options, location, comment, serdeInfo, external = external, ifNotExists = ifNotExists)
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    这里会匹配到case Some(query),返回CreateTableAsSelectStatement,这就是和Spark2(或者说Spark源码里的visitCreateTable)不同之处,Spark2返回的是CreateTable(tableDesc, mode, Some(query)),那么又是在哪里对CreateTableAsSelectStatement进行处理的呢

    ResolveCatalogs 和 ResolveSessionCatalog

    有两个规则类会匹配CreateTableAsSelectStatement,ResolveCatalogs是在Analyzerbatches中定义的,ResolveSessionCatalog是在BaseSessionStateBuilder.analyzer重写的的extendedResolutionRules中定义的(我们在PlanChangeLogger的日志中可知,真正起作用的是ResolveSessionCatalog

    override def batches: Seq[Batch] = Seq(
        Batch("Substitution", fixedPoint,
          // This rule optimizes `UpdateFields` expression chains so looks more like optimization rule.
          // However, when manipulating deeply nested schema, `UpdateFields` expression tree could be
          // very complex and make analysis impossible. Thus we need to optimize `UpdateFields` early
          // at the beginning of analysis.
          OptimizeUpdateFields,
          CTESubstitution,
          WindowsSubstitution,
          EliminateUnions,
          SubstituteUnresolvedOrdinals),
        Batch("Disable Hints", Once,
          new ResolveHints.DisableHints),
        Batch("Hints", fixedPoint,
          ResolveHints.ResolveJoinStrategyHints,
          ResolveHints.ResolveCoalesceHints),
        Batch("Simple Sanity Check", Once,
          LookupFunctions),
        Batch("Resolution", fixedPoint,
          ResolveTableValuedFunctions(v1SessionCatalog) ::
          ResolveNamespace(catalogManager) ::
          new ResolveCatalogs(catalogManager) ::
          ResolveUserSpecifiedColumns ::
          ResolveInsertInto ::
          ResolveRelations ::
          ......
          extendedResolutionRules : _*),
          .......
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
      protected def analyzer: Analyzer = new Analyzer(catalogManager) {
        override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
          new FindDataSourceTable(session) +:
            new ResolveSQLOnFile(session) +:
            new FallBackFileSourceV2(session) +:
            ResolveEncodersInScalaAgg +:
            new ResolveSessionCatalog(catalogManager) +:
            ResolveWriteToStream +:
            customResolutionRules
    
        override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
          DetectAmbiguousSelfJoin +:
            PreprocessTableCreation(session) +:
            PreprocessTableInsertion +:
            DataSourceAnalysis +:
            customPostHocResolutionRules
    
        override val extendedCheckRules: Seq[LogicalPlan => Unit] =
          PreWriteCheck +:
            PreReadCheck +:
            HiveOnlyCheck +:
            TableCapabilityCheck +:
            CommandCheck +:
            customCheckRules
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    他们两个的apply方法分别为:

    class ResolveCatalogs(val catalogManager: CatalogManager)
      extends Rule[LogicalPlan] with LookupCatalog {
      import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
      import org.apache.spark.sql.connector.catalog.CatalogV2Util._
    
      override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
        case c @ CreateTableStatement(
             NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) =>
          CreateV2Table(
            catalog.asTableCatalog,
            tbl.asIdentifier,
            c.tableSchema,
            // convert the bucket spec and add it as a transform
            c.partitioning ++ c.bucketSpec.map(_.asTransform),
            convertTableProperties(c),
            ignoreIfExists = c.ifNotExists)
    
        case c @ CreateTableAsSelectStatement(
             NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _, _) =>
          CreateTableAsSelect(
            catalog.asTableCatalog,
            tbl.asIdentifier,
            // convert the bucket spec and add it as a transform
            c.partitioning ++ c.bucketSpec.map(_.asTransform),
            c.asSelect,
            convertTableProperties(c),
            writeOptions = c.writeOptions,
            ignoreIfExists = c.ifNotExists)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    class ResolveSessionCatalog(val catalogManager: CatalogManager)
      extends Rule[LogicalPlan] with LookupCatalog {
      import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
      import org.apache.spark.sql.connector.catalog.CatalogV2Util._
      import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
    
      override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
        case AddColumns(ResolvedV1TableIdentifier(ident), cols) =>
          cols.foreach { c =>
            assertTopLevelColumn(c.name, "AlterTableAddColumnsCommand")
            if (!c.nullable) {
              throw QueryCompilationErrors.addColumnWithV1TableCannotSpecifyNotNullError
            }
          }
          AlterTableAddColumnsCommand(ident.asTableIdentifier, cols.map(convertToStructField))
    
        ......
    
        case c @ CreateTableAsSelectStatement(
             SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _, _) =>
          val (storageFormat, provider) = getStorageFormatAndProvider(
            c.provider, c.options, c.location, c.serde, ctas = true)
          if (!isV2Provider(provider)) {
            val tableDesc = buildCatalogTable(tbl.asTableIdentifier, new StructType,
              c.partitioning, c.bucketSpec, c.properties, provider, c.location,
              c.comment, storageFormat, c.external)
            val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
            CreateTable(tableDesc, mode, Some(c.asSelect))
          } else {
            CreateTableAsSelect(
              catalog.asTableCatalog,
              tbl.asIdentifier,
              // convert the bucket spec and add it as a transform
              c.partitioning ++ c.bucketSpec.map(_.asTransform),
              c.asSelect,
              convertTableProperties(c),
              writeOptions = c.writeOptions,
              ignoreIfExists = c.ifNotExists)
          }
        ......
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    可以看到这两个规则都试图去匹配CreateTableAsSelectStatement,区别是一个匹配NonSessionCatalogAndTable,另一个匹配SessionCatalogAndTable,根据名字判断,他们两个的判断是反过来的,总有一个会匹配上,那么这俩具体实现是啥呢,我看以SessionCatalogAndTable为例,我们知道scala在匹配样例类对象时回去调用它的unapply方法,这里的参数namePartsCreateTableAsSelectStatement的第一个参数,在上面的visitCreateTable可知,它是由visitCreateTableHeader(ctx.createTableHeader)返回的table

      object SessionCatalogAndTable {
        def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Seq[String])] = nameParts match {
          case SessionCatalogAndIdentifier(catalog, ident) =>
            Some(catalog -> ident.asMultipartIdentifier)
          case _ => None
        }
      }
    
      object SessionCatalogAndIdentifier {
    
        def unapply(parts: Seq[String]): Option[(CatalogPlugin, Identifier)] = parts match {
          case CatalogAndIdentifier(catalog, ident) if CatalogV2Util.isSessionCatalog(catalog) =>
            Some(catalog, ident)
          case _ => None
        }
      }
    
        object CatalogAndIdentifier {
        import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
    
        private val globalTempDB = SQLConf.get.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
    
        def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Identifier)] = {
          assert(nameParts.nonEmpty)
          if (nameParts.length == 1) { // nameParts.length等于1,代表只有表名,没有库名
            Some((currentCatalog, Identifier.of(catalogManager.currentNamespace, nameParts.head)))
          } else if (nameParts.head.equalsIgnoreCase(globalTempDB)) { //nameParts.head为库名,判断库名是否等于globalTempDB
            // Conceptually global temp views are in a special reserved catalog. However, the v2 catalog
            // API does not support view yet, and we have to use v1 commands to deal with global temp
            // views. To simplify the implementation, we put global temp views in a special namespace
            // in the session catalog. The special namespace has higher priority during name resolution.
            // For example, if the name of a custom catalog is the same with `GLOBAL_TEMP_DATABASE`,
            // this custom catalog can't be accessed.
            Some((catalogManager.v2SessionCatalog, nameParts.asIdentifier))
          } else {
            try {
              // 否则,通过catalogManager.catalog(nameParts.head)获取catalog
              Some((catalogManager.catalog(nameParts.head), nameParts.tail.asIdentifier))
            } catch {
              case _: CatalogNotFoundException =>
                Some((currentCatalog, nameParts.asIdentifier))
            }
          }
        }
      } 
    
        def currentCatalog: CatalogPlugin = catalogManager.currentCatalog 
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    最终会调用到CatalogAndIdentifier.unapply,因为我们建表语句中没有加库名限定,所以这里的namePartsSeq(tableName),也就是length等于,所以返回Some((currentCatalog, Identifier.of(catalogManager.currentNamespace, nameParts.head))),值为:Some((HoodieCatalog, Identifier.of(default., tableName)),具体为啥为HoodieCatalog,原因和我们在开头提到的一个配置有关

       def sparkConf(): SparkConf = {
        val sparkConf = new SparkConf()
        if (HoodieSparkUtils.gteqSpark3_2) {
          sparkConf.set("spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
        }
        sparkConf
      }   
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    再看CatalogV2Util.isSessionCatalog

      def isSessionCatalog(catalog: CatalogPlugin): Boolean = {
        catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME)
      }
    
    private[sql] object CatalogManager {
      val SESSION_CATALOG_NAME: String = "spark_catalog"
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    这里的HoodieCatalog.name()是在V2SessionCatalog实现的

    class V2SessionCatalog(catalog: SessionCatalog)
      extends TableCatalog with SupportsNamespaces with SQLConfHelper {
      import V2SessionCatalog._
    
      override val defaultNamespace: Array[String] = Array("default")
    
      override def name: String = CatalogManager.SESSION_CATALOG_NAME
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    所以CatalogV2Util.isSessionCatalog(catalog)为ture,NonSessionCatalogAndTable正好相反,!CatalogV2Util.isSessionCatalog(catalog)返回 false

     object NonSessionCatalogAndTable {
        def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Seq[String])] = nameParts match {
          case NonSessionCatalogAndIdentifier(catalog, ident) =>
            Some(catalog -> ident.asMultipartIdentifier)
          case _ => None
        }
      }
    
        object NonSessionCatalogAndIdentifier {
        def unapply(parts: Seq[String]): Option[(CatalogPlugin, Identifier)] = parts match {
          case CatalogAndIdentifier(catalog, ident) if !CatalogV2Util.isSessionCatalog(catalog) =>
            Some(catalog, ident)
          case _ => None
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    所以最终在ResolveSessionCatalog中匹配成功

    case c @ CreateTableAsSelectStatement(
             SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _, _) =>
          val (storageFormat, provider) = getStorageFormatAndProvider(
            c.provider, c.options, c.location, c.serde, ctas = true)
          if (!isV2Provider(provider)) {
            val tableDesc = buildCatalogTable(tbl.asTableIdentifier, new StructType,
              c.partitioning, c.bucketSpec, c.properties, provider, c.location,
              c.comment, storageFormat, c.external)
            val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
            CreateTable(tableDesc, mode, Some(c.asSelect))
          } else {
            CreateTableAsSelect(
              catalog.asTableCatalog,
              tbl.asIdentifier,
              // convert the bucket spec and add it as a transform
              c.partitioning ++ c.bucketSpec.map(_.asTransform),
              c.asSelect,
              convertTableProperties(c),
              writeOptions = c.writeOptions,
              ignoreIfExists = c.ifNotExists)
          }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    这一块的逻辑是判断是否为V2Provider,如果不是的话返回CreateTable,是的话返回CreateTableAsSelect,关于判断是否为V2Provider的逻辑比较多,这里先不讲,我们放在后面讲,我们这个版本的代码,是V2Provider,所以返回CreateTableAsSelect,这就是和Spark2不同的关键点,如果不是V2Provider,那么和Saprk2一样返回CreateTable 。我们会在下面讲到:CreateTableAsSelect 最终调用HoodieCatalog创建Hudi表,而CreateTable我们在上面讲Spark2时已知最终调用CreateHoodieTableAsSelectCommand

    case class CreateTableAsSelect(
        catalog: TableCatalog,
        tableName: Identifier,
        partitioning: Seq[Transform],
        query: LogicalPlan,
        properties: Map[String, String],
        writeOptions: Map[String, String],
        ignoreIfExists: Boolean) extends UnaryCommand with V2CreateTablePlan {
    
    trait UnaryCommand extends Command with UnaryLike[LogicalPlan]      
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    那么CreateTableAsSelect又是在哪里被转化,最后调用Hudi的逻辑的呢?

    planning

      @transient private[sql] val logicalPlan: LogicalPlan = {
        val plan = queryExecution.commandExecuted
        if (sparkSession.sessionState.conf.getConf(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED)) {
          val dsIds = plan.getTagValue(Dataset.DATASET_ID_TAG).getOrElse(new HashSet[Long])
          dsIds.add(id)
          plan.setTagValue(Dataset.DATASET_ID_TAG, dsIds)
        }
        plan
      }
    
      lazy val commandExecuted: LogicalPlan = mode match {
        case CommandExecutionMode.NON_ROOT => analyzed.mapChildren(eagerlyExecuteCommands)
        case CommandExecutionMode.ALL => eagerlyExecuteCommands(analyzed)
        case CommandExecutionMode.SKIP => analyzed
      }
    
      private def eagerlyExecuteCommands(p: LogicalPlan) = p transformDown {
        case c: Command =>
          val qe = sparkSession.sessionState.executePlan(c, CommandExecutionMode.NON_ROOT)
          val result = SQLExecution.withNewExecutionId(qe, Some(commandExecutionName(c))) {
            qe.executedPlan.executeCollect()
          }
          CommandResult(
            qe.analyzed.output,
            qe.commandExecuted,
            qe.executedPlan,
            result)
        case other => other
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    CreateTableAsSelect也为Command类型,跟上一篇文章讲的Spark2一样,Spark3也在new DataSet时通过变量logicalPlan,触发后面的planning阶段

    DataSourceV2Strategy

    SparkPlannerstrategies里有一个DataSourceV2Strategy

    class SparkPlanner(val session: SparkSession, val experimentalMethods: ExperimentalMethods)
      extends SparkStrategies with SQLConfHelper {
    
      def numPartitions: Int = conf.numShufflePartitions
    
      override def strategies: Seq[Strategy] =
        experimentalMethods.extraStrategies ++
          extraPlanningStrategies ++ (
          LogicalQueryStageStrategy ::
          PythonEvals ::
          new DataSourceV2Strategy(session) ::
          FileSourceStrategy ::
          DataSourceStrategy ::
          SpecialLimits ::
          Aggregation ::
          Window ::
          JoinSelection ::
          InMemoryScans ::
          SparkScripts ::
          BasicOperators :: Nil)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    看一下它的apply方法

      override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
        case PhysicalOperation(project, filters,
        ......
        case WriteToDataSourceV2(relationOpt, writer, query, customMetrics) =>
          val invalidateCacheFunc: () => Unit = () => relationOpt match {
            case Some(r) => session.sharedState.cacheManager.uncacheQuery(session, r, cascade = true)
            case None => ()
          }
          WriteToDataSourceV2Exec(writer, invalidateCacheFunc, planLater(query), customMetrics) :: Nil
    
        case CreateV2Table(catalog, ident, schema, parts, props, ifNotExists) =>
          val propsWithOwner = CatalogV2Util.withDefaultOwnership(props)
          CreateTableExec(catalog, ident, schema, parts, propsWithOwner, ifNotExists) :: Nil
    
        case CreateTableAsSelect(catalog, ident, parts, query, props, options, ifNotExists) =>
          val propsWithOwner = CatalogV2Util.withDefaultOwnership(props)
          val writeOptions = new CaseInsensitiveStringMap(options.asJava)
          catalog match {
            case staging: StagingTableCatalog =>
              AtomicCreateTableAsSelectExec(staging, ident, parts, query, planLater(query),
                propsWithOwner, writeOptions, ifNotExists) :: Nil
            case _ =>
              CreateTableAsSelectExec(catalog, ident, parts, query, planLater(query),
                propsWithOwner, writeOptions, ifNotExists) :: Nil
          }
        ......
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    这里会匹配到CreateTableAsSelect,然后根据catalog的类型选择返回AtomicCreateTableAsSelectExec或者CreateTableAsSelectExec,那么需要看一下catalog是否为StagingTableCatalog

    HoodieCatalog

    class HoodieCatalog extends DelegatingCatalogExtension
      with StagingTableCatalog
      with SparkAdapterSupport
      with ProvidesHoodieConfig {
    
      val spark: SparkSession = SparkSession.active
    
      override def stageCreate(ident: Identifier, schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): StagedTable = {
        if (sparkAdapter.isHoodieTable(properties)) {
          HoodieStagedTable(ident, this, schema, partitions, properties, TableCreationMode.STAGE_CREATE)
        } else {
          BasicStagedTable(
            ident,
            super.createTable(ident, schema, partitions, properties),
            this)
        }
      }
    
      override def stageReplace(ident: Identifier, schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): StagedTable = {
        if (sparkAdapter.isHoodieTable(properties)) {
          HoodieStagedTable(ident, this, schema, partitions, properties, TableCreationMode.STAGE_REPLACE)
        } else {
          super.dropTable(ident)
          BasicStagedTable(
            ident,
            super.createTable(ident, schema, partitions, properties),
            this)
        }
      }
    
      override def stageCreateOrReplace(ident: Identifier,
                                        schema: StructType,
                                        partitions: Array[Transform],
                                        properties: util.Map[String, String]): StagedTable = {
        if (sparkAdapter.isHoodieTable(properties)) {
          HoodieStagedTable(
            ident, this, schema, partitions, properties, TableCreationMode.CREATE_OR_REPLACE)
        } else {
          try super.dropTable(ident) catch {
            case _: NoSuchTableException => // ignore the exception
          }
          BasicStagedTable(
            ident,
            super.createTable(ident, schema, partitions, properties),
            this)
        }
      }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    根据HoodieCatalog的定义我们知道它实现了StagingTableCatalog,所以返回AtomicCreateTableAsSelectExec

    AtomicCreateTableAsSelectExec

    case class AtomicCreateTableAsSelectExec(
        catalog: StagingTableCatalog,
        ident: Identifier,
        partitioning: Seq[Transform],
        plan: LogicalPlan,
        query: SparkPlan,
        properties: Map[String, String],
        writeOptions: CaseInsensitiveStringMap,
        ifNotExists: Boolean) extends TableWriteExecHelper {
    
      override protected def run(): Seq[InternalRow] = {
        if (catalog.tableExists(ident)) {
          if (ifNotExists) {
            return Nil
          }
    
          throw QueryCompilationErrors.tableAlreadyExistsError(ident)
        }
        val schema = CharVarcharUtils.getRawSchema(query.schema).asNullable
        val stagedTable = catalog.stageCreate(
          ident, schema, partitioning.toArray, properties.asJava)
        writeToTable(catalog, stagedTable, writeOptions, ident)
      }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    可以看到AtomicCreateTableAsSelectExec有一个run方法,那么它的run方法是哪里触发的呢?

    V2CommandExec

    其实AtomicCreateTableAsSelectExecV2CommandExec的子类

    case class AtomicCreateTableAsSelectExec(
        catalog: StagingTableCatalog,
        ident: Identifier,
        partitioning: Seq[Transform],
        plan: LogicalPlan,
        query: SparkPlan,
        properties: Map[String, String],
        writeOptions: CaseInsensitiveStringMap,
        ifNotExists: Boolean) extends TableWriteExecHelper {
    
    private[v2] trait TableWriteExecHelper extends V2TableWriteExec with SupportsV1Write {
    
    trait V2TableWriteExec extends V2CommandExec with UnaryExecNode {
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    AtomicCreateTableAsSelectExecexecuteCollect是在V2CommandExec实现的

    abstract class V2CommandExec extends SparkPlan {
    
      /**
       * Abstract method that each concrete command needs to implement to compute the result.
       */
      protected def run(): Seq[InternalRow]
    
      /**
       * The value of this field can be used as the contents of the corresponding RDD generated from
       * the physical plan of this command.
       */
      private lazy val result: Seq[InternalRow] = run()
    
      /**
       * The `execute()` method of all the physical command classes should reference `result`
       * so that the command can be executed eagerly right after the command query is created.
       */
      override def executeCollect(): Array[InternalRow] = result.toArray
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    在上面eagerlyExecuteCommands中会调用executeCollect方法,继而调用AtomicCreateTableAsSelectExecrun方法,那么这个run方法是实现hudi逻辑的地方吗?

      override protected def run(): Seq[InternalRow] = {
        if (catalog.tableExists(ident)) {
          if (ifNotExists) {
            return Nil
          }
    
          throw QueryCompilationErrors.tableAlreadyExistsError(ident)
        }
        val schema = CharVarcharUtils.getRawSchema(query.schema).asNullable
        val stagedTable = catalog.stageCreate(
          ident, schema, partitioning.toArray, properties.asJava)
        writeToTable(catalog, stagedTable, writeOptions, ident)
      }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    我们看到有两个地方和HoodieCatalog有关,一个是catalog.stageCreate,这里判断是否为Hudi表,是的话返回HoodieStagedTable,所以这里的stagedTableHoodieStagedTable

      override def stageCreate(ident: Identifier, schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): StagedTable = {
        if (sparkAdapter.isHoodieTable(properties)) {
          HoodieStagedTable(ident, this, schema, partitions, properties, TableCreationMode.STAGE_CREATE)
        } else {
          BasicStagedTable(
            ident,
            super.createTable(ident, schema, partitions, properties),
            this)
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    另一个是writeToTable,它是在父接口TableWriteExecHelper实现的

    private[v2] trait TableWriteExecHelper extends V2TableWriteExec with SupportsV1Write {
      protected def writeToTable(
          catalog: TableCatalog,
          table: Table,
          writeOptions: CaseInsensitiveStringMap,
          ident: Identifier): Seq[InternalRow] = {
        Utils.tryWithSafeFinallyAndFailureCallbacks({
          table match {
            case table: SupportsWrite =>
              val info = LogicalWriteInfoImpl(
                queryId = UUID.randomUUID().toString,
                query.schema,
                writeOptions)
              val writeBuilder = table.newWriteBuilder(info)
    
              val write = writeBuilder.build()
              val writtenRows = write match {
                case v1: V1Write => writeWithV1(v1.toInsertableRelation)
                case v2 => writeWithV2(v2.toBatch)
              }
    
              table match {
                case st: StagedTable => st.commitStagedChanges()
                case _ =>
              }
              writtenRows
    
            case _ =>
              // Table does not support writes - staged changes are also rolled back below if table
              // is staging.
              throw QueryExecutionErrors.unsupportedTableWritesError(ident)
          }
        })(catchBlock = {
          table match {
            // Failure rolls back the staged writes and metadata changes.
            case st: StagedTable => st.abortStagedChanges()
            case _ => catalog.dropTable(ident)
          }
        })
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    无论是HoodieStagedTable还是BasicStagedTable,都既实现了SupportsWrite,也实现了StagedTable,所以会先匹配上SupportsWrite,然后再匹配上StagedTable,最终调用commitStagedChanges,其他的细节比如writeBuilderwriteWithV1本文先不研究

    case class HoodieStagedTable(ident: Identifier,
                                 catalog: HoodieCatalog,
                                 override val schema: StructType,
                                 partitions: Array[Transform],
                                 override val properties: util.Map[String, String],
                                 mode: TableCreationMode) extends StagedTable with SupportsWrite {
    
      private var sourceQuery: Option[DataFrame] = None
      private var writeOptions: Map[String, String] = Map.empty
    
      override def commitStagedChanges(): Unit = {
        val props = new util.HashMap[String, String]()
        val optionsThroughProperties = properties.asScala.collect {
          case (k, _) if k.startsWith("option.") => k.stripPrefix("option.")
        }.toSet
        val sqlWriteOptions = new util.HashMap[String, String]()
        properties.asScala.foreach { case (k, v) =>
          if (!k.startsWith("option.") && !optionsThroughProperties.contains(k)) {
            props.put(k, v)
          } else if (optionsThroughProperties.contains(k)) {
            sqlWriteOptions.put(k, v)
          }
        }
        if (writeOptions.isEmpty && !sqlWriteOptions.isEmpty) {
          writeOptions = sqlWriteOptions.asScala.toMap
        }
        props.putAll(properties)
        props.put("hoodie.table.name", ident.name())
        props.put(RECORDKEY_FIELD.key, properties.get("primaryKey"))
        catalog.createHoodieTable(ident, schema, partitions, props, writeOptions, sourceQuery, mode)
      }
    
      override def name(): String = ident.name()
    
      override def abortStagedChanges(): Unit = {
        clearTablePath(properties.get("location"), catalog.spark.sparkContext.hadoopConfiguration)
      }
    
      private def clearTablePath(tablePath: String, conf: Configuration): Unit = {
        val path = new Path(tablePath)
        val fs = path.getFileSystem(conf)
        fs.delete(path, true)
      }
    
      override def capabilities(): util.Set[TableCapability] = Set(TableCapability.V1_BATCH_WRITE).asJava
    
      override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
        writeOptions = info.options.asCaseSensitiveMap().asScala.toMap
        new HoodieV1WriteBuilder
      }
    
      /*
       * WriteBuilder for creating a Hoodie table.
       */
      private class HoodieV1WriteBuilder extends WriteBuilder {
        override def build(): V1Write = new V1Write {
          override def toInsertableRelation(): InsertableRelation = {
            new InsertableRelation {
              override def insert(data: DataFrame, overwrite: Boolean): Unit = {
                sourceQuery = Option(data)
              }
            }
          }
        }
      }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68

    commitStagedChanges方法中会调用catalog.createHoodieTable创建Hudi表,实现Hudi自己的逻辑,这样就是为啥在我开头提到的PR中为啥修改CTAS的代码会涉及HoodieCatalog类了

      def createHoodieTable(ident: Identifier,
                            schema: StructType,
                            partitions: Array[Transform],
                            allTableProperties: util.Map[String, String],
                            writeOptions: Map[String, String],
                            sourceQuery: Option[DataFrame],
                            operation: TableCreationMode): Table = {
    
        val (partitionColumns, maybeBucketSpec) = convertTransforms(partitions)
        val newSchema = schema
        val newPartitionColumns = partitionColumns
        val newBucketSpec = maybeBucketSpec
    
        val isByPath = isPathIdentifier(ident)
    
        val location = if (isByPath) Option(ident.name()) else Option(allTableProperties.get("location"))
        val id = ident.asTableIdentifier
    
        val locUriOpt = location.map(CatalogUtils.stringToURI)
        val existingTableOpt = getExistingTableIfExists(id)
        val loc = locUriOpt
          .orElse(existingTableOpt.flatMap(_.storage.locationUri))
          .getOrElse(spark.sessionState.catalog.defaultTablePath(id))
        val storage = DataSource.buildStorageFormatFromOptions(writeOptions.--(needFilterProps))
          .copy(locationUri = Option(loc))
        val tableType =
          if (location.isDefined) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED
        val commentOpt = Option(allTableProperties.get("comment"))
    
        val tablePropertiesNew = new util.HashMap[String, String](allTableProperties)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    isV2Provider

    来看一下isV2Provider方法

      private def isV2Provider(provider: String): Boolean = {
        // Return earlier since `lookupDataSourceV2` may fail to resolve provider "hive" to
        // `HiveFileFormat`, when running tests in sql/core.
        if (DDLUtils.isHiveTable(Some(provider))) return false
        DataSource.lookupDataSourceV2(provider, conf) match {
          // TODO(SPARK-28396): Currently file source v2 can't work with tables.
          case Some(_: FileDataSourceV2) => false
          case Some(_) => true
          case _ => false
        }
      }
    
      def isHiveTable(provider: Option[String]): Boolean = {
        provider.isDefined && provider.get.toLowerCase(Locale.ROOT) == HIVE_PROVIDER
      }
      val HIVE_PROVIDER = "hive"  
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    这里的参数provider为hudi,所以isHiveTable是false,接下来看一下lookupDataSourceV2

      /**
       * Returns an optional [[TableProvider]] instance for the given provider. It returns None if
       * there is no corresponding Data Source V2 implementation, or the provider is configured to
       * fallback to Data Source V1 code path.
       */
      def lookupDataSourceV2(provider: String, conf: SQLConf): Option[TableProvider] = {
        val useV1Sources = conf.getConf(SQLConf.USE_V1_SOURCE_LIST).toLowerCase(Locale.ROOT)
          .split(",").map(_.trim)
        val cls = lookupDataSource(provider, conf)
        cls.newInstance() match {
          case d: DataSourceRegister if useV1Sources.contains(d.shortName()) => None
          case t: TableProvider
              if !useV1Sources.contains(cls.getCanonicalName.toLowerCase(Locale.ROOT)) =>
            Some(t)
          case _ => None
        }
      }
    
        val USE_V1_SOURCE_LIST = buildConf("spark.sql.sources.useV1SourceList")
        .internal()
        .doc("A comma-separated list of data source short names or fully qualified data source " +
          "implementation class names for which Data Source V2 code path is disabled. These data " +
          "sources will fallback to Data Source V1 code path.")
        .version("3.0.0")
        .stringConf
        .createWithDefault("avro,csv,json,kafka,orc,parquet,text")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    这里的useV1Sources默认为"avro,csv,json,kafka,orc,parquet,text",继续看lookupDataSource,其中的provider1 = hudi, provider2 = hudi.DefaultSource,然后加载所有的META-INF/services/org.apache.spark.sql.sources.DataSourceRegister,再返回里面的内容,和Hudi相关的有org.apache.hudi.DefaultSource org.apache.hudi.Spark3DefaultSource org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat 等,然后过滤shortName=hudi的,只有Spark3DefaultSource满足,所以直接返回Spark3DefaultSource

      /** Given a provider name, look up the data source class definition. */
      def lookupDataSource(provider: String, conf: SQLConf): Class[_] = {
        // provider1 = hudi
        val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) match {
          case name if name.equalsIgnoreCase("orc") &&
              conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native" =>
            classOf[OrcDataSourceV2].getCanonicalName
          case name if name.equalsIgnoreCase("orc") &&
              conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "hive" =>
            "org.apache.spark.sql.hive.orc.OrcFileFormat"
          case "com.databricks.spark.avro" if conf.replaceDatabricksSparkAvroEnabled =>
            "org.apache.spark.sql.avro.AvroFileFormat"
          case name => name
        }
        // provider2 = hudi.DefaultSource
        val provider2 = s"$provider1.DefaultSource"
        val loader = Utils.getContextOrSparkClassLoader
        // 这里是去加载所有的META-INF/services/org.apache.spark.sql.sources.DataSourceRegister,然后返回里面的内容
        // 其中有`org.apache.hudi.DefaultSource` `org.apache.hudi.Spark3DefaultSource` `org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat` 等
        val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
    
        try {
          // 过滤shortName=hudi的,只有Spark3DefaultSource满足,所以直接返回Spark3DefaultSource
          serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider1)).toList match {
            // the provider format did not match any given registered aliases
            case Nil =>
              try {
                Try(loader.loadClass(provider1)).orElse(Try(loader.loadClass(provider2))) match {
                  case Success(dataSource) =>
                    // Found the data source using fully qualified path
                    dataSource
                  case Failure(error) =>
                    if (provider1.startsWith("org.apache.spark.sql.hive.orc")) {
                      throw QueryCompilationErrors.orcNotUsedWithHiveEnabledError()
                    } else if (provider1.toLowerCase(Locale.ROOT) == "avro" ||
                      provider1 == "com.databricks.spark.avro" ||
                      provider1 == "org.apache.spark.sql.avro") {
                      throw QueryCompilationErrors.failedToFindAvroDataSourceError(provider1)
                    } else if (provider1.toLowerCase(Locale.ROOT) == "kafka") {
                      throw QueryCompilationErrors.failedToFindKafkaDataSourceError(provider1)
                    } else {
                      throw QueryExecutionErrors.failedToFindDataSourceError(provider1, error)
                    }
                }
              } catch {
                case e: NoClassDefFoundError => // This one won't be caught by Scala NonFatal
                  // NoClassDefFoundError's class name uses "/" rather than "." for packages
                  val className = e.getMessage.replaceAll("/", ".")
                  if (spark2RemovedClasses.contains(className)) {
                    throw QueryExecutionErrors.removedClassInSpark2Error(className, e)
                  } else {
                    throw e
                  }
              }
            case head :: Nil =>
              // there is exactly one registered alias
              head.getClass
            case sources =>
              // There are multiple registered aliases for the input. If there is single datasource
              // that has "org.apache.spark" package in the prefix, we use it considering it is an
              // internal datasource within Spark.
              val sourceNames = sources.map(_.getClass.getName)
              val internalSources = sources.filter(_.getClass.getName.startsWith("org.apache.spark"))
              if (internalSources.size == 1) {
                logWarning(s"Multiple sources found for $provider1 (${sourceNames.mkString(", ")}), " +
                  s"defaulting to the internal datasource (${internalSources.head.getClass.getName}).")
                internalSources.head.getClass
              } else {
                throw QueryCompilationErrors.findMultipleDataSourceError(provider1, sourceNames)
              }
          }
        } catch {
          case e: ServiceConfigurationError if e.getCause.isInstanceOf[NoClassDefFoundError] =>
            // NoClassDefFoundError's class name uses "/" rather than "." for packages
            val className = e.getCause.getMessage.replaceAll("/", ".")
            if (spark2RemovedClasses.contains(className)) {
              throw QueryExecutionErrors.incompatibleDataSourceRegisterError(e)
            } else {
              throw e
            }
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82

    看一下Spark3DefaultSource

    class Spark3DefaultSource extends DefaultSource with DataSourceRegister with TableProvider {
    
      override def shortName(): String = "hudi"
    
      def inferSchema: StructType = new StructType()
    
      override def inferSchema(options: CaseInsensitiveStringMap): StructType = inferSchema
    
      override def getTable(schema: StructType,
                            partitioning: Array[Transform],
                            properties: java.util.Map[String, String]): Table = {
        val options = new CaseInsensitiveStringMap(properties)
        val path = options.get("path")
        if (path == null) throw new HoodieException("'path' cannot be null, missing 'path' from table properties")
    
        HoodieInternalV2Table(SparkSession.active, path)
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    可以看到shortName等于hudi,而其他的 DefaultSource:hudi_v1,HoodieParquetFileFormat:Hoodie-Parquet,另外Spark2对应的为Spark2DefaultSource

    class Spark2DefaultSource extends DefaultSource with DataSourceRegister {
      override def shortName(): String = "hudi"
    }
    
    • 1
    • 2
    • 3

    所以clsSpark3DefaultSource

        cls.newInstance() match {
          case d: DataSourceRegister if useV1Sources.contains(d.shortName()) => None
          case t: TableProvider
              if !useV1Sources.contains(cls.getCanonicalName.toLowerCase(Locale.ROOT)) =>
            Some(t)
          case _ => None
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    Spark3DefaultSource既是DataSourceRegister的子类又是TableProvider的子类,所以会先匹配DataSourceRegister,但是由于useV1Sources不包含hudi,所以继续匹配TableProvider,useV1Sources不包含Spark3DefaultSource,所以返回Some(Spark3DefaultSource)

        DataSource.lookupDataSourceV2(provider, conf) match {
          // TODO(SPARK-28396): Currently file source v2 can't work with tables.
          case Some(_: FileDataSourceV2) => false
          case Some(_) => true
          case _ => false
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    Spark3DefaultSource不是FileDataSourceV2的子类,所以会匹配到case Some(_) => true返回true,所以hudi是V2Provider

    最新代码

    通过上面讲的我们知道了为啥hudi是V2Provider,所以最终CTAS会调用HoodieCatalog.createHoodieTable,而不是调用CreateHoodieTableAsSelectCommand,这就是和Spark2不同的关键点,不过最新代码又改成和Spark2一样调用CreateHoodieTableAsSelectCommand,不走HoodieCatalog的逻辑了,原因是因为把Spark3DefaultSource注释掉了一部分

    class Spark3DefaultSource extends DefaultSource with DataSourceRegister /* with TableProvider */ {
    
      override def shortName(): String = "hudi"
    
      /*
      def inferSchema: StructType = new StructType()
    
      override def inferSchema(options: CaseInsensitiveStringMap): StructType = inferSchema
    
      override def getTable(schema: StructType,
                            partitioning: Array[Transform],
                            properties: java.util.Map[String, String]): Table = {
        val options = new CaseInsensitiveStringMap(properties)
        val path = options.get("path")
        if (path == null) throw new HoodieException("'path' cannot be null, missing 'path' from table properties")
    
        HoodieInternalV2Table(SparkSession.active, path)
      }
      */
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    我们看到Spark3DefaultSource不是TableProvider的子类了,所以最终isV2Provider返回false,后面的逻辑就和Spark2的一样了。

    相关PR:[HUDI-4178] Addressing performance regressions in Spark DataSourceV2 Integration

    我们可以打印一下它的执行计划

    == Parsed Logical Plan ==
    'CreateTableAsSelectStatement [h0], [dt], [hoodie.datasource.write.operation=upsert, primaryKey=id, hoodie.table.name=tableName, hoodie.database.name=databaseName, type=cow, preCombineField=ts], hudi, false, false
    +- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
       +- OneRowRelation
    
    == Analyzed Logical Plan ==
    CreateHoodieTableAsSelectCommand `default`.`h0`, ErrorIfExists
       +- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
          +- OneRowRelation
    
    == Optimized Logical Plan ==
    CommandResult Execute CreateHoodieTableAsSelectCommand
       +- CreateHoodieTableAsSelectCommand `default`.`h0`, ErrorIfExists
             +- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
                +- OneRowRelation
    
    == Physical Plan ==
    CommandResult <empty>
       +- Execute CreateHoodieTableAsSelectCommand
             +- CreateHoodieTableAsSelectCommand `default`.`h0`, ErrorIfExists
                   +- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
                      +- OneRowRelation
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    可以看到确实和Spark2一样也是通过CreateHoodieTableAsSelectCommand来实现Hudi逻辑的

  • 相关阅读:
    python多线程技术(Threading)
    php循环读取txt里面关键词并按页数
    操作系统第三章王道习题_内存管理_总结易错知识点
    【开源电路】STM32F103VCT6开发板
    计算机毕业设计node+vue基于微信小程序的美甲店铺座位预约系统的设计与实现
    Meta分析如何下笔?掌握这些干货就够了
    浅谈 Linux 孤儿进程和僵尸进程
    市场最火的模式原来是它,不少企业家都选择错了,人均消费商模式——让某宝某东等大主流平台的客户,成为你的直接用户
    Win8如何删除临时文件?
    dfs + bfs 几类经典板子
  • 原文地址:https://blog.csdn.net/dkl12/article/details/126095742