canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
基于日志增量订阅和消费的业务包括
【1】数据库镜像
【2】数据库实时备份
【3】索引构建和实时维护(拆分异构索引、倒排索引等)
【4】业务 cache 刷新
【5】带业务逻辑的增量数据处理
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

【1】MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
【2】MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
【3】MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
【1】canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
【2】MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
【3】canal 解析 binary log 对象(原始为 byte 流)

Canal主要用途是对MySQL数据库增量日志进行解析,提供增量数据的订阅和消费,简单说就是可以对MySQL的增量数据进行实时同步,支持同步到MySQL、Elasticsearch、HBase等数据存储中去。
具体的部署操作步骤参见:
https://github.com/alibaba/canal/wiki/QuickStart
这边使用的是Docker进行容器化部署,从DockerHub上找到了canal-server的容器镜像,使用版本是V1.1.5版本。
canal/canal-server:v1.1.5

需要对外的端口是11111,上面映射到主机的端口是30000。
数据卷配置的方式使用的是主机映射卷,需要映射两个配置文件.

其中两个挂载对应instance.properties配置文件和canal.properties配置文件。
instance.properties配置文件
instance.properties配置文件,该配置文件主要是配置监听的MySQL实例的相关属性。

position info主要配置了连接的MySQL地址,binlog日志名称,日志位置,时间戳。中间两个信息可以通过在MySQL中通过
show master status 命令查看

时间戳可以以当前时间为准,精确到毫秒级别。

canal.instance.XXX 指定了连接到对应数据库的账号密码等信息,该信息对应的就是官方文档的下面这一步操作。

canal.instance.filter.regex 表示表名称的过滤规则,即希望当前数据库实例那些表可以被监听到。这个可以在配置文件中配置,也可以在客户端代码中配置,这里还是要求在配置文件中构建,后面的连接方式没有使用客户端连接。
举例:全库全表:.\…
指定某个库全表:test…* ,匹配库名test下所有表
单表:test.user ;匹配库名test下user表
多规则组合使用: test…*,test2.user1,匹配test库下所有表以及匹配test2库下usee1表
修改java程序下connector.subscribe配置的过滤正则
全库全表
connector.subscribe(“.\…”)
指定库全表
connector.subscribe(“test\…")
单表
connector.subscribe(“test.user”)
多规则组合使用
connector.subscribe("test\…,test2.user1,test3.user2”)
canal.properties配置文件主要是描述canal服务的属性,开放的端口,连接的MQ,admin配置(该组件本次未使用)
配置消息队列
在刚开始是直接通过客户端连接到Canal服务器消费信息的,但是这样会存在一些问题,客户端因为重启或者其他原因不可用时会导致数据丢失,考虑到数据一致和完整性,增加了RabbitMQ作为消息传递中间件。
配置方式,修改confg/canal.properties添加MQ的配置,canal支持kafka、RocketMQ、RabbitMQ。因为项目中使用RabbitMQ比较多,也就选择了RabbitMQ。
配置方式参考如下,需要提前创建exchange、queue,并根据MQ的配置填写host/username/password信息。

配置完成后重启容器,并在数据库执行几条DML语句,观察MQ是否有消息进来。

可以在获取一条信息查看明细

类似的监听数据库变更的组件有MaxWell、canal等,因为之前有使用过canal更熟悉一些所以也就选择了这个组件。

canal 特别设计了 client-server 模式,交互协议使用 protobuf 3.0 , client 端可采用不同语言实现不同的消费逻辑
Java版本的参考链接
https://github.com/alibaba/canal/wiki/ClientExample
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.5</version>
</dependency>
下面提供一个简单的SpringBoot示例
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.List;
@Component
public class CannalClient implements InitializingBean {
private final static int BATCH_SIZE = 1000;
@Override
public void afterPropertiesSet() throws Exception {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.208.31", 11111), "example", "canal", "canal");
try {
//打开连接
connector.connect();
//订阅数据库表,全部表
connector.subscribe(".*\\..*");
//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
connector.rollback();
while (true) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(BATCH_SIZE);
//获取批量ID
long batchId = message.getId();
//获取批量的数量
int size = message.getEntries().size();
//如果没有数据
if (batchId == -1 || size == 0) {
try {
//线程休眠2秒
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
//如果有数据,处理数据
printEntry(message.getEntries());
}
//进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
connector.ack(batchId);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
}
}
/**
* 打印canal server解析binlog获得的实体类信息
*/
private static void printEntry(List<CanalEntry.Entry> entrys) {
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
//开启/关闭事务的实体类型,跳过
continue;
}
//RowChange对象,包含了一行数据变化的所有特征
//比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
CanalEntry.RowChange rowChage;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
//获取操作类型:insert/update/delete类型
CanalEntry.EventType eventType = rowChage.getEventType();
//打印Header信息
System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
//判断是否是DDL语句
if (rowChage.getIsDdl()) {
System.out.println("================》;isDdl: true,sql:" + rowChage.getSql());
}
//获取RowChange对象里的每一行数据,打印出来
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
//如果是删除语句
if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
//如果是新增语句
} else if (eventType == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
//如果是更新的语句
} else {
//变更前的数据
System.out.println("------->; before");
printColumn(rowData.getBeforeColumnsList());
//变更后的数据
System.out.println("------->; after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
下面简单展示下监听到消息队列的消息内容,这里以一个测试表为例,展示下新增、删除、更新时接收到数据内容
CREATE TABLE `test` (
`id` int(11) NOT NULL,
`name` varchar(255) DEFAULT NULL,
`phone` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
{
"data": [{
"id": "1",
"name": "测试",
"phone": "12345678"
}],
"database": "db",
"es": 1658225090000,
"id": 7326,
"isDdl": false,
"mysqlType": {
"id": "int",
"name": "varchar(255)",
"phone": "varchar(255)"
},
"old": null,
"pkNames": ["id"],
"sql": "",
"sqlType": {
"id": 4,
"name": 12,
"phone": 12
},
"table": "test",
"ts": 1658225090766,
"type": "INSERT"
}
{
"data": [{
"id": "1",
"name": "测试更新",
"phone": "12345678"
}],
"database": "db",
"es": 1658225185000,
"id": 7333,
"isDdl": false,
"mysqlType": {
"id": "int",
"name": "varchar(255)",
"phone": "varchar(255)"
},
"old": [{
"name": "测试"
}],
"pkNames": ["id"],
"sql": "",
"sqlType": {
"id": 4,
"name": 12,
"phone": 12
},
"table": "test",
"ts": 1658225185768,
"type": "UPDATE"
}
{
"data": [{
"id": "1",
"name": "测试更新",
"phone": "12345678"
}],
"database": "db",
"es": 1658225230000,
"id": 7337,
"isDdl": false,
"mysqlType": {
"id": "int",
"name": "varchar(255)",
"phone": "varchar(255)"
},
"old": null,
"pkNames": ["id"],
"sql": "",
"sqlType": {
"id": 4,
"name": 12,
"phone": 12
},
"table": "test",
"ts": 1658225230416,
"type": "DELETE"
}
考虑到数据的完整性,当前测试快速插入一万条数据进行测试,看最终的数据监听情况。
CREATE DEFINER=`root`@`%` PROCEDURE `batchInsertUserTest`()
BEGIN
declare i int;
set i=1;
while i<=10000 do
INSERT INTO -- INSERT测试语句
set i=i+1;
end while;
END
执行存储过程
call batchInsertUserTest();
测试结果,经过测试观察可以看到数据处理正常,当前的表是canal监听到数据并写入的业务数据表

ALTER TABLE `tableName`
ADD COLUMN `temp` varchar(255) NULL AFTER `status`;

经过测试发现当前版本是可以监听到DDL的变更的,注意这个问题在老版本中是存在一些问题的,可能出现本地缓存的字段列数和当前列数不一致的情况,在创建一个新表的时候,观察日志是可以看到他会将表结构放到本地缓存中的。
如果使用canal记录监听信息,需要注意MySQL变更日志数据量是很大的,要根据具体的情况进行筛选和丢弃。在刚开始时,我并没有注意到这个问题,导致几天时间内就积攒了几百万的数据,而且其中还有一些json的大文本字段,结果就导致数据备份程序因为空间不够出错。
1:可以适当的进行筛选和表的过滤,只记录需要的数据
2:如果表只做查询操作,可以适当的对一些大文本字段进行压缩,或者对表空间进行压缩,减少占用空间
考虑到业务侧存在因异常事务回滚的情况,所以进行了下面的手动事务开启和回滚的操作,经过测试事务回滚并不会触发。
START TRANSACTION;
INSERT INTO ...
ROLLBACK ;
通过事物添加记录,如果出现异常回滚,MYSQL binlog不会记录删除记录。
canal1.1.5 mysql批量更新或批量插入,canal生成的消息只有一条
insert into exam.canal_test(name) values (‘232323232’),(‘ttttttt’),(‘ooooooo’);
针对上面的情况,需要在写代码的时候注意使用集合进行处理,避免出现问题。
在浏览GitHub的issues时发现有部分用户反映canal有概率丢失数据,这个我在实际的使用中未发现,但是通过下面描述可能是存在低概率的数据丢失问题,这个在使用过程中需要注意,对于核心的业务处理需要做好补偿操作。
