• RabbitMQ——死信队列


    RabbitMQ——死信队列

    死信队列(Dead Letter Queue,DLQ)是 RabbitMQ 中的一种重要特性,用于处理无法被消费的消息,防止消息丢失。

    死信的来源

    消息队列中,当消息满足一定条件而无法被正常消费时,这些消息会被发送到死信队列。满足条件的情况包括但不限于:

    • 消息被拒绝(basic.rejectbasic.nack)且不重新入队(requeue 参数为 false)。
    • 消息过期(TTL,Time-To-Live)。
    • 队列长度超过限制,无法再添加数据到mq中。

    生产者

    package com.weipch.rabbitmq.dlq;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.GetResponse;
    import com.weipch.util.RabbitMqUtils;
    
    /**
     * @Author 方唐镜
     * @Create 2024-03-03 14:08
     * @Description
     */
    public class Produce {
    
    
    	private static final String NORMAL_EXCHANGE = "normal_exchange";
    
    
    	public static void main(String[] args) throws Exception {
    		Channel channel = RabbitMqUtils.getChannel();
            //模拟消息过期 10s
    		//AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
    		for (int i = 0; i < 10; i++) {
    			String message = "hello world" + i;
    			channel.basicPublish(NORMAL_EXCHANGE, "normal-routing-key", null, message.getBytes());
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    消费者

    正常队列:

    package com.weipch.rabbitmq.dlq;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import com.weipch.util.RabbitMqUtils;
    
    import java.nio.charset.StandardCharsets;
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * @Author 方唐镜
     * @Create 2024-03-03 13:50
     * @Description
     */
    public class Consumer01 {
    
    
    	private static final String NORMAL_EXCHANGE = "normal_exchange";
    	private static final String DEAD_EXCHANGE = "dead_exchange";
    
    	private static final String NORMAL_QUEUE = "normal_queue";
    	private static final String DEAD_QUEUE = "dead_queue";
    
    
    	public static void main(String[] args) throws Exception {
    		Channel channel = RabbitMqUtils.getChannel();
    		//声明死信交换机和队列
    		channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
    		channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
    		//绑定
    		channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "dead-routing-key");
    
    
    		//声明普通交换机和队列
    		channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
    		//在正常队列中设置死信参数 指定死信交换机和死信路由键
    		Map<String, Object> map = new HashMap<>();
    		map.put("x-dead-letter-exchange", DEAD_EXCHANGE);
    		map.put("x-dead-letter-routing-key", "dead-routing-key");
    		//最大长度
    		//map.put("x-max-length", 6);
    		channel.queueDeclare(NORMAL_QUEUE, false, false, false, map);
    		channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "normal-routing-key");
    		DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    			String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
    			if (message.contains("5")){
    				System.out.println("Consumer01接收消息:" + message + ",此消息被拒绝");
    				//拒绝消息并把消息丢入死信队列
    				channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
    			}else {
    				System.out.println("Consumer01接收消息:" + message);
    				channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    			}
    		};
    		channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, (consumerTag, e) -> {});
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59

    死信队列:

    package com.weipch.rabbitmq.dlq;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.weipch.util.RabbitMqUtils;
    
    import java.nio.charset.StandardCharsets;
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * @Author 方唐镜
     * @Create 2024-03-03 13:50
     * @Description
     */
    public class Consumer02 {
    	
    	private static final String DEAD_QUEUE = "dead_queue";
    
    	public static void main(String[] args) throws Exception {
    		Channel channel = RabbitMqUtils.getChannel();
    		channel.basicConsume(DEAD_QUEUE, true,
    			(consumerTag, delivery) -> System.out.println("Consumer02:" + new String(delivery.getBody(), StandardCharsets.UTF_8)),
    			(consumerTag, e) -> {});
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    生产者发送消息到正常队列,而消费者负责消费正常队列的消息。当消息被消费者拒绝并不再重新投递时,消息会被发送到死信队列。

  • 相关阅读:
    Linux笔记之diff工具软件P4merge的使用
    Visual Studio Code 配置C、C++ 文件debug调试环境
    prompt提示词:AI英语词典,让AI教你学英语,通过AI实现一个网易有道英语词典
    并发编程day06
    Springboot毕业设计毕设作品,心理评测系统设计与实现
    C++中cin输入空格
    101道算法JavaScript描述【二叉树】9
    递归算法学习——有效的数独,解数独
    【Linux常用命令4】系统状态监测命令---2
    LP光纤模式计算器
  • 原文地址:https://blog.csdn.net/wpc2018/article/details/136596515