• RabbitMQ发送方确认机制


    1、前言

    RabbitMQ消息首先发送到交换机,然后通过路由键【routingKey】和【bindingKey】比较从而将消息发送到对应的队列【queue】上。在这个过程有两个地方消息可能会丢失:

    1. 消息发送到交换机的过程。
    2. 消息从交换机发送到队列的过程。

    而RabbitMQ提供了类似于回调函数的机制来告诉发送方消息是否发送成功。这里针对上述的两种情况,RabbitMQ也是给出了以下的应对策略:

    • publisher-confirm:消息到达交换机时会触发。
    • publisher-return:到达交换机但是没有路由到队列,会返回ack以及失败原因。

    2、publisher-confirm

    在SpringBoot项目的properties文件中加上

    spring.rabbitmq.publisher-confirm-type=correlated

    该配置有三个值:

    1. none:是禁用发布确认模式,是默认值
    2. correlated:是发布消息成功到交换器后会触发回调方法
    3. simple:有两种效果,第一种和correlated值一样会触发回调方法;第二种在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker。

    RabbitMQ的配置类实现ConfirmCallback

    1. /**
    2. * @author LoneWalker
    3. * @date 2023/4/8
    4. * @description
    5. */
    6. @Slf4j
    7. @Configuration
    8. public class RabbitMqConfig implements RabbitTemplate.ConfirmCallback {
    9. @Bean
    10. public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
    11. RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    12. rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
    13. //设置给rabbitTemplate
    14. rabbitTemplate.setConfirmCallback(this);
    15. return rabbitTemplate;
    16. }
    17. @Bean
    18. public MessageConverter jackson2JsonMessageConverter() {
    19. return new Jackson2JsonMessageConverter();
    20. }
    21. @Bean
    22. public DirectExchange getExchange(){
    23. return new DirectExchange("directExchange",false,false);
    24. }
    25. @Bean
    26. public Queue getQueue(){
    27. return new Queue("publisher.addUser",true,false,false);
    28. }
    29. @Bean
    30. public Binding getBinding(DirectExchange exchange,Queue queue){
    31. return BindingBuilder.bind(queue).to(exchange).with("publisher.addUser");
    32. }
    33. /**
    34. * 消息成功到达交换机会触发
    35. * @param correlationData
    36. * @param ack
    37. * @param cause
    38. */
    39. @Override
    40. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    41. if (ack) {
    42. log.info("交换机收到消息成功:" + correlationData.getId());
    43. }else {
    44. log.error("交换机收到消息失败:" + correlationData.getId() + "原因:" + cause);
    45. }
    46. }
    47. }

    而需要这个correlationData是因为确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突。所以我们改写一下发送消息的方法:

    1. @RequiredArgsConstructor
    2. @Service
    3. public class PublisherServiceImpl implements PublisherService{
    4. private final RabbitTemplate rabbitTemplate;
    5. @Override
    6. public void addUser(User user) {
    7. CorrelationData correlationData = new CorrelationData();
    8. correlationData.setId(UUID.randomUUID().toString());
    9. rabbitTemplate.convertAndSend("directExchange","publisher.addUser",user,correlationData);
    10. }
    11. }

    然后发送消息:

    再模拟一下失败的情况——把交换机名称改成错的:

    温馨提示:测试完把交换机名称改回去。

    3、publisher-return

    在SpringBoot项目的properties文件中添加:

    1. spring.rabbitmq.publisher-returns=true
    2. ###消息在没有被队列接收时是否强行退回还是直接丢弃
    3. spring.rabbitmq.template.mandatory=true

    RabbitMQ的配置类再实现ReturnsCallback

    1. @Slf4j
    2. @Configuration
    3. public class RabbitMqConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
    4. @Bean
    5. public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
    6. RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    7. rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
    8. //设置给rabbitTemplate
    9. rabbitTemplate.setConfirmCallback(this);
    10. rabbitTemplate.setReturnsCallback(this);
    11. rabbitTemplate.setMandatory(true);
    12. return rabbitTemplate;
    13. }
    14. @Bean
    15. public MessageConverter jackson2JsonMessageConverter() {
    16. return new Jackson2JsonMessageConverter();
    17. }
    18. @Bean
    19. public DirectExchange getExchange(){
    20. return new DirectExchange("directExchange",false,false);
    21. }
    22. @Bean
    23. public Queue getQueue(){
    24. return new Queue("publisher.addUser",true,false,false);
    25. }
    26. @Bean
    27. public Binding getBinding(DirectExchange exchange,Queue queue){
    28. return BindingBuilder.bind(queue).to(exchange).with("publisher.addUser");
    29. }
    30. /**
    31. * 消息成功到达交换机会触发
    32. * @param correlationData
    33. * @param ack
    34. * @param cause
    35. */
    36. @Override
    37. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    38. if (ack) {
    39. log.info("交换机收到消息成功:" + correlationData.getId());
    40. }else {
    41. log.error("交换机收到消息失败:" + correlationData.getId() + "原因:" + cause);
    42. }
    43. }
    44. /**
    45. * 消息未成功到达队列会触发
    46. * @param returnedMessage
    47. */
    48. @Override
    49. public void returnedMessage(ReturnedMessage returnedMessage) {
    50. log.error("{}--消息未成功到达队列",returnedMessage.getMessage().getMessageProperties().getMessageId());
    51. }
    52. }

    把路由键改为错误的值:

    正常来说消息到达交换机就一定可以到达队列,到不了队列基本上就是代码写错了。

  • 相关阅读:
    【Java 进阶篇】数据库介绍与MySQL详细介绍
    484-红黑树
    Vue2023 面试归纳及复习
    vue防抖函数封装与使用
    PolarDB-X 全局 Binlog 解读之性能篇(上)
    重温历史:Palm OS经典游戏于发布20年后公开源代码
    【Python音视频技术】用moviepy实现图文成片功能
    儿童医疗保健生物识别技术市场现状及未来发展趋势分析
    tf.ones_initializer
    学长讲解 - FPGA与单片机的区别 【毕设基础概念】
  • 原文地址:https://blog.csdn.net/QingXu1234/article/details/130860096