• rabbitMQ学习-延迟队列


    延迟队列

    延迟队列,对列内部是有序的,最重要的特性就是体现在他的延时属性上,延时队列中的元素时希望在指定时间到了之后或者之前取出和处理,简单的来说,延迟队列就是用来存放需要在指定时间被处理的元素的队列。

    延迟队列的使用场景

    1. 订单在十分钟内未支付则自动取消
    2. 新创建的店铺,如果在十天内没有上传商品,则自动发送消息提醒
    3. 用户注册成功后,如果没有在三天内登录则发送短信提醒消息
    4. 用户发起退款,如果在三天内没有得到处理则通知相关运营人员。
    5. 预定会员后,需要在预定的时间点前十分钟通知各个与会员人员参加会议。

    延迟队列整合SpringBoot

    
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0modelVersion>
        <groupId>com.xhgroupId>
        <artifactId>SpringBoot_RabbitMQartifactId>
        <version>0.0.1-SNAPSHOTversion>
        <name>SpringBoot_RabbitMQname>
        <description>SpringBoot_RabbitMQdescription>
    
        <properties>
            <java.version>1.8java.version>
            <project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8project.reporting.outputEncoding>
            <spring-boot.version>2.3.7.RELEASEspring-boot.version>
        properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starterartifactId>
            dependency>
    
            
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-amqpartifactId>
                <version>2.1.8.RELEASEversion>
            dependency>
    
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-webartifactId>
            dependency>
    
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-testartifactId>
                <scope>testscope>
            dependency>
    
            <dependency>
                <groupId>com.alibabagroupId>
                <artifactId>fastjsonartifactId>
                <version>1.2.83version>
            dependency>
            <dependency>
                <groupId>org.projectlombokgroupId>
                <artifactId>lombokartifactId>
                <version>1.18.24version>
            dependency>
    
            
            <dependency>
                <groupId>io.springfoxgroupId>
                <artifactId>springfox-swagger2artifactId>
                <version>2.9.2version>
            dependency>
            <dependency>
                <groupId>io.springfoxgroupId>
                <artifactId>springfox-swagger-uiartifactId>
                <version>2.9.2version>
            dependency>
    
            
            <dependency>
                <groupId>org.springframework.amqpgroupId>
                <artifactId>spring-rabbit-testartifactId>
                <scope>testscope>
            dependency>
    
        dependencies>
    
        <dependencyManagement>
    
            <dependencies>
                <dependency>
                    <groupId>org.springframework.bootgroupId>
                    <artifactId>spring-boot-dependenciesartifactId>
                    <version>${spring-boot.version}version>
                    <type>pomtype>
                    <scope>importscope>
                dependency>
            dependencies>
        dependencyManagement>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.pluginsgroupId>
                    <artifactId>maven-compiler-pluginartifactId>
                    <version>3.8.1version>
                    <configuration>
                        <source>1.8source>
                        <target>1.8target>
                        <encoding>UTF-8encoding>
                    configuration>
                plugin>
                <plugin>
                    <groupId>org.springframework.bootgroupId>
                    <artifactId>spring-boot-maven-pluginartifactId>
                    <version>2.3.7.RELEASEversion>
                    <configuration>
                        <mainClass>com.xh.springBoot.SpringBootRabbitMqApplicationmainClass>
                    configuration>
                    <executions>
                        <execution>
                            <id>repackageid>
                            <goals>
                                <goal>repackagegoal>
                            goals>
                        execution>
                    executions>
                plugin>
            plugins>
        build>
    
    project>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    @Configuration
    @EnableSwagger2
    public class SwaggerConfig {
        @Bean
        public Docket webApiConfig(){
            return new Docket(DocumentationType.SWAGGER_2)
                    .groupName("webApi")
                    .apiInfo(webApiInfo())
                    .select()
                    .build();
        }
        private ApiInfo webApiInfo(){
            return  new ApiInfoBuilder()
                    .title("rabbitMQ接口文档")
                    .description("描述Rabbit微服务接口定义")
                    .version("1.0")
                    .contact(new Contact("我喜欢","127.0.0.1","2844****670@qq.com"))
                    .build();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    队列TTL
    创建两个队列QA和QB,两者队列TTL分被是10s,40s,然后在创建一个交换机X和死信交换机Y,他们的类型都是direct,创建一个死信队列QD,他们的绑定关系如下:
    在这里插入图片描述
    代码文件

    /*
    TTL 队列  配置文件类代码
     */
    @Configuration
    public class TtlQueueConfig {
        //普通的交换机名称
        public static final String X_EXCHANGE = "X";
        //死信交换机的名称
        public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
        //普通队列的名称
        public static final String A_QUEUE = "QA";
        public static final String B_QUEUE = "QB";
        //死信队列的名称
        public static final String Y_DEAD_LETTER_QUEUE = "QD";
    
        //声明XCHANGE
        @Bean("xExchange")  //起别名
        public DirectExchange xExchange(){
            return new DirectExchange(X_EXCHANGE);
        }
        //声明yExchange 别名
        @Bean("yExchange")
        public DirectExchange yExchange(){
            return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
        }
    
        //声明队列
        @Bean("queueA")
        public Queue queueA(){
            Map<String,Object> map  =new HashMap<>();
            //设置死信交换机
            map.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
            //设置死信RoutingKey
            map.put("x-dead-letter-routing-key","YD");
            //设置TTL单位是ms  什么时候消息成为死信,10秒钟后
            map.put("x-message-ttl",10000);
    
            return QueueBuilder.durable(A_QUEUE).withArguments(map).build();
        }
    
        //声明队列
        @Bean("queueB")
        public Queue queueB(){
            Map<String,Object> map  =new HashMap<>();
            //设置死信交换机
            map.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
            //设置死信RoutingKey
            map.put("x-dead-letter-routing-key","YD");
            //设置TTL单位是ms  什么时候消息成为死信,40秒钟后
            map.put("x-message-ttl",40000);
    
            return QueueBuilder.durable(B_QUEUE).withArguments(map).build();
        }
    
        //死信队列
        @Bean("queueD")
        public  Queue QueueD(){
            return QueueBuilder.durable(Y_DEAD_LETTER_QUEUE).build();
        }
    
        //绑定
        @Bean
        public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
                                      @Qualifier("xExchange") DirectExchange xExchange){
            return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    
        }
        //绑定
        @Bean
        public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
                                      @Qualifier("xExchange") DirectExchange xExchange){
            return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    
        }
        //绑定
        @Bean
        public Binding queueDBindingX(@Qualifier("queueD") Queue queueD,
                                      @Qualifier("yExchange") DirectExchange yExchange){
            return BindingBuilder.bind(queueD).to(yExchange).with("YD");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    /*
    发送延迟消息
    http://地址/ttl/sendMessage/子非吾喵
     */
    @Slf4j //打印日志
    @RestController
    @RequestMapping("/ttl")
    public class SellMessageController {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        //开始发送消息
        @GetMapping("/sendMessage/{message}")
        //注意@PathVariable 可以自定义传值到url中,前提示变量名必须和url中的一样
        public void sendMessage(@PathVariable String message){
            //后边的语句参数会替换{},这由程序员控制
            log.info("当前时间:{},发送一条信息给两个TTL队列:{}",new Date().toString(),message);
            rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10秒的消息队列:" + message);
            rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40秒的消息队列:" + message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    http://127.0.0.1:8080/ttl/sendMessage/hello1

    http://127.0.0.1:8080/ttl/sendMessage/hello2
    在这里插入图片描述
    上述存在一个不足之处,就是我们每次使用的时候,都需要增加一个新的时间需求,就需要新增一个队列,这里只是用了10s和40s两个时间选项,但万一我要一个小时呢,难不成还又要添加,这样太麻烦了。

    延迟队列优化

    在这里插入图片描述
    使用QC来设置时间,用户自定义时间。

    具体代码如下

    TtlQueueConfig.java中添加

    public static final String C_QUEUE = "QC";
    
        //声明QC
        @Bean("queueC")
        public Queue queueC(){
            Map<String,Object> map = new HashMap<>(3);
            //设置死信交换机
            map.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
            //设置死信RoutingKey
            map.put("x-dead-letter-routing-key","YD");
            //TTL设置时长  ms (这里不写,又用户自己定义)
            return QueueBuilder.durable(C_QUEUE).withArguments(map).build();
    
        }
        //绑定
        @Bean
        public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,
                                      @Qualifier("xExchange") DirectExchange xExchange){
            return BindingBuilder.bind(queueC).to(xExchange).with("XC");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    SellMessageController.java

    //开始发送消息 消息 TTL
        @GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
        public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){
            log.info("当前时间:{},发送一条时长{}毫秒TTL信息给队列QC:{}",
                    new Date().toString(),ttlTime,message);
            rabbitTemplate.convertAndSend("X","XC",message,msg -> {
                //发送消息的时候,延迟时长
                msg.getMessageProperties().setExpiration(ttlTime);
                return msg;
            } );
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    注意我在写这个代码的时候,刚刚开始是敲错了一个字母,导致运行时生成的队列,然后调错的时候,需要生成的队列删除,然后重新创建,可以解决问题。。
    在这里插入图片描述

    http://127.0.0.1:8080/ttl/sendExpirationMsg/hello1/2000
    http://127.0.0.1:8080/ttl/sendExpirationMsg/hello2/20000

    死信在做延迟的一个巨大缺陷,消息不会按时“死亡”,由于RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延迟时长很短,则第二个消息并不会优先得到执行。

    这里出现一个问题,当出现两条消息的时候,我们发现,他居然是按照谁先来的,哪怕你时间短,但是你比另一个队列慢一步,你就只能老老实实排队了。

    在这里插入图片描述

    RabbitMQ插件实现延迟队列

    这个就是解决上边的问题的,我们使用插件解决。

    在官网上下载: https://www/rabbitmq.com/community-plugins.html,下载

    rabbitmq_delayed_message_exchange-3.8.0

    拷贝到

    /usr/lib/rabbitmq/lib/rabbitmq_server-版本号/plugs

    安装指令

    rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    
    • 1

    然后在linux系统下解压

    然后安装rabbitmq-plugins enable rabbitmq_delayed_message_exchange

    之后重启

    systemctl restart rabbitmq-server(这个是你安装时候的名字)
    
    • 1

    在这里插入图片描述
    这个时候不在是队列,而是交换机了。

    没插件的时候

    在这里插入图片描述
    基于插件的
    在这里插入图片描述

    解决办法:

    https://blog.csdn.net/DZP_dream/article/details/118391439
    docker search rabbitmq
    安装拉取容器:

    docker run -dit --name rabbitmq -e RABBITMQ_DEFAULT_USER=guest -e
    启动rabbitmq
    RABBITMQ_DEFAULT_PASS=guest -p 15672:15672 -p 5672:5672 rabbitmq:management

    https://www.rabbitmq.com/community-plugins.html 下载插件

    负责插件到容器中
    docker cp rabbitmq_delayed_message_exchange-3.8.0.ez rabbitmq:/plugins

    [root@local rabbitmq]# docker exec -it rabbitmq /bin/bash                                     
    root@3bb56f68570b:/# rabbitmq-plugins list
    
    • 1
    • 2

    启动插件

    root@3bb56f68570b:/# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    
    • 1

    设置延迟队列配置类
    DelayedConfig

    /*
    延迟队列插件
     */
    @Configuration //实例化
    public class DelayedQueueConfig {
    
        //队列 //
        public static  final  String DELAYED_QUEUE_NAME = "delayed.queue";
        //交换机
        public static final  String DELAYED_EXCHANGE_NAME = "delayed.exchange";
        //routingKey
        public  static  final  String DELAYED_ROUTING_KEY = "delayed.routing";
    
        @Bean
        public Queue delayedQueue(){
            return  new Queue(DELAYED_QUEUE_NAME);
        }
    
        //声明交换机  基于插件的交换机就是这么定义的
        @Bean
        //自定义交换机 由于是不存在rabbit里边的交换机,也就是我们使用的延迟队列插件
        public CustomExchange delayedExchange(){
            Map<String,Object> map = new HashMap<>();
            map.put("x-delayed-type","direct");
            /*
            1.交换机名称
            2.交换机类型
            3.是否需要持久化
            4.是否需要自动删除
            5.其他参数
             */
            return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",
                    true,false,map);
        }
    
        //绑定
        @Bean
        public Binding delayedQueueBingdingDelayedExchange(
                @Qualifier("delayedQueue") Queue delayedQueue,
                @Qualifier("delayedExchange") CustomExchange delayedExchange
        ){
            //绑定 将队列和交换机直接进行绑定
            return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    生产者代码

     /*
         * @Title  发送消息  基于插件的 消息以及 延迟的时间
         * @Description 延迟队列的插件
         * @author  罗小黑
         * @date 2022/11/6 11:40
         */
        @GetMapping("/sendDelayMsg/{message}/{delayTime}")
        public void sendDelayedMsg(@PathVariable String message,@PathVariable Integer delayedTime){
            log.info("当前时间:{},发送一条时长{}毫秒信息给延迟队列delayedTime:{}",
                    new Date().toString(),delayedTime,message);
            rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME,
                    DelayedQueueConfig.DELAYED_ROUTING_KEY,message, msg -> {
                //发送消息的时候,延迟时长  单位ms
                msg.getMessageProperties().setDelay(delayedTime);
                return msg;
            } );
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    消费者代码

    
    /*
    延迟队列消费者
     */
    @Component
    @Slf4j
    public class DelayedQueueConsumer {
            /*
            基于插件的延迟队列
             */
    
        //监听消息
        @RabbitListener(queues= DelayedQueueConfig.DELAYED_QUEUE_NAME)
        public void receiveDelayQueue(Message message){
            String msg = new String(message.getBody());
            log.info("当前时间:{},收到延迟队列的消息:{}",new Date().toString(),msg);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    http://127.0.0.1:8080/ttl/sendDelayMsg/come on bady1/20000
    http://127.0.0.1:8080/ttl/sendDelayMsg/come on bady1/2000
    在这里插入图片描述
    第二个消息被先消费掉了,符合预期。

    总结:

    延迟队列在需要延时的处理的场景下非常有用,使用rabbitmq来实现延迟队列可以很好的利用rabbitmq的特性,如消息的可靠发送,消息可靠投递,死信队列来保证消息至少被消费一次已经未被正常处理的消息不会被丢弃,另外,通过rabbitMQ集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延迟队列不可用或者信息丢失

    当然,延迟队列还有很多其他选择,比如利用java的delayQueue,利用Redis的zset,利用Quartz或者利用kafka的时间轮,这些方式各有各的特点,看需要的适用的场景。

  • 相关阅读:
    VirtualBox+Vagrant安装虚拟机
    [附源码]java毕业设计网上招聘系统
    【21天Python进阶学习挑战赛】[day8]操作MySQL和SqlServer
    【Gradio-Windows-Linux】解决share=True无法创建共享链接,缺少frpc_windows_amd64_v0.2
    汇编语言快速入门(非常详细)
    html语音播报功能问题
    Unity --- 网格链接与动态障碍物
    再看const成员函数
    独立站引流新玩法
    7.1.3 Selenium的用法2
  • 原文地址:https://blog.csdn.net/qq_45922256/article/details/127797386