• RabbitMQ延时队列


    一、RabbitMQ下载并使用插件

    1、查看RabbitMQ插件的文件路径

    docker inspect rabbitmq

    找到Mounts下面Name:rabbitmq_plugin的Source即为插件路径

    使用 cd 进入到该目录

    2、下载插件

    wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez

    3、使用插件

    1、进入到容器内部

    docker exec -it rabbitmq /bin/bash

    2、启动插件

    rabbitmq-plugins enable rabbitmq_delayed_message_exchange

    3、查看插件是否使用成功

    rabbitmq-plugins list

    4、重启容器

    docker restart rabbitmq

    二、使用延时队列

    1、生产者

    1. @Test
    2. public void test01(){
    3. //创建消息后置处理器对象
    4. MessagePostProcessor postProcessor = message -> {
    5. //设置消息过期时间/毫秒
    6. message.getMessageProperties().setHeader("x-delay","10000");
    7. return message;
    8. };
    9. rabbitTemplate.convertAndSend("exchange.test.delay",
    10. "routing.key.test.delay",
    11. "hello delay message by plug" + new SimpleDateFormat("HH:mm:ss").format(new Date()),
    12. postProcessor);
    13. }

    2、消费者

    1. @RabbitListener(queues={"queue.test.delay"})
    2. public void lisenter03(String data, Message message, Channel channel) throws IOException {
    3. log.info("接收到 : " + data);
    4. log.info("当前时间 : " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
    5. channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    6. }

    三、其他(消息可靠性-生产者确认)

    1、配置application.yml

    1. spring:
    2. rabbitmq:
    3. host: localhost
    4. port: 5672
    5. username: admin
    6. password: 123456
    7. virtual-host: /
    8. publisher-confirm-type: correlated #交换机确认
    9. publisher-returns: true #队列确认

    2、编写配置类

    1. @Component
    2. @Slf4j
    3. public class RabbitMqConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
    4. @Autowired
    5. private RabbitTemplate rabbitTemplate;
    6. //构造后执行
    7. @PostConstruct
    8. public void InitRabbit(){
    9. rabbitTemplate.setReturnsCallback(this);
    10. rabbitTemplate.setConfirmCallback(this);
    11. }
    12. //发送到交换机执行的回调函数
    13. @Override
    14. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    15. log.info("confirm 回调 data : " + correlationData);
    16. log.info("confirm 回调 ack : " + ack);
    17. log.info("confirm 回调 cause : " + cause);
    18. }
    19. //发送到队列失败才执行的回调函数
    20. //使用延时队列时成功也会执行
    21. @Override
    22. public void returnedMessage(ReturnedMessage returnedMessage) {
    23. log.info("returnedMessage 回调 消息 : " + returnedMessage.getMessage().getBody());
    24. log.info("returnedMessage 回调 状态码 : " + returnedMessage.getReplyCode());
    25. log.info("returnedMessage 回调 描述 : " + returnedMessage.getReplyText());
    26. log.info("returnedMessage 回调 交换机 : " + returnedMessage.getExchange());
    27. log.info("returnedMessage 回调 路由键 : " + returnedMessage.getRoutingKey());
    28. }
    29. }

    四、延时队列的其他实现思路

    可以通过配置消息的过期时间和死信队列,消费者监听死信队列,同样可以实现延时消息的效果

  • 相关阅读:
    GitHub 桌面版 v3.0 新特性「GitHub 热点速览」
    Migrations
    【算法-哈希表4】 三数之和(去重版)
    GPT-3后的下一步:大型语言模型的未来方向
    接口测试是什么?怎样做接口测试?(测试人必看,讲的超详细)
    422-计算机网络(7-13)
    ai软件基础教程自学网,怎么快速学会ai软件
    web前端大一实训 HTML+CSS+JavaScript王者荣耀(60页) web课程设计网页规划与设计 HTML期末大作业 HTML网页设计结课作业
    [含lw+源码等]SSM房屋租赁系统|房屋出租|房产中介[包运行成功]
    mysql组合索引详解
  • 原文地址:https://blog.csdn.net/qh1112/article/details/139378124