• Flink-JDBC SQL Connector报错: java.lang.Integer cannot be cast to java.lang.Long


    序号作者版本时间备注
    1HamaWhite1.0.02022-11-22增加文档

    一、基础信息

    1.1 组件版本

    • Flink:    1.13.0
    • JDK:  1.8
    • Mysql:  8.0.15
    • com.ververica:flink-connector-mysql-cdc:2.0.0
    • org.apache.flink:flink-connector-jdbc_2.11:1.13.0

    1.2 建表语句

    1.2.1 Mysql中的建表语句

    1. -- 新建表demo
    2. CREATE TABLE demo (
    3. sid int(6) ,
    4. name varchar(255)
    5. );
    6. -- 插入两条测试数据
    7. insert into demo values(1,'hamawhite'),(2,'song.bs');

    1.2.2 FlinkSQL中的建表语句

    Mysql中字段sid定义的是int(6)类型,在FlinkSQL中定义的sid字段是BIGINT类型。

    1. -- 新建JDBC SQL表, 字段sid定义为BIGINT类型
    2. DROP TABLE IF EXISTS flink_demo_jdbc_bigint;
    3. CREATE TABLE flink_demo_jdbc_bigint (
    4. sid BIGINT,
    5. name STRING
    6. )WITH
    7. (
    8. 'connector'='jdbc',
    9. 'url'='jdbc:mysql://192.168.90.150:3306/demo',
    10. 'table-name'='demo',
    11. 'username'='root',
    12. 'password'='xxx'
    13. );

    二、测试操作

    运行SQL:

    select * from flink_demo_jdbc_bigint limit 10

    会遇到如下错误: 

    1. java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long
    2. at org.apache.flink.table.data.GenericRowData.getLong(GenericRowData.java:154)
    3. at org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$7(RowData.java:249)
    4. at org.apache.flink.table.data.RowData.lambda$createFieldGetter$25774257$1(RowData.java:296)
    5. at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170)
    6. at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131)
    7. at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
    8. at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
    9. at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
    10. at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
    11. at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
    12. at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
    13. at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317)
    14. at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411)
    15. at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:92)
    16. at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
    17. at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
    18. at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)

    flink_demo_jdbc_bigint表中只有sid字段是BIGINT类型,因此上面错误是针对sid字段的。

    三、原因分析

    3.1 分析GenericRowData类

     查看Flink源码GenericRowData类的getLong(int pos) 如下:

    根据错误提示是因为Integer转Long出的错,但是看上述代码this.fields[pos]应该返回的是Integer类型,用基本数据类型long强转应该是可以的。

    3.2 测试Java基本类型转换

    3.2.1 测试代码1

    例如下面的测试代码,能正常运行得出结果:

     3.2.2 测试代码2

    但是flink中的(long) this.fields[pos]却出错,继续下面的测试代码:

    至此,复现错误。

    3.2.3 代码分析

    对比上面两小节的代码,差异点如下:

    1. 小节3.2.1的代码输入变量input在编译时,已经知道是Integer类型。

    2. 小节3.2.2的代码输入变量input是根据key从JSONObject获取的value,在编译阶段其实不知道value的类型。

    经过上述分析,开始怀疑编译后的源码有差异

    Test1.java编译后的代码如下:

     Test2.java编译后的代码如下:

    可以看到Test2.java中把 long result = (long) input 经过编译后是long result = (Long)input,编译的时候把(long)改为(Long)。而Test1.class中还是跟源码一样,仍然是 long result = (long)input。

    3.3 查看Flink编译后的代码

    查看Flink中GenericRowData类的getLong(int pos) 编译后的代码。

     跟3.2节的测试结果一样,在编译阶段也是把(long)编译为(Long),这样读出来Integer是没法转换为Long的,两个都继承自Number,但两者间是没有关系的。

    四、修改方法

    4.1 修改业务代码

    由于在Mysql中sid字段是int(6)类型的,那么在定义Flink表的时候就用Integer,如下SQL:

    1. -- 新建JDBC SQL表, 字段sid定义为INT类型
    2. DROP TABLE IF EXISTS flink_demo_jdbc_int;
    3. CREATE TABLE flink_demo_jdbc_int (
    4. sid INT,
    5. name STRING
    6. )WITH
    7. (
    8. 'connector'='jdbc',
    9. 'url'='jdbc:mysql://192.168.90.150:3306/demo',
    10. 'table-name'='demo',
    11. 'username'='root',
    12. 'password'='xxx'
    13. );
    14. -- 测试SQL
    15. select * from flink_demo_jdbc_int limit 10

    上面代码是能正确运行的。

    4.2 修改Flink源码中的GenericRowData类

    推荐修改Flink的GenericRowData类,把(long)改为(Long)避免给人造成迷惑。修改后的代码如下:

    1. @Override
    2. public long getLong(int pos) {
    3. return (Long) this.fields[pos];
    4. }

    五、Mysql-CDC vs JDBC Connector

    5.1 测试SQL 

    上面测试的是jdbc connector,再测试下 mysql-cdc connector。测试SQL:

    1. -- 新建Mysql CDC表, 字段sid定义为INT类型
    2. DROP TABLE IF EXISTS flink_demo_cdc_int;
    3. CREATE TABLE flink_demo_cdc_int (
    4. sid INT,
    5. name STRING
    6. )WITH
    7. (
    8. 'connector'='mysql-cdc',
    9. 'hostname'='192.168.90.150',
    10. 'port'='3306',
    11. 'username'='root',
    12. 'password'='xxx',
    13. 'database-name'='demo',
    14. 'table-name'='demo',
    15. 'scan.startup.mode'='initial',
    16. 'scan.incremental.snapshot.enabled'='false'
    17. );
    18. -- 新建Mysql CDC表, 字段sid定义为BIGINT类型
    19. DROP TABLE IF EXISTS flink_demo_cdc_bigint;
    20. CREATE TABLE flink_demo_cdc_bigint (
    21. sid BIGINT,
    22. name STRING
    23. )WITH
    24. (
    25. 'connector'='mysql-cdc',
    26. 'hostname'='192.168.90.150',
    27. 'port'='3306',
    28. 'username'='root',
    29. 'password'='xxx',
    30. 'database-name'='demo',
    31. 'table-name'='demo',
    32. 'scan.startup.mode'='initial',
    33. 'scan.incremental.snapshot.enabled'='false'
    34. );
    35. -- 测试SQL
    36. select * from flink_demo_cdc_int limit 10;
    37. select * from flink_demo_cdc_bigint limit 10;

    上面不管sid是定义的INT,还是BIGINT类型,都是能正常读取数据的。

    因此,jdbc connector和mysql-cdc connector是有差异的。

    5.2 分析结果

    在的Flink的GenericRowData.java的void setField(int pos, Object value) 方法中增加断点,开始调试。

    5.2.1 JDBC Connector的结果

    在org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter的RowData toInternal(ResultSet resultSet)的方法中只是把ResultSet中的类型setField进去,则sid塞进去的还是Integer类型。

    那读取的时候执行的是(Long) this.fields[pos],就会遇到Integer转Long的错误。

    5.2.2 Mysql CDC Connector的结果

    在com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema的RowData createRowConverter(RowType rowType)的方法中会把读出来的Integer类型根据FlinkSQL中定义的类型进行转换。

    针对sid INT转换后还是Integer类型,sid BIGINT读取出来的则是BIGINT类型。

    可以,由于mysql-cdc connector在读取数据塞进GenericRowData已经按照定义进行了类型转换,那么读取的时候则不会出现问题的。即针对mysql-cdc connector,mysql表中定义的是int(6)类型,在Flink SQL中不管定义的是INT还是BIGINT类型,则都能是正常读取数据的。

  • 相关阅读:
    Linux:程序地址空间/虚拟地址等相关概念理解
    强化学习和近似动态规划的区别与联系是什么,他们俩是一回事吗
    Python BeautifulSoup 库使用教程
    wireshark不同颜色报文含义(报文颜色)
    麒麟KYLINOS通过命令行配置kysec的防火墙
    习题1.24
    {大厂漏洞 } OA产品存在SQL注入
    FastAdmin 列表多选后批量操作数据
    Dubbo链路追踪——生成全局ID(traceId)
    R语言使用attributes函数移除变量的所有(全部)旧有属性信息、使用attr函数移除变量中单个旧有属性信息
  • 原文地址:https://blog.csdn.net/xin_jmail/article/details/127977073