• FlinkSQL自定义UDAF使用的三种方式


    1.UDAF定义

    Aggregate functions(聚合函数)将多行的标量值映射到新的标量值(多进一出),聚合函数用到了累加器,下图是聚合过程:

    Aggregate functions聚合函数实现的核心步骤如下。

    (1)继承AggregateFunction

    (2)必须覆盖createAccumulator和getValue

    (3)提供accumulate方法

    (4)retract⽅法在OVER windows上才是必须的

    (5)merge有界聚合以及会话窗⼝和滑动窗口聚合都需要(对性能优化也有好处)

    2.数据集格式

    学生学科考试成绩数据集如下所示:

    1,"zhangsan","Chinese",90

    1,"zhangsan","Math",74

    1,"zhangsan","English",100

    2,"lisi","Chinese",86

    2,"lisi","Math",99

    2,"lisi","English",92

    第一列表示学生ID,第二列表示学生姓名,第三列表示学科,第四列表示成绩。

    3.自定义UDAF

    FlinkSQL自定义UDAF函数对学生考试成绩进行聚合操作的具体代码如下所示。

    public class FlinkAggFunction {

    public static void main(String[] args) {

    //1.获取stream的执行环境

    StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();

    senv.setParallelism(1);

    //2.创建表执行环境

    StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv);

    //3.数据源

    Table table = tEnv.fromValues(

    DataTypes.ROW(

    DataTypes.FIELD("id",DataTypes.DECIMAL(10,2)),

    DataTypes.FIELD("name",DataTypes.STRING()),

    DataTypes.FIELD("course",DataTypes.STRING()),

    DataTypes.FIELD("score",DataTypes.DOUBLE())

    ),

    row(1,"zhangsan","Chinese",90),

    row(1,"zhangsan","Math",74),

    row(1,"zhangsan","English",100),

    row(2,"lisi","Chinese",86),

    row(2,"lisi","Math",99),

    row(2,"lisi","English",92)

    ).select($("id"),$("name"),$("course"),$("score"));

    tEnv.createTemporaryView("student",table);

    //4.1调用方式1 table api(未注册函数)

    tEnv.from("student")

    .groupBy($("course"))

    .select($("course"),call(AvgFunction.class,$("score").as("avg_score")))

    .execute().print();

    //4.2调用方式2table api(注册函数)

    tEnv.createTemporarySystemFunction("AvgFunction",AvgFunction.class);

    tEnv.from("student")

    .groupBy($("course"))

    .select($("course"),call("AvgFunction",$("score").as("avg_score")))

    .execute().print();

    //4.3调用方式3 sql(注册函数)

    tEnv.sqlQuery("select course,AvgFunction(score) as avg_score from student group by course")

    .execute().print();

    }

    //可变累加器的数据结构

    @Data

    @NoArgsConstructor

    @AllArgsConstructor

    public static class AvgAccumulator{

    public double sum = 0.0;

    public int count = 0;

    }

    //自定义UDAF

    public static class AvgFunction extends AggregateFunction<Double,AvgAccumulator>{

    //获取累加器的值

    @Override

    public Double getValue(AvgAccumulator avgAccumulator) {

    if(avgAccumulator.count==0){

    return null;

    }else {

    return avgAccumulator.sum/avgAccumulator.count;

    }

    }

    //初始化累加器

    @Override

    public AvgAccumulator createAccumulator() {

    return new AvgAccumulator();

    }

    //迭代累加

    public void accumulate(AvgAccumulator acc,Double score){

    acc.setSum(acc.sum+score);

    acc.setCount(acc.count+1);

    }

    }

    }

    4.运行结果

    FlinkSQL自定义UDAF函数之后,使用注册的AvgFunction函数对学生考试成绩聚合之后的效果如下所示。

  • 相关阅读:
    可视化看板有那么多应用场景,该如何快速搭建?可视化工具该如何选择?
    整个表单设置disable禁用,单独某个功能设置不禁用
    索尼 toio™ 应用创意开发征文 | 如何用Python控制Q宝进行机器人擂台赛
    我的大一.
    RS232电平和TTL电平有什么不同
    【正点原子Linux连载】第二十二章 AP3216C 摘自【正点原子】I.MX6U嵌入式Qt开发指南V1.0.2
    从小码农到大厂Offer收割机
    SpringMVC
    本地数据库IndexedDB - 学员管理系统之列表管理(二)
    深入解析MySQL死锁:原因、检测与解决方案
  • 原文地址:https://blog.csdn.net/dajiangtai007/article/details/125500080