• RabbitMQ延迟队列


    目录

    💌 介绍

    💒 使用场景

    🏳‍🌈 模拟案例

    📕 准备工作

    🏴 写法一(死信队列TTL)

     RabbitMQ配置文件

     生产者

    消费者

    测试

    🏴 写法二 (死信队列TTL)

     RabbitMQ配置文件

    生产者

    消费者

    测试

    🚩 写法三 (插件版本-推荐)

    插件安装

    RabbitMQ配置文件

    生产者

    消费者

    测试

    👍 延迟队列方法推荐 


    💌 介绍

    顾名思义:首先它要具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费。

    💒 使用场景

    • 预支付订单创建成功后,30分钟后还没有支付,自动取消订单,修改订单状态
    • 用户注册成功后,如果3天没有登录则进行短信提醒
    • 优惠券过期前发送短信进行提醒
    • ....

    以上场景都可以用延时队列来完成


    🏳‍🌈 模拟案例

    需求:生产者发布消息,10秒、60秒后消费者拿到消息进行消费

    📕 准备工作

    导入RabbitMQ依赖

    1. org.springframework.boot
    2. spring-boot-starter-amqp

     配置RabbitMQ连接相关信息

    1. #MySQL
    2. spring:
    3. rabbitmq:
    4. host: 127.0.0.1
    5. port: 5672
    6. username: xxxx
    7. password: xxx
    8. server:
    9. port: 8087

    🏴 写法一(死信队列TTL)

    生产者生产消息——>到交换机分发给对应的队列(A10秒过期,B60秒过期)——>过期后到死信交换机——>消费者进行消费(执行顺序如下图)

     RabbitMQ配置文件

    1. import org.springframework.amqp.core.*;
    2. import org.springframework.beans.factory.annotation.Qualifier;
    3. import org.springframework.context.annotation.Bean;
    4. import org.springframework.context.annotation.Configuration;
    5. import java.util.HashMap;
    6. /**
    7. * @author 小影
    8. * @create: 2022/8/18 10:26
    9. * @describe:mq配置 如示例图配置:2交换机、4队列、4路由key
    10. */
    11. @Configuration
    12. public class RabbitMQConfiguration {
    13. // 延迟交换机
    14. public static final String DELAY_EXCHANGE_NAME = "delay.exchange";
    15. // 延迟队列
    16. public static final String DELAY_QUEUE_NAME_A = "delay.queue.a";
    17. public static final String DELAY_QUEUE_NAME_B = "delay.queue.b";
    18. // 延迟队列路由key
    19. public static final String DELAY_QUEUE_ROUTING_KEY_A = "delay.routingKey.a";
    20. public static final String DELAY_QUEUE_ROUTING_KEY_B = "delay.routingKey.b";
    21. // 死信交换机
    22. public static final String DEAD_LETTER_EXCHANGE_NAME = "dead.letter.exchange";
    23. // 死信队列
    24. public static final String DEAD_LETTER_QUEUE_NAME_A = "dead.letter.queue.a";
    25. public static final String DEAD_LETTER_QUEUE_NAME_B = "dead.letter.queue.b";
    26. // 私信队列路由key
    27. public static final String DEAD_LETTER_ROUTING_KEY_A = "dead.letter.delay_10s.routingkey.a";
    28. public static final String DEAD_LETTER_ROUTING_KEY_B = "dead.letter.delay_60s.routingkey.b";
    29. // 声明延迟交换机
    30. @Bean("delayExchange")
    31. public DirectExchange delayExchange() {
    32. return new DirectExchange(DELAY_EXCHANGE_NAME);
    33. }
    34. // 声明死信交换机
    35. @Bean("deadLetterExchange")
    36. public DirectExchange deadLetterExchange() {
    37. return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);
    38. }
    39. // 声明延迟队列A,延迟10s,并且绑定到对应的死信交换机
    40. @Bean("delayQueueA")
    41. public Queue delayQueueA() {
    42. HashMap args = new HashMap<>();
    43. // 声明队列绑定的死信交换机
    44. args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);
    45. // 声明队列的属性路由key
    46. args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_A);
    47. // 声明队列的消息TTL存活时间
    48. args.put("x-message-ttl", 10000);
    49. return QueueBuilder.durable(DELAY_QUEUE_NAME_A).withArguments(args).build();
    50. }
    51. // 声明延迟队列B,延迟60s,并且绑定到对应的死信交换机
    52. @Bean("delayQueueB")
    53. public Queue delayQueueB() {
    54. HashMap args = new HashMap<>();
    55. // 声明队列绑定的死信交换机
    56. args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);
    57. // 声明队列的属性路由key
    58. args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_B);
    59. // 声明队列的消息TTL存活时间
    60. args.put("x-message-ttl", 60000);
    61. return QueueBuilder.durable(DELAY_QUEUE_NAME_B).withArguments(args).build();
    62. }
    63. // 声明死信队列A,用于接收延迟10S的消息
    64. @Bean("deadLetterQueueA")
    65. public Queue deadLetterQueueA() {
    66. return new Queue(DEAD_LETTER_QUEUE_NAME_A);
    67. }
    68. // 声明死信队列B,用于接收延迟60S的消息
    69. @Bean("deadLetterQueueB")
    70. public Queue deadLetterQueueB() {
    71. return new Queue(DEAD_LETTER_QUEUE_NAME_B);
    72. }
    73. // 设置延迟队列A的绑定关系
    74. @Bean
    75. public Binding delayBindingA(@Qualifier("delayQueueA") Queue queue,
    76. @Qualifier("delayExchange") DirectExchange exchange) {
    77. return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY_A);
    78. }
    79. // 设置延迟队列B的绑定关系
    80. @Bean
    81. public Binding delayBindingB(@Qualifier("delayQueueB") Queue queue,
    82. @Qualifier("delayExchange") DirectExchange exchange) {
    83. return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY_B);
    84. }
    85. // 设置死信队列A的绑定关系
    86. @Bean
    87. public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
    88. @Qualifier("deadLetterExchange") DirectExchange exchange) {
    89. return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_ROUTING_KEY_A);
    90. }
    91. // 设置死信队列B的绑定关系
    92. @Bean
    93. public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue,
    94. @Qualifier("deadLetterExchange") DirectExchange exchange) {
    95. return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_ROUTING_KEY_B);
    96. }
    97. }

    此配置文件的代码关系图如下

     生产者

    1. import static com.ying.demo.config.RabbitMQConfiguration.DELAY_EXCHANGE_NAME;
    2. import static com.ying.demo.config.RabbitMQConfiguration.DELAY_QUEUE_ROUTING_KEY_A;
    3. import static com.ying.demo.config.RabbitMQConfiguration.DELAY_QUEUE_ROUTING_KEY_B;
    4. /**
    5. * @author 小影
    6. * @create: 2022/8/18 11:13
    7. * @describe:延迟消息生产者
    8. */
    9. @Component
    10. public class DelayMessageProducer {
    11. @Resource
    12. private RabbitTemplate rabbitTemplate;
    13. public void send(String message,int type) {
    14. switch (type){
    15. case 1: // 10s的消息
    16. // param:队列名称、路由key、消息
    17. rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUE_ROUTING_KEY_A, message);
    18. break;
    19. case 2:// 60s的消息
    20. rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUE_ROUTING_KEY_B, message);
    21. break;
    22. }
    23. }
    24. }

    消费者

    1. import com.rabbitmq.client.Channel;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.amqp.core.Message;
    4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    5. import org.springframework.stereotype.Component;
    6. import java.time.LocalDateTime;
    7. import static com.ying.demo.config.RabbitMQConfiguration.DEAD_LETTER_QUEUE_NAME_A;
    8. import static com.ying.demo.config.RabbitMQConfiguration.DEAD_LETTER_QUEUE_NAME_B;
    9. /**
    10. * @author 小影
    11. * @create: 2022/8/18 11:19
    12. * @describe:死信消费者
    13. */
    14. @Slf4j
    15. @Component
    16. public class DeadLetterQueueConsumer {
    17. /**
    18. * 监听私信队列A
    19. * @param message
    20. * @param channel 作手动回执、确认
    21. */
    22. @RabbitListener(queues = DEAD_LETTER_QUEUE_NAME_A)
    23. public void receiveA(Message message, Channel channel) {
    24. String msg = new String(message.getBody());
    25. log.info("当前时间:{},死信队列A收到消息:{}", LocalDateTime.now(),msg);
    26. }
    27. /**
    28. * 监听私信队列B
    29. * @param message
    30. * @param channel 作手动回执、确认
    31. */
    32. @RabbitListener(queues = DEAD_LETTER_QUEUE_NAME_B)
    33. public void receiveB(Message message, Channel channel) {
    34. String msg = new String(message.getBody());
    35. log.info("当前时间:{},死信队列B收到消息:{}", LocalDateTime.now(),msg);
    36. }
    37. }

    测试

    1. @Slf4j
    2. @RestController
    3. @RequestMapping("rabbitmq")
    4. public class RabbitMqController {
    5. @Resource
    6. private DelayMessageProducer producer;
    7. @GetMapping("send")
    8. public void send(String message, Integer type) {
    9. log.info("当前时间:{},消息:{},延迟类型:{}", LocalDateTime.now(), message, Objects.requireNonNull(type));
    10. producer.send(message, type);
    11. }
    12. }

    分别请求

    http://localhost:8089/rabbitmq/send?message=我是10秒&type=1

    http://localhost:8089/rabbitmq/send?message=我是60秒&type=2

    如果出现异常:Channel shutdown: channel error; protocol method:#method(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'delay.exchange' in vhost '/': received ''x-delayed-message'' but current is 'direct', class-id=40, method-id=10

    可能是mq已经存在交换机了先去删掉

    弊端:后期要扩展其他不同延时的时间,就需要增加延时的配置,非常麻烦


    🏴 写法二 (死信队列TTL)

    生产者生产消息(并设置过期时间)——>到交换机分发给延迟队列——>过期后到死信交换机——>消费者进行消费(执行顺序如下图)

     RabbitMQ配置文件

    1. import org.springframework.amqp.core.*;
    2. import org.springframework.beans.factory.annotation.Qualifier;
    3. import org.springframework.context.annotation.Bean;
    4. import org.springframework.context.annotation.Configuration;
    5. import java.util.HashMap;
    6. /**
    7. * @author 小影
    8. * @create: 2022/8/18 10:26
    9. * @describe:mq配置 如示例图配置:2交换机、2队列、2路由key
    10. */
    11. @Configuration
    12. public class RabbitMQConfiguration {
    13. // 延迟交换机
    14. public static final String DELAY_EXCHANGE_NAME = "delay.exchange";
    15. // 延迟队列
    16. public static final String DELAY_QUEUE_NAME = "delay.queue";
    17. // 延迟队列路由key
    18. public static final String DELAY_QUEUE_ROUTING_KEY = "delay.routingKey";
    19. // 死信交换机
    20. public static final String DEAD_LETTER_EXCHANGE_NAME = "dead.letter.exchange";
    21. // 死信队列
    22. public static final String DEAD_LETTER_QUEUE_NAME = "dead.letter.queue";
    23. // 私信队列路由key
    24. public static final String DEAD_LETTER_ROUTING_KEY = "dead.letter.routingkey";
    25. // 声明延迟交换机
    26. @Bean("delayExchange")
    27. public DirectExchange delayExchange() {
    28. return new DirectExchange(DELAY_EXCHANGE_NAME);
    29. }
    30. // 声明死信交换机
    31. @Bean("deadLetterExchange")
    32. public DirectExchange deadLetterExchange() {
    33. return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);
    34. }
    35. // 声明延迟队列,不设置存活时间,并且绑定到对应的死信交换机
    36. @Bean("delayQueue")
    37. public Queue delayQueue() {
    38. HashMap args = new HashMap<>();
    39. // 声明队列绑定的死信交换机
    40. args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);
    41. // 声明队列的属性路由key
    42. args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
    43. return QueueBuilder.durable(DELAY_QUEUE_NAME).withArguments(args).build();
    44. }
    45. // 声明死信队列
    46. @Bean("deadLetterQueue")
    47. public Queue deadLetterQueue() {
    48. return new Queue(DEAD_LETTER_QUEUE_NAME);
    49. }
    50. // 设置延迟队列的绑定关系
    51. @Bean
    52. public Binding delayBinding(@Qualifier("delayQueue") Queue queue,
    53. @Qualifier("delayExchange") DirectExchange exchange) {
    54. return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY);
    55. }
    56. // 设置死信队列的绑定关系
    57. @Bean
    58. public Binding deadLetterBinding(@Qualifier("deadLetterQueue") Queue queue,
    59. @Qualifier("deadLetterExchange") DirectExchange exchange) {
    60. return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_ROUTING_KEY);
    61. }
    62. }

    生产者

    1. import static com.ying.demo.config.RabbitMQConfiguration.DELAY_EXCHANGE_NAME;
    2. import static com.ying.demo.config.RabbitMQConfiguration.DELAY_QUEUE_ROUTING_KEY;
    3. /**
    4. * @author 小影
    5. * @create: 2022/8/18 11:13
    6. * @describe:延迟消息生产者
    7. */
    8. @Component
    9. public class DelayMessageProducer {
    10. @Resource
    11. private RabbitTemplate rabbitTemplate;
    12. /**
    13. *
    14. * @param message 消息
    15. * @param delayTime 存活时间
    16. */
    17. public void send(String message,String delayTime) {
    18. // param:延迟交换机,路由KEY,存活时间
    19. rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUE_ROUTING_KEY, message, msg -> {
    20. msg.getMessageProperties().setExpiration(delayTime);
    21. return msg;
    22. });
    23. }
    24. }

    消费者

    1. import com.rabbitmq.client.Channel;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.amqp.core.Message;
    4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    5. import org.springframework.stereotype.Component;
    6. import java.time.LocalDateTime;
    7. import static com.ying.demo.config.RabbitMQConfiguration.DEAD_LETTER_QUEUE_NAME;
    8. /**
    9. * @author 小影
    10. * @create: 2022/8/18 11:19
    11. * @describe:死信消费者
    12. */
    13. @Slf4j
    14. @Component
    15. public class DeadLetterQueueConsumer {
    16. /**
    17. * 监听私信队列A
    18. * @param message
    19. * @param channel 作手动回执、确认
    20. */
    21. @RabbitListener(queues = DEAD_LETTER_QUEUE_NAME)
    22. public void receiveA(Message message, Channel channel) {
    23. String msg = new String(message.getBody());
    24. log.info("当前时间:{},死信队列收到消息:{}", LocalDateTime.now(),msg);
    25. }
    26. }

    测试

    1. @Slf4j
    2. @RestController
    3. @RequestMapping("rabbitmq")
    4. public class RabbitMqController {
    5. @Resource
    6. private DelayMessageProducer producer;
    7. @GetMapping("send")
    8. public void send(String message, String delayTime) {
    9. log.info("当前时间:{},消息:{},存活时间:{}", LocalDateTime.now(), message, delayTime);
    10. producer.send(message, delayTime);
    11. }
    12. }

    分别请求

    http://localhost:8089/rabbitmq/send?message=我是60秒&delayTime=60000

    http://localhost:8089/rabbitmq/send?message=我是10秒&delayTime=10000

    弊端:由于是先进先出的,如果60秒进去了,10秒在进去,10秒结束了,他要等60秒结束,60秒出来10秒才能出来


    🚩 写法三 (插件版本-推荐)

    安装插件后会生成新的Exchange类型 x-delayed-message ,该类型消息支持延迟投递机制,接收消息后并未立即将消息投递至目标队列,而是存储在mnesia(一个分布式数据库)中,随后检测消息延迟时间,如达到投递时间讲其通过 x-delayed-type 类型标记的交换机投至目标队列。 

    插件安装

    1.进入mq官网社区插件:Community Plugins — RabbitMQ

    2.找到rabbitmq_delayed_message_exchange

     选择对应版本的ez文件下载

     Releases · rabbitmq/rabbitmq-delayed-message-exchange · GitHub

     

     注:我的MQ是通过yum安装的

     1.在系统中查看安装的rabbitmq

    rpm -qa |grep rabbitmq

     2.查询mq的安装的相关文件目录

    rpm -ql rabbitmq-server-3.10.7-1.el8.noarch

     翻到最下面发现mnesia的安装目录; mnesia=分布式数据库,看看就好

     然后把我们下载的ez安装包解压放到 /usr/lib/rabbitmq/lib/rabbitmq_server-3.10.7/plugins 里面

    3.重启RabbitMQ服务

    systemctl restart rabbitmq-server.service

    4.重启插件

    rabbitmq-plugins enable rabbitmq_delayed_message_exchange

     


    RabbitMQ配置文件

    1. /**
    2. * @author 小影
    3. * @create: 2022/8/18 10:26
    4. * @describe:mq配置 如示例图配置:1交换机、1队列、1路由key
    5. */
    6. @Configuration
    7. public class RabbitMQConfiguration {
    8. // 延迟交换机
    9. public static final String DELAY_EXCHANGE_NAME = "delay.exchange";
    10. // 延迟队列
    11. public static final String DELAY_QUEUE_NAME = "delay.queue";
    12. // 延迟队列路由key
    13. public static final String DELAY_QUEUE_ROUTING_KEY = "delay.routingKey";
    14. // 声明延迟交换机
    15. @Bean("delayExchange")
    16. public CustomExchange delayExchange() {
    17. HashMap args = new HashMap<>();
    18. args.put("x-delayed-type", "direct");
    19. return new CustomExchange(DELAY_EXCHANGE_NAME,"x-delayed-message",true,false,args);
    20. }
    21. // 声明延迟队列
    22. @Bean("delayQueue")
    23. public Queue delayQueue() {
    24. return new Queue(DELAY_QUEUE_NAME);
    25. }
    26. // 设置延迟队列的绑定关系
    27. @Bean
    28. public Binding delayBinding(@Qualifier("delayQueue") Queue queue,
    29. @Qualifier("delayExchange") CustomExchange exchange) {
    30. return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY).noargs();
    31. }
    32. }

    生产者

    1. import static com.ying.demo.config.RabbitMQConfiguration.DELAY_EXCHANGE_NAME;
    2. import static com.ying.demo.config.RabbitMQConfiguration.DELAY_QUEUE_ROUTING_KEY;
    3. /**
    4. * @author 小影
    5. * @create: 2022/8/18 11:13
    6. * @describe:延迟消息生产者
    7. */
    8. @Component
    9. public class DelayMessageProducer {
    10. @Resource
    11. private RabbitTemplate rabbitTemplate;
    12. /**
    13. *
    14. * @param message 消息
    15. * @param delayTime 存活时间
    16. */
    17. public void send(String message,Integer delayTime) {
    18. // param:延迟交换机,路由KEY,存活时间
    19. rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUE_ROUTING_KEY, message, msg -> {
    20. msg.getMessageProperties().setDelay(delayTime);
    21. return msg;
    22. });
    23. }
    24. }

    消费者

    1. import static com.ying.demo.config.RabbitMQConfiguration.DELAY_QUEUE_NAME;
    2. /**
    3. * @author 小影
    4. * @create: 2022/8/18 11:19
    5. * @describe:消费者
    6. */
    7. @Slf4j
    8. @Component
    9. public class DeadLetterQueueConsumer {
    10. /*
    11. * 监听私信队列
    12. * @param message
    13. * @param channel 作手动回执、确认
    14. */
    15. @RabbitListener(queues = DELAY_QUEUE_NAME)
    16. public void receiveA(Message message, Channel channel) {
    17. String msg = new String(message.getBody());
    18. log.info("当前时间:{},延迟队列收到消息:{}", LocalDateTime.now(),msg);
    19. }
    20. }

    测试

    1. @Slf4j
    2. @RestController
    3. @RequestMapping("rabbitmq")
    4. public class RabbitMqController {
    5. @Resource
    6. private DelayMessageProducer producer;
    7. @GetMapping("send")
    8. public void send(String message, Integer delayTime) {
    9. log.info("当前时间:{},消息:{},存活时间:{}", LocalDateTime.now(), message, delayTime);
    10. producer.send(message, delayTime);
    11. }
    12. }

    启动项目查看rabbitmq的可视化界面

    如下图此时生成的交换机是x-delayed-message类型的

     分别发送:

    http://localhost:8089/rabbitmq/send?message=我是60秒&delayTime=60000

    http://localhost:8089/rabbitmq/send?message=我是10秒&delayTime=10000

     结局并不是60秒先被消费,完成了我们的意愿。

    原理:

    1. 交换机里面有个数据库,生产者生产信息把这个信息放入数据库中
    2. 交换机里面的插件就会一直监听这个时间
    3. 时间到了把对应数据取出来,放入队列,让消费者进行消费

    👍 延迟队列方法推荐 

     这是小编在开发学习使用和总结,  这中间或许也存在着不足,希望可以得到大家的理解和建议。如有侵权联系小编!

  • 相关阅读:
    php安装ldap扩展模块
    算法笔记-第十章-动态规划2
    API网关功能一览
    Java基础 --- 创建线程
    选好冒烟测试用例,为进入QA的制品包把好第一道关
    #机器学习--高等数学基础--第三章:微分中值定理与导数的应用
    不熟练的模版集合
    Java登录管理功能的自我理解(尚庭公寓)
    蜜雪冰城涨价怒赞无数 雪王张红超卷出一条阳道
    快速LLaMA:面向大型语言模型的查询感知推理加速 论文摘要翻译与评论
  • 原文地址:https://blog.csdn.net/weixin_46522803/article/details/126521534