• RabbitMQ手动ACK与死信队列


    为了保证消息从队列可靠的达到消费者,RabbitMQ 提供了消息确认机制(Message Acknowledgement)。

    默认情况下RabbitMQ在消息发出后就立即将这条消息删除,而不管消费端是否接收到,是否处理完,导致消费端消息丢失时RabbitMQ自己又没有这条消息了。所以在实际项目中会使用手动Ack。

    1、手动应答

    1. Channel.basicAck (用于肯定确认):RabbitMQ 已知道该消息成功被处理,可以将其丢弃了。
    2. Channel.basicNack (用于否定确认)
    3. Channel.basicReject (用于否定确认):与 Channel.basicNack 相比少一个参数,不处理该消息了直接拒绝,可以将其丢弃了。

    消费者端的配置,相关属性值改为自己的:

    1. server.port=8082
    2. #rabbitmq服务器ip
    3. spring.rabbitmq.host=localhost
    4. #rabbitmq的端口
    5. spring.rabbitmq.port=5672
    6. #用户名
    7. spring.rabbitmq.username=lonewalker
    8. #密码
    9. spring.rabbitmq.password=XX
    10. #配置虚拟机
    11. spring.rabbitmq.virtual-host=demo
    12. #设置消费端手动 ack none不确认 auto自动确认 manual手动确认
    13. spring.rabbitmq.listener.simple.acknowledge-mode=manual

    修改消费代码:请勿复制使用,会卡死

    1. package com.example.consumer.service;
    2. import com.alibaba.fastjson.JSONObject;
    3. import com.example.consumer.entity.User;
    4. import com.rabbitmq.client.Channel;
    5. import lombok.extern.slf4j.Slf4j;
    6. import org.springframework.amqp.core.Message;
    7. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    8. import org.springframework.stereotype.Service;
    9. import java.io.IOException;
    10. /**
    11. * @description:
    12. * @author: LoneWalker
    13. * @create: 2022-04-04
    14. **/
    15. @Service
    16. @Slf4j
    17. public class ConsumerService {
    18. @RabbitListener(queues ="publisher.addUser")
    19. public void addUser(String userStr,Channel channel,Message message){
    20. long deliveryTag = message.getMessageProperties().getDeliveryTag();
    21. try {
    22. log.info("我一直在重试");
    23. int a = 1/0;
    24. User user = JSONObject.parseObject(userStr,User.class);
    25. log.info(user.toString());
    26. //手动ack 第二个参数为false是表示仅仅确认当前消息 true表示确认之前所有的消息
    27. channel.basicAck(deliveryTag,false);
    28. } catch (Exception e) {
    29. //手动nack 告诉rabbitmq该消息消费失败 第三个参数:如果被拒绝的消息应该被重新请求,而不是被丢弃或变成死信,则为true
    30. try {
    31. channel.basicNack(deliveryTag,false,true);
    32. } catch (IOException ex) {
    33. ex.printStackTrace();
    34. }
    35. }
    36. }
    37. }

    先启动发布者发送消息,查看控制台:有一条消息待消费·

    启动消费端,因为代码中有除0,所以会报错,这里就会出现一条unacked消息:

    因为设置的是将消息重新请求,所以它会陷入死循环

    防止出现这种情况,可以将basicNack最后一个参数改为false,让消息进去死信队列

    2、什么是死信队列

    说简单点就是备胎队列,而死信的来源有以下几种:

    1. 消息被否定确认,使用 channel.basicNackchannel.basicReject ,并且此时requeue 属性被设置为false
    2. 消息在队列的存活时间超过设置的TTL时间。
    3. 消息队列的消息数量已经超过最大队列长度。

    “死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。

    3、配置死信队列

    一般会为每个重要的业务队列配置一个死信队列。可以分为以下步骤:

    1. 配置业务队列,绑定到业务交换机上
    2. 为业务队列配置死信交换机和路由key
    3. 为死信交换机配置死信队列

    从控制台将之前的交换机都删除,然后修改代码。

    首先看一下发布者的配置代码:

    1. package com.example.publisher.config;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.amqp.core.*;
    4. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    5. import org.springframework.amqp.rabbit.connection.CorrelationData;
    6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    7. import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    8. import org.springframework.amqp.support.converter.MessageConverter;
    9. import org.springframework.context.annotation.Bean;
    10. import org.springframework.context.annotation.Configuration;
    11. import java.util.HashMap;
    12. import java.util.Map;
    13. /**
    14. * @author LoneWalker
    15. * @date 2023/4/8
    16. * @description
    17. */
    18. @Slf4j
    19. @Configuration
    20. public class RabbitMqConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
    21. @Bean
    22. public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
    23. RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    24. rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
    25. //设置给rabbitTemplate
    26. rabbitTemplate.setConfirmCallback(this);
    27. rabbitTemplate.setReturnsCallback(this);
    28. rabbitTemplate.setMandatory(true);
    29. return rabbitTemplate;
    30. }
    31. @Bean
    32. public MessageConverter jackson2JsonMessageConverter() {
    33. return new Jackson2JsonMessageConverter();
    34. }
    35. /************ 正常配置 ******************/
    36. /**
    37. * 正常交换机,开启持久化
    38. */
    39. @Bean
    40. DirectExchange normalExchange() {
    41. return new DirectExchange("normalExchange", true, false);
    42. }
    43. @Bean
    44. public Queue normalQueue() {
    45. // durable: 是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
    46. // exclusive: 默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。
    47. // autoDelete: 是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
    48. Map args = deadQueueArgs();
    49. // 队列设置最大长度
    50. args.put("x-max-length", 5);
    51. return new Queue("normalQueue", true, false, false, args);
    52. }
    53. @Bean
    54. public Queue ttlQueue() {
    55. Map args = deadQueueArgs();
    56. // 队列设置消息过期时间 60 秒
    57. args.put("x-message-ttl", 60 * 1000);
    58. return new Queue("ttlQueue", true, false, false, args);
    59. }
    60. @Bean
    61. Binding normalRouteBinding() {
    62. return BindingBuilder.bind(normalQueue())
    63. .to(normalExchange())
    64. .with("normalRouting");
    65. }
    66. @Bean
    67. Binding ttlRouteBinding() {
    68. return BindingBuilder.bind(ttlQueue())
    69. .to(normalExchange())
    70. .with("ttlRouting");
    71. }
    72. /**************** 死信配置 *****************/
    73. /**
    74. * 死信交换机
    75. */
    76. @Bean
    77. DirectExchange deadExchange() {
    78. return new DirectExchange("deadExchange", true, false);
    79. }
    80. /**
    81. * 死信队列
    82. */
    83. @Bean
    84. public Queue deadQueue() {
    85. return new Queue("deadQueue", true, false, false);
    86. }
    87. @Bean
    88. Binding deadRouteBinding() {
    89. return BindingBuilder.bind(deadQueue())
    90. .to(deadExchange())
    91. .with("deadRouting");
    92. }
    93. /**
    94. * 转发到 死信队列,配置参数
    95. */
    96. private Map deadQueueArgs() {
    97. Map map = new HashMap<>();
    98. // 绑定该队列到死信交换机
    99. map.put("x-dead-letter-exchange", "deadExchange");
    100. map.put("x-dead-letter-routing-key", "deadRouting");
    101. return map;
    102. }
    103. /**
    104. * 消息成功到达交换机会触发
    105. * @param correlationData
    106. * @param ack
    107. * @param cause
    108. */
    109. @Override
    110. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    111. if (ack) {
    112. log.info("交换机收到消息成功:" + correlationData.getId());
    113. }else {
    114. log.error("交换机收到消息失败:" + correlationData.getId() + "原因:" + cause);
    115. }
    116. }
    117. /**
    118. * 消息未成功到达队列会触发
    119. * @param returnedMessage
    120. */
    121. @Override
    122. public void returnedMessage(ReturnedMessage returnedMessage) {
    123. log.error("{}--消息未成功到达队列",returnedMessage.getMessage().getMessageProperties().getMessageId());
    124. }
    125. }

    properties

    1. server.port=8081
    2. #rabbitmq服务ip
    3. spring.rabbitmq.host=localhost
    4. #rabbitmq端口号
    5. spring.rabbitmq.port=5672
    6. #用户名
    7. spring.rabbitmq.username=用户名改为自己的
    8. #密码
    9. spring.rabbitmq.password=密码改为自己的
    10. #虚拟机
    11. spring.rabbitmq.virtual-host=demo
    12. spring.rabbitmq.publisher-confirm-type=correlated
    13. spring.rabbitmq.publisher-returns=true
    14. spring.rabbitmq.template.mandatory=true

    发送消息:

    1. @RequiredArgsConstructor
    2. @Service
    3. public class PublisherServiceImpl implements PublisherService{
    4. private final RabbitTemplate rabbitTemplate;
    5. @Override
    6. public void addUser(User user) {
    7. CorrelationData correlationData = new CorrelationData();
    8. correlationData.setId(UUID.randomUUID().toString());
    9. rabbitTemplate.convertAndSend("normalExchange","normalRouting",user,correlationData);
    10. }
    11. }

    4、模拟场景

    4.1消息处理异常

    文章开篇说到的消息手动ack,一旦出现异常会陷入死循环,那么不把消息放回原队列,而是放入死信队列,然后抛异常由人工处理:

    1. package com.example.consumer.service;
    2. import com.alibaba.fastjson.JSONObject;
    3. import com.example.consumer.entity.User;
    4. import com.rabbitmq.client.Channel;
    5. import lombok.extern.slf4j.Slf4j;
    6. import org.springframework.amqp.core.Message;
    7. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    8. import org.springframework.stereotype.Service;
    9. import java.io.IOException;
    10. /**
    11. * @description:
    12. * @author: LoneWalker
    13. * @create: 2022-04-04
    14. **/
    15. @Service
    16. @Slf4j
    17. public class ConsumerService {
    18. @RabbitListener(queues ="normalQueue")
    19. public void addUser(String userStr,Channel channel,Message message){
    20. long deliveryTag = message.getMessageProperties().getDeliveryTag();
    21. try {
    22. int a = 1/0;
    23. User user = JSONObject.parseObject(userStr,User.class);
    24. log.info(user.toString());
    25. //手动ack 第二个参数为false是表示仅仅确认当前消息 true表示确认之前所有的消息
    26. channel.basicAck(deliveryTag,false);
    27. } catch (Exception e) {
    28. //手动nack 告诉rabbitmq该消息消费失败 第三个参数:如果被拒绝的消息应该被重新请求,而不是被丢弃或变成死信,则为true
    29. try {
    30. channel.basicNack(deliveryTag,false,false);
    31. } catch (IOException ex) {
    32. throw new RuntimeException("消息处理失败");
    33. }
    34. }
    35. }
    36. }

    注意basicNack的第三个参数,设置为false后就不会重新请求。

    4.2队列达到最大长度

    配置上面的代码已经有过了:

    测试的话我们发6条消息,加上4.1测试产生的死信,预期死信队列中应该会有两条:

    4.3消息TTL过期

    过期时间TTL表示可以对消息设置预期的时间,超过这个时间就删除或者放入死信队列。修改routingKey为ttlRouting。上述代码中配置过期时间为60s

    死信队列中的消息处理和正常的队列没什么区别,就不赘述了。

  • 相关阅读:
    c++ 条件变量使用详解 wait_for wait_unitl 虚假唤醒
    Springboot辅助功能(内嵌tomcat服务器)
    Postman接口测试之POST、GET请求方法
    驱动开发:内核监控进程与线程回调
    前端面试宝典React篇01 如何拿下大厂前端面试
    数据库如何储存和管理数据的?
    左程云老师算法课笔记(一)
    java日志框架总结(六、logback日志框架 扩展)
    01 pyechars 特性、版本、安装介绍
    kotlin用ping命令判断网络是否是通的
  • 原文地址:https://blog.csdn.net/QingXu1234/article/details/130904397