• Apache Paimon 使用 Postgres CDC 获取数据


    a.依赖准备

    flink-connector-postgres-cdc-*.jar
    
    • 1

    b.Synchronizing Tables(同步表)

    在Flink DataStream作业中使用 PostgresSyncTableAction 或直接通过flink run,可以将PostgreSQL中的一个或多个表同步到一个Paimon表中。

    /bin/flink run \
        /path/to/paimon-flink-action-0.7.0-incubating.jar \
        postgres_sync_table
        --warehouse  \
        --database  \
        --table  \
        [--partition_keys ] \
        [--primary_keys ] \
        [--type_mapping ] \
        [--computed_column <'column-name=expr-name(args[, ...])'> [--computed_column ...]] \
        [--metadata_column ] \
        [--postgres_conf  [--postgres_conf  ...]] \
        [--catalog_conf  [--catalog_conf  ...]] \
        [--table_conf  [--table_conf  ...]]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    配置信息如下

    ConfigurationDescription
    –warehouseThe path to Paimon warehouse.
    –databaseThe database name in Paimon catalog.
    –tableThe Paimon table name.
    –partition_keysThe partition keys for Paimon table. If there are multiple partition keys, connect them with comma, for example “dt,hh,mm”.
    –primary_keysThe primary keys for Paimon table. If there are multiple primary keys, connect them with comma, for example “buyer_id,seller_id”.
    –type_mappingIt is used to specify how to map PostgreSQL data type to Paimon type. Supported options:“to-string”: maps all PostgreSQL types to STRING.
    –computed_columnThe definitions of computed columns. The argument field is from PostgreSQL table field name. See here for a complete list of configurations.
    –metadata_column–metadata_column is used to specify which metadata columns to include in the output schema of the connector. Metadata columns provide additional information related to the source data, for example: --metadata_column table_name,database_name,schema_name,op_ts. See its document for a complete list of available metadata.
    –postgres_confThe configuration for Flink CDC Postgres sources. Each configuration should be specified in the format “key=value”. hostname, username, password, database-name, schema-name, table-name and slot.name are required configurations, others are optional. See its document for a complete list of configurations.
    –catalog_confThe configuration for Paimon catalog. Each configuration should be specified in the format “key=value”. See here for a complete list of catalog configurations.
    –table_confThe configuration for Paimon table sink. Each configuration should be specified in the format “key=value”. See here for a complete list of table configurations.

    如果指定的Paimon表不存在,将自动创建该表,表结构将从所有指定的PostgreSQL表中派生出来。

    如果Paimon表已经存在,其表结构将与所有指定PostgreSQL表的结构进行比较。

    示例1:将表同步到一个Paimon表中

    /bin/flink run \
        /path/to/paimon-flink-action-0.7.0-incubating.jar \
        postgres_sync_table \
        --warehouse hdfs:///path/to/warehouse \
        --database test_db \
        --table test_table \
        --partition_keys pt \
        --primary_keys pt,uid \
        --computed_column '_year=year(age)' \
        --postgres_conf hostname=127.0.0.1 \
        --postgres_conf username=root \
        --postgres_conf password=123456 \
        --postgres_conf database-name='source_db' \
        --postgres_conf schema-name='public' \
        --postgres_conf table-name='source_table1|source_table2' \
        --postgres_conf slot.name='paimon_cdc' \
        --catalog_conf metastore=hive \
        --catalog_conf uri=thrift://hive-metastore:9083 \
        --table_conf bucket=4 \
        --table_conf changelog-producer=input \
        --table_conf sink.parallelism=4
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    如示例所示,postgres_conf的表名支持正则表达式,以监控满足正则表达式的多个表。所有表的结构将合并到一个Paimon表结构中。

    示例2:将分片的表同步到一个Paimon表中

    使用正则表达式设置“schema-name”来捕获多个schemas。

    典型场景:表“source_table”被拆分为模式“source_schema1”,“source_schema2”…,然后将所有“source_table”的数据同步到一个Paimon表中。

    /bin/flink run \
        /path/to/paimon-flink-action-0.7.0-incubating.jar \
        postgres_sync_table \
        --warehouse hdfs:///path/to/warehouse \
        --database test_db \
        --table test_table \
        --partition_keys pt \
        --primary_keys pt,uid \
        --computed_column '_year=year(age)' \
        --postgres_conf hostname=127.0.0.1 \
        --postgres_conf username=root \
        --postgres_conf password=123456 \
        --postgres_conf database-name='source_db' \
        --postgres_conf schema-name='source_schema.+' \
        --postgres_conf table-name='source_table' \
        --postgres_conf slot.name='paimon_cdc' \
        --catalog_conf metastore=hive \
        --catalog_conf uri=thrift://hive-metastore:9083 \
        --table_conf bucket=4 \
        --table_conf changelog-producer=input \
        --table_conf sink.parallelism=4
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
  • 相关阅读:
    二叉树的定义、性质及遍历算法
    【Jenkins】win 10 / win 11:Jenkins 服务登录凭据配置
    安全狗安装
    shell 编程简记
    File相关方法2
    系统架构设计师【补充知识】: 应用数学 (核心总结)
    SpringCloud-06-Config
    我对于测试团队建设的意见
    设计模式-原型模式
    现代企业架构框架-数据架构
  • 原文地址:https://blog.csdn.net/m0_50186249/article/details/136730815