RabbitMQ消息首先发送到交换机,然后通过路由键【routingKey】和【bindingKey】比较从而将消息发送到对应的队列【queue】上。在这个过程有两个地方消息可能会丢失:
而RabbitMQ提供了类似于回调函数的机制来告诉发送方消息是否发送成功。这里针对上述的两种情况,RabbitMQ也是给出了以下的应对策略:
在SpringBoot项目的properties文件中加上
spring.rabbitmq.publisher-confirm-type=correlated
该配置有三个值:
RabbitMQ的配置类实现ConfirmCallback
- /**
- * @author LoneWalker
- * @date 2023/4/8
- * @description
- */
- @Slf4j
- @Configuration
- public class RabbitMqConfig implements RabbitTemplate.ConfirmCallback {
-
- @Bean
- public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
- RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
- rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
- //设置给rabbitTemplate
- rabbitTemplate.setConfirmCallback(this);
- return rabbitTemplate;
- }
-
- @Bean
- public MessageConverter jackson2JsonMessageConverter() {
- return new Jackson2JsonMessageConverter();
- }
-
- @Bean
- public DirectExchange getExchange(){
- return new DirectExchange("directExchange",false,false);
- }
-
- @Bean
- public Queue getQueue(){
- return new Queue("publisher.addUser",true,false,false);
- }
-
- @Bean
- public Binding getBinding(DirectExchange exchange,Queue queue){
- return BindingBuilder.bind(queue).to(exchange).with("publisher.addUser");
- }
-
- /**
- * 消息成功到达交换机会触发
- * @param correlationData
- * @param ack
- * @param cause
- */
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- if (ack) {
- log.info("交换机收到消息成功:" + correlationData.getId());
- }else {
- log.error("交换机收到消息失败:" + correlationData.getId() + "原因:" + cause);
- }
- }
- }
而需要这个correlationData是因为确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突。所以我们改写一下发送消息的方法:
- @RequiredArgsConstructor
- @Service
- public class PublisherServiceImpl implements PublisherService{
-
- private final RabbitTemplate rabbitTemplate;
-
- @Override
- public void addUser(User user) {
-
- CorrelationData correlationData = new CorrelationData();
- correlationData.setId(UUID.randomUUID().toString());
-
- rabbitTemplate.convertAndSend("directExchange","publisher.addUser",user,correlationData);
- }
- }
然后发送消息:

再模拟一下失败的情况——把交换机名称改成错的:

温馨提示:测试完把交换机名称改回去。
在SpringBoot项目的properties文件中添加:
- spring.rabbitmq.publisher-returns=true
- ###消息在没有被队列接收时是否强行退回还是直接丢弃
- spring.rabbitmq.template.mandatory=true
RabbitMQ的配置类再实现ReturnsCallback
- @Slf4j
- @Configuration
- public class RabbitMqConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
-
- @Bean
- public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
- RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
- rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
- //设置给rabbitTemplate
- rabbitTemplate.setConfirmCallback(this);
- rabbitTemplate.setReturnsCallback(this);
- rabbitTemplate.setMandatory(true);
- return rabbitTemplate;
- }
-
- @Bean
- public MessageConverter jackson2JsonMessageConverter() {
- return new Jackson2JsonMessageConverter();
- }
-
- @Bean
- public DirectExchange getExchange(){
- return new DirectExchange("directExchange",false,false);
- }
-
- @Bean
- public Queue getQueue(){
- return new Queue("publisher.addUser",true,false,false);
- }
-
- @Bean
- public Binding getBinding(DirectExchange exchange,Queue queue){
- return BindingBuilder.bind(queue).to(exchange).with("publisher.addUser");
- }
-
- /**
- * 消息成功到达交换机会触发
- * @param correlationData
- * @param ack
- * @param cause
- */
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- if (ack) {
- log.info("交换机收到消息成功:" + correlationData.getId());
- }else {
- log.error("交换机收到消息失败:" + correlationData.getId() + "原因:" + cause);
- }
- }
-
- /**
- * 消息未成功到达队列会触发
- * @param returnedMessage
- */
- @Override
- public void returnedMessage(ReturnedMessage returnedMessage) {
- log.error("{}--消息未成功到达队列",returnedMessage.getMessage().getMessageProperties().getMessageId());
- }
- }
把路由键改为错误的值:

正常来说消息到达交换机就一定可以到达队列,到不了队列基本上就是代码写错了。