• 使用TCP方式拉取Canal数据


    1 Canal对接Kafka联调

    1.1 配置修改

    canal.properties

    修改 zk:

    canal.zkServers = 10.51.50.219:2181
    
    • 1
    instance.properties

    开启配置项

    canal.mq.dynamicTopic 是 Canal 的 MQ 动态 Topic 配置项:

    • test_javaedge_01 是kafka 的 topic
    • test_db.users 要监控的数据库、表
    • test_db.users 表发生变化时,Canal 将会把变化的数据推送到名为 test_javaedge_01:test_db.users 的 MQ Topic 中。
    canal.mq.dynamicTopic=test_javaedge_01:test_db\\.users
    
    • 1

    开启一个消费者

    [root@javaedge-kafka-dev bin]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_javaedge_01
    
    • 1

    datagrip 新增数据:

    消费到该数据:

    2 使用TCP方式拉取Canal数据

    现在 serverMode 改回tcp。重启

    javaedge@JavaEdgedeMac-mini deployer % jps
    71002 CanalLauncher
    javaedge@JavaEdgedeMac-mini deployer %
    
    • 1
    • 2
    • 3

    canal 同步程序

    package com.javaedge.canal;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.protocol.CanalEntry;
    import com.alibaba.otter.canal.protocol.Message;
    import com.google.common.base.CaseFormat;
    
    import java.net.InetSocketAddress;
    import java.util.HashMap;
    import java.util.List;
    
    public class CanalClientApp {
        public static void main(String[] args) throws Exception {
    
            CanalConnector connector = CanalConnectors.newSingleConnector(
                    new InetSocketAddress("localhost", 11111),
                    "example",
                    null, null);
    
            while (true) {
                connector.connect();
                connector.subscribe("test_db.users");
                Message message = connector.get(100);
                List<CanalEntry.Entry> entries = message.getEntries();
                if (entries.size()>0) {
                    for (CanalEntry.Entry entry : entries) {
                        String tableName = entry.getHeader().getTableName();
    
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
                        CanalEntry.EventType eventType = rowChange.getEventType();
    
                        if (eventType == CanalEntry.EventType.INSERT) {
                            for (CanalEntry.RowData rowData : rowDatasList) {
                                List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                                HashMap<Object, Object> map = new HashMap<>();
                                for (CanalEntry.Column column : afterColumnsList) {
                                    String key = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, column.getName());
                                    map.put(key, column.getValue());
                                }
                                System.out.println("tableName=" + tableName + "  map=" + JSON.toJSONString(map));
                            }
                        }
                    }
    
                }
    
            }
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    运行程序。操作 user 数据表,新增一行数据:

    程序输出:

    显然,后续不管你想把数据同步到哪儿去,都完全自由!

    数据链路

    MySQL -》canal server(tcp)-》canal client-》kafka。

  • 相关阅读:
    SpringBoot连接MySql主从配置 读写分离
    通过js操作元素样式属性
    快速认识 WebAssembly
    C primer plus学习笔记 —— 3、字符的IO(输入/输出)
    如何管理 X.509 数字证书
    安徽工业大学计算机考研资料汇总
    Qt QObject Cannot create children for a parent that is in a different thread
    Mac硬件设备系统环境的升级/更新 macOS
    AMD GPU 内核驱动分析(三)-dma-fence 同步工作模型
    后悔没有早点遇到你——我亲爱的无货源
  • 原文地址:https://blog.csdn.net/qq_33589510/article/details/132818078