CarbonData是一种新型的Apache Hadoop本地文件格式,使用先进的列式存储、索引、压缩和编码技术,以提高计算效率,有助于加速超过PB数量级的数据查询,可用于更快的交互查询。同时,CarbonData也是一种将数据源与Spark集成的高性能分析引擎。

使用CarbonData的目的是对大数据即席查询提供超快速响应。从根本上说,CarbonData是一个OLAP引擎,采用类似于RDBMS中的表来存储数据。用户可将大量(10TB以上)的数据导入以CarbonData格式创建的表中,CarbonData将以压缩的多维索引列格式自动组织和存储数据。数据被加载到CarbonData后,就可以执行即席查询,CarbonData将对数据查询提供秒级响应。
CarbonData将数据源集成到Spark生态系统,用户可使用Spark SQL执行数据查询和分析。也可以使用Spark提供的第三方工具JDBCServer连接到Spark SQL。
CarbonData作为Spark内部数据源运行,不需要额外启动集群节点中的其他进程,CarbonData Engine在Spark Executor进程之中运行。CarbonData结构如下图所示:

存储在CarbonData Table中的数据被分成若干个CarbonData数据文件,每一次数据查询时,CarbonData Engine模块负责执行数据集的读取、过滤等实际任务。CarbonData Engine作为Spark Executor进程的一部分运行,负责处理数据文件块的一个子集。
Table数据集数据存储在HDFS中。同一Spark集群内的节点可以作为HDFS的数据节点。
为了解决日益增长的数据量给driver带来的压力与出现的各种问题,现引入单独的索引缓存服务器,将索引从Carbon查询的Spark应用侧剥离。所有的索引内容全部由索引缓存服务器管理,Spark应用通过RPC方式获取需要的索引数据。这样,释放了大量的业务侧的内存,使得业务不会受集群规模影响而性能或者功能出现问题。
出于管理和信息收集的需要,企业内部会存储海量数据,包括数目众多的各种数据库、数据仓库等,此时会面临以下困境:数据源种类繁多,数据集结构化混合,相关数据存放分散等,这就导致了跨源复杂查询因传输效率低,耗时长。
当前开源Spark在跨源查询时,只能对简单的filter进行下推,因此造成大量不必要的数据传输,影响SQL引擎性能。针对下推能力进行增强,当前对aggregate、复杂projection、复杂predicate均可以下推到数据源,尽量减少不必要数据的传输,提升查询性能。
目前仅支持JDBC数据源的查询下推,支持的下推模块有aggregate、projection、predicate、aggregate over inner join、aggregate over union all等。为应对不同应用场景的特殊需求,对所有下推模块设计开关功能,用户可以自行配置是否应用上述查询下推的增强。
| 模块 | 增强前 | 增强后 |
|---|---|---|
| aggregate | 不支持aggregate下推 | 1. 支持的聚合函数为:sum, avg, max, min, count 例如:select count(*) from table。 2. 支持聚合函数内部表达式 例如:select sum(a+b) from table 3. 支持聚合函数运算, 例如: select avg(a) + max(b) from table 4. 支持having下推 例如: select sum(a) from table where a>0 group by b having sum(a)>10 5. 支持部分函数下推 支持对abs()、month()、length()等数学、时间、字符串函数进行下推。并且,除了以上内置函数,用户还可以通过SET命令新增数据源支持的函数。 例如: select sum(abs(a)) from table 6. 支持aggregate之后的limit、order by下推(由于Oracle不支持limit,所以Oracle中limit、order by不会下推) 例如: select sum(a) from table where a>0 group by b order by sum(a) limit 5 |
| projection | 仅支持简单projection下推,例如:select a, b from table | 1. 支持复杂表达式下推。 例如:select (a+b)*c from table 2. 支持部分函数下推,详细参见表下方的说明。 例如:select length(a)+abs(b) from table 3. 支持projection之后的limit、order by下推。例如:select a, b+c from table order by a limit 3 |
| predicate | 仅支持运算符左边为列名右边为值的简单filter,例如 select * from table where a>0 or b in (“aaa”, “bbb”) | 1. 支持复杂表达数下推 例如:select * from table where a+b>c*d or a/c in (1, 2, 3) 2. 支持部分函数下推,详细参见表下方的说明。 例如:select * from table where length(a)>5 |
| aggregate over inner join | 需要将两个表中相关的数据全部加载到Spark,先进行join操作,再进行aggregate操作 | 支持以下几种: 1. 支持的聚合函数为:sum, avg, max, min,count 2.所有aggregate只能来自同一个表,group by可以来自一个表或者两个表,只支持inner join。 不支持的情形有: 1.不支持aggregate同时来自join左表和右表的下推。2.不支持aggregate内包含运算,如:sum(a+b)。3.不支持aggregate运算,如:sum(a)+min(b)。 |
| aggregate over union all | 需要将两个表中相关的数据全部加载到Spark,先进行union操作,再进行aggregate操作 | 支持情况: 支持的聚合函数为:sum, avg, max, min,count 不支持的情况: 1. 不支持aggregate内包含运算,如:sum(a+b)。 2.不支持aggregate运算,如:sum(a)+min(b)。 |