• (高阶)Redis 7 第13讲 数据双写一致性 canal篇


     面试题

    问题答案
    如何保证mysql改动后,立即同步到Rediscanal

     

    简介

    https://github.com/alibaba/canal/wikiicon-default.png?t=N7T8https://github.com/alibaba/canal/wiki

     基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费 

     业务

    1. 数据库镜像
    2. 数据库实时备份
    3. 多级索引 (卖家和买家各自分库索引)
    4. search build
    5. 业务cache刷新
    6. 价格变化等重要业务消息

    下载

    官网https://github.com/alibaba/canal/releases/tag/canal-1.1.6icon-default.png?t=N7T8https://github.com/alibaba/canal/releases/tag/canal-1.1.6
    百度网盘链接:https://pan.baidu.com/s/1Hs7JieAZA_q4lmvIdJZgFw?pwd=aqi2 
    提取码:aqi2 

     Mysql 主从复制原理

    Canal原理

    1. canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
    2. MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
    3. canal 解析 binary log 对象(原始为 byte 流)

     案例

    mysql 环境 

    1. # 查看mysql 版本
    2. SELECT VERSION();
    3. # 查看当前主机的二进制日志
    4. SHOW MASTER status;
    5. # 查看binlog 开启状态
    6. SHOW VARIABLES LIKE 'log_bin'

     

     my.ini 配置

    1. # 在mysqld中加入一下内容
    2. [mysqld]
    3. log-bin=mysql-bin #开启 binlog
    4. binlog-format=ROW #选择 ROW 模式
    5. server_id=1 #配置MySQL replaction需要定义,不要和canal的 slaveId重复

     

     

     重启Mysql

     

    创建canal用户并授权

    1. DROP USER IF EXISTS 'canal'@'%';
    2. CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
    3. GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    4. FLUSH PRIVILEGES;
    5. SELECT * FROM mysql.`user`

    canal安装配置

    上传安装包

    解压安装包

    tar -zxvf canal.deployer-1.1.6.tar.gz
    

     配置文件地址

    修改配置

    启动canal

     

    启动成功

     如果出现如下错误

    Caused by: java.io.IOException: caching_sha2_password Auth failed
    com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.negotiate(MysqlConnector.java:260)

     

    1. # 修改加密方式
    2. select host,user,plugin from mysql.user ;
    3. ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';

    创建测试表

    1. CREATE TABLE `t_user` (
    2. `id` int NOT NULL AUTO_INCREMENT,
    3. `userName` varchar(255) NOT NULL,
    4. PRIMARY KEY (`id`)
    5. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

     创建Redis工具类

    1. package com.mco.utils;
    2. import cn.hutool.core.util.RandomUtil;
    3. import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
    4. import org.slf4j.Logger;
    5. import org.slf4j.LoggerFactory;
    6. import redis.clients.jedis.GeoCoordinate;
    7. import redis.clients.jedis.Jedis;
    8. import redis.clients.jedis.JedisPool;
    9. import redis.clients.jedis.JedisPoolConfig;
    10. import redis.clients.jedis.args.GeoUnit;
    11. import java.time.Duration;
    12. import java.util.HashMap;
    13. import java.util.Map;
    14. import java.util.Objects;
    15. /**
    16. * @author :liao.wei
    17. * @date :2023/9/18 21:15
    18. * @package : com.mco.utils
    19. */
    20. public class RedisUtils {
    21. private static Logger logger = LoggerFactory.getLogger(JedisPoolUtil.class);
    22. public static final String REDIS_IP_ADDR = "120.77.64.190";
    23. public static final String REDIS_PWD = "111111";
    24. public static JedisPool jedisPool;
    25. static {
    26. JedisPoolConfig poolConfig = new JedisPoolConfig();
    27. poolConfig.setMaxIdle(8);
    28. poolConfig.setMinIdle(2);
    29. poolConfig.setMaxWait(Duration.ofSeconds(30000));
    30. jedisPool = new JedisPool(poolConfig, REDIS_IP_ADDR, 6379, 10000, REDIS_PWD);
    31. }
    32. public static Jedis getJedis() throws Exception {
    33. if (null != jedisPool) {
    34. return jedisPool.getResource();
    35. }
    36. throw new Exception("Jedispool is not ok");
    37. }
    38. }

    Canal 业务类

    1. public class RedisCanalClient {
    2. public static final Integer _60SECONDS = 60;
    3. public static final String CANAL_IP_ADDR = "192.168.1.11";
    4. private static void redisInsert(List columns)
    5. {
    6. JSONObject jsonObject = new JSONObject();
    7. for (Column column : columns)
    8. {
    9. System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
    10. jsonObject.put(column.getName(),column.getValue());
    11. }
    12. if(columns.size() > 0)
    13. {
    14. try(Jedis jedis = RedisUtils.getJedis())
    15. {
    16. jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
    17. }catch (Exception e){
    18. e.printStackTrace();
    19. }
    20. }
    21. }
    22. private static void redisDelete(List columns)
    23. {
    24. JSONObject jsonObject = new JSONObject();
    25. for (Column column : columns)
    26. {
    27. jsonObject.put(column.getName(),column.getValue());
    28. }
    29. if(columns.size() > 0)
    30. {
    31. try(Jedis jedis = RedisUtils.getJedis())
    32. {
    33. jedis.del(columns.get(0).getValue());
    34. }catch (Exception e){
    35. e.printStackTrace();
    36. }
    37. }
    38. }
    39. private static void redisUpdate(List columns)
    40. {
    41. JSONObject jsonObject = new JSONObject();
    42. for (Column column : columns)
    43. {
    44. System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
    45. jsonObject.put(column.getName(),column.getValue());
    46. }
    47. if(columns.size() > 0)
    48. {
    49. try(Jedis jedis = RedisUtils.getJedis())
    50. {
    51. jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
    52. System.out.println("---------update after: "+jedis.get(columns.get(0).getValue()));
    53. }catch (Exception e){
    54. e.printStackTrace();
    55. }
    56. }
    57. }
    58. public static void printEntry(List entrys)
    59. {
    60. for (Entry entry : entrys) {
    61. if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
    62. continue;
    63. }
    64. RowChange rowChage = null;
    65. try {
    66. //获取变更的row数据
    67. rowChage = RowChange.parseFrom(entry.getStoreValue());
    68. } catch (Exception e) {
    69. throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(),e);
    70. }
    71. //获取变动类型
    72. EventType eventType = rowChage.getEventType();
    73. System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
    74. entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
    75. entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
    76. for (RowData rowData : rowChage.getRowDatasList()) {
    77. if (eventType == EventType.INSERT) {
    78. redisInsert(rowData.getAfterColumnsList());
    79. } else if (eventType == EventType.DELETE) {
    80. redisDelete(rowData.getBeforeColumnsList());
    81. } else {//EventType.UPDATE
    82. redisUpdate(rowData.getAfterColumnsList());
    83. }
    84. }
    85. }
    86. }
    87. public static void main(String[] args)
    88. {
    89. System.out.println("---------O(∩_∩)O哈哈~ initCanal() main方法-----------");
    90. //=================================
    91. // 创建链接canal服务端
    92. CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(CANAL_IP_ADDR,
    93. 11111), "example", "", ""); // 这里用户名和密码如果在这写了,会覆盖canal配置文件的账号密码,如果不填从配置文件中读
    94. int batchSize = 1000;
    95. //空闲空转计数器
    96. int emptyCount = 0;
    97. System.out.println("---------------------canal init OK,开始监听mysql变化------");
    98. try {
    99. connector.connect();
    100. //connector.subscribe(".*\\..*");
    101. connector.subscribe("test.t_user"); // 设置监听哪个表
    102. connector.rollback();
    103. int totalEmptyCount = 10 * _60SECONDS;
    104. while (emptyCount < totalEmptyCount) {
    105. System.out.println("我是canal,每秒一次正在监听:"+ UUID.randomUUID().toString());
    106. Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
    107. long batchId = message.getId();
    108. int size = message.getEntries().size();
    109. if (batchId == -1 || size == 0) {
    110. emptyCount++;
    111. try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
    112. } else {
    113. //计数器重新置零
    114. emptyCount = 0;
    115. printEntry(message.getEntries());
    116. }
    117. connector.ack(batchId); // 提交确认
    118. // connector.rollback(batchId); // 处理失败, 回滚数据
    119. }
    120. System.out.println("已经监听了"+totalEmptyCount+"秒,无任何消息,请重启重试......");
    121. } finally {
    122. connector.disconnect();
    123. }
    124. }
    125. }

     说明:

            

    CANAL_IP_ADDR:canal 服务部署ip
    

    InetSocketAddress: 端口可从canal.log 中查看

    启动main方法

    数据库中新增一条数据

     查看Canal客户端监听

     查看Redis数据

    POM引入

    1. UTF-8
    2. 1.8
    3. 1.8
    4. 4.12
    5. 1.2.17
    6. 1.16.18
    7. 5.1.47
    8. 1.1.16
    9. 4.1.5
    10. 1.3.0
    11. redis.clients
    12. jedis
    13. 4.3.1
    14. com.alibaba.otter
    15. canal.client
    16. 1.1.0
    17. org.springframework.boot
    18. spring-boot-starter-web
    19. org.springframework.boot
    20. spring-boot-starter-actuator
    21. org.springframework.boot
    22. spring-boot-starter-data-redis
    23. org.apache.commons
    24. commons-pool2
    25. org.springframework.boot
    26. spring-boot-starter-aop
    27. org.aspectj
    28. aspectjweaver
    29. mysql
    30. mysql-connector-java
    31. 5.1.47
    32. com.alibaba
    33. druid-spring-boot-starter
    34. 1.1.10
    35. com.alibaba
    36. druid
    37. ${druid.version}
    38. org.mybatis.spring.boot
    39. mybatis-spring-boot-starter
    40. ${mybatis.spring.boot.version}
    41. cn.hutool
    42. hutool-all
    43. 5.2.3
    44. junit
    45. junit
    46. ${junit.version}
    47. org.springframework.boot
    48. spring-boot-starter-test
    49. test
    50. log4j
    51. log4j
    52. ${log4j.version}
    53. org.projectlombok
    54. lombok
    55. ${lombok.version}
    56. true
    57. javax.persistence
    58. persistence-api
    59. 1.0.2
    60. tk.mybatis
    61. mapper
    62. ${mapper.version}
    63. org.springframework.boot
    64. spring-boot-autoconfigure
    65. org.apache.commons
    66. commons-pool2
    67. 2.11.1
    68. com.baomidou
    69. mybatis-plus-boot-starter
    70. 3.4.1

    connector.subscribe过滤规则

    源码地址icon-default.png?t=N7T8https://gitee.com/UniQue006/redis_example.git 

    1. 🌹 以上分享 Redis 数据一致性 canal应用,如有问题请指教。
    2. 🌹🌹 如你对技术也感兴趣,欢迎交流。
    3. 🌹🌹🌹 如有需要,请👍点赞💖收藏🐱‍🏍分享
  • 相关阅读:
    【QT】QtCreator卸载与安装(非正常状态)
    正则表达式实战:最新豆瓣top250爬虫超详细教程
    Go语言实践模式 - 函数选项模式(Functional Options Pattern)
    [autojs]用户界面GUI编程
    一,安卓aosp源码编译环境搭建
    技术与安全的交织
    常见漏洞修复方案
    SpringMVC
    竞赛 基于深度学习的视频多目标跟踪实现
    redis
  • 原文地址:https://blog.csdn.net/qq_32662595/article/details/132993624