• SpringBoot死信队列、延迟队列


    1.死信队列

    • 当queue中的消息无法被消费时,消息成为死信,产生条件如下三个:

      1. 消息TTL过期(TTL一般生产者每次发消息都单独指定)
      2. queue满了而无法添加
      3. 消息被拒or否定 且不重新入队basic.reject拒绝 或 basic.nack否定)并且requeue = false不重新入队
    • 失败消息如何转发到(绑定到)死信队列

      1. 需要单独声明:死信交换机 和 死信队列
      2. 需要设置参数来实现消息转发到死信交换机绑定参数,转发到死信队列
        消息拒绝
    • 应用场景举例:

      1. 需要保证消息不丢失(如下单功能)
      2. 延迟队列的一种实现方式(还有一种是基于插件)

    2.延迟队列

    2.1适用场景

    2.1.1轮训和定时任务的缺点

    • 数据量小:可以采用每秒轮训实现上述功能(数据量大时会导致轮训瞬间处理太多)
    • 对时间要求不严格:那么可以每天凌晨跑定时任务来实现(例如生成一些日报)

    但是对于大数据量、时效性强的场景,需要用到延迟队列

    2.1.2延迟队列的优点

    延迟功能有保障(时间要求严格)
    不会像轮训一样一瞬间处理太多

    2.2延迟队列的两种实现

    RabbitMQ本身是不支持延迟队列的,如果不使用插件,就只能改造死信队列————C1消费着直接不存在,消息到期TTL之后一定会进入死信队列进行处理

    延迟队列的实现分为“死信队列实现”和“插件直接实现”,结构图分别为:基于死信队列
    基于插件

    3.死信实现延时队列

    注意:由于队列的先进先出特性,只有当过期的消息到了队列的顶端(队首),才会被真正的丢弃或者进入死信队列。
    如果遇到不同的任务类型需要不同的延时的话,需要为每一种不同延迟时间的消息建立单独的消息队列。

    3.1配置信息

    3.1.1pom

    下面两个

        
          org.springframework.boot
          spring-boot-starter-web
        
    
    	 
          org.springframework.boot
          spring-boot-starter-test
          test
        
    
        
          com.rabbitmq
          amqp-client
          5.8.0
        
    
        
          org.springframework.boot
          spring-boot-starter-amqp
        
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    3.1.2配置文件

    spring.rabbitmq.host=124...
    #spring.rabbitmq.port=5672 默认
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=123
    
    server.port=8080
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    3.2架构图

    左边的YD要与右边的YD相同(routingkey保证相同才能正确转发消息)

    3.3配置类声明与绑定

    @Configuration
    public class DeadConfig {
    
      //  1.交换机
      public static final String X_EXCHANGE = "X";
      public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    
      //  2. 队列
      public static final String QUEUE_A = "QA";
      public static final String QUEUE_B = "QB";
      public static final String DEAD_LETTER_QUEUE = "QD";
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    3.3.1交换机

      @Bean("xExchange")
      public DirectExchange xExchange(){
        return new DirectExchange(X_EXCHANGE);
      }
      @Bean("yExchange")
      public DirectExchange yExchange(){
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    3.3.2队列

    关于ttl,实际环境尽量不要写在队列声明期间,而应写在生产者send的时候

     @Bean("queueA")//10s过期时间
      public Queue queueA(){//AMQP包下的Queue
        //绑定参数————转发到死信交换机
    /*  原始写法
        HashMap arg = new HashMap<>();
        arg.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        arg.put("x-dead-letter-routing-key","YD");
        arg.put("x-message-ttl",10000);
        return QueueBuilder.
            durable(QUEUE_A).
            withArguments(arg).
            build();
     *///SpringBoot写法
        return QueueBuilder.
            durable(QUEUE_A).
            deadLetterExchange(Y_DEAD_LETTER_EXCHANGE).
            deadLetterRoutingKey("YD").
            ttl(10000).
            build();
      }
    
      @Bean("queueB")//40s过期时间
      public Queue queueB(){
        return QueueBuilder.
            durable(QUEUE_B).
            deadLetterExchange(Y_DEAD_LETTER_EXCHANGE).
            deadLetterRoutingKey("YD").
            ttl(40000).
            build();
      }
    
      @Bean("queueD")//死信队列
      public Queue queueD(){
        return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    3.2.3绑定

      @Bean //参数:QA  xExchange
      public Binding QAbindX(@Qualifier("queueA") Queue QA, @Qualifier("xExchange") DirectExchange X){
        return BindingBuilder.bind(QA).to(X).with("XA");
      }
    
      @Bean
      public Binding QBbindX(@Qualifier("queueB") Queue QB, @Qualifier("xExchange") DirectExchange X){
        return BindingBuilder.bind(QB).to(X).with("XB");
      }
    
      @Bean
      public Binding QDbindY(@Qualifier("queueD") Queue QD, @Qualifier("yExchange") DirectExchange Y){
        return BindingBuilder.bind(QD).to(Y).with("YB");
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    在这里插入图片描述

    3.4生产者

    @RestController
    @RequestMapping("ttl")
    public class ProducerController {
      @Autowired
      RabbitTemplate rabbitTemplate;
    
      @GetMapping("send")
      public void sendA(String msg){
        System.out.println("生产者发送消息:" + new Date().toString());
        rabbitTemplate.convertAndSend("X","XA","10s延迟");
        rabbitTemplate.convertAndSend("X","XB","40s延迟");
    
      }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    3.5消费者监听

    消费者其实只需要用@RabbitListener进行监听即可,而不是做成一个接口

    @Service
    //@Component也行
    public class ConsumerService {
    
      @RabbitListener(queues = "QD")//监听QD队列消息(QD是死信队列)
      public void receiveD(Message message , Channel channel) {//导入AMQP包
        System.out.println("收到死信队列消息:" + new String(message.getBody()) + new Date().toString());
      }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    4.插件实现延迟队列

    具体百度RabbitMQ延迟队列插件,官网下载,当安装好后可以在web界面看到多了一种交换机————由此可知,插件是通过交换机实现延迟的

    4.1配置交换机

    • 因为基于插件的延迟队列只有交换机不同,所以只需要配置交换机

    • 而新的交换机在Spring中没有对应的API,所以需要用自定义交换机CustomExchange

      @Bean
      public Queue delayQueue(){
        return new Queue("delayQueue");
      }
      
      @Bean
      public CustomExchange delayExchange(){
      //只能用原生写法,没有Spring提供的API
        HashMap arg = new HashMap<>();
        arg.put("x-delayed-type","direct");
        return new CustomExchange("dExchange","x-delayed-message",true,false,arg);
      }
      
      @Bean
      public Binding delayBinding(@Qualifier("delayQueue") Queue QC,
          @Qualifier("delayExchange") CustomExchange cExchange){
        return BindingBuilder.
            bind(QC).
            to(cExchange).
            with("delayRoutingKey").
            noargs();
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22

    4.2生产者和消费者

      @GetMapping("delay")
      public void delaySent(String msg){
        System.out.println("生产者发送消息:" + new Date().toString());
        rabbitTemplate.convertAndSend("dExchange","delayRoutingKey",msg,m -> {
          m.getMessageProperties().setDelay(5000);//5秒延迟
          return m;
        });
      }
    
      @RabbitListener(queues = "delayQueue")
      public void delayListenner(Message message , Channel channel){
        System.out.println("收到死信队列消息:" + new String(message.getBody()) + new Date().toString());
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    5.补充

    延迟队列的选择还有:Java中的DelayQueue、Quartz;Redis中的zset;Kafka的时间轮。但RabbitMQ的方案最全面

  • 相关阅读:
    数据结构与算法基础(王卓)(2)
    Linux下根目录都包含什么? 每个文件什么作用?
    C语言实现扫雷小游戏(更新中)
    selenium的常见方法及使用
    铜九铁十快来上分:vite+ts+vue3搭建你的组件库
    Mybatis之if标签判断boolean值
    在ios设备上运行Unity Profiler
    Java实训:学生信息管理系统
    Commvault+XSKY 推出基于 Object Lock 的防勒索病毒联合方案
    Gif格式图片怎么制作?超简单的方法分享
  • 原文地址:https://blog.csdn.net/m0_56079407/article/details/125858748