• SparkSQL实现原理-DataSet缓存的实现


    SparkSQL实现原理-DataSet缓存的实现

    ​ 类似RDD,在SparkSQL中也可以把Dataset缓存起来,这样就不需要再计算已经缓存的数据集了。不过SparkSQL的缓存机制和RDD的缓存机制不同,本文分析SparkSQL的Dataset缓存的实现原理。

    Dataset缓存的使用

    ​ 通过一个简单的例子来查看Dataset的缓存的效果。

    scala> var data = spark.range(10).cache
    data: org.apache.spark.sql.Dataset[Long] = [id: bigint]
    
    scala> data.explain()
    == Physical Plan ==
    *(1) InMemoryTableScan [id#17L]
       +- InMemoryRelation [id#17L], StorageLevel(disk, memory, deserialized, 1 replicas)
             +- *(1) Range (0, 10, step=1, splits=16)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    ​ 通过查询计划可以看出,当执行cache时,会生成一个InMemoryRelation的执行计划。该逻辑计划会使用CachedRDDBuilder来缓存Dataset的数据。

    另外,从以上的逻辑计划还可以看到:默认的缓存的存储级别是memory_and_disk,复制因子是1,是以非序列化的形式进行缓存。

    缓存的实现接口

    ​ 在SparkSQL中缓存的方式,可以有两种方式来缓存一个Dataset:(1)使用cache函数;(2)使用persist函数。这两个函数的设计也和RDD的这两个函数的设计类似,cache函数无法设置缓存级别(使用默认级别);persist函数可以设置缓存级别。

    (1)cache函数的实现

    ​ cache函数的实现代码如下:

     def cache(): this.type = persist()
    
    • 1

    (2)persist函数的实现

    ​ persist函数的实现代码如下:

    def persist(): this.type = {
        sparkSession.sharedState.cacheManager.cacheQuery(this)
        this
      }
    
    • 1
    • 2
    • 3
    • 4

    ​ 还有一个可以设置存储级别的定义:

    def persist(newLevel: StorageLevel): this.type = {
      sparkSession.sharedState.cacheManager.cacheQuery(this, None, newLevel)
      this
    }
    
    • 1
    • 2
    • 3
    • 4

    ​ 要注意,若persist函数设置了存储级别,则数据也会以这个存储级别进行缓存。

    ​ 从以上实现代码可以看出,Dataset缓存的实现都是通过CacheManager来实现的。下面分析CacheManager的实现原理。

    CacheManager的实现

    ​ CacheManager是基于内存的结构化查询的缓存,该结构化查询是基于逻辑计划来进行的。它通过SparkSession#SharedState来进行跨Session共享。可以在SparkSession中查看该对象的情况:

    scala> spark.sharedState.cacheManager
    res14: org.apache.spark.sql.execution.CacheManager = org.apache.spark.sql.execution.CacheManager@1e40bfbe
    
    • 1
    • 2

    ​ 它通过InMemoryRelation中的字节缓冲区缓存数据,这是自动生成的逻辑计划,会返回与最初缓存的查询相同的结果。最终它使用一个list[CachedData]来管理需要缓存的逻辑计划和缓存的对应关系。

    cacheQuery函数

    ​ 最终Dataset的缓存的实现是在cacheQuery函数中完成的。该函数会缓存由给定Dataset逻辑计划产生的数据,默认的缓存级别是:MEMORY_AND_DISK。该函数的实现代码如下:

    // 缓存由给定Dataset的逻辑计划产生的数据。
    // 与`RDD.cache()`不同,默认存储级别为 `MEMORY_AND_DISK`,因为重新计算数据集内存中的列代价很大。
    def cacheQuery(
        query: Dataset[_],
        tableName: Option[String] = None,
        storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock {  // 添加写锁
      // 获取计算Dataset的逻辑计划
      val planToCache = query.logicalPlan
      // 查看是否数据已经缓存了
      if (lookupCachedData(planToCache).nonEmpty) { // 已经缓存过了
        logWarning("Asked to cache already cached data.")
      } else {
        // 数据没有缓存
        val sparkSession = query.sparkSession
        // 创建一个InMemoryRelation对象。默认ds的缓存是要压缩的
        val inMemoryRelation = InMemoryRelation(
          sparkSession.sessionState.conf.useCompression,
          sparkSession.sessionState.conf.columnBatchSize, storageLevel,
          sparkSession.sessionState.executePlan(planToCache).executedPlan, // 物理计划
          tableName,
          planToCache)
        // 把CachedData对象保存到CacheManager的链表中。
        cachedData.add(CachedData(planToCache, inMemoryRelation))
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    CachedData类

    该类是逻辑计划和InMemoryRelation对象的对应关系。通过该对应关系来判断某个计划的数据是否已经被缓存过了。

    case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation)
    
    • 1
    cachedData变量

    cachedData是一个CachedData的链表,定义如下:

    @transient
    private val cachedData = new java.util.LinkedList[CachedData]
    
    • 1
    • 2

    通过以上代码可以看出:

    (1)我们知道Dataset逻辑计划的执行是在调用action操作后才进行的,所以Dataset的缓存操作是懒加载的。也就是说当执行Dataset的cache或persist函数时,并没有去计算Dataset或缓存数据,而是创建了一个InMemoryRelation对象,它实际上是一个逻辑计划。最后的数据计算是在InMemoryTableScanExec对象中完成的,这是InMemoryRelation的物理计划,通过该物理计划来生成具体的执行代码。

    (2)判断是否某个Dataset是否已经缓存,是根据逻辑计划(也就是查看cachedData中是否存在相同的CachedData对象),而不是根据某个具体的Dataset名称。

    小结

    ​ 本文介绍了SparkSQL中Dataset缓存的具体实现原理。可见,对于Dataset的缓存只是生成了一个逻辑计划,当执行action算子实际计算数据才会计算并缓存数据。

  • 相关阅读:
    零基础学习React(Html)
    条件渲染(v-if、v-show)、列表渲染(v-for)、列表中key的原理和作用、列表过滤(filter)、列表排序(sort)
    阶段六-Day03-MyBatis1
    在家呆了两天,今天聊下分布式压测
    【Flutter】Flutter 数据存储 Hive 的简要使用说明
    Perl文件锁机制:守护你的数据安全
    如何看待为了省小钱而花费时间
    【Java第十八期】:#用Java模拟实现一个单向不带头不循环的链表
    人物重识别(ReID):AaP-ReID: Improved Attention-Aware Person Re-identification
    计算机基础(一):面向CPU编程
  • 原文地址:https://blog.csdn.net/zg_hover/article/details/126654460