• RabbitMQ初步到精通-第六章-RabbitMQ之死信队列


    目录

    第六章-RabbitMQ之死信队列

    1. 死信概念

    2. 死信架构

    3. 死信来源

    3.1 消息 TTL 过期

    3.2 队列达到最大长度(队列满了,无法再添加数据到 mq 中)

    3.3 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false(不再重新入队)

    4. 验证代码

    4.1 TTL

    4.2 超过Length

    4.3 拒绝

    5. 总结


    第六章-RabbitMQ之死信队列

    1. 死信概念

    未了解死信这块的内容的时候,经常听到别人提及 ‘死信’ - What? 信息死了? 按字面意思去看就是一个信息由于某些原因导致无法正常消费,造成了这个信息死亡的结果,造成的原因我们再单说。

    死信- Dead Letter . 由于消息未正常消费,导致了消息的再次转移或被丢弃-成为死信。本文中涉及到的内容,是消息从原正常消费队列中又重定向路由到了另外一个预留的队列-死信队列

    应用场景:我们应用最多的应该是在消息防丢处理中使用,消费者消费的时候,出现了异常,我们可以使用死信队列的机制,将此条信息转移到一个预留的队列中,再有对应的消费者去处理这部分数据内容。-还是模糊-继续往下-

    2. 死信架构

    如上图所示,我们可以看出 黄色的部分内容 为 正常业务的交换机 Exchange,以及正常业务的对列1 Queue, 而绿色的部分为 另外一套 交换机与队列,他们的创建与正常的类型无异,姑且我们称他们为 死信交换机及死信队列2. 那现在我们就有两套交换机与队列

    那最核心的内容是需要将死信交换机与正常业务的队列1做好绑定关系,这样就能实现当正常队列1无法消费或其他原因时,将消息重定向至死信的交换机中去,死信的交换机再将消息路由至队列2中。

    总结一下:生产者生产了一条消息,将消息推送到Broker 中的 正常交换机Exchange,交换机将消息路由至队列1 ,这时由于队列1中的消息未能正常消费,过期了,导致队列1又将消息投递到了死信交换机中,死信交换机再将消息路由到队列2,从而是的消费者2正常消费到了队列2中的死信消息。

    3. 死信来源

    上文也提到了,消息过期了,导致消息未正常消费,那我们在这里总结一下所有死信的来源

    3.1 消息 TTL 过期

    TTL意思是 Time to live , 即消息的存活时间,那时间到期了,消息也就达到了他的生命的尽头成为了死信。那如何设置TTL呢?

    3.1.1 设置消息的TTL

    发送消息的时候设置属性,可以每条消息设置不同的ttl

    AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("1000").build();

    3.1.2 设置队列的TTL

    声明消息队列的时候,这个是全局的,所有发到这个队列的消息的过期时间是一样的

    deadLetterParams.put("x-message-ttl", 1000);

    队列设置好TTL,则面板中会提示:

     假如你两种都设置了,以小的ttl为准

    那这两者设置有啥区别呢?

    区别:queue的全局ttl,消息过期立刻就会被删掉;如果是发送消息时设置的ttl,过期之后并不会立刻删掉,这时候消息是否过期是需要投递给消费者的时候判断的。【这个可以做个小实验-投递两条消息,第一条设置过期时间10s,第二条1s, 投递完成,都不进行消费理论第二条应该先过期,但由于第一条没消费,第二条也不会过期】

    原因:queue的全局ttl,队列的有效期都一样,先入队列的队列头部,头部也是最早过期的消息,rabbitmq会有一个定时任务从队列的头部开始扫描是否有过期消息即可。而每条设置不同的ttl,只有遍历整个队列才可以筛选出来过期的消息,这样的效率实在是太低,而且如果消息量大了根本不可行,所以rabbitmq在等到消息投递给消费者的时候判断当前消息是否过期,虽然删除的不及时但是不影响功能。
     

    3.2 队列达到最大长度(队列满了,无法再添加数据到 mq 中)

    队列设置最大长度及存储

    1. Map args = new HashMap();
    2. args.put("x-max-length",10);/∥设置queue的最大长度10
    3. args.put("x-max-length-bytes",1024);//设置最大总字节数1KB
    4. channel.queueDeclare("myqueue", false, false, false, args);

    面板展示:

    当消息超限会被丢弃或转向至死信队列中。

    3.3 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false(不再重新入队)

    消息拒绝方式:这两者的区别是 basic.nack 多一个参数,boolean multiple 支持批量拒绝。其余一致。

    1. channel.basicReject(envelope.getDeliveryTag(), false);
    2. channel.basicNack(envelope.getDeliveryTag(), false, false);

    4. 验证代码

    4.1 TTL

    生产者:

    1. /**
    2. * @author rabbit
    3. * @version 1.0.0
    4. * @Description 死信队列-
    5. * 当消息在一个队列中变为死信后,它被重新发送到另一个Exchange。
    6. * @createTime 2022/07/27 19:34:00
    7. */
    8. public class DeadLetterTTLProducer {
    9. private static String NORMAL_EXCHANGE_NAME = "normal_exchange_ttl";
    10. //生产者
    11. public static void main(String[] args) throws Exception {
    12. //1、获取connection
    13. Connection connection = RabbitCommonConfig.getConnection();
    14. //2、创建channel
    15. Channel channel = connection.createChannel();
    16. for (int i = 0; i < 5; i++) {
    17. sendMsg(channel);
    18. }
    19. //4、关闭管道和连接
    20. channel.close();
    21. connection.close();
    22. }
    23. private static void sendMsg(Channel channel) throws IOException, InterruptedException {
    24. // 1. 设置消息 TTL 过期时间
    25. // 2. 设置 队列 TTL 过期时间
    26. AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("1000").build();
    27. String message = "info";
    28. channel.basicPublish(NORMAL_EXCHANGE_NAME, "normal-key", properties, message.getBytes());
    29. System.out.println("消息发送完成:" + message);
    30. }
    31. }

    消费者:

    1. /**
    2. * @author rabbit
    3. * @version 1.0.0
    4. * @Description 正常队列消费者,除了正常的消费者 需要创建 队列、交换机,绑定关系外,
    5. * 还需要创建 死信的队列、交换机、绑定关系。
    6. * 最核心的一点还有,需要将 死信队列的交换机信息做为一个参数,绑定到正常的队列中去。
    7. * @createTime 2022/11/17 16:53:00
    8. */
    9. public class DeadLetterTTLConsumer_Normal {
    10. private static String NORMAL_EXCHANGE_NAME = "normal_exchange_ttl";
    11. private static String NORMAL_QUEUE_NAME = "normal_queue_ttl";
    12. private static String DEAD_EXCHANGE_NAME = "dead_exchange";
    13. private static String DEAD_QUEUE_NAME = "dead-queue";
    14. public static void main(String[] args) throws IOException, TimeoutException {
    15. //1、获取连对象、
    16. Connection connection = RabbitCommonConfig.getConnection();
    17. //2、创建channel
    18. Channel channel = connection.createChannel();
    19. //3. 创建死信队列与交换机及绑定关系
    20. handleQueueAndBinding(channel, DEAD_QUEUE_NAME, null, DEAD_EXCHANGE_NAME, "dead-letter-key");
    21. // 正常队列与死信交换机的绑定关系
    22. Map deadLetterParams = getNormalAndDeadParams();
    23. // 4.声明一个正常队列与交换机及绑定关系
    24. handleQueueAndBinding(channel, NORMAL_QUEUE_NAME, deadLetterParams, NORMAL_EXCHANGE_NAME, "normal-key");
    25. channel.basicQos(1);
    26. //5.开启监听Queue
    27. DefaultConsumer consumer = new DefaultConsumer(channel) {
    28. @Override
    29. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    30. try {
    31. Thread.sleep(1000);
    32. } catch (InterruptedException e) {
    33. e.printStackTrace();
    34. }
    35. System.out.println("Normal消费者接收消息: " + new String(body, "UTF-8"));
    36. channel.basicAck(envelope.getDeliveryTag(), false);
    37. }
    38. };
    39. channel.basicConsume(NORMAL_QUEUE_NAME, false, consumer);
    40. System.out.println("Normal消费者启动接收消息......");
    41. //5、键盘录入,让程序不结束!
    42. System.in.read();
    43. //6、释放资源
    44. channel.close();
    45. connection.close();
    46. }
    47. private static Map getNormalAndDeadParams() {
    48. Map deadLetterParams = new HashMap<>();
    49. deadLetterParams.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
    50. deadLetterParams.put("x-dead-letter-routing-key", "dead-letter-key");
    51. //队列过期时间限制
    52. //deadLetterParams.put("x-message-ttl", 1000);
    53. return deadLetterParams;
    54. }
    55. /**
    56. * 处理队列与绑定关系
    57. *
    58. * @param channel
    59. * @param deadQueueName
    60. * @param o
    61. * @param deadExchangeName
    62. * @param routingKey
    63. * @throws IOException
    64. */
    65. private static void handleQueueAndBinding(Channel channel, String deadQueueName, Map o, String deadExchangeName, String routingKey) throws IOException {
    66. // 声明一个队列
    67. channel.queueDeclare(deadQueueName, false, false, false, o);
    68. // 声明一个交换机
    69. channel.exchangeDeclare(deadExchangeName, BuiltinExchangeType.DIRECT);
    70. // 队列与交换机绑定
    71. channel.queueBind(deadQueueName, deadExchangeName, routingKey);
    72. }
    73. }

    死信消费者:

    1. /**
    2. * @author rabbit
    3. * @version 1.0.0
    4. * @Description 死信队列消费者
    5. * 与正常消费者一致 监听自己的队列消息即可
    6. * @createTime 2022/11/17 16:54:00
    7. */
    8. public class DeadLetterConsumer_Dead {
    9. private static String DEAD_QUEUE_NAME = "dead-queue";
    10. public static void main(String[] args) throws IOException, TimeoutException {
    11. //1、获取连对象、
    12. Connection connection = RabbitCommonConfig.getConnection();
    13. //2、创建channel
    14. Channel channel = connection.createChannel();
    15. //3.开启监听Queue
    16. DefaultConsumer consumer = new DefaultConsumer(channel) {
    17. @Override
    18. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    19. System.out.println("死信消费者接收消息: " + new String(body, "UTF-8"));
    20. }
    21. };
    22. channel.basicConsume(DEAD_QUEUE_NAME, true, consumer);
    23. System.out.println("死信消费者启动等待消费消息:");
    24. //5、键盘录入,让程序不结束!
    25. System.in.read();
    26. //6、释放资源
    27. channel.close();
    28. connection.close();
    29. }
    30. }

    结果:

    1. 生产者:
    2. 消息发送完成:info
    3. 消息发送完成:info
    4. 消息发送完成:info
    5. 消息发送完成:info
    6. 消息发送完成:info
    7. 消费者:
    8. Normal消费者启动接收消息......
    9. Normal消费者接收消息: info
    10. 死信消费者:
    11. 死信消费者启动等待消费消息:
    12. 死信消费者接收消息: info
    13. 死信消费者接收消息: info
    14. 死信消费者接收消息: info
    15. 死信消费者接收消息: info

    4.2 超过Length

    生产者:

    1. /**
    2. * @author rabbit
    3. * @version 1.0.0
    4. * @Description 普通的生产者
    5. * @createTime 2022/11/17 16:51:00
    6. */
    7. public class DeadLetterLengthProducer {
    8. private static String EXCHANGE_NAME = "normal_exchange";
    9. public static void main(String[] args) throws IOException {
    10. //1、获取连对象、
    11. Connection connection = RabbitCommonConfig.getConnection();
    12. //2、创建channel
    13. Channel channel = connection.createChannel();
    14. //3. 发送消息
    15. for (int i = 1; i <= 10; i++) {
    16. String message = "info" + i;
    17. channel.basicPublish(EXCHANGE_NAME, "normal-key", null, message.getBytes());
    18. System.out.println("生产者已发送消息:" + message);
    19. }
    20. System.out.println("消息发送完成");
    21. }
    22. }

    消费者:

    1. /**
    2. * @author rabbit
    3. * @version 1.0.0
    4. * @Description 正常队列消费者,除了正常的消费者 需要创建 队列、交换机,绑定关系外,
    5. * 还需要创建 死信的队列、交换机、绑定关系。
    6. * 最核心的一点还有,需要将 死信队列的交换机信息做为一个参数,绑定到正常的队列中去。
    7. * @createTime 2022/11/17 16:53:00
    8. */
    9. public class DeadLetterLengthConsumer_Normal {
    10. private static String NORMAL_EXCHANGE_NAME = "normal_exchange";
    11. private static String NORMAL_QUEUE_NAME = "normal-queue";
    12. private static String DEAD_EXCHANGE_NAME = "dead_exchange";
    13. private static String DEAD_QUEUE_NAME = "dead-queue";
    14. public static void main(String[] args) throws IOException, TimeoutException {
    15. //1、获取连对象、
    16. Connection connection = RabbitCommonConfig.getConnection();
    17. //2、创建channel
    18. Channel channel = connection.createChannel();
    19. //3. 创建死信队列与交换机及绑定关系
    20. handleQueueAndBinding(channel, DEAD_QUEUE_NAME, null, DEAD_EXCHANGE_NAME, "dead-letter-key");
    21. // 正常队列与死信交换机的绑定关系
    22. Map deadLetterParams = getNormalAndDeadParams();
    23. // 4.声明一个正常队列与交换机及绑定关系
    24. handleQueueAndBinding(channel, NORMAL_QUEUE_NAME, deadLetterParams, NORMAL_EXCHANGE_NAME, "normal-key");
    25. channel.basicQos(1);
    26. //5.开启监听Queue
    27. DefaultConsumer consumer = new DefaultConsumer(channel) {
    28. @Override
    29. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    30. try {
    31. Thread.sleep(1000);
    32. } catch (InterruptedException e) {
    33. e.printStackTrace();
    34. }
    35. System.out.println("Normal消费者接收消息: " + new String(body, "UTF-8"));
    36. channel.basicAck(envelope.getDeliveryTag(), false);
    37. }
    38. };
    39. channel.basicConsume(NORMAL_QUEUE_NAME, false, consumer);
    40. System.out.println("Normal消费者启动接收消息......");
    41. //5、键盘录入,让程序不结束!
    42. System.in.read();
    43. //6、释放资源
    44. channel.close();
    45. connection.close();
    46. }
    47. private static Map getNormalAndDeadParams() {
    48. Map deadLetterParams = new HashMap<>();
    49. deadLetterParams.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
    50. deadLetterParams.put("x-dead-letter-routing-key", "dead-letter-key");
    51. deadLetterParams.put("x-max-length", 6);
    52. return deadLetterParams;
    53. }
    54. /**
    55. * 处理队列与绑定关系
    56. *
    57. * @param channel
    58. * @param deadQueueName
    59. * @param o
    60. * @param deadExchangeName
    61. * @param routingKey
    62. * @throws IOException
    63. */
    64. private static void handleQueueAndBinding(Channel channel, String deadQueueName, Map o, String deadExchangeName, String routingKey) throws IOException {
    65. // 声明一个队列
    66. channel.queueDeclare(deadQueueName, false, false, false, o);
    67. // 声明一个交换机
    68. channel.exchangeDeclare(deadExchangeName, BuiltinExchangeType.DIRECT);
    69. // 队列与交换机绑定
    70. channel.queueBind(deadQueueName, deadExchangeName, routingKey);
    71. }
    72. }

    死信消费者:同上

    结果:

    1. 生产者:
    2. 生产者已发送消息:info1
    3. 生产者已发送消息:info2
    4. 生产者已发送消息:info3
    5. 生产者已发送消息:info4
    6. 生产者已发送消息:info5
    7. 生产者已发送消息:info6
    8. 生产者已发送消息:info7
    9. 生产者已发送消息:info8
    10. 生产者已发送消息:info9
    11. 生产者已发送消息:info10
    12. 消息发送完成
    13. 消费者:
    14. Normal消费者启动接收消息......
    15. Normal消费者接收消息: info1
    16. Normal消费者接收消息: info5
    17. Normal消费者接收消息: info6
    18. Normal消费者接收消息: info7
    19. Normal消费者接收消息: info8
    20. Normal消费者接收消息: info9
    21. Normal消费者接收消息: info10
    22. 死信消费者:
    23. 死信消费者启动等待消费消息:
    24. 死信消费者接收消息: info2
    25. 死信消费者接收消息: info3
    26. 死信消费者接收消息: info4

    4.3 拒绝

    生产者:

    1. /**
    2. * @author rabbit
    3. * @version 1.0.0
    4. * @Description 普通的生产者
    5. * @createTime 2022/11/17 16:51:00
    6. */
    7. public class DeadLetterRejectProducer {
    8. private static String EXCHANGE_NAME = "normal_exchange_reject";
    9. public static void main(String[] args) throws IOException {
    10. //1、获取连对象、
    11. Connection connection = RabbitCommonConfig.getConnection();
    12. //2、创建channel
    13. Channel channel = connection.createChannel();
    14. //3. 发送消息
    15. for (int i = 1; i <= 10; i++) {
    16. String message = "info" + i;
    17. channel.basicPublish(EXCHANGE_NAME, "normal-key", null, message.getBytes());
    18. System.out.println("生产者已发送消息:" + message);
    19. }
    20. System.out.println("消息发送完成");
    21. }
    22. }

    消费者:

    1. /**
    2. * @author rabbit
    3. * @version 1.0.0
    4. * @Description 正常队列消费者,除了正常的消费者 需要创建 队列、交换机,绑定关系外,
    5. * 还需要创建 死信的队列、交换机、绑定关系。
    6. * 最核心的一点还有,需要将 死信队列的交换机信息做为一个参数,绑定到正常的队列中去。
    7. * @createTime 2022/11/17 16:53:00
    8. */
    9. public class DeadLetterRejectConsumer_Normal {
    10. private static String NORMAL_EXCHANGE_NAME = "normal_exchange_reject";
    11. private static String NORMAL_QUEUE_NAME = "normal_queue_reject";
    12. private static String DEAD_EXCHANGE_NAME = "dead_exchange";
    13. private static String DEAD_QUEUE_NAME = "dead-queue";
    14. public static void main(String[] args) throws IOException, TimeoutException {
    15. //1、获取连对象、
    16. Connection connection = RabbitCommonConfig.getConnection();
    17. //2、创建channel
    18. Channel channel = connection.createChannel();
    19. //3. 创建死信队列与交换机及绑定关系
    20. handleQueueAndBinding(channel, DEAD_QUEUE_NAME, null, DEAD_EXCHANGE_NAME, "dead-letter-key");
    21. // 正常队列与死信交换机的绑定关系
    22. Map deadLetterParams = getNormalAndDeadParams();
    23. // 4.声明一个正常队列与交换机及绑定关系
    24. handleQueueAndBinding(channel, NORMAL_QUEUE_NAME, deadLetterParams, NORMAL_EXCHANGE_NAME, "normal-key");
    25. channel.basicQos(1);
    26. //5.开启监听Queue
    27. DefaultConsumer consumer = new DefaultConsumer(channel) {
    28. @Override
    29. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    30. try {
    31. Thread.sleep(1000);
    32. } catch (InterruptedException e) {
    33. e.printStackTrace();
    34. }
    35. if ("info5".equals(new String(body, "UTF-8"))) {
    36. System.out.println("Normal消费者接收消息:" + new String(body, "UTF-8") + "并且拒绝签收了");
    37. // 禁止重新入队
    38. //channel.basicReject(envelope.getDeliveryTag(), false);
    39. channel.basicNack(envelope.getDeliveryTag(), false, false);
    40. } else {
    41. System.out.println("Normal消费者接收消息:" + new String(body, "UTF-8"));
    42. channel.basicAck(envelope.getDeliveryTag(), false);
    43. }
    44. }
    45. };
    46. channel.basicConsume(NORMAL_QUEUE_NAME, false, consumer);
    47. System.out.println("Normal消费者启动接收消息......");
    48. //5、键盘录入,让程序不结束!
    49. System.in.read();
    50. //6、释放资源
    51. channel.close();
    52. connection.close();
    53. }
    54. private static Map getNormalAndDeadParams() {
    55. Map deadLetterParams = new HashMap<>();
    56. deadLetterParams.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
    57. deadLetterParams.put("x-dead-letter-routing-key", "dead-letter-key");
    58. return deadLetterParams;
    59. }
    60. /**
    61. * 处理队列与绑定关系
    62. *
    63. * @param channel
    64. * @param deadQueueName
    65. * @param o
    66. * @param deadExchangeName
    67. * @param routingKey
    68. * @throws IOException
    69. */
    70. private static void handleQueueAndBinding(Channel channel, String deadQueueName, Map o, String deadExchangeName, String routingKey) throws IOException {
    71. // 声明一个队列
    72. channel.queueDeclare(deadQueueName, false, false, false, o);
    73. // 声明一个交换机
    74. channel.exchangeDeclare(deadExchangeName, BuiltinExchangeType.DIRECT);
    75. // 队列与交换机绑定
    76. channel.queueBind(deadQueueName, deadExchangeName, routingKey);
    77. }
    78. }

    死信消费者:同上

    结果:

    1. 生产者:
    2. 生产者已发送消息:info1
    3. 生产者已发送消息:info2
    4. 生产者已发送消息:info3
    5. 生产者已发送消息:info4
    6. 生产者已发送消息:info5
    7. 生产者已发送消息:info6
    8. 生产者已发送消息:info7
    9. 生产者已发送消息:info8
    10. 生产者已发送消息:info9
    11. 生产者已发送消息:info10
    12. 消息发送完成
    13. 消费者:
    14. Normal消费者启动接收消息......
    15. Normal消费者接收消息:info1
    16. Normal消费者接收消息:info2
    17. Normal消费者接收消息:info3
    18. Normal消费者接收消息:info4
    19. Normal消费者接收消息:info5并且拒绝签收了
    20. Normal消费者接收消息:info6
    21. Normal消费者接收消息:info7
    22. Normal消费者接收消息:info8
    23. Normal消费者接收消息:info9
    24. Normal消费者接收消息:info10
    25. 死信消费者:
    26. 死信消费者启动等待消费消息:
    27. 死信消费者接收消息: info5

    5. 总结

    其实死信队列没什么神秘的内容,只是:

    1. 在原来的基础上增加了一套死信Exchange和死信Queue

    2. 与原来的Queue 和死信Exchange做好了绑定关系。

    绑定关系依靠参数设置在业务正常queue声明的时候进行传入:

    1. Map deadLetterParams = new HashMap<>();
    2. deadLetterParams.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
    3. deadLetterParams.put("x-dead-letter-routing-key", "dead-letter-key");
    4. channel.queueDeclare(deadQueueName, false, false, false, deadLetterParams );

    与业务Queue绑定好关系后,看下面板的体现:

     这里的 DLX - dead-letter-exchange  = 业务中的参数 :"x-dead-letter-exchange" 指向的 死信Exchange

    这里的DLK -dead-letter-routing-key" = 业务中的参数:"x-dead-letter-routing-key" 指向的是 死信的 路由键

  • 相关阅读:
    AlphaGo & Model-Based RL
    GPT与BERT模型
    ARM64汇编0A - thumb模式与IT块
    C语言结课实战项目_贪吃蛇小游戏
    《软件设计师考试》易混淆知识点
    基于JAVA旅游景点推荐系统计算机毕业设计源码+数据库+lw文档+系统+部署
    Spring Cloud Gateway简介
    你安全吗?丨秦淮到底是哪种黑客?你猜对了吗?
    C#表达树
    那些年用Python踩过的坑
  • 原文地址:https://blog.csdn.net/blucastle/article/details/127941119