• RabbitMq 高级特性及整合SpringBoot学习总结


    SpringBoot整合RabbitMq

    引入依赖:
    生产者:
    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-amqpartifactId>
    dependency>
    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-testartifactId>
    dependency>
    
    消费者:
    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-amqpartifactId>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    配置文件:
    spring:
      rabbitmq:
        host: 127.0.0.1 # ip
        port: 5672
        username: zhangsan
        password: 123456
        virtual-host: /zhangsan
        # 开启confirms回调 P -> Exchange
    	spring.rabbitmq.publisher-confirms=true
    	# 开启returnedMessage回调 Exchange -> Queue
    	spring.rabbitmq.publisher-returns=true
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    生产者配置类:
    @Configuration
    public class RabbitMqConfig {
    
        //定义交换机的名字
        public static final String  EXCHANGE_NAME = "boot_topic_exchange";
        //定义队列的名字
        public static final String QUEUE_NAME = "boot_queue";
    
        //1、声明交换机
        @Bean("bootExchange")
        public Exchange bootExchange(){
            return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
        }
    
        //2、声明队列
        @Bean("bootQueue")
        public Queue bootQueue(){
            return QueueBuilder.durable(QUEUE_NAME).build();
        }
        
        //3、队列与交换机进行绑定
        @Bean
        public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){
            return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    消费者监听队列:
    @Component
    public class RabbitMQListener {
    
          //定义方法进行信息的监听   RabbitListener中的参数用于表示监听的是哪一个队列
          @RabbitListener(queues = "boot_queue")
          public void ListenerQueue(Message message){
              System.out.println("message:"+message.getBody());
          }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    生产者测试类:
    
    @SpringBootTest
    @RunWith(SpringRunner.class)
    public class ProducerTest {
    
         //注入 RabbitTemplate
        @Autowired
         private RabbitTemplate  rabbitTemplate;
    
         @Test
         public void send(){
             rabbitTemplate.convertAndSend("boot_topic_exchange","boot.haha","boot mq...");
         }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    消息的可靠投递

    1、保障消息发送成功
    • 使用事务的手段,性能低。
    • 使用异步的confirm确认机制,确认消息到达exchange中
    2、保障消息存储成功
    • 在消息投递前对消息信息进行持久化(队列、交换机、消息),可以使用redis
    • 搭建broker的集群,备份存储队列、交换机、消息的信息
    • 监听死信消息(死信:队列满,过期,拒收)
    3、保证消费者接收消息成功
    • 使用手动确认消息的方式,消费之后修改持久化消息的状态

    消息生产者保证可靠性投递

    在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景,RabbitMQ 提供了两种方式用来控制消息的投递可靠性模式。
    rabbitmq 整个消息投递的路径为:
    producer--->rabbitmq broker--->exchange--->queue--->consumer

    • confirm 确认模式
      消息从 producer —> exchange 失败,则会返回一个 confirmCallback 。
    • return 退回模式
      消息从 exchange —> queue 失败,则会返回一个 returnCallback 。

    可以将利用这两个 callback 控制消息的可靠性投递

    代码实现

    配置文件:
    
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:rabbit="http://www.springframework.org/schema/rabbit"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/context
           https://www.springframework.org/schema/context/spring-context.xsd
           http://www.springframework.org/schema/rabbit
           http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    
        
        <context:property-placeholder location="classpath:rabbitmq.properties"/>
    
        
        <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                                   port="${rabbitmq.port}"
                                   username="${rabbitmq.username}"
                                   password="${rabbitmq.password}"
                                   virtual-host="${rabbitmq.virtual-host}"
                                   publisher-confirms="true"
                                   publisher-returns="true"
        />
    
        
        <rabbit:admin connection-factory="connectionFactory"/>
    
        
        <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
    
    
    
        
        
        <rabbit:queue id="test_queue_confirm" name="test_queue_confirm">rabbit:queue>
    
        <rabbit:direct-exchange name="test_exchange_confirm">
            <rabbit:bindings>
                <rabbit:binding queue="test_queue_confirm" key="confirm">rabbit:binding>
            rabbit:bindings>
        rabbit:direct-exchange>
    
    
        
    
        
        
         <rabbit:queue name="test_queue_ttl" id="test_queue_ttl">
             
             <rabbit:queue-arguments>
                 
                 <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer">entry>
             rabbit:queue-arguments>
    
         rabbit:queue>
    
        <rabbit:topic-exchange name="test_exchange_ttl" >
             <rabbit:bindings>
                 <rabbit:binding pattern="ttl.#" queue="test_queue_ttl">rabbit:binding>
             rabbit:bindings>
         rabbit:topic-exchange>
    
        
    
    
        
        
    
        
    
        <rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
            
            <rabbit:queue-arguments>
                
                <entry key="x-dead-letter-exchange" value="exchange_dlx" />
    
                
                <entry key="x-dead-letter-routing-key" value="dlx.hehe" />
    
                
                <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
                
                <entry key="x-max-length" value="10" value-type="java.lang.Integer" />
            rabbit:queue-arguments>
        rabbit:queue>
    
        <rabbit:topic-exchange name="test_exchange_dlx">
            <rabbit:bindings>
                <rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx">rabbit:binding>
            rabbit:bindings>
        rabbit:topic-exchange>
    
    
        
    
        <rabbit:queue name="queue_dlx" id="queue_dlx">rabbit:queue>
        <rabbit:topic-exchange name="exchange_dlx">
            <rabbit:bindings>
                <rabbit:binding pattern="dlx.#" queue="queue_dlx">rabbit:binding>
            rabbit:bindings>
        rabbit:topic-exchange>
        
    
    
    
    
    
        
    
        
        
        <rabbit:queue id="order_queue" name="order_queue">
            
            <rabbit:queue-arguments>
                <entry key="x-dead-letter-exchange" value="order_exchange_dlx" />
                <entry key="x-dead-letter-routing-key" value="dlx.order.cancel" />
                <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
            rabbit:queue-arguments>
        rabbit:queue>
    
        <rabbit:topic-exchange name="order_exchange">
            <rabbit:bindings>
                <rabbit:binding pattern="order.#" queue="order_queue">rabbit:binding>
            rabbit:bindings>
        rabbit:topic-exchange>
    
        
        <rabbit:queue id="order_queue_dlx" name="order_queue_dlx">rabbit:queue>
    
        <rabbit:topic-exchange name="order_exchange_dlx">
            <rabbit:bindings>
                <rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx">rabbit:binding>
            rabbit:bindings>
        rabbit:topic-exchange>
        
    beans>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    测试类
    Confirm 模式:
    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
    public class ProducerTest {
    
        @Autowired
        private RabbitTemplate  rabbitTemplate;
        
        @Test
        public void testConfirm() {
    
             //定义回调
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                /**
                 *
                 * @param correlationData 相关配置信息
                 * @param ack   exchange交换机 是否成功收到了消息。true 成功,false代表失败
                 * @param cause 失败原因
                 */
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    //System.out.println("confirm方法被执行了...."+correlationData.getId());
    
                     //ack 为  true表示 消息已经到达交换机
                    if (ack) {
                        //接收成功
                        System.out.println("接收成功消息" + cause);
                    } else {
                        //接收失败
                        System.out.println("接收失败消息" + cause);
                        //做一些处理,让消息再次发送。
                    }
                }
            });
            //进行消息发送
            for (int i = 0; i < 5; i++) {
                rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","message Confirm...");
            }
            //进行睡眠操作
            try {
                Thread.sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    return模式:
        @Test
        public void testReturn() {
    
            //设置交换机处理失败消息的模式   为true的时候,消息达到不了 队列时,会将消息重新返回给生产者
            rabbitTemplate.setMandatory(true);
    
            //定义回调
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                /**
                 *
                 * @param message   消息对象
                 * @param replyCode 错误码
                 * @param replyText 错误信息
                 * @param exchange  交换机
                 * @param routingKey 路由键
                 */
                @Override
                public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                    System.out.println("return 执行了....");
    
                    System.out.println("message:"+message);
                    System.out.println("replyCode:"+replyCode);
                    System.out.println("replyText:"+replyText);
                    System.out.println("exchange:"+exchange);
                    System.out.println("routingKey:"+routingKey);
    
                    //处理
                }
            });
            //进行消息发送
            rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","message return...");
    
            //进行睡眠操作
            try {
                Thread.sleep(5000);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    批量发送消息,让消费者每次拉去指定的数量:
         @Test
         public void  testQos(){
             for (int i = 0; i < 10; i++) {
                 // 发送消息
                 rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm....");
             }
    
         }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    TTL:

    TTL 全称 Time To Live(存活时间/过期时间),当消息到达存活时间后,还没有被消费,会被自动清除,RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。

    设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。

    <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
    
    • 1

    设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。

    如果两者都进行了设置,以时间短的为准。

        @Test
        public void testTtl() {
    
            for (int i = 0; i < 10; i++) {
                Message message = MessageBuilder
                        .withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
                        //消息单独过期时间
                        .setExpiration("5000")
                        .build();
    
                // 发送消息
                rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.zhangsan", message);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    死信队列:

    死信交换机和死信队列和普通的没有区别,当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列。
    在这里插入图片描述

    消息成为死信的三种情况:
    • 队列消息长度到达限制;
    • 消费者拒接消费消息,并且不重回队列;
    • 原队列存在消息过期设置,消息到达超时时间未被消费;
        @Test
        public void testDlx(){
        
        	Message message = MessageBuilder
                        .withBody("死信消息测试".getBytes(StandardCharsets.UTF_8))
                        //消息单独过期时间
                        .setExpiration("5000")
                        .build();
    			
            //测试过期时间,死信消息
            rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.hehe",message);
            //测试长度限制后,消息死信
           for (int i = 0; i < 20; i++) {
                rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.hehe",message);
            }
            //测试消息拒收
            rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.zhangsan",message);
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    延迟队列:

    延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
    例如:下单后,30分钟未支付,取消订单,回滚库存。
    使用TTL+死信队列组合可以实现延迟队列的效果。
    在这里插入图片描述

    配置文件设置:

    1、声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
    2、声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
    3、正常队列绑定死信交换机
    设置两个参数:
    x-dead-letter-exchange:死信交换机名称
    x-dead-letter-routing-key:发送给死信交换机的routingkey

        @Test
        public  void testDelay() throws InterruptedException {
        	Message message = MessageBuilder
                        .withBody("订单信息:id=1,time=2020年12月...".getBytes(StandardCharsets.UTF_8))
                        //消息单独过期时间
                        .setExpiration("5000")
                        .build();
    		
            //发送订单消息。 将来是在订单系统中,下单成功后,发送消息
            rabbitTemplate.convertAndSend("order_exchange","order.msg",message);
    
            //打印倒计时10秒
            for (int i = 10; i > 0 ; i--) {
                System.out.println(i+"...");
                Thread.sleep(1000);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    消息消费者保证可靠性投递

    Consumer Ack
    ack指Acknowledge确认,表示消费端收到消息后的确认方式,有三种确认方式:

    • 自动确认:acknowledge="none"
      当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。
    • 手动确认:acknowledge="manual"
      需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。
    • 根据异常情况确认:acknowledge="auto"
    代码实现
    配置文件:
    
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:rabbit="http://www.springframework.org/schema/rabbit"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/context
           https://www.springframework.org/schema/context/spring-context.xsd
           http://www.springframework.org/schema/rabbit
           http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
        
        <context:property-placeholder location="classpath:rabbitmq.properties"/>
    
        
        <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                                   port="${rabbitmq.port}"
                                   username="${rabbitmq.username}"
                                   password="${rabbitmq.password}"
                                   virtual-host="${rabbitmq.virtual-host}"/>
        <context:component-scan base-package="com.zhangsan.listener" />
    
        
    
        
        <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="5">
            <rabbit:listener ref="ackListener" queue-names="test_queue_confirm">rabbit:listener>
            
    
            
            <rabbit:listener ref="dlxListener" queue-names="test_queue_dlx">rabbit:listener>
    
            
            <rabbit:listener ref="orderListener" queue-names="order_queue_dlx">rabbit:listener>
        rabbit:listener-container>
    beans>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    测试类:

    ACK测试:

    @Component
    public class AckListener implements ChannelAwareMessageListener {
    
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            //获取消息的id
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            try {
            //获取消息
            System.out.println("message:"+new String(message.getBody()));
            //进行业务处理
            System.out.println("=====进行业务处理====");
            //模拟出现异常
            int  i = 5/0;
            //进行消息签收
            channel.basicAck(deliveryTag, true);
            } catch (Exception e) {
                //出现异常,拒绝签收
                //第二个参数:是否应用于多消息
                //第三个参数:requeue:重回队列。
                //如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
                channel.basicNack(deliveryTag, true, true);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    消费端限流:

    首先消费端的确认模式一定为手动确认:acknowledge=“manual”
    配置 prefetch属性设置消费端一次拉取多少消息:prefetch=“5”

    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="5" >
    
    • 1

    RabbitMQ 保障消息幂等

    在编程中一个幂等操作的特点是:任意多次执行所产生的结果与一次执行的产生的结果相同,在MQ中由于网络故障、客户端延迟消费、MQ自动重试过程中可能会导致消息的重复消费的问题。我们要保证一次或多次请求同一个资源,对资源本身产生的影响和第一次执行的影响相同。

    例如:消费者在消费完一条消息后,向RabbitMQ 发送一个ACK 确认,但是此时网络断开或者其他原因导致RabbitMQ 没有收到这个ACK,那么RabbitMQ 并不会讲该条消息删除,而是重回队列,当客户端重新建立到连接后,消费者还是会再次收到该条消息,这就造成了消息的重复消费。

    解决方案:
    1、乐观锁机制:
    • 根据version版本,也就是在操作库存前先获取当前商品的version版本号,然后操作的时候带上此version号
    • 第一次操作库存时,得到version为1,调用库存服务把version+1,version变成了2。
    • 但返回给订单服务出现了问题没有收到ACK,订单服务又会一次发起调用库存服务,此时订单服务传递的version还是1
    • 再执行更新时,就不会执行了,因为此时version = 2,where条件不成立,这样就保证了不管调用几次,只会真正的处理一次
    2、Redis:
    • 在消费者修改库存之前,先将消息的 id 执行setnx放到 Redis 中
    • 如果执行成功就表示没有修改过这条库存,可以进行消费
    • 如果执行失败表示库存已经被修改了,直接ACK
    3、唯一ID + 指纹码机制 利用数据库主键去重:
    • 唯一ID就是业务表的唯一的主键,如商品ID
    • 生产者发送消息时生成指纹码,可以用时间戳+业务编号的方式
    • 消费者每次重试时都带着同样的唯一ID + 指纹码发送
    • 消费者获取到消息后先根据唯一ID + 指纹码去查询db是否存在该消息
    • 如果不存在,则正常消费,消费完毕后把唯一ID + 指纹码写入db
    • 如果存在,则证明消息被消费过,直接ACK
    • 这种方法的好处是实现简单,坏处是高并发下有数据写入的性能瓶颈。可以根据ID进行分库分表进行算法路由,实现分压分流机制
  • 相关阅读:
    【深度学习实验】线性模型(五):使用Pytorch实现线性模型:基于鸢尾花数据集,对模型进行评估(使用随机梯度下降优化器)
    【Vue】模板语法,事件处理器及综合案例、自定义组件、组件通信
    单例模式(Singleton)
    c++后端开发书籍推荐
    找实习之从0开始的后端学习日记【9.17】
    电脑系统重装后如何开启Win11实时辅助字幕
    完整调试 security oauth2 swagger2的过程
    6-5,web3浏览器链接区块链(react+区块链实战)
    基于 Ubuntu 20.04 系统 部署 NetBox
    深入剖析:正则表达式的奥秘
  • 原文地址:https://blog.csdn.net/weixin_45161367/article/details/126688255