| 问题 | 答案 |
| 如何保证mysql改动后,立即同步到Redis | canal |

https://github.com/alibaba/canal/wiki
https://github.com/alibaba/canal/wiki
基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
- 数据库镜像
- 数据库实时备份
- 多级索引 (卖家和买家各自分库索引)
- search build
- 业务cache刷新
- 价格变化等重要业务消息
| 官网 | https://github.com/alibaba/canal/releases/tag/canal-1.1.6 https://github.com/alibaba/canal/releases/tag/canal-1.1.6 |
| 百度网盘 | 链接:https://pan.baidu.com/s/1Hs7JieAZA_q4lmvIdJZgFw?pwd=aqi2 提取码:aqi2 |

- canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
- canal 解析 binary log 对象(原始为 byte 流)
- # 查看mysql 版本
- SELECT VERSION();
-
- # 查看当前主机的二进制日志
- SHOW MASTER status;
-
- # 查看binlog 开启状态
-
- SHOW VARIABLES LIKE 'log_bin'


- # 在mysqld中加入一下内容
- [mysqld]
- log-bin=mysql-bin #开启 binlog
- binlog-format=ROW #选择 ROW 模式
- server_id=1 #配置MySQL replaction需要定义,不要和canal的 slaveId重复

重启Mysql
- DROP USER IF EXISTS 'canal'@'%';
- CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
-
- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
-
- FLUSH PRIVILEGES;
-
- SELECT * FROM mysql.`user`


tar -zxvf canal.deployer-1.1.6.tar.gz



如果出现如下错误
Caused by: java.io.IOException: caching_sha2_password Auth failed
com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.negotiate(MysqlConnector.java:260)
- # 修改加密方式
- select host,user,plugin from mysql.user ;
- ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';
-
- CREATE TABLE `t_user` (
- `id` int NOT NULL AUTO_INCREMENT,
- `userName` varchar(255) NOT NULL,
- PRIMARY KEY (`id`)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
- package com.mco.utils;
-
- import cn.hutool.core.util.RandomUtil;
- import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import redis.clients.jedis.GeoCoordinate;
- import redis.clients.jedis.Jedis;
- import redis.clients.jedis.JedisPool;
- import redis.clients.jedis.JedisPoolConfig;
- import redis.clients.jedis.args.GeoUnit;
-
- import java.time.Duration;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.Objects;
-
- /**
- * @author :liao.wei
- * @date :2023/9/18 21:15
- * @package : com.mco.utils
- */
- public class RedisUtils {
- private static Logger logger = LoggerFactory.getLogger(JedisPoolUtil.class);
-
- public static final String REDIS_IP_ADDR = "120.77.64.190";
- public static final String REDIS_PWD = "111111";
-
- public static JedisPool jedisPool;
-
- static {
- JedisPoolConfig poolConfig = new JedisPoolConfig();
- poolConfig.setMaxIdle(8);
- poolConfig.setMinIdle(2);
- poolConfig.setMaxWait(Duration.ofSeconds(30000));
- jedisPool = new JedisPool(poolConfig, REDIS_IP_ADDR, 6379, 10000, REDIS_PWD);
- }
-
- public static Jedis getJedis() throws Exception {
- if (null != jedisPool) {
- return jedisPool.getResource();
- }
- throw new Exception("Jedispool is not ok");
- }
- }
- public class RedisCanalClient {
- public static final Integer _60SECONDS = 60;
- public static final String CANAL_IP_ADDR = "192.168.1.11";
-
- private static void redisInsert(List
columns) - {
- JSONObject jsonObject = new JSONObject();
- for (Column column : columns)
- {
- System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
- jsonObject.put(column.getName(),column.getValue());
- }
- if(columns.size() > 0)
- {
- try(Jedis jedis = RedisUtils.getJedis())
- {
- jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
- }catch (Exception e){
- e.printStackTrace();
- }
- }
- }
-
-
- private static void redisDelete(List
columns) - {
- JSONObject jsonObject = new JSONObject();
- for (Column column : columns)
- {
- jsonObject.put(column.getName(),column.getValue());
- }
- if(columns.size() > 0)
- {
- try(Jedis jedis = RedisUtils.getJedis())
- {
- jedis.del(columns.get(0).getValue());
- }catch (Exception e){
- e.printStackTrace();
- }
- }
- }
-
- private static void redisUpdate(List
columns) - {
- JSONObject jsonObject = new JSONObject();
- for (Column column : columns)
- {
- System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
- jsonObject.put(column.getName(),column.getValue());
- }
- if(columns.size() > 0)
- {
- try(Jedis jedis = RedisUtils.getJedis())
- {
- jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
- System.out.println("---------update after: "+jedis.get(columns.get(0).getValue()));
- }catch (Exception e){
- e.printStackTrace();
- }
- }
- }
-
- public static void printEntry(List
entrys) - {
- for (Entry entry : entrys) {
- if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
- continue;
- }
-
- RowChange rowChage = null;
- try {
- //获取变更的row数据
- rowChage = RowChange.parseFrom(entry.getStoreValue());
- } catch (Exception e) {
- throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(),e);
- }
- //获取变动类型
- EventType eventType = rowChage.getEventType();
- System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
- entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
- entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
-
- for (RowData rowData : rowChage.getRowDatasList()) {
- if (eventType == EventType.INSERT) {
- redisInsert(rowData.getAfterColumnsList());
- } else if (eventType == EventType.DELETE) {
- redisDelete(rowData.getBeforeColumnsList());
- } else {//EventType.UPDATE
- redisUpdate(rowData.getAfterColumnsList());
- }
- }
- }
- }
-
-
- public static void main(String[] args)
- {
- System.out.println("---------O(∩_∩)O哈哈~ initCanal() main方法-----------");
-
- //=================================
- // 创建链接canal服务端
- CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(CANAL_IP_ADDR,
- 11111), "example", "", ""); // 这里用户名和密码如果在这写了,会覆盖canal配置文件的账号密码,如果不填从配置文件中读
- int batchSize = 1000;
- //空闲空转计数器
- int emptyCount = 0;
- System.out.println("---------------------canal init OK,开始监听mysql变化------");
- try {
- connector.connect();
- //connector.subscribe(".*\\..*");
- connector.subscribe("test.t_user"); // 设置监听哪个表
- connector.rollback();
- int totalEmptyCount = 10 * _60SECONDS;
- while (emptyCount < totalEmptyCount) {
- System.out.println("我是canal,每秒一次正在监听:"+ UUID.randomUUID().toString());
- Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
- long batchId = message.getId();
- int size = message.getEntries().size();
- if (batchId == -1 || size == 0) {
- emptyCount++;
- try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
- } else {
- //计数器重新置零
- emptyCount = 0;
- printEntry(message.getEntries());
- }
- connector.ack(batchId); // 提交确认
- // connector.rollback(batchId); // 处理失败, 回滚数据
- }
- System.out.println("已经监听了"+totalEmptyCount+"秒,无任何消息,请重启重试......");
- } finally {
- connector.disconnect();
- }
- }
- }
说明:
CANAL_IP_ADDR:canal 服务部署ipInetSocketAddress: 端口可从canal.log 中查看

查看Canal客户端监听
查看Redis数据
-
-
UTF-8 -
1.8 -
1.8 -
4.12 -
1.2.17 -
1.16.18 -
5.1.47 -
1.1.16 -
4.1.5 -
1.3.0 -
-
-
-
redis.clients -
jedis -
4.3.1 -
-
-
-
com.alibaba.otter -
canal.client -
1.1.0 -
-
-
-
org.springframework.boot -
spring-boot-starter-web -
-
-
org.springframework.boot -
spring-boot-starter-actuator -
-
-
-
org.springframework.boot -
spring-boot-starter-data-redis -
-
-
org.apache.commons -
commons-pool2 -
-
-
-
org.springframework.boot -
spring-boot-starter-aop -
-
-
org.aspectj -
aspectjweaver -
-
-
-
mysql -
mysql-connector-java -
5.1.47 -
-
-
-
com.alibaba -
druid-spring-boot-starter -
1.1.10 -
-
-
com.alibaba -
druid -
${druid.version} -
-
-
-
org.mybatis.spring.boot -
mybatis-spring-boot-starter -
${mybatis.spring.boot.version} -
-
-
-
cn.hutool -
hutool-all -
5.2.3 -
-
-
junit -
junit -
${junit.version} -
-
-
org.springframework.boot -
spring-boot-starter-test -
test -
-
-
log4j -
log4j -
${log4j.version} -
-
-
org.projectlombok -
lombok -
${lombok.version} -
true -
-
-
-
javax.persistence -
persistence-api -
1.0.2 -
-
-
-
tk.mybatis -
mapper -
${mapper.version} -
-
-
org.springframework.boot -
spring-boot-autoconfigure -
-
-
org.apache.commons -
commons-pool2 -
2.11.1 -
-
-
com.baomidou -
mybatis-plus-boot-starter -
3.4.1 -
-

源码地址
https://gitee.com/UniQue006/redis_example.git
- 🌹 以上分享 Redis 数据一致性 canal应用,如有问题请指教。
-
- 🌹🌹 如你对技术也感兴趣,欢迎交流。
-
- 🌹🌹🌹 如有需要,请👍点赞💖收藏🐱🏍分享