• RabbitMQ消费者的可靠性


    目录

    一、消费者确认

    二、失败重试机制

    2.1、失败处理策略

    三、业务幂等性

    3.1、唯一消息ID

     3.2、业务判断

    3.3、兜底方案


    一、消费者确认

    RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

    • ack:成功处理消息,RabbitMQ从队列中删除该消息
    • nack:消息处理失败,RabbitMQ需要再次投递消息
    • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

    一般reject方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过try catch机制捕获,消息处理成功时返回ack,处理失败时返回nack.

    由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:

    • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
    • manual:手动模式。需要自己在业务代码中调用api,发送ackreject,存在业务入侵,但更灵活
    • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:
      • 如果是业务异常,会自动返回nack
      • 如果是消息处理或校验异常,自动返回reject;

     通过下面的配置可以修改SpringAMQP的ACK处理方式:

    1. spring:
    2. rabbitmq:
    3. host: 192.168.200.129 # 你的虚拟机IP
    4. port: 5672 # 端口
    5. virtual-host: / # 虚拟主机
    6. username: admin # 用户名
    7. password: 123 # 密码
    8. listener:
    9. simple:
    10. prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
    11. acknowledge-mode: none

    当使用none模式时

    生产者发送一条消息

     消费者接受消息时抛异常

     

    测试可以发现:当消息处理发生异常时,消息依然被RabbitMQ删除了。

    把确认机制修改为auto

    1. spring:
    2. rabbitmq:
    3. host: 192.168.200.129 # 你的虚拟机IP
    4. port: 5672 # 端口
    5. virtual-host: / # 虚拟主机
    6. username: admin # 用户名
    7. password: 123 # 密码
    8. listener:
    9. simple:
    10. prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
    11. acknowledge-mode: auto

     再次发送消息

     

    在异常位置打断点,再次发送消息,程序卡在断点时,可以发现此时消息状态为unacked(未确定状态):

    当我们把配置改为auto时,消息处理失败后,会回到RabbitMQ,并重新投递到消费者。

    二、失败重试机制

    当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。 极端情况就是消费者一直无法执行成功,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力。

    为了应对上述情况Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

    修改consumer服务的application.yml文件,添加内容

    1. spring:
    2. rabbitmq:
    3. host: 192.168.200.129 # 你的虚拟机IP
    4. port: 5672 # 端口
    5. virtual-host: / # 虚拟主机
    6. username: admin # 用户名
    7. password: 123 # 密码
    8. listener:
    9. simple:
    10. prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
    11. acknowledge-mode: auto #消息确认
    12. retry:
    13. enabled: true # 开启消费者失败重试
    14. initial-interval: 1000ms # 初识的失败等待时长为1
    15. multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
    16. max-attempts: 3 # 最大重试次数
    17. stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

    重启consumer服务,重复之前的测试。可以发现:

    • 消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次

    结论:

    • 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
    • 重试达到最大次数后,Spring会返回reject,消息会被丢弃
    2.1、失败处理策略

    在之前的测试中,本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了

    因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,它有3个不同实现:

    • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
    • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
    • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

    在消费者里创建一个异常消息配置类

    1. @Configuration
    2. @Slf4j
    3. //当配置文件中spring.rabbitmq.listener.simple.retry.enabled 属性为ture时配置类才生效
    4. @ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
    5. public class ErrorMessageConfig {
    6. //消息处理失败交换机
    7. @Bean
    8. public DirectExchange errorMessageExchange(){
    9. return new DirectExchange("error.direct");
    10. }
    11. //消息处理失败队列
    12. @Bean
    13. public Queue errorQueue(){
    14. return new Queue("error.queue", true);
    15. }
    16. //绑定关系
    17. @Bean
    18. public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
    19. return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    20. }
    21. //定义一个RepublishMessageRecoverer,关联队列和交换机
    22. @Bean
    23. public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    24. log.error("加载RepublishMessageRecoverer");
    25. return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    26. }
    27. }

     当接收消息出现异常时,会创建error.queue队列

     可以查看到异常信息

    三、业务幂等性

    何为幂等性? 幂等是一个数学概念,用函数表达式来描述是这样的:f(x) = f(f(x)),例如求绝对值函数。 在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。例如:

    • 根据id删除数据
    • 查询数据
    • 新增数据

    但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:

    • 取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
    • 退款业务。重复退款对商家而言会有经济损失。

    所以,我们要尽可能避免业务被重复执行。 然而在实际业务场景中,由于意外经常会出现业务被重复执行的情况,例如:

    • 页面卡顿时频繁刷新导致表单重复提交
    • 服务间调用的重试
    • MQ消息的重复投递

    我们在用户支付成功后会发送MQ消息到交易服务,修改订单状态为已支付,就可能出现消息重复投递的情况。如果消费者不做判断,很有可能导致消息被消费多次,出现业务故障。 举例:

    1. 假如用户刚刚支付完成,并且投递消息到交易服务,交易服务更改订单为已支付状态。
    2. 由于某种原因,例如网络故障导致生产者没有得到确认,隔了一段时间后重新投递给交易服务。
    3. 但是,在新投递的消息被消费之前,用户选择了退款,将订单状态改为了已退款状态。
    4. 退款完成后,新投递的消息才被消费,那么订单状态会被再次改为已支付。业务异常。

    因此,我们必须想办法保证消息处理的幂等性。这里给出两种方案:

    • 唯一消息ID
    • 业务状态判断
    3.1、唯一消息ID
    1. 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
    2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
    3. 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。

    SpringAMQP的MessageConverter自带了MessageID的功能,我们只要开启这个功能即可。 以Jackson的消息转换器为例 

    在生产者和消费者的启动类里加一个配置

    1. @Bean
    2. public MessageConverter messageConverter(){
    3. // 1.定义消息转换器
    4. Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
    5. // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    6. jjmc.setCreateMessageIds(true);
    7. return jjmc;
    8. }

     测试生产者发送消息会产生一个id

     3.2、业务判断

    业务判断就是基于业务本身的逻辑或状态来判断是否是重复的请求或消息,不同的业务场景判断的思路也不一样。 例如我们当前案例中,处理消息的业务逻辑是把订单状态从未支付修改为已支付。因此我们就可以在执行业务时判断订单状态是否是未支付,如果不是则证明订单已经被处理过,无需重复处理。

    相比较而言,消息ID的方案需要改造原有的数据库,所以我更推荐使用业务判断的方案。

    以支付修改订单的业务为例,我们需要修改OrderServiceImpl中的markOrderPaySuccess方法:

    1. @Override
    2. public void markOrderPaySuccess(Long orderId) {
    3. // 1.查询订单
    4. Order old = getById(orderId);
    5. // 2.判断订单状态
    6. if (old == null || old.getStatus() != 1) {
    7. // 订单不存在或者订单状态不是1,放弃处理
    8. return;
    9. }
    10. // 3.尝试更新订单
    11. Order order = new Order();
    12. order.setId(orderId);
    13. order.setStatus(2);
    14. order.setPayTime(LocalDateTime.now());
    15. updateById(order);
    16. }

    上述代码逻辑上符合了幂等判断的需求,但是由于判断和更新是两步动作,因此在极小概率下可能存在线程安全问题。

    1. @Override
    2. public void markOrderPaySuccess(Long orderId) {
    3. // UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
    4. lambdaUpdate()
    5. .set(Order::getStatus, 2)
    6. .set(Order::getPayTime, LocalDateTime.now())
    7. .eq(Order::getId, orderId)
    8. .eq(Order::getStatus, 1)
    9. .update();
    10. }

    注意看,上述代码等同于这样的SQL语句:

    UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
    

    我们在where条件中除了判断id以外,还加上了status必须为1的条件。如果条件不符(说明订单已支付),则SQL匹配不到数据,根本不会执行。

    3.3、兜底方案

    我们可以在交易服务设置定时任务,定期查询订单支付状态。这样即便MQ通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性。
     

     

  • 相关阅读:
    【图像融合】差异的高斯:一种简单有效的通用图像融合方法[用于融合红外和可见光图像、多焦点图像、多模态医学图像和多曝光图像](Matlab代码实现)
    实战演练,使用硬raid卡制作“raid”阵列
    vue学习之textarea输入英文时换行
    使用 ElasticSearch 作为知识库,存储向量及相似性搜索
    pytorch-fastrcnn识别王者荣耀敌方英雄血条
    计算机毕业设计之java+ssm爱家房屋租赁信息管理系统
    100道JVM面试题大全最新版2023版
    栈和队列实现的思路和代码
    ONNX推理流程
    GO语言-包的使用
  • 原文地址:https://blog.csdn.net/qi341500/article/details/134090203