• FlinkSql中的聚合查询


            在 SQL 中,一个很常见的功能就是对某一列的多条数据做一个合并统计,得到一个或多个结果值;比如求和、最大最小值、平均值等等,这种操作叫作聚合(Aggregation)查询。Flink 中的 SQL 是流处理与标准 SQL 结合的产物,所以聚合查询也可以分成两种:流处理中特有的聚合(主要指窗口聚合),以及 SQL 原生的聚合查询方式(分组聚合和开窗聚合)。

    分组聚合

            就是通过 GROUP BY 子句来指定分组的键(key),从而对数据按照某个字段做一个分组统计。SQL 中的分组聚合可以对应 DataStream API 中 keyBy 之后的聚合转换,它们都是按照某个 key 对数据进行了划分,各自维护状态来进行聚合统计的。在流处理中,分组聚合同样是一个持续查询,而且是一个更新查询,得到的是一个动态表;每当流中有一个新的数据到来时,都会导致结果表的更新操作。因此,想要将结果表转换成流或输出到外部系统,必须采用撤回流(retract stream)或更新插入流(upsert stream)的编码方式;如果在代码中直接转换成 DataStream 打印输出,需要调用 toChangelogStream()。另外,在持续查询的过程中,由于用于分组的 key 可能会不断增加,因此计算结果所需要维护的状态也会持续增长,为了性能需要设置TTL。

    使用场景:可以按照用户名进行分组,统计每个用户点击 url 的次数
    SELECT user, COUNT(url) as cnt FROM EventTable GROUP BY user

    开窗聚合

            开窗函数的聚合与之前两种聚合有本质的不同:分组聚合、窗口 TVF聚合都是“多对一”的关系,将数据分组之后每组只会得到一个聚合结果;而开窗函数是对每行都要做一次开窗聚合,因此聚合之后表中的行数不会有任何减少,是一个“多对多”的关系Flink SQL 中的开窗函数也是通过 OVER 子句来实现的,这里 OVER 关键字前面是一个聚合函数,它会应用在后面 OVER 定义的窗口上。在 OVER子句中主要有以下几个部分:

    • PARTITION BY(可选):用来指定分区的键(key),类似于 GROUP BY 的分组,这部分是可选的;
    • ORDER BY:明确地指出数据基于那个字段排序(目前只支持按照时间属性的升序排列,所以这里 ORDER BY 后面的字段必须是定义好的时间属性
    • 开窗范围:要扩展多少行来做聚合。这个范围是由 BETWEEN <下界> AND <上界> 来定义的,也就是“从下界到上界”的范围。目前支持的上界只能是 CURRENT ROW,也就是定义一个“从之前某一行到当前行”的范围。

    开窗范围

            开窗选择的范围可以基于时间,也可以基于数据的数量。所以开窗范围还应该在两种模式之间做出选择:范围间隔(RANGE intervals)和行间隔(ROW intervals)

    范围间隔

            范围间隔以 RANGE 为前缀,就是基于 ORDER BY 指定的时间字段去选取一个范围,一般就是当前行时间戳之前的一段时间。例如开窗范围选择当前行之前 1 小时的数据:

    RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW

    行间隔

            行间隔以 ROWS 为前缀,就是直接确定要选多少行,由当前行出发向前选取就可以了。例如开窗范围选择当前行之前的 5 行数据(最终聚合会包括当前行,所以一共 6 条数据):       

    ROWS BETWEEN 5 PRECEDING AND CURRENT ROW

    使用场景:统计每个用户截止当前事件事件一小时内的uv

    1. SELECT user, ts,
    2. COUNT(url) OVER (
    3. PARTITION BY user
    4. ORDER BY ts
    5. RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
    6. ) AS cnt
    7. FROM EventTable

    WINDOW 子句

            在 SQL 中,也可以用 WINDOW 子句来在 SELECT 外部单独定义一个 OVER 窗口,来实现窗口的复用

    1. SELECT user, ts,
    2. COUNT(url) OVER w AS cnt,
    3. MAX(CHAR_LENGTH(url)) OVER w AS max_url
    4. FROM EventTable
    5. WINDOW w AS (
    6. PARTITION BY user
    7. ORDER BY ts
    8. ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)

    窗口聚合

      在流处理中,往往需要将无限数据流划分成有界数据集,这就是所谓的“窗口”在 Flink 的 Table API 和 SQL 中,窗口的计算是通过“窗口聚合”(window aggregation)来实现的。与分组聚合类似,窗口聚合也需要调用 SUM()、MAX()、MIN()、COUNT()一类的聚合函数,通过 GROUP BY 子句来指定分组的字段。只不过窗口聚合时,需要将窗口信息作为分组 key 的一部分定义出来。在1.13 版本开始使用了“窗口表值函数”(Windowing TVF),窗口本身返回的是就是一个表,所以窗口会出现在 FROM里面,GROUP BY 后面的则是窗口新增的字段 window_start 和 window_end。

    窗口的类型可参照博客FlinkSql中的窗口_大大大大肉包的博客-CSDN博客

    使用场景:每小时统计一次当天截至现在的pv(使用了统计周期为 1 天、累积间隔为 1小时的累积窗口)
    1. SELECT
    2. user,
    3. window_end AS endT,
    4. COUNT(url) AS cnt
    5. FROM TABLE(
    6. CUMULATE( TABLE EventTable, // 定义累积窗口
    7. DESCRIPTOR(ts),
    8. INTERVAL '1' HOUR,
    9. INTERVAL '1' DAY))
    10. GROUP BY user, window_start, window_end

    聚合实例

    统计每小时内有最多访问行为的用户,取前两名,相当于是一个每小时活跃用户的查询(窗口topN)

    1. SELECT *
    2. FROM
    3. (
    4. SELECT *,
    5. ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY cnt desc ) AS row_num
    6. FROM
    7. (
    8. --开一个滚动窗口
    9. SELECT window_start, window_end, user, COUNT(url) as cnt
    10. FROM TABLE (TUMBLE( TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOUR ))
    11. GROUP BY window_start, window_end, user
    12. ) a
    13. ) a
    14. WHERE row_num <= 2
    15. ;
  • 相关阅读:
    kafka学习笔记04(小滴课堂)
    【Typescript重点】接口的使用
    代码还原之 函数
    Win11蓝牙无法连接怎么办?可以试试这个方法。
    测试面试总结
    leetcode20-有效的括号
    深入剖析Linux线程特定数据
    ST 2.0 霍尔FOC 的相关难点总结
    HTML 中创建 WebSocket服务与接收webSocket发送内容
    【DL with Pytorch】第 2 章 : 神经网络的构建块
  • 原文地址:https://blog.csdn.net/qq_42456324/article/details/128120846