• flinkcdc监控sqlserver,数据库的表中该字段是空值,而cdc中该字段的值是'N,不一致


    关注 码龄 粉丝数 原力等级 -- 被采纳 被点赞 采纳率 laijunlin_data 2024-08-06 15:45 采纳率: 58.3% 浏览 10 首页/ 大数据 / flinkcdc监控sqlserver,数据库的表中该字段是空值,而cdc中该字段的值是'N,不一致 flinksqlserverjava 版本 1.flinkcdc com.ververica flink-connector-sqlserver-cdc 3.0.0 sqlserver的版本: 2019遇到的问题 sqlserver查询BOSS_UNDER_PROCESSING的值是空值,但是flinkcdc获得的数据变成 "BOSS_UNDER_PROCESSING": "'N",该字段在sqlserver的设置是:BOSS_UNDER_PROCESSING nchar(1) DEFAULT N'N' NULL, { "before": null, "after": { "ORIGREC": 338, "APPRDATE": null, "APPRDISP": null, "APPROVEDBY": null, "APPRSTS": null, "AUTOAPPRVTEST": null, "AUTORELEASE": null, "BATCHNO": null, "CASENUMBER": null, "CHARGENO": null, "COCNO": null, "COLLDATE": null, "COMMENTS": null, "CREATEDBY": null, "DATEPRODAV": null, "DATERECV": null, "DEPT": "Changzhou", "DESIREDTAT": null, "DISPSTS": null, "DRAWDATE": null, "DRAWNBY": null, "DUEDATE": null, "EXTERNAL_ID": null, "FLDSTS": "Draft", "FLOWNAME": null, "FOLDERFLAG": null, "FOLDERNO": "37B10AAE-F", "FSTEPCODE": null, "GUID": null, "INVENTORYID": null, "INVESTIGATIONCODE": null, "INVOICENUMBER": null, "INVSTIGLAYOUTCODE": null, "LABDUEDATE": null, "LOGDATE": null, "MANAGER": null, "MATCODE": null, "NOOFRETAINS": null, "NOTES": null, "ORIGSTS": "N", "PREPRUNFOLDER": null, "PRODGROUP": null, "PROGRAMCODE": null, "PROJECTNO": null, "RASCLIENTID": null, "RASNO": null, "RASPROJECTNO": null, "RECEIVEDBY": null, "RESAMPNO": null, "RUNFOLDER": null, "SP_CODE": null, "SPECNO": null, "STABNO": null, "STUDYTYPE": null, "SUBMITTER": null, "TAGCOMMENTS": null, "WFSAMPLETYPE": null, "WORKFLOWCODE": null, "BATCHID": null, "SUBMITTINGORG": null, "FOLDERNAME": null, "METADATA_GUID": null, "EMFOLDERNO": null, "BILL_RASCLIENTID": null, "BILL_RASPROJECTNO": null, "DISP_STUDYNO": null, "SAMPLE_POINT_O": null, "ORIGREC_ARC": null, "P_BATCH_O": null, "PRELOG_DT": null, "SCHED_OCCURRENCE_O": null, "F_AREA_NAME": null, "F_PLANT": null, "F_PROCESS_SAMPLE_TYPE": null, "VESSEL_O": null, "F_SPECIAL_INSTRUCTIONS": null, "NEED_PRELIMINARY": "N", "REP_LANG": null, "SHIP_TO_CLIENTID": null, "SUPPLIER_CLIENTID": null, "MFG_CLIENTID": null, "BUYER_CLIENTID": null, "AGENT_CLIENTID": null, "RETURN_ADDRESSNO": null, "PO_NUMBER": null, "ORDER_TEMPLATE_CODE": null, "SERVICE_LEVEL": null, "SAMPLE_PRODUCTION_STAGE": null, "SALES_REP_ID": null, "BOSS_HEADER_ID": null, "BOSS_LINE_ID": null, "BOSS_ORDER_STATUS": null, "BOSS_LINE_STATUS": null, "SOURCE": "STARLIMS", "ORGID": null, "COUNTRY_ORIGIN": null, "COUNTRY_DESTINATION": null, "SHIPTO_ADDRESSNO": null, "BILLTO_ADDRESSNO": null, "BOSS_TRANSACTION_ID": null, "BOSS_TRANSACTION_NO": null, "BOSS_TRANSACTION_DATE": null, "BUYER_ADDRESSNO": null, "BOSS_BATCH_SOURCE": null, "MATTYPE": null, "RET_ADDRESS_OPT": null, "APPLICANT_ADDRESSNO": null, "SUPP_ADDRESSNO": null, "MFG_ADDRESSNO": null, "AGENT_ADDRESSNO": null, "RET_ADDRESS_INSTRC": null, "FOLDERPRICE": null, "PRICELISTID": null, "BILL_PRICELISTID": null, "BUYER_RASPROJECTNO": null, "BUYER_PRICELISTID": null, "AGENT_RASPROJECTNO": null, "AGENT_PRICELISTID": null, "BOSS_ORDER_NUMBER": null, "INDUSTRY_SEGMENT": null, "TEST_DESCRIPTION": null, "TEST_METHOD": null, "METHOD_YEAR": null, "REFERENCE_ID": null, "SAMPLE_DESCRIPTION": null, "ON_HOLD": "N", "SGS_BILLING_NOTES": null, "SALES_AFFILIATE_ID": null, "ASSIGNED_TO": null, "SERVICE_LEVEL_ID": null, "CLIENTNOTES": null, "EXTERNAL_PARENT_ID": null, "EXTERNAL_CREATEDBY": null, "UNCERTAINTY_ID": null, "CREATE_BOSS_ORDER": "Y", "TEMPLATE_ID": null, "STA_LANGID": null, "FIRST_REPORTNO": null, "DUEDATE_MODE": "Forward", "DUEDATE_OVERRIDE": "N", "CONFORMANCE_STATEMENT": null, "SGS_WITNESS": null, "LAB_NOTES": null, "REPORT_NOTES": null, "SPLIT_LANGID": null, "SPLIT_COMMENTS": null, "SPLIT_TYPE": null, "SPLIT_LANGNAME": null, "QRCODE_REPORT": null, "SEND_REPORT_FORMAT": "\u0027PDF", "SELF_REFERENCE": "\u0027N", "COMMITTEDBY": null, "FLD_RPT_STS": null, "NEED_FINAL_REPORT": null, "BOSS_LAST_REQ_STATUS": null, ** "BOSS_UNDER_PROCESSING": "\u0027N",** "OUTSOURCEFROM": null, "LAST_UPDATED": null, "RPT_STS_UPTDATE": null, "SGS_CLIENT_INFO_UPDATED": null, "LAST_ACTIVATION_DATE": null, "BOSS_LAST_REQ_DATE": null, "BOSS_LAST_REQ_ERROR": null, "PRICE_REVIEW_STATUS": "\u0027N/A", "SGS_PO_DATE": null, "SGS_INVOICE_TMP": null, "SGS_PEST_EVAL_STATUS": null, "SGS_CREATED_FROM": null, "RPT_SAMP_TYPE": "\u0027All", "SGSMARTTRFNO": null, "CLIENT_IF_INT": null }, "source": { "version": "1.9.7.Final", "connector": "sqlserver", "name": "sqlserver_transaction_log_source", "ts_ms": 1722929207790, "snapshot": "last", "db": "loya-demo", "sequence": null, "schema": "dbo", "table": "FOLDERS", "change_lsn": null, "commit_lsn": "00000052:000022d0:0002", "event_serial_no": null }, "op": "r", "ts_ms": 1722929208600, "transaction": null } #问题相关的代码 Properties mysqlProperties = new Properties(); mysqlProperties.setProperty("converters", "dateConverters"); mysqlProperties.setProperty("dateConverters.type", "com.sgs.bigdata.utils.MySqlDateTimeConverter"); Map config = new HashMap(); config.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name()); JsonDebeziumDeserializationSchema jdd = new JsonDebeziumDeserializationSchema(false, config); // 数据表 SourceFunction sourceFunction = SqlServerSource.builder() .hostname(hostname) .port(port) .database(database) // monitor sqlserver database .tableList(tableName) // monitor products table .username(username) .password(password) .startupOptions(StartupOptions.initial()) .deserializer(jdd) .debeziumProperties(mysqlProperties) .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource dataStreamSource = env.addSource(sourceFunction); dataStreamSource.print("<<<"); 不知道怎么解决? 展开全部 收起 写回答 好问题 0 提建议 追加酬金 关注问题 微信扫一扫 点击复制链接 分享 邀请回答 编辑 收藏 删除 结题 收藏 举报 追加酬金 (90%的用户在追加酬金后获得了解决方案) 当前问题酬金 ¥ 0 (可追加 ¥500) ¥ 15¥ 20¥ 50¥ 100¥ 200 支付方式 扫码支付 二维码出错 点击刷新 支付金额 15 元 提供问题酬金的用户不参与问题酬金结算和分配 支付即为同意 《付费问题酬金结算规则》 结题 再想想 删除 再等等

    1条回答 默认 最新

    • 一直免费一直爽 2024-08-11 18:42
      关注

      以下回复参考:皆我百晓生券券喵儿等免费微信小程序作答:

      要解决这个问题,首先确保您已经正确配置了Flink Connector SQL Server CDC插件,并且数据库连接信息也正确。然后,您可以尝试以下步骤:

      1. 检查数据一致性:首先确认数据库中的BOSS_UNDER_PROCESSING字段确实为空值,因为CDC会根据这个值更新流中的数据。

      2. 查看字段类型:确保BOSS_UNDER_PROCESSING字段在数据库端确实是nchar(1)类型的。如果不是,则需要调整字段类型以匹配CDC的要求。

      3. 修改CDC配置

        • 在您的Flink项目中,找到与flink-connector-sqlserver-cdc依赖项关联的flink-connector-sqlserver-cdc.properties文件。
        • 确保配置中的columnMapping部分包含BOSS_UNDER_PROCESSING字段以及相应的转换器。例如:
          columnMapping = {BOSS_UNDER_PROCESSING=VARCHAR}
          
        • columnMapping中的字段名称替换为实际的字段名,确保它与数据库中的字段名称相同。
      4. 验证数据一致性

        • 使用print()方法打印数据流中的数据,以检查是否按照预期执行。
        • 检查BOSS_UNDER_PROCESSING字段的值是否已成功从数据库读取并写入流中。
      5. 处理缺失数据

        • 如果BOSS_UNDER_PROCESSING字段在数据库中确实存在但未被读取,可能需要手动填充缺失值。这一步取决于您如何处理缺失值策略。

      通过以上步骤,应该可以解决BOSS_UNDER_PROCESSING字段在CDC中的错误值。如果您遇到任何其他问题,或者有更具体的上下文细节,请提供更多信息以便进行进一步的帮助。

      评论 编辑记录
      编辑
      预览

      报告相同问题?

    • 相关阅读:
      2023年中国研究生数学建模竞赛D题解题思路
      Unity之MVC思想(通过普通方法和使用MVC思想完成同一个小案例:掌握MVC简单框架)
      Redis数据库的部署及常用命令
      亿图脑图MindMaster(Pro)
      Java大量文件处理场景堆外内存溢出(线上JVM排障之八)
      k8s - pod卷的使用 - pod镜像的升级与回滚 - 探针
      SpringMVC--HttpMessageConverter
      项目架构:husky + lint-staged + eslint - git提交前自动检查代码
      Java获取今天、本周、本月、本季度、上月、上一年的时间范围
      IC验证 --- synopsys router介绍
    • 原文地址:https://ask.csdn.net/questions/8135467