• flink sql实战案例


    目录

    一、背景

    二、流程

    三、案例

    1.flink sql读取 Kafka 并写入 MySQL

    source

    sink

    insert

    2.flinksql读kafka写入kudu

    source

    sink

    insert

    四、注意点

    1.断点续传

    2.实时采集

    3.回溯问题


    一、背景

    使用flink sql实时同步一下数据

    二、流程

    总的来说就三步

    source-->>sink->>insert

    三、案例

    1.flink sql读取 Kafka 并写入 MySQL

    source

    1. CREATE TABLE source_table (
    2. user_id VARCHAR,
    3. item_id VARCHAR,
    4. category_id VARCHAR,
    5. behavior VARCHAR,
    6. ts TIMESTAMP
    7. ) WITH (
    8. 'connector.type' = 'kafka', -- 使用 kafka connector
    9. 'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本
    10. 'connector.topic' = 'user_behavior', -- kafka topic
    11. 'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取
    12. 'connector.properties.0.key' = 'zookeeper.connect', -- 连接信息
    13. 'connector.properties.0.value' = 'localhost:2181',
    14. 'connector.properties.1.key' = 'bootstrap.servers',
    15. 'connector.properties.1.value' = 'localhost:9092',
    16. 'update-mode' = 'append',
    17. 'format.type' = 'json', -- 数据源格式为 json
    18. 'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则
    19. )

    sink

    1. CREATE TABLE sink_table (
    2. dt VARCHAR,
    3. pv BIGINT,
    4. uv BIGINT
    5. ) WITH (
    6. 'connector.type' = 'jdbc', -- 使用 jdbc connector
    7. 'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', -- jdbc url
    8. 'connector.table' = 'pvuv_sink', -- 表名
    9. 'connector.username' = 'username', -- 用户名
    10. 'connector.password' = 'password', -- 密码
    11. 'connector.write.flush.max-rows' = '1' -- 默认5000条,为了演示改为1条
    12. )

    insert

    1. INSERT INTO sink_table
    2. SELECT
    3. DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') as dt,
    4. COUNT(*) as pv,
    5. COUNT(DISTINCT user_id) as uv
    6. FROM source_table
    7. GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00')

    2.flinksql读kafka写入kudu

    source

    1. -- kafka source
    2. drop table if exists source_table;
    3. CREATE TABLE source_table (
    4. user_id VARCHAR
    5. ,item_id VARCHAR
    6. ,category_id VARCHAR
    7. ,behavior INT
    8. ,ts TIMESTAMP(3)
    9. ,process_time as proctime()
    10. , WATERMARK FOR ts AS ts
    11. ) WITH (
    12. 'connector' = 'kafka'
    13. ,'topic' = 'user_behavior'
    14. ,'properties.bootstrap.servers' = 'venn:9092'
    15. ,'properties.group.id' = 'source_table'
    16. ,'scan.startup.mode' = 'group-offsets'
    17. ,'format' = 'json'
    18. );

    sink

    1. -- kafka sink
    2. drop table if exists sink_table;
    3. CREATE TABLE sink_table (
    4. user_id STRING
    5. ,item_id STRING
    6. ,category_id STRING
    7. ,ts TIMESTAMP(3)
    8. ) WITH (
    9. 'connector.type' = 'kudu'
    10. ,'kudu.masters' = 'venn:7051,venn:7151,venn:7251'
    11. ,'kudu.table' = 'source_table'
    12. ,'kudu.hash-columns' = 'user_id'
    13. ,'kudu.primary-key-columns' = 'user_id'
    14. ,'kudu.max-buffer-size' = '5000'
    15. ,'kudu.flush-interval' = '1000'
    16. );

    insert

    1. -- insert
    2. insert into sink_table
    3. select user_id, item_id, category_id,ts
    4. from source_table;

    四、注意点

    1.断点续传

    断点续传是指数据同步任务在运行过程中因各种原因导致任务失败,不需要重头同步数据,只需要从上次失败的位置继续同步即可,类似于下载文件时因网络原因失败,不需要重新下载文件,只需要继续下载就行,可以大大节省时间和计算资源。

    默认关闭,开启的话调参 isRestore:true

     

    2.实时采集

    根据数据源的数据是否实时变化可以把数据同步分为离线数据同步和实时数据同步,上面介绍的断点续传就是离线数据同步里的功能,实时采集其实就是实时数据同步,当数据源里的数据发生了增删改操作,同步任务监听到这些变化,将变化的数据实时同步到目标数据源。除了数据实时变化外,实时采集和离线数据同步的另一个区别是:实时采集任务是不会停止的,任务会一直监听数据源是否有变化。

    3.回溯问题

    例如mysql是事务型数据库,会update,最新的消息发过去,得回撤更新前的消息,update-和update+两条消息,数据都在state里。

    简单举个例子,统计男女数量,一开始mysql里是男,然后mysql更新为女了,这时候你接收的kafka,消息都会过来,state里一开始存着男,然后把男回撤,女进来,就要删除男新增女,state一般在rocksdb里,可以设置table.exec.state.ttl 窗口时间。

    1. 相关参数
    2. val tEnv: TableEnvironment = ...
    3. val configuration = tEnv.getConfig().getConfiguration()
    4. configuration.setString("table.exec.mini-batch.enabled", "true") // 启用
    5. configuration.setString("table.exec.mini-batch.allow-latency", "5 s") // 缓存超时时长
    6. configuration.setString("table.exec.mini-batch.size", "5000") // 缓存大小

     

     

    ps:因为本人这方面不是很专业,还在学习的阶段,有问题的话大家可以多多指教哈~

  • 相关阅读:
    初识Cpp之 四、数据类型
    【VS2017】MIDL : CreateFile() error : 2
    【python基础3】
    到github上去学别人怎么写代码
    真实sql注入以及小xss--BurpSuite联动sqlmap篇
    ubuntu18.04安装gtsam
    林沛满-TCP之在途字节数
    async/await 贴脸输出,这次你总该明白了
    【Rust日报】2022-05-28 Neon:AWS Aurora Postgres 的无服务器开源替代方案。
    电容笔做的比较好的品牌有哪些?高性价比电容笔测评
  • 原文地址:https://blog.csdn.net/chimchim66/article/details/126879494