• RabbitMQ消费者确认消息入门演示


     源码dmeo在文章末尾获取👇🏻

    1. SpringAMQP则允许配置三种确认模式 

    1. manual:手动ack,需要在业务代码结束后,调用api发送ack。
    2. auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack

    3.  none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

    首先声明队列交换机 

    1. @Configuration
    2. public class CommonConfig {
    3. @Bean
    4. public DirectExchange simpleDirect() {
    5. return new DirectExchange("simple.direct", true, false);
    6. }
    7. @Bean
    8. public Queue simpleQueue() {
    9. return QueueBuilder.durable("simple.queue").build();
    10. }
    11. }

    创建一个会报错的消息接收方 ,模拟消费者报错

    1. @RabbitListener(queues = "simple.queue")
    2. public void listenSimpleQueue(String msg){
    3. log.info("消费者接收到simple.queue的消息:【" + msg + "】");
    4. //该代码会报错异常, 数值溢出
    5. System.out.println(1 / 0);
    6. System.out.println("消费者处理消息成功!");
    7. }

     配置好上述这些代码, 我们就可以启动消费者服务器了, 然后去mq客户端查看创建的队列和交换机

    1.2 none

    在我们配置成none的时候, 很明显此消息投递后立即被删除

     1.3 aoto(推荐使用)

    当换成aoto模式的时候, 代码出现异常消息, mq会进行重新投递, 但是重新投递会一直无限重试

    为了解决这种无限重试的问题, spring提供了retry(重试)机制, 使用这个机制我们就可以在消费者报错抛异常的时候, 利用本地的重试来解决这个问题, 如下所示. 

     现在我们配置了失败等待的时间为1秒, 等待时间的倍数为2 , 重试次数最大为3 , 这样一来我们就会收到三次重试的信息
    第一次和第二次的时间相差一秒(失败时间(1秒) x 失败倍数1 = 1秒), 
    第二次和第三次的时间相差两秒(失败时间(1秒) x 失败倍数2 = 2秒), 

    最后在到达最大重试次数就会停止重试

    但是这种方法的弊端就是当重试次数达到最大耗尽后, 消息回直接被丢弃

     这里我们采用一种解决方案就是RepublishMessageRecoverer失败之后将消息投递到一个指定的队列当中
     

    1. 创建RepublishMessageRecoverer 交换机和队列

    1. import org.springframework.amqp.core.*;
    2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    3. import org.springframework.amqp.rabbit.retry.MessageRecoverer;
    4. import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
    5. import org.springframework.context.annotation.Bean;
    6. import org.springframework.context.annotation.Configuration;
    7. @Configuration
    8. public class ErrorMessageConfig {
    9. @Bean
    10. public DirectExchange errorMessageExchange(){
    11. return new DirectExchange("error.direct");
    12. }
    13. @Bean
    14. public Queue errorQueue(){
    15. return new Queue("error.queue", true);
    16. }
    17. @Bean
    18. public Binding errorMessageBinding(){
    19. return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("e");
    20. }
    21. //失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
    22. @Bean
    23. public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    24. return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "e");
    25. }
    26. }

     创建成功后启动服务器, 在rabbitmq客户端查看创建的队列和交换机

     并向simple.queue队列发送消息

     这时重试的次数已经达到最大, MQ就转发到失败的队列当中去了

    Republishing failed message to exchange 'error.direct' with routing key e

     我们再查看失败的消息error.queue的信息, 这里直接展示了我们控制台中栈错误信息, 其中错误原因就是 代码1/0的问题

     

     链接:https://pan.baidu.com/s/1il41ywFnYM4_q3MU9GN_MQ 
    提取码:heng

  • 相关阅读:
    x86 CPU架构
    hive shell中有许多日志信息的解决办法
    【项目】三子棋小游戏(C语言)
    去中心化身份DID(研究)
    @Transactional注解为何会失效
    KMP算法next数组
    YOLO系列目标检测算法-Scaled-YOLOv4
    RabbitMQ 延时消息实现方式
    van-uploader上传图片报错Invalid handler for event “load“(在uniapp编译)
    unity 之参数类型之引用类型
  • 原文地址:https://blog.csdn.net/qq_45481709/article/details/126111285