• Flink CDC 实现Postgres到MySQL流式加工传输案例


    案例:实现文章的访问量统计

    使用Flink Postgres CDC 进行数据输入,在 Flink SQL CLI 中进行逻辑加工,整个过程使用 SQL ,无需代码。将结果通过JDBC方式输出到MySQL。

    1 数据库表准备

    postgres数据库环境配置参考文章:Flink PostgreSQL CDC配置和常见问题

    -- postgresql
    -- 文章记录表
    CREATE TABLE t_article (
      id SERIAL NOT NULL PRIMARY KEY,
      title VARCHAR(100) NOT NULL,
      contents VARCHAR(500) NOT NULL,
      author VARCHAR(20) NOT NULL,
      create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
    );
    ALTER SEQUENCE public.t_article_id_seq RESTART WITH 1001;
    ALTER TABLE public.t_article REPLICA IDENTITY FULL;
    -- 初始化数据
    INSERT INTO t_article (title,contents,author)
    VALUES ('华尔街日报回怼马斯克','知情人士揭露马斯克和谷歌联合创始人谢尔盖·布林之间长达20年的友谊突...','每日邮报'),
           ('跨省异地就医新规出台','国家医保局、财政部26日发布《关于进一步做好基本医疗保险跨省异地就医直接结算工作的通知》','新华社客户端'),
           ('扎实推动共同富裕','让人民过上好日子,是“国之大者”。深刻践行以人民为中心的发展思想,既要尽力而为也要量力而行,我们在高质量发展中促进共同富裕的脚步更加笃定。','中国经济网');
    
    -- 文章访问记录表       
    CREATE TABLE t_article_visit  (
    	id SERIAL NOT NULL PRIMARY KEY,
      article_id int4 NOT NULL,
      visitor VARCHAR(100) NOT NULL,
      visitor_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
    ); 
    ALTER TABLE public.t_article REPLICA IDENTITY FULL; 
    -- 初始化数据
    INSERT INTO  "t_article_visit"( "article_id", "visitor") VALUES (1001, 'zhangsan');
    INSERT INTO  "t_article_visit"( "article_id", "visitor") VALUES (1001, 'lisi');
    
    -- mysql
    CREATE TABLE f_article_visit_count (
      article_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
      article_title VARCHAR(255)  NOT NULL,
      visitor_count bigint  DEFAULT 0
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    2 安装flink以及依赖jar

    下面以本地模式安装,提前安装好 Java 8 或者 Java 11

    flink版本使用 1.13.6 ,下载后并解压。

    $ tar -xzf flink-1.13.6-bin-scala_2.11.tgz
    $ cd flink-1.13.6
    
    • 1
    • 2

    目录结构:
    在这里插入图片描述

    启动与停止命令

    $ ./bin/start-cluster.sh
    $ ./bin/stop-cluster.sh
    
    • 1
    • 2

    下载cdc依赖jar

    下载地址
    在这里插入图片描述
    本案例使用 postgres作为数据输入,下载 flink-sql-connector-postgres-cdc-2.2.1.jar 即可

    下载JDBC Connectors

    下载地址
    注意:也需要额外下载相关数据库的驱动。

    将下载好的依赖包放置到 /lib 目录下

    • flink-sql-connector-postgres-cdc-2.2.1.jar
    • flink-connector-jdbc_2.11-1.13.6.jar
    • mysql-connector-java-8.0.28.jar

    3.使用flink sql 客户端编写加工逻辑

    启动sql客户端

    ./bin/sql-client.sh 
    
    • 1

    flink sql-client启动界面

    创建Postgres CDC连接和Mysql JDBC连接

    -- 设置检查点
     SET execution.checkpointing.interval = 3s;
     CREATE TABLE t_article (
       	id INT,
    	title STRING,
    	contents STRING,
    	author STRING,
    	create_time TIMESTAMP(0),
       PRIMARY KEY (id) NOT ENFORCED
     ) WITH (
       'connector' = 'postgres-cdc',
       'hostname' = '10.10.1.74',
       'port' = '5432',
       'username' = 'postgres',
       'password' = '123456',
       'database-name' = 'test',
       'schema-name' = 'public',
       'table-name' = 't_article',
       'decoding.plugin.name' = 'pgoutput',
       'debezium.slot.name' = 'flink_t_article'
     );
      CREATE TABLE t_article_visit (
    	id INT,
       	article_id INT,
    	visitor STRING,
    	visitor_time TIMESTAMP(0),
        PRIMARY KEY (id) NOT ENFORCED
     ) WITH (
       'connector' = 'postgres-cdc',
       'hostname' = '10.10.1.74',
       'port' = '5432',
       'username' = 'postgres',
       'password' = '123456',
       'database-name' = 'test',
       'schema-name' = 'public',
       'table-name' = 't_article_visit',
       'decoding.plugin.name' = 'pgoutput',
       'debezium.slot.name' = 'flink_t_article_visit'
     );
     CREATE TABLE f_article_visit_count (
       	article_id INT,
    	article_title STRING,
    	visitor_count  BIGINT,
        PRIMARY KEY (article_id) NOT ENFORCED
     ) WITH ('connector' = 'jdbc',
        'url' = 'jdbc:mysql://10.10.1.72:3306/test',
        'username' = 'root',
        'password' = '123456',
        'table-name' = 'f_article_visit_count'
     );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    在客户端查看创建的逻辑表
    查看flink表

    执行加工任务

    INSERT INTO f_article_visit_count SELECT
    art.id article_id,
    art.title article_title,
    count( artv.id ) visitor_count 
    FROM t_article art
    INNER JOIN t_article_visit artv ON art.id = artv.article_id 
    GROUP BY art.id,art.title;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    控制台查看任务

    flink 控制台页面

    查看数据

    任务发布后,表f_article_visit_count 的初始数据
    初始加工数据
    通过向postgres库中表t_article_visit插入记录,观察mysql库中表f_article_visit_count数据实时变化

    INSERT INTO  "t_article_visit"( "article_id", "visitor") VALUES (1001, 'wangwu');
    INSERT INTO  "t_article_visit"( "article_id", "visitor") VALUES (1002, 'libai');
    
    • 1
    • 2

    在这里插入图片描述

  • 相关阅读:
    Nacos注册中心细节分析
    洛谷--欢乐的跳
    【bug】uniapp的image组件渲染gif图,只有第一次点击的时候有动效,需要每次点击都有gif效果,已解决
    核心内参: TDR原理及常见问题
    Java (JVM) 内存模型
    【Pytorch基础知识】数据的归一化和反归一化
    基于Python的MNIST解析图片文件和标签文件实验报告
    回归预测 | MATLAB实现RUN-XGBoost龙格库塔优化极限梯度提升树多输入回归预测
    【C++进阶之路】C++11(上)
    CC0是什么,为什么它会改变NFT市场?
  • 原文地址:https://blog.csdn.net/xiweiller/article/details/126031013