“事务”:解决消息生产者和消费者的数据一致性问题。
举例:使用消息队列来异步清理购物车。创建订单时执行了 2 个步骤:

问题:订单数据与购物车数据不一致:
清理购物车的操作:只要成功执行购物车清理后再提交消费确认即可,如果失败,由于没有提交消费确认,消息队列会自动重试。
关键点集中在订单系统,创建订单和发送消息这两个步骤要么都操作成功,要么都失败。
事务消息适用于需要异步更新数据,对数据实时性要求不太高的场景。
Kafka 和 RocketMQ 都提供了事务相关功能。

**问题:**第四步提交事务消息时失败了怎么办?
Kafka :直接抛出异常,让用户自行处理。
我们可以在业务代码中反复重试提交,直到提交成功,或者删除之前创建的订单进行补偿。
增加了事务反查的机制来解决事务消息提交失败的问题。
当Broker 没有收到提交或者回滚的请求,Broker 会定期去 Producer 上反查这个事务对应的本地事务的状态,然后根据反查结果决定提交或者回滚这个事务。
反查本地事务的实现,不依赖消息的发送方,即订单服务的某个实例节点上的任何数据。即使发送事务消息的那个订单服务节点宕机了,RocketMQ 可以通过其他订单服务的节点来执行反查,确保事务的完整性。
实现:
业务代码需要实现一个反查本地事务状态的接口,告知RocketMQ 本地事务是成功还是失败。
本例中只要根据消息中的订单 ID,在订单库中查询这个订单是否存在即可,然后返回成功或者失败。
整体流程:
原理:
在 Producer 端给每个发出的消息附加一个连续递增的序号,然后在 Consumer 端来检查这个序号的连续性。
序号不连续,那就是丢消息了。可以通过缺失的序号来确定丢失的是哪条消息,进一步排查原因。
实现:
在 Producer 发消息之前的拦截器中将序号注入到消息中,在 Consumer 收到消息的拦截器中检测序号的连性。
分布式系统中存在的问题:
消息在 Producer 创建出来,经过网络传输发送到 Broker;
采用请求确认机制,来保证消息的可靠传递:
消息在 Broker 端存储,如果是集群,消息会在这个阶段被复制到其他的副本上。
通过配置 Broker 参数来避免因为宕机丢消息。
Consumer 从 Broker 上拉取消息,经过网络传输发送到;
确认机制:执行完所有消费业务逻辑之后,再发送消费确认。
消息传递失败时,发送方会执行重试,重试的过程中就有可能会产生重复的消息。如果没有对重复消息进行处理,就有可能会导致系统的数据出现错误。
MQTT 协议给出了三种传递消息时能够提供的服务质量标准,这三种服务质量从低到高依次是:
大部分消息队列提供的服务质量都是 At least once,包括 RocketMQ、RabbitMQ 和Kafka 都是这样。也就是说,消息队列很难保证消息不重复。
需要我们的代码能接受“消息是可能会重复的”这一现状,通过一些方法来消除重复消息对业务的影响。
在消费端,让我们消费消息的操作具备幂等性。
幂等(Idempotence) 本来是一个数学上的概念:
如果一个函数 f(x) 满足:f(f(x)) = f(x),则函数 f(x) 满足幂等性。
At least once + 幂等消费 = Exactly once。
使用唯一约束字段,实现只能插入一条记录的操作,达到多次重复操作无法插入,解决消息重复性。
例如把转账单 ID 和账户 ID 这两个字段联合起来创建一个唯一约束,这样对于相同的转账单 ID 和账户 ID,表里至多只能存在一条记录。
给数据变更设置一个前置条件,如果满足条件就更新数据,否则拒绝,在更新数据的时候,同时变更前置条件中需要判断的数据。这样,重复执行这个操作时,由于第一次更新数据的时候已经变更了前置条件中需要判断的数据,不满足前置条件,则不会重复执行更新数据操作。
通用的方法是,给数据增加一个版本号属性,更新前,比较当前数据的版本号是否和消息中的版本号一致,不一致就拒绝更新数据,更新数据的同时将版本号 +1,一样可以实现幂等更新。
在执行数据更新操作之前,先检查一下是否执行过这个更新操作。
实现方法:
每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。
注意:
分布式系统中难以维护全局唯一的ID并且多消费实例容易出现数据错误等问题,可通过加锁和事务等方式解决。