类似RDD,在SparkSQL中也可以把Dataset缓存起来,这样就不需要再计算已经缓存的数据集了。不过SparkSQL的缓存机制和RDD的缓存机制不同,本文分析SparkSQL的Dataset缓存的实现原理。
通过一个简单的例子来查看Dataset的缓存的效果。
scala> var data = spark.range(10).cache
data: org.apache.spark.sql.Dataset[Long] = [id: bigint]
scala> data.explain()
== Physical Plan ==
*(1) InMemoryTableScan [id#17L]
+- InMemoryRelation [id#17L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Range (0, 10, step=1, splits=16)
通过查询计划可以看出,当执行cache时,会生成一个InMemoryRelation的执行计划。该逻辑计划会使用CachedRDDBuilder来缓存Dataset的数据。
另外,从以上的逻辑计划还可以看到:默认的缓存的存储级别是memory_and_disk,复制因子是1,是以非序列化的形式进行缓存。
在SparkSQL中缓存的方式,可以有两种方式来缓存一个Dataset:(1)使用cache函数;(2)使用persist函数。这两个函数的设计也和RDD的这两个函数的设计类似,cache函数无法设置缓存级别(使用默认级别);persist函数可以设置缓存级别。
(1)cache函数的实现
cache函数的实现代码如下:
def cache(): this.type = persist()
(2)persist函数的实现
persist函数的实现代码如下:
def persist(): this.type = {
sparkSession.sharedState.cacheManager.cacheQuery(this)
this
}
还有一个可以设置存储级别的定义:
def persist(newLevel: StorageLevel): this.type = {
sparkSession.sharedState.cacheManager.cacheQuery(this, None, newLevel)
this
}
要注意,若persist函数设置了存储级别,则数据也会以这个存储级别进行缓存。
从以上实现代码可以看出,Dataset缓存的实现都是通过CacheManager来实现的。下面分析CacheManager的实现原理。
CacheManager是基于内存的结构化查询的缓存,该结构化查询是基于逻辑计划来进行的。它通过SparkSession#SharedState来进行跨Session共享。可以在SparkSession中查看该对象的情况:
scala> spark.sharedState.cacheManager
res14: org.apache.spark.sql.execution.CacheManager = org.apache.spark.sql.execution.CacheManager@1e40bfbe
它通过InMemoryRelation中的字节缓冲区缓存数据,这是自动生成的逻辑计划,会返回与最初缓存的查询相同的结果。最终它使用一个list[CachedData]来管理需要缓存的逻辑计划和缓存的对应关系。
最终Dataset的缓存的实现是在cacheQuery函数中完成的。该函数会缓存由给定Dataset逻辑计划产生的数据,默认的缓存级别是:MEMORY_AND_DISK。该函数的实现代码如下:
// 缓存由给定Dataset的逻辑计划产生的数据。
// 与`RDD.cache()`不同,默认存储级别为 `MEMORY_AND_DISK`,因为重新计算数据集内存中的列代价很大。
def cacheQuery(
query: Dataset[_],
tableName: Option[String] = None,
storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock { // 添加写锁
// 获取计算Dataset的逻辑计划
val planToCache = query.logicalPlan
// 查看是否数据已经缓存了
if (lookupCachedData(planToCache).nonEmpty) { // 已经缓存过了
logWarning("Asked to cache already cached data.")
} else {
// 数据没有缓存
val sparkSession = query.sparkSession
// 创建一个InMemoryRelation对象。默认ds的缓存是要压缩的
val inMemoryRelation = InMemoryRelation(
sparkSession.sessionState.conf.useCompression,
sparkSession.sessionState.conf.columnBatchSize, storageLevel,
sparkSession.sessionState.executePlan(planToCache).executedPlan, // 物理计划
tableName,
planToCache)
// 把CachedData对象保存到CacheManager的链表中。
cachedData.add(CachedData(planToCache, inMemoryRelation))
}
}
该类是逻辑计划和InMemoryRelation对象的对应关系。通过该对应关系来判断某个计划的数据是否已经被缓存过了。
case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation)
cachedData是一个CachedData的链表,定义如下:
@transient
private val cachedData = new java.util.LinkedList[CachedData]
通过以上代码可以看出:
(1)我们知道Dataset逻辑计划的执行是在调用action操作后才进行的,所以Dataset的缓存操作是懒加载的。也就是说当执行Dataset的cache或persist函数时,并没有去计算Dataset或缓存数据,而是创建了一个InMemoryRelation对象,它实际上是一个逻辑计划。最后的数据计算是在InMemoryTableScanExec对象中完成的,这是InMemoryRelation的物理计划,通过该物理计划来生成具体的执行代码。
(2)判断是否某个Dataset是否已经缓存,是根据逻辑计划(也就是查看cachedData中是否存在相同的CachedData对象),而不是根据某个具体的Dataset名称。
本文介绍了SparkSQL中Dataset缓存的具体实现原理。可见,对于Dataset的缓存只是生成了一个逻辑计划,当执行action算子实际计算数据才会计算并缓存数据。