• JDBC SQL Server Source Connector: 一览与实践


    file

    在快速发展的数据驱动业务环境中,确保数据在各个系统间高效、准确地同步至关重要。为了进一步的数据处理和分析,经常需要将这些数据同步到其他数据处理系统。Apache SeaTunnel 提供了一个强大而灵活的数据集成框架,使得从 SQL Server 到其他系统的数据同步变得简单且高效。

    本文档将指导您如何配置 Apache SeaTunnel,使用 JDBC SQL Server Source Connector 来实现数据的有效同步。

    JDBC SQL Server Source Connector

    支持 SQL Server 版本

    • 服务器:2008(或更高版本,仅供信息参考)

    支持以下引擎

    Spark
    Flink
    Seatunnel Zeta

    主要特点

    支持查询 SQL 并能够实现投影效果。

    描述

    通过 JDBC 读取外部数据源数据。

    支持的数据源信息

    数据源支持的版本驱动URLMaven
    SQL Server支持版本 >= 2008com.microsoft.sqlserver.jdbc.SQLServerDriverjdbc:sqlserver://localhost:1433下载

    数据库依赖

    请下载与 'Maven' 对应的支持列表,并将其复制到 'SEATNUNNEL_HOME/plugins/jdbc/lib/' 工作目录

<p>例如 SQL Server 数据源:cp mssql-jdbc-xxx.jar SEATNUNNEL_HOME/plugins/jdbc/lib/

    数据类型映射

    SQL Server 数据类型Seatunnel 数据类型
    BITBOOLEAN
    TINYINT
    SMALLINT
    SHORT
    INTEGERINT
    BIGINTLONG
    DECIMAL
    NUMERIC
    MONEY
    SMALLMONEY
    DECIMAL((指定列的指定列大小)+1,
    (获取指定列的小数点右边的数字的数量。)))
    REALFLOAT
    FLOATDOUBLE
    CHAR
    NCHAR
    VARCHAR
    NTEXT
    NVARCHAR
    TEXT
    STRING
    DATELOCAL_DATE
    TIMELOCAL_TIME
    DATETIME
    DATETIME2
    SMALLDATETIME
    DATETIMEOFFSET
    LOCAL_DATE_TIME
    TIMESTAMP
    BINARY
    VARBINARY
    IMAGE
    UNKNOWN
    尚不支持

    源选项

    名称类型必需默认值描述
    url字符串-JDBC 连接的 URL。例如:jdbc:sqlserver://127.0.0.1:1434;database=TestDB
    driver字符串-用于连接到远程数据源的 JDBC 类名,如果使用 SQL Server,则值为 com.microsoft.sqlserver.jdbc.SQLServerDriver
    user字符串-连接实例的用户名
    password字符串-连接实例的密码
    query字符串-查询语句
    connection_check_timeout_sec整数30等待用于验证连接的数据库操作完成的秒数
    partition_column字符串-并行处理的分区列,仅支持数值类型。
    partition_lower_bound长整数-用于扫描的 partition_column 最小值,如果未设置,SeaTunnel 将查询数据库获取最小值。
    partition_upper_bound长整数-用于扫描的 partition_column 最大值,如果未设置,SeaTunnel 将查询数据库获取最大值。
    partition_num整数作业并行度分区计数的数量,仅支持正整数。默认值为作业并行度。
    fetch_size整数0对返回大量对象的查询,您可以配置查询中使用的行抓取大小,以减少满足选择条件所需的数据库命中次数,从而提高性能。
    零表示使用 JDBC 默认值。
    common-options-源插件的常见参数,请参阅 源常用选项 以获取详细信息。

    提示

    如果未设置 partition_column,则将以单一并发运行;如果设置了 partition_column,则将根据任务的并发度进行并行执行。

    任务示例

    简单:

    简单的单一任务以读取数据表

    # 定义运行时环境
    env {
      # 您可以在此处设置 Flink 配置
      execution.parallelism = 1
      job.mode = "BATCH"
    }
    source{
        Jdbc {
            driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
            url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
            user = SA
            password = "Y.sa123456"
            query = "select * from full_types_jdbc"
        }
    }

    transform {
        # 如果您想要获取有关如何配置 seatunnel 和查看变换插件的完整列表的更多信息,
        # 请转到 [seatunnel.apache.org/docs/transform-v2/sql](https://seatunnel.apache.org/docs/transform-v2/sql)
    }

    sink {
        Console {}
    }

    • 1

    并行:

    使用您配置的分片字段和分片数据并行读取您的查询表,如果您希望读取整个表,可以这样做:

    env {
      # 您可以在此处设置 Flink 配置
      execution.parallelism = 10
      job.mode = "BATCH"
    }

    source {
        Jdbc {
            driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
            url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
            user = SA
            password = "Y.sa123456"
            # 根据需要定义查询逻辑
            query = "select * from full_types_jdbc"
            # 并行分片读取字段
            partition_column = "id"
            # 片段数量
            partition_num = 10
        }
    }

    transform {
        # 如果您想要获取有关如何配置 Seatunnel 和查看转换插件的完整列表的更多信息,
        # 请转到 https://seatunnel.apache.org/docs/transform-v2/sql
    }

    sink {
        Console {}
    }
    • 1

    并行:

    使用您配置的分片字段和分片数据并行读取您的查询表,如果您希望读取整个表,可以这样做:

    env {
      # 您可以在此处设置 Flink 配置
      execution.parallelism = 10
      job.mode = "BATCH"
    }

    source {
        Jdbc {
            driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
            url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
            user = SA
            password = "Y.sa123456"
            # 根据需要定义查询逻辑
            query = "select * from full_types_jdbc"
            # 并行分片读取字段
            partition_column = "id"
            # 片段数量
            partition_num = 10
        }
    }

    transform {
        # 如果您想要获取有关如何配置 Seatunnel 和查看转换插件的完整列表的更多信息,
        # 请转到 https://seatunnel.apache.org/docs/transform-v2/sql
    }

    sink {
        Console {}
    }
    • 1

    并行:

    使用您配置的分片字段和分片数据并行读取您的查询表,如果您希望读取整个表,可以这样做:

    env {
      # 您可以在此处设置 Flink 配置
      execution.parallelism = 10
      job.mode = "BATCH"
    }

    source {
        Jdbc {
            driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
            url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
            user = SA
            password = "Y.sa123456"
            # 根据需要定义查询逻辑
            query = "select * from full_types_jdbc"
            # 并行分片读取字段
            partition_column = "id"
            # 片段数量
            partition_num = 10
        }
    }

    transform {
        # 如果您想要获取有关如何配置 Seatunnel 和查看转换插件的完整列表的更多信息,
        # 请转到 https://seatunnel.apache.org/docs/transform-v2/sql
    }

    sink {
        Console {}
    }
    • 1

    并行:

    使用您配置的分片字段和分片数据并行读取您的查询表,如果您希望读取整个表,可以这样做:

    env {
      # 您可以在此处设置 Flink 配置
      execution.parallelism = 10
      job.mode = "BATCH"
    }

    source {
        Jdbc {
            driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
            url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
            user = SA
            password = "Y.sa123456"
            # 根据需要定义查询逻辑
            query = "select * from full_types_jdbc"
            # 并行分片读取字段
            partition_column = "id"
            # 片段数量
            partition_num = 10
        }
    }

    transform {
        # 如果您想要获取有关如何配置 Seatunnel 和查看转换插件的完整列表的更多信息,
        # 请转到 https://seatunnel.apache.org/docs/transform-v2/sql
    }

    sink {
        Console {}
    }
    • 1

    分段并行读取示例:

    这是一个快速并行读取数据的分片示例

    env {
      # 您可以在此处设置引擎配置
      execution.parallelism = 10
    }

    source {
      # 这是一个示例源插件,仅用于测试和展示源插件的功能
      Jdbc {
        driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
        url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
        user = SA
        password = "Y.sa123456"
        query = "select * from column_type_test.dbo.full_types_jdbc"
        # 并行分片读取字段
        partition_column = "id"
        # 片段数量
        partition_num = 10
      }
      # 如果您想要获取有关如何配置 Seatunnel 和查看源插件的完整列表的更多信息,
      # 请转到 https://seatunnel.apache.org/docs/connector-v2/source/Jdbc
    }

    transform {
      # 如果您想要获取有关如何配置 Seatunnel 和查看转换插件的完整列表的更多信息,
      # 请转到 https://seatunnel.apache.org/docs/transform-v2/sql
    }

    sink {
      Console {}
      # 如果您想要获取有关如何配置 Seatunnel 和查看接收插件的完整列表的更多信息,
      # 请转到 https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc
    }
    • 1
    > 本文由 [白鲸开源科技](http://www.whaleops.com) 提供发布支持!
  • 相关阅读:
    使用并查集生成一个迷宫
    uni-app入门:自定义组件实现父子组件参数传递
    Vue 的最大的优势是什么?
    spark-submit源码解析
    Python异步框架大战:FastAPI、Sanic、Tornado VS Go 的 Gin
    图论基础 —— 概述
    卷积神经网络包括卷积和,卷积神经网络中的卷积
    模型运行过程中占内存的中间变量
    极客日报:美团APP被曝连续24小时定位;清华校友吴旻当选IEEE SPS首位华裔女主席;VS Code 1.61发布
    PyQt基础练习1
  • 原文地址:https://blog.csdn.net/weixin_54625990/article/details/134247092