• 8. springboot + rabbitmq 消息发布确认机制



    RabbitMQ之生产者发布确认原理章节已经介绍了rabbitmq生产者是如何对消息进行发布确认保证消息不丢失的。本章节继续看下springboot整合rabbitmq后是如何保证消息不丢失的。

    1. 消息需要发布确认的原因

    消息正常是通过生产者生产消息传递到交换机,然后经过交换机路由到消息队列中,最后消费者消费,如下图所示
    在这里插入图片描述
    上述为正常情况,但也有情况会导致消息丢失的情况:第一种情况,交换机重启,当消息经过消费者投递后,恰巧交换机正在重启,会导致生产者投递消息失败,从而导致消息丢失;第二种情况,交换机没问题,消息投递到交换机后,交换机路由到消息队列过程中出了问题,比如routingKey错误导致路由不到队列中。
    针对上述两种导致消息丢失的情况,下面采用消息确认发布机制,分别采取消息正确投递到交换机后回调接口来确认消息正确被投递,消息经交换机正确路由到队列中回调接口来确认消息正确被路由。

    2. 消息发送交换机后回调接口ConfirmCallback ,保证消息在发送交换机处不丢失

    当消息经生产者投递到交换机后,为避免消息丢失,需要回调RabbitTemplate.ConfirmCallback接口,回调接口后,尤其是要对投递失败的消息进行处理或者记录下来保证消息不丢失。该接口不管消息投递到交换机成功或者失败都会进行回调,未避免消息丢失,可以选择在回调接口中只处理或者登记投递失败的消息,达到消息不丢失的目的。

    下面通过案例演示生产者投递消息到交换机后回调ConfirmCallback接口情况。

    在这里插入图片描述
    `

    2.1 在application.properties中添加spring.rabbitmq.publisher-confirm-type=correlated配置,开启回调机制

    spring.rabbitmq.host=192.168.xx.xxx
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=123
    
    spring.rabbitmq.publisher-confirm-type=correlated
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    2.2 声明交换机和队列并进行绑定
    声明confirm_exchange交换机,声明confirm_queue队列,并通过routingKey=confirm绑定交换机和队列。

    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class ConfirmConfig {
        public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
        public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
        public static final String ROUTING_KEY = "confirm";
    
        /*声明交换机*/
        @Bean
        public DirectExchange confirmExchange(){
            return new DirectExchange(CONFIRM_EXCHANGE_NAME);
        }
    
        /*声明队列*/
        @Bean
        public Queue confirmQueue(){
            return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
        }
    
        /*绑定交换机和队列*/
        @Bean
        public Binding exchangeBindingQueue(@Qualifier("confirmExchange") DirectExchange confirmExchange,
                                            @Qualifier("confirmQueue") Queue confirmQueue){
            return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(ROUTING_KEY);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    2.3 实现RabbitTemplate.ConfirmCallback接口
    当消息由生产者发到交换机后会回调该接口中的confirm方法

    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.stereotype.Component;
    
    @Component
    @Slf4j
    public class ExchangeCallback implements RabbitTemplate.ConfirmCallback {
    	/* correlationData 内含消息内容
    	 * ack 交换机接受成功或者失败。 true表示交换机接受消息成功, false表示交换机接受失败
    	 * cause 表示失败原因
    	*/
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.out.println("hello world");
            String id = correlationData.getId();
            String message = new String(correlationData.getReturnedMessage().getBody());
            if (ack){
                log.info("交换机收到消息id为{}, 消息内容为{}", id, message);
            }else {
                log.info("交换机未收到消息id为{}, 消息内容为{}, 原因为{}", id, message, cause);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    2.4 创建生产者
    创建生产者生产消息,本案例中生产者发送了2个消息,分别为hello rabbitmq 1hello rabbitmq 2
    在创建生产者时要设置一下需要回调的接口ExchangeCallback ,在设置回调接口时用了java的@PostConstruct注解,该注解作用用来指定bean初始化的顺序,Constructor(构造方法) -> @Autowired(依赖注入) -> @PostConstruct(注释的方法)。也就是说在初始化Producer3对象时,先调了Producer3的默认无参构造函数;然后执行Autowired注解部分,从Spring IOC容器中寻找rabbitTemplate和exchangeCallback对象分别注入到Producer3的rabbitTemplate和exchangeCallback属性中;最后执行PostConstruct注解部分,把exchangeCallback设置到rabbitTemplate中。

    在发送hello rabbitmq 2消息时故意把routingKey写错导致hello rabbitmq 2消息不能从交换机发送到队列中,为下一节做铺垫。

    import com.lzj.config.ConfirmConfig;
    import com.lzj.config.ExchangeCallback;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import javax.annotation.PostConstruct;
    
    @Slf4j
    @Component
    public class Producer3 {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Autowired
        private ExchangeCallback exchangeCallback;
    
        @PostConstruct
        public void init(){
            rabbitTemplate.setConfirmCallback(exchangeCallback);
        }
    
        public void produceMessage(){
            String message = "hello rabbitmq 1";
            CorrelationData correlationData1 = new CorrelationData("1");
            correlationData1.setReturnedMessage(new Message(message.getBytes()));
            rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.ROUTING_KEY, message, correlationData1);
            message = "hello rabbitmq 2";
            CorrelationData correlationData2 = new CorrelationData("2");
            correlationData2.setReturnedMessage(new Message(message.getBytes()));
            rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, "error_routingkey", message, correlationData2);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    2.5 启动测试

    @SpringBootApplication
    @Slf4j
    public class SpringbootDemo {
        public static void main(String[] args) {
            ConfigurableApplicationContext app = SpringApplication.run(SpringbootDemo.class, args);
            Producer3 producer = app.getBean(Producer3.class);
            producer.produceMessage();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    启动上面测试案例,生产者发送了id为1的hello rabbitmq 1消息和id为2的hello rabbitmq 2,交换机收到消息也回调了ExchangeCallback 接口对2条消息都进行了确认,尤其是对于失败的消息要在此步保存失败的消息,避免消息在交换机这一步丢失。

    2022-08-04 00:43:28.817  INFO 13512 --- [           main] com.lzj.producer.Producer3               : 生产者发送id为1, 消息内容为hello rabbitmq 1
    2022-08-04 00:43:28.823  INFO 13512 --- [           main] com.lzj.producer.Producer3               : 生产者发送id为2, 消息内容为hello rabbitmq 2
    2022-08-04 00:43:28.827  INFO 13512 --- [nectionFactory1] com.lzj.config.ExchangeCallback          : 交换机收到消息id为1, 消息内容为hello rabbitmq 1
    2022-08-04 00:43:28.828  INFO 13512 --- [nectionFactory2] com.lzj.config.ExchangeCallback          : 交换机收到消息id为2, 消息内容为hello rabbitmq 2
    
    • 1
    • 2
    • 3
    • 4

    但上面还有一个问题,虽然回调ExchangeCallback接口,可以保证消息到交换机一步不会丢失,但如果交换机到队列的过程中出现了问题,消息一样会丢失。比如上面生产者把routingKey写错了,就会导致hello rabbitmq 2消息从交换机路由不到队列中。下面创建消费者程序,看消费者消费confirm_queue队列中消息情况

    import com.lzj.config.ConfirmConfig;
    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Slf4j
    @Component
    public class Consumer3 {
    
        @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
        public void consuemrMessage(Message message, Channel channel){
            String msg = new String(message.getBody());
            log.info("消费者消费消息{}", msg);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    然后重新启动测试代码,输出如下,可以看出消费端只消费了hello rabbitmq 1,而hello rabbitmq 2消息则丢失了。

    2022-08-04 00:45:28.817  INFO 13512 --- [           main] com.lzj.producer.Producer3               : 生产者发送id为1, 消息内容为hello rabbitmq 1
    2022-08-04 00:45:28.823  INFO 13512 --- [           main] com.lzj.producer.Producer3               : 生产者发送id为2, 消息内容为hello rabbitmq 2
    2022-08-04 00:45:28.827  INFO 13512 --- [nectionFactory1] com.lzj.config.ExchangeCallback          : 交换机收到消息id为1, 消息内容为hello rabbitmq 1
    2022-08-04 00:45:28.828  INFO 13512 --- [nectionFactory2] com.lzj.config.ExchangeCallback          : 交换机收到消息id为2, 消息内容为hello rabbitmq 2
    2022-08-04 00:45:28.831  INFO 13512 --- [ntContainer#2-1] com.lzj.consumer.Consumer3   			   : 消费者消费消息hello rabbitmq 1
    
    • 1
    • 2
    • 3
    • 4
    • 5

    3. 消息经交换机路由到队列后回调接口ReturnCallback ,保证消息在发送队列处不丢失

    为解决上一节中,消息由交换机和消息队列中异常,导致消息丢失问题,解决办法就是在添加消息从交换机路由到队列中失败后回调的接口,在回调接口中把失败的消息保存下来就可以避免消息丢失了。

    在回调接口之前还需为RabbitMQ设置Mandatory标志,只有当该标志为true时,消息由交换机到队列失败后才会回调接口;如果该标志设置false时,消息由交换机路由到队列失败后自动丢弃消息,会导致消息丢失,这也是默认设置,所以如需保证消息不丢失,要打开Mandatory标志。

    下面继续进行上面案例,在上面案例的基础上添加ReturnCallback接口实现

    package com.lzj.config;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.stereotype.Component;
    
    @Slf4j
    @Component
    public class QueueCallback implements RabbitTemplate.ReturnCallback {
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            log.info("消息 {} 经交换机 {} 通过routingKey={} 路由到队列失败,失败code为:{}, 失败原因为:{}",
                    new String(message.getBody()), exchange, routingKey, replyCode, replyText);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    然后修改上面的生产者Producer3 ,只需修改2点:第一点,为RabbitTemplate 设置Mandatory标志;第二点,把ReturnCallback的实现加入监听。

    import com.lzj.config.ConfirmConfig;
    import com.lzj.config.ExchangeCallback;
    import com.lzj.config.QueueCallback;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import javax.annotation.PostConstruct;
    
    @Slf4j
    @Component
    public class Producer3 {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Autowired
        private ExchangeCallback exchangeCallback;
    
        @Autowired
        private QueueCallback queueCallback;
    
        @PostConstruct
        public void init(){
            rabbitTemplate.setConfirmCallback(exchangeCallback);
            /**
    		* true:交换机无法将消息进行路由时,会将该消息返回给生产者
    		* false:如果发现消息无法进行路由,则直接丢弃
    		*/
            rabbitTemplate.setMandatory(true);
            rabbitTemplate.setReturnCallback(queueCallback);
        }
    
        public void produceMessage(){
            String message = "hello rabbitmq 1";
            CorrelationData correlationData1 = new CorrelationData("1");
            correlationData1.setReturnedMessage(new Message(message.getBytes()));
            rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.ROUTING_KEY, message, correlationData1);
            log.info("生产者发送id为{}, 消息内容为{}", correlationData1.getId(), message);
            message = "hello rabbitmq 2";
            CorrelationData correlationData2 = new CorrelationData("2");
            correlationData2.setReturnedMessage(new Message(message.getBytes()));
            rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, "error_routingkey", message, correlationData2);
            log.info("生产者发送id为{}, 消息内容为{}", correlationData2.getId(), message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    下面重新启动上面的测试案例,可以得出如下测试结果,生产者生产的hello rabbitmq 1hello rabbitmq 2消息都已发到交换机,也都被交换机确认了,但hello rabbitmq 2被交换机路由到队列时由于routingKey错误导致路由失败,已在ReturnCallback接口回调中被记录下来,最终正确被路由到队列中的消息只有hello rabbitmq 1,从打印日志看也只有hello rabbitmq 1被消费了。

    2022-08-05 00:39:29.670  INFO 6832 --- [           main] com.lzj.producer.Producer3               : 生产者发送id为1, 消息内容为hello rabbitmq 1
    2022-08-05 00:39:29.683  INFO 6832 --- [           main] com.lzj.producer.Producer3               : 生产者发送id为2, 消息内容为hello rabbitmq 2
    2022-08-05 00:39:29.702  INFO 6832 --- [nectionFactory3] com.lzj.config.ExchangeCallback          : 交换机收到消息id为1, 消息内容为hello rabbitmq 1
    2022-08-05 00:39:29.704  INFO 6832 --- [nectionFactory1] com.lzj.config.QueueCallback             : 消息 hello rabbitmq 2 经交换机 confirm_exchange 通过routingKey=error_routingkey 路由到队列失败,失败code为:312, 失败原因为:NO_ROUTE
    2022-08-05 00:39:29.705  INFO 6832 --- [nectionFactory2] com.lzj.config.ExchangeCallback          : 交换机收到消息id为2, 消息内容为hello rabbitmq 2
    2022-08-05 00:39:29.707  INFO 6832 --- [ntContainer#2-1] com.lzj.consumer.Consumer3               : 消费者消费消息hello rabbitmq 1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    结论:从上面2个案例可以看出,通过ConfirmCallback和ReturnCallback接口的回调,保证了消息在交换机和队列处消息不丢失。

  • 相关阅读:
    【音视频笔记】Mediacodec+Muxer生成mp4,浏览器无法播放问题处理
    10 个关于 Promise 和 setTimeout 知识的面试题,通过图解一次说透彻
    深度学习检测小目标常用方法
    前端基础之《Bootstrap(13)—JavaScript插件_标签页、工具提示、弹出框、折叠效果和幻灯片》
    【Linux】指令详解(二)
    python工程打包成可执行文件
    【OpenCV】角点检测、特征点提取(Harris、Shi-Tomas、SIFT、SURF、FAST、ORB)学习笔记
    传输层 TCP连接管理 优化关闭连接时的TIME-WAIT状态
    Qt中一个信号连接多个槽函数后的执行顺序
    一键自动化博客发布工具,用过的人都说好(infoq篇)
  • 原文地址:https://blog.csdn.net/u010502101/article/details/125986244