• 7. TTL 延迟队列


    二八佳人体似酥,腰间仗剑斩愚夫。虽然不见人头落,暗里教君骨髓枯。

    创建两个队列 QA和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X和死信交
    换机 Y,它们的类型都是 direct,创建一个死信队列QD,它们的绑定关系如下:

    image.png

    死信队列

    ttl 配置 TtlConfig

    package com.yjl.amqp.config.ttl;
    
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.stereotype.Component;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * 列信队列配置
     *
     * @author yuejianli
     * @date 2022-11-23
     */
    @Component
    public class TtlConfig {
        @Value("${rabbit.ttl.queue_a}")
        private String queuea;
        @Value("${rabbit.ttl.queue_b}")
        private String queueb;
    
        @Value("${rabbit.ttl.x_exchange}")
        private String xexchange;
    
    
        @Value("${rabbit.ttl.y_dead_queue_d}")
        private String deadQueued;
    
        @Value("${rabbit.ttl.y_dead_exchange}")
        private String ydeadExchange;
    
    
        @Value("${rabbit.ttl.queue_c}")
        private String queuec;
    
    
        // 定义队列和 交换机,并进行绑定
    
    
        @Bean(value = "direct_exchange")
        DirectExchange directExchange() {
            return new DirectExchange(xexchange);
        }
    
    
        @Bean(value = "direct_queuea")
        public Queue queuea() {
            //return new Queue(queuea);
    
            Map<String, Object> args = new HashMap<>();
    
            args.put("x-dead-letter-exchange", ydeadExchange);
    
            args.put("x-dead-letter-routing-key", "YD");
    
            // 声明 ttl  10s
            args.put("x-message-ttl", 10000);
    
            return QueueBuilder.durable(queuea).withArguments(args).build();
        }
    
    
        //进行绑定
        @Bean
        Binding bindingDirectExchangea(@Qualifier("direct_queuea") Queue queue,
                                       @Qualifier("direct_exchange") DirectExchange directExchange) {
            return BindingBuilder.bind(queue).to(directExchange).with("XA");
        }
    
    
        @Bean(value = "direct_queueb")
        public Queue queueb() {
            // return new Queue(queueb);
    
            Map<String, Object> args = new HashMap<>();
    
            args.put("x-dead-letter-exchange", ydeadExchange);
    
            args.put("x-dead-letter-routing-key", "YD");
    
            // 声明 ttl  40s
            args.put("x-message-ttl", 40000);
    
            return QueueBuilder.durable(queueb).withArguments(args).build();
        }
    
    
        @Bean
        Binding bindingDirectExchangeb(@Qualifier("direct_queueb") Queue queue,
                                       @Qualifier("direct_exchange") DirectExchange directExchange) {
            return BindingBuilder.bind(queue).to(directExchange).with("XB");
        }
    
    
        @Bean(value = "direct_dead_exchange")
        DirectExchange directDeadExchange() {
            return new DirectExchange(ydeadExchange);
        }
    
    
        @Bean(value = "direct_dead_queued")
        public Queue deadQueued() {
            return new Queue(deadQueued);
        }
    
        // 进行绑定
        @Bean
        Binding bindingDeadDirectExchange(@Qualifier("direct_dead_queued") Queue queue,
                                          @Qualifier("direct_dead_exchange") DirectExchange directExchange) {
            return BindingBuilder.bind(queue).to(directExchange).with("YD");
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116

    死信消费者 TtlConsumer

    @Component
    @Slf4j
    public class TtlConsumer {
    
        /**
         * 死信队列
         */
        @RabbitListener(queues = {"${rabbit.ttl.y_dead_queue_d}"})
        public void deadQueueListener(String message) {
            log.info("死信队列获取消息:" + message);
            // 执行具体的业务操作
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    生产者 TtlProduer

    @RestController
    @Slf4j
    public class TtlProduer {
    
        @Value("${rabbit.ttl.x_exchange}")
        private String xexchange;
    
    
        @Value("${rabbit.ttl.delayed_exchange}")
        private String deadExchange;
    
        @Value("${rabbit.ttl.delayed_routing_key}")
        private String deadRoutingKey;
    
    
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        @GetMapping("/sendMessage/{message}")
        public String sendMessage(@PathVariable("message") String message) {
            log.info("接收到消息:{}", message);
            // 发送到 A 队列
            rabbitTemplate.convertAndSend(xexchange, "XA", "消息来自ttl 10s 的队列消息:" + message);
            //发送到B队列
            rabbitTemplate.convertAndSend(xexchange, "XB", "消息来自ttl 40s 的队列消息:" + message);
            return message;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    验证

    输入网址, 发送消息

    http://localhost:8088/Server/sendMessage/hello,RabbitMQ

    image.png

    死信队列 A 10s 接到消息, 队列B 40s 后接收到消息。

    第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息,
    然后被消费掉,这样一个延时队列就打造完成了。
    不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S
    两个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然 后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?

    延时队列优化

    在这里新增了一个队列QC,绑定关系如下,该队列不设置 TTL 时间

    image.png

    ttl 配置QC 队列 TtlConfig

        // 声明一个队列 C, 没有过期时间的。
    
    
        @Bean(value = "direct_queuec")
        public Queue queuec() {
            // return new Queue(queueb);
    
            Map<String, Object> args = new HashMap<>();
    
            args.put("x-dead-letter-exchange", ydeadExchange);
    
            args.put("x-dead-letter-routing-key", "YD");
    
    
            return QueueBuilder.durable(queuec).withArguments(args).build();
        }
    
    
        @Bean
        Binding bindingDirectExchangec(@Qualifier("direct_queuec") Queue queue,
                                       @Qualifier("direct_exchange") DirectExchange directExchange) {
            return BindingBuilder.bind(queue).to(directExchange).with("XC");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    死信消费者 TtlConsumer 保持不变

    生产者 TtlProducer

    时间由前端传入, 设置时间往 C 队列里面放置。

        @GetMapping("/sendMessage/{message}/{ttl}")
        public String sendMessageWithTtl(@PathVariable("message") String message, @PathVariable("ttl") String ttl) {
            log.info("接收到消息:{}, 发送的时长 是{} ms", message, ttl);
            //
    //        // 发送到 A 队列
    //        rabbitTemplate.convertAndSend(xexchange,"XA","消息来自ttl 10s 的队列消息:"+message);
    //        //发送到B队列
    //        rabbitTemplate.convertAndSend(xexchange,"XB","消息来自ttl 40s 的队列消息:"+message);
    
    
            // 发送给 C 队列
            rabbitTemplate.convertAndSend(xexchange, "XC", message, correlationData -> {
                correlationData.getMessageProperties().setExpiration(ttl);
                return correlationData;
            });
            return message;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    验证

    发送消息,携带着时间:

    http://localhost:8088/Server/sendMessage/sendA5s/5000

    http://localhost:8088/Server/sendMessage/sendB20s/20000

    image.png

    看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消
    息可能并不会按时“死亡“,因为 RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。

    如,先执行发送 20s 的,再执行发送 5s的

    http://localhost:8088/Server/sendMessage/sendB20s/20000
    http://localhost:8088/Server/sendMessage/sendA5s/5000

    image.png

    消息消费时有顺序了。

    Rabbitmq 插件实现延迟队列

    上文中提到的问题,确实是一个问题,如果不能实现在消息粒度上的 TTL,并使其在设置的 TTL 时间
    及时死亡,就无法设计成一个通用的延时队列。那如何解决呢,接下来我们就去解决该问题

    在官网上下载 https://www.rabbitmq.com/community-plugins.html
    下载 rabbitmq_delayed_message_exchange 插件,然后解压放置到 RabbitMQ的插件目录。
    进入 RabbitMQ的安装目录下的 plgins 目录,执行下面命令让该插件生效,然后重启 RabbitMQ

    rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    
    • 1

    image.png

    我本地可能是由于版本的原因,一直不成功

    D:\job\RabbitMQ Server\rabbitmq_server-3.10.11\sbin>rabbitmq-plugins.bat enable rabbitmq_delayed_message_exchange
    
    • 1

    image.png

    image.png

    虽然有这个 x-delayed-message

    架构优化

    image.png

    在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制 消息传递后并
    不会立即投递到目标队列中,而是存储在mnesia(一个分布式数据系统)表中,当达到投递时间时,才 投递到目标队列中

    配置 DelayedQueueConfig

    package com.yjl.amqp.config.ttl;
    
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.stereotype.Component;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * 延迟队列处理
     *
     * @author yuejianli
     * @date 2022-11-23
     */
    @Component
    public class DelayedQueueConfig {
        @Value("${rabbit.ttl.delayed_queue}")
        private String deadQueued;
    
        @Value("${rabbit.ttl.delayed_exchange}")
        private String deadExchange;
    
        @Value("${rabbit.ttl.delayed_routing_key}")
        private String deadRoutingKey;
    
    
        // 自定义交换机, 定义延迟交换机
        @Bean("delayedExchange")
        public CustomExchange delayedExchange() {
            Map<String, Object> args = new HashMap<>();
            // 自定义交换机的类型
            args.put("x-delayed-type", "direct");
            return new CustomExchange(deadExchange, "x-delayed-message", true, false, args);
        }
    
        @Bean("delayedQueue")
        public Queue delayedQueue() {
            return new Queue(deadQueued);
        }
    
    
        //进行绑定
        @Bean
        Binding bindingDelayedExchange(@Qualifier("delayedQueue") Queue queue,
                                       @Qualifier("delayedExchange") CustomExchange customExchange) {
            return BindingBuilder.bind(queue).to(customExchange).with(deadRoutingKey).noargs();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    延迟消费者扩展 TtlConsumer

        /**
         * 延迟队列
         */
        @RabbitListener(queues = {"${rabbit.ttl.delayed_queue}"})
        public void delayedQueueListener(String message) {
            log.info("延迟队列获取消息:" + message);
            // 执行具体的业务操作
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    生产者 TtlProducer

     @GetMapping("/sendDelayMessage/{message}/{ttl}")
        public String sendDelayMessageWithTtl(@PathVariable("message") String message, @PathVariable("ttl") String ttl) {
            log.info("接收到消息:{}, 发送的时长 是{} ms", message, ttl);
    
            // 发送给 C 队列
            rabbitTemplate.convertAndSend(deadExchange, deadRoutingKey, message, correlationData -> {
                correlationData.getMessageProperties().setExpiration(ttl);
                return correlationData;
            });
            return message;
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    验证

    http://localhost:8088/Server/sendDelayMessage/sendB20s/20000
    http://localhost:8088/Server/sendDelayMessage/sendA5s/5000

    我本地是不行, 有发现问题的,可以告诉我。
    image.png

    想要的效果是这样:

    image.png

    第二个消息被先消费掉了,符合预期

    延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ来实现延时队列可以很好的利用
    RabbitMQ的特性,如:
    消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正 确处理的消息不会被丢弃。
    另外,通过 RabbitMQ集群的特性,可以很好的解决单点故障问题,
    不会因为 单个节点挂掉导致延时队列不可用或者消息丢失。
    当然,延时队列还有很多其它选择,
    比如利用 Java 的DelayQueue,利用 Redis 的 zset,
    利用 Quartz 或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景

  • 相关阅读:
    【计算机网络】http协议
    仿射变换矩阵
    PG学习笔记(PostgreSQL)
    一些洛谷题单里的题目
    mac下mysql 常用命令
    使用personal token将hexo本地文件推到GitHub
    电压放大器的应用范围有哪些
    CTF-Crypto学习记录-第四天 “ “ --- SHA1安全散列算法,实现原理。
    基于php的校园零食网站
    从您输入网站 URL 到其在屏幕上完成加载的整个过程
  • 原文地址:https://blog.csdn.net/yjltx1234csdn/article/details/128170900