docker inspect rabbitmq
找到Mounts下面Name:rabbitmq_plugin的Source即为插件路径

使用 cd 进入到该目录
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez
docker exec -it rabbitmq /bin/bash
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
rabbitmq-plugins list
docker restart rabbitmq
- @Test
- public void test01(){
- //创建消息后置处理器对象
- MessagePostProcessor postProcessor = message -> {
- //设置消息过期时间/毫秒
- message.getMessageProperties().setHeader("x-delay","10000");
- return message;
- };
-
- rabbitTemplate.convertAndSend("exchange.test.delay",
- "routing.key.test.delay",
- "hello delay message by plug" + new SimpleDateFormat("HH:mm:ss").format(new Date()),
- postProcessor);
- }
- @RabbitListener(queues={"queue.test.delay"})
- public void lisenter03(String data, Message message, Channel channel) throws IOException {
- log.info("接收到 : " + data);
- log.info("当前时间 : " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
- channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
- }
- spring:
- rabbitmq:
- host: localhost
- port: 5672
- username: admin
- password: 123456
- virtual-host: /
- publisher-confirm-type: correlated #交换机确认
- publisher-returns: true #队列确认
- @Component
- @Slf4j
- public class RabbitMqConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- //构造后执行
- @PostConstruct
- public void InitRabbit(){
- rabbitTemplate.setReturnsCallback(this);
- rabbitTemplate.setConfirmCallback(this);
- }
-
- //发送到交换机执行的回调函数
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- log.info("confirm 回调 data : " + correlationData);
- log.info("confirm 回调 ack : " + ack);
- log.info("confirm 回调 cause : " + cause);
- }
-
- //发送到队列失败才执行的回调函数
- //使用延时队列时成功也会执行
- @Override
- public void returnedMessage(ReturnedMessage returnedMessage) {
- log.info("returnedMessage 回调 消息 : " + returnedMessage.getMessage().getBody());
- log.info("returnedMessage 回调 状态码 : " + returnedMessage.getReplyCode());
- log.info("returnedMessage 回调 描述 : " + returnedMessage.getReplyText());
- log.info("returnedMessage 回调 交换机 : " + returnedMessage.getExchange());
- log.info("returnedMessage 回调 路由键 : " + returnedMessage.getRoutingKey());
- }
- }
可以通过配置消息的过期时间和死信队列,消费者监听死信队列,同样可以实现延时消息的效果