• FlinkSQL-UDF自定义数据源


    昨天遇到了一个比较好玩的需求,要测试Flink-iceberg的版本问题
    同时不能改动线上flink和iceberg的版本

    平台已经提供了,在iceberg专属的FlinkSQL端是可以勾选iceberg版本的
    但是自定义数据源插入iceberg一般用的是jar包,如何不通过jar直接通过SQL生成自定义数据源

    阿没错!那就是我们万能的UDF啦!UDF写一个connector,类似data-gen,想一想都很兴奋有木有,然后我发现,好像是没有这样的接口诶
    但是没有关系!函数可以实现万物,本身计算机大部分都是函数做得,如果你觉得函数做不了,那只是因为自己实现不了这样的函数

    在官方网站中我找到了灵感,一列对多列输出!
    好了就是你了,用datagen控制生成速率,输出一个无效值,然后通过接收无效值来生成我自己的行,好一个偷天换日呀
     

    那么直接用Java来实现flink UDF,接受一行,然后输出自己任意想模拟的数据

    1. package udf2;
    2. import org.apache.flink.table.annotation.DataTypeHint;
    3. import org.apache.flink.table.annotation.FunctionHint;
    4. import org.apache.flink.table.functions.TableFunction;
    5. import org.apache.flink.types.Row;
    6. import java.util.Random;
    7. @FunctionHint(output = @DataTypeHint("ROW"))
    8. public class generateRowUdtf extends TableFunction {
    9. public void eval(String a) {
    10. Random random = new Random();
    11. long id = Math.abs(random.nextLong()) % 20000;
    12. String data1 = Math.abs(random.nextInt()) % 2000000 + "";
    13. int data2 = Math.abs(random.nextInt()) % 2000;
    14. double data4 = Math.abs(Math.random() * 2000 + 1);
    15. collect(Row.of(id,data1,data2,data4));
    16. }
    17. }

    然后UDF上传,写下如下SQL,对这个表进行连接操作,SQL可以在本地进行测试~直接print即可看到效果,非常滴神奇~ 这样子我们的自定义生成器就做好啦(全是api,你做了个锤子)

    1. CREATE TEMPORARY table test_insert(
    2. id2 String
    3. )WITH(
    4. 'connector' = 'datagen',
    5. 'rows-per-second'='100',
    6. 'fields.id2.kind'='random',
    7. 'fields.id2.length'='8'
    8. );
    9. CREATE TEMPORARY SYSTEM FUNCTION generateRowUdtf AS 'udf2.generateRowUdtf';
    10. insert into xxx
    11. SELECT T.id, T.data1, T.data2, T.data4
    12. FROM test_insert AS S
    13. left join lateral table(generateRowUdtf(id2)) AS T(id,data1,data2,data4) on true;

  • 相关阅读:
    前端异常监控平台之Sentry落地
    C Primer Plus(6) 中文版 第10章 数组和指针 10.6 保护数组中的数据
    百亿数据分库分表核心流程详解
    c++ openssl实现https
    h5修改钉钉双标题栏问题
    用Excel制作甘特图跟踪项目进度(附绘制教程)
    【OAuth2】十一、认识SpringAuthorization Server
    弘辽科技:淘宝9月什么时候有活动?99大促有哪些活动?
    智能密集型仓储货架自动化立体库|四向穿梭式货架对于仓库空间面积上有什么要求?
    史上最全的Python包管理工具:Anaconda教程
  • 原文地址:https://blog.csdn.net/zxc132465258/article/details/125912194