• 使用canal订阅mysql的binlog,springboot使用canal订阅mysql的binlog


    写在前面

    本文用到了docker,安装docker请移步:centos7安装docker-简单而详细无坑

    canal开源地址:https://github.com/alibaba/canal

    canal原理:
    canal 模拟 MySQL slave 的交互协议,伪装自己为MySQL slave,向 MySQL master 发送dump 协议
    MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
    canal 解析 binary log 对象(原始为 byte 流)

    一、初始化mysql

    1、安装mysql

    docker安装mysql-简单无坑

    # 拉取mysql最新版本
    docker pull mysql
    # 启动mysql,root/root
    docker run -p 3306:3306 --name mysql \
    -e MYSQL_ROOT_PASSWORD=root \
    -d mysql
    
    # 设置容器自启动:
    sudo docker update 9dff25e9d363 --restart=always
    # 进入容器
    docker exec -it 9dff25e9d363 /bin/bash
    # 安装vi
    apt-get update && apt-get install -y vim
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    2、创建数据库

    在这里创建了一个名为【mytest】的数据库。

    3、修改mysql配置文件

    修改my.cnf(默认在/etc/mysql目录),添加以下两行

    [mysqld]
    server_id=1
    #开启二进制
    log-bin=mysql-bin
    binlog-format=ROW
    #指定库,缩小监控的范围。
    binlog-do-db=mytest
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    改完记得重启mysql。(/mysql/data目录下会出现mysql-bin.000001)

    docker restart mysql
    
    • 1

    4、创建一个同步用户

    在mysql中创建一个专门用来同步的用户

    create user canal@'%' IDENTIFIED by 'canal';
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
    FLUSH PRIVILEGES;
    
    • 1
    • 2
    • 3

    5、查看主库的状态

    在这里插入图片描述

    二、初始化canal

    1、使用docker启动canal

    # 下载canal
    docker pull canal/canal-server:v1.1.5
    
    # 启动canal
    docker run -p 11111:11111 --name canal -d canal/canal-server:v1.1.5
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2、查看mysql地址

    # 这里是172.17.0.2
    docker inspect mysql
    
    • 1
    • 2

    3、修改canal配置文件

    修改canal.properties配置文件(/home/admin/canal-server/conf目录)(不需要改)

    # 默认端口 11111
    # 默认输出model为tcp, mysql就使用tcp
    # tcp, kafka, RocketMQ
    #canal.serverMode = tcp
    
    #################################################
    ######### destinations ############# 
    #################################################
    # canal可以有多个instance,每个实例有独立的配置文件,默认只 有一个example实例。
    # 如果需要处理多个mysql数据的话,可以复制出多个example,对其重新命名,
    # 命令和配置文件中指定的名称一致。然后修改canal.properties 中的 canal.destinations
    # canal.destinations=实例 1,实例 2,实例 3
    #canal.destinations = example
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    修改instance.properties配置文件(/home/admin/canal-server/conf/example目录)(只需要修改mysql地址即可)

    # 不能和mysql master重复
    canal.instance.mysql.slaveId=2
    # 使用mysql的虚拟ip和端口 需要改
    canal.instance.master.address=172.17.0.2:3306
    # 使用已创建的canal用户 默认就有
    canal.instance.dbUsername=canal
    canal.instance.dbPassword=canal
    canal.instance.connectionCharset = UTF-8
    # canal.instance.defaultDatabaseName =test
    
    # 表示匹配所有的库所有的表 默认就是
    canal.instance.filter.regex =.*\\..*
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    4、重启canal

    docker restart canal
    
    • 1

    当canal监听到binlog发生变化,会通知canal客户端。

    三、java使用canal客户端

    1、依赖

    maven:

    <dependency>
        <groupId>com.alibaba.ottergroupId>
        <artifactId>canal.clientartifactId>
        <version>1.1.2version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    gradle:

    implementation 'com.alibaba.otter:canal.client:1.1.2'
    
    • 1

    2、核心代码

    import com.alibaba.fastjson.JSONObject;
    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.protocol.CanalEntry;
    import com.alibaba.otter.canal.protocol.Message;
    import com.google.protobuf.ByteString;
    
    import java.net.InetSocketAddress;
    import java.util.List;
    
    public class CanalClient {
    
        public static void main(String[] args) throws Exception{
    
            //1.获取 canal 连接对象
            CanalConnector canalConnector =
                    CanalConnectors.newSingleConnector(new
                            InetSocketAddress("canal所在服务器IP", 11111), "example", "", "");
    
            System.out.println("canal启动并开始监听数据 ...... ");
            while (true){
                canalConnector.connect();
                //订阅表
                canalConnector.subscribe("shop001.*");
                //获取数据
                Message message = canalConnector.get(100);
                //解析message
                List<CanalEntry.Entry> entries = message.getEntries();
                if(entries.size() <=0){
                    System.out.println("未检测到数据");
                    Thread.sleep(1000);
                }
                for(CanalEntry.Entry entry : entries){
                    //1、获取表名
                    String tableName = entry.getHeader().getTableName();
                    //2、获取类型
                    CanalEntry.EntryType entryType = entry.getEntryType();
                    //3、获取序列化后的数据
                    ByteString storeValue = entry.getStoreValue();
    
                    //判断是否rowdata类型数据
                    if(CanalEntry.EntryType.ROWDATA.equals(entryType)){
                        //对第三步中的数据进行解析
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
                        //获取当前事件的操作类型
                        CanalEntry.EventType eventType = rowChange.getEventType();
                        //获取数据集
                        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
                        //便利数据
                        for(CanalEntry.RowData rowData : rowDatasList){
                            //数据变更之前的内容
                            JSONObject beforeData = new JSONObject();
                            List<CanalEntry.Column> beforeColumnsList = rowData.getAfterColumnsList();
                            for(CanalEntry.Column column : beforeColumnsList){
                                beforeData.put(column.getName(),column.getValue());
                            }
                            //数据变更之后的内容
                            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                            JSONObject afterData = new JSONObject();
                            for(CanalEntry.Column column : afterColumnsList){
                                afterData.put(column.getName(),column.getValue());
                            }
                            System.out.println("Table :" + tableName +
                                    ",eventType :" + eventType +
                                    ",beforeData :" + beforeData +
                                    ",afterData : " + afterData);
                        }
                    }else {
                        System.out.println("当前操作类型为:" + entryType);
                    }
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.protocol.CanalEntry;
    import com.alibaba.otter.canal.protocol.Message;
    import com.google.protobuf.InvalidProtocolBufferException;
    
    import java.net.InetSocketAddress;
    import java.util.List;
    
    public class CanalStarter {
    
    
        public static void main(String[] args) {
            CanalConnector connector = CanalConnectors.newSingleConnector(
                    new InetSocketAddress("192.168.56.10", 11111),
                    "example",
                    "",
                    ""
            );
    
            try {
                // 连接
                connector.connect();
                // 订阅所有数据库
                connector.subscribe(".*\\..*");
                // 定位到上次消费的位置
                connector.rollback();
    
                while (true) {
                    // 我拿到消息之后,不用去返回回馈,拿100条
                    Message msg = connector.getWithoutAck(100);
    
                    // 没拿到消息,重新拿
                    if(msg == null || msg.getId() < 0 || msg.getEntries().size() == 0) {
                        System.out.println("nothing consumed");
                        Thread.sleep(1000);
                        continue;
                    }
    
                    // 解析信息
                    printEntry(msg.getEntries());
                    // ack
                    connector.ack(msg.getId());
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            } finally {
                // 连接关闭
                connector.disconnect();
            }
        }
    
        private static void printEntry(List<CanalEntry.Entry> entries) throws InvalidProtocolBufferException {
            for (CanalEntry.Entry entry : entries) {
                // 事务开启和事务结束不去管
                if(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||
                        entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                    continue;
                }
                System.out.println("*********** message event");
                System.out.println("table name:" + entry.getHeader().getTableName());
                System.out.println("entry type:" + entry.getEntryType());
                // 拿到消息
                CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                    System.out.println("event type : " + rowChange.getEventType());
                    System.out.println("*********** before change");
                    printRowData(rowData.getBeforeColumnsList());
                    System.out.println("*********** after change");
                    printRowData(rowData.getAfterColumnsList());
                }
            }
        }
    
        private static void printRowData(List<CanalEntry.Column> columns) {
            if (columns == null && columns.size() == 0) {
                return;
            }
            for (CanalEntry.Column column : columns) {
                // 真正每一行的内容
                System.out.println(column.getName() + ":" + column.getValue());
                System.out.println(column.getIndex());
            }
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88

    四、springboot使用canal客户端(亲测该方式并不是很友好。。)

    1、引入依赖

    <dependency>
        <groupId>top.javatoolgroupId>
        <artifactId>canal-spring-boot-starterartifactId>
        <version>1.2.1-RELEASEversion>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2、编写配置

    canal:
      destination: example # canal的集群名字,要与安装canal时设置的名称一致
      server: 192.168.56.10:11111 # canal服务地址
    
    • 1
    • 2
    • 3

    3、监听类

    import org.springframework.stereotype.Component;
    import top.javatool.canal.client.annotation.CanalTable;
    import top.javatool.canal.client.handler.EntryHandler;
    
    @CanalTable("tb_item") //监听的表
    @Component
    public class ItemHandler implements EntryHandler<Testb> {
    
    
        @Override
        public void insert(Testb item) {
            System.out.println("insert");
            System.out.println(item);
        }
    
        @Override
        public void update(Testb before, Testb after) {
            System.out.println("update");
            System.out.println(before);
            System.out.println(after);
    
        }
    
        @Override
        public void delete(Testb item) {
            System.out.println("delete");
            System.out.println(item);
    
        }
    }
    
    class Testb{
        private Integer id;
        private String name;
    
        public Integer getId() {
            return id;
        }
    
        public void setId(Integer id) {
            this.id = id;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        @Override
        public String toString() {
            return "Testb{" +
                    "id=" + id +
                    ", name='" + name + '\'' +
                    '}';
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
  • 相关阅读:
    深入探索 Django Rest Framework
    通过训练NLP制作一个自己的简易输入法
    MySQL数据库结合项目实战SQL优化总结
    OneFlow源码解析:Eager模式下Tensor的存储管理
    【python】Django——连接mysql数据库
    1012 The Best Rank
    王争 | 设计模式之美 - 职责链模式
    基于javaweb房屋租赁管理系统的设计与实现
    【教程】 iOS混淆加固原理篇
    【学习日记2023.5.23】 之 Redis入门未入坑
  • 原文地址:https://blog.csdn.net/A_art_xiang/article/details/127322068