• Rabbitmq- 消费者ack机制与发布者消息确认


    Rabbitmq消息确认机制

    https://blog.csdn.net/yorsola/article/details/108436276
    官网:https://www.rabbitmq.com/confirms.html

    使用消息代理(如 RabbitMQ)的系统按照定义是分布式的。由于发送的协议方法(消息)不能保证到达对等点或被对等点成功处理,因此发布者和消费者都需要一种传递和处理确认的机制。RabbitMQ 支持的几个消息传递协议提供了这样的特性.

    (消费者)交付确认
    当 RabbitMQ 向消费者传递消息时,它需要知道何时认为消息发送成功。哪种逻辑是最佳的取决于系统。
    因此,这主要是一个申请决定。在 AMQP 0-9-1 中,它是在使用 basic.consume 方法注册消费者或使用basic.get方法按需获取消息时生成的。
    
    交付标识符:交付标签
    在我们继续讨论其他主题之前,重要的是解释如何识别交付(并且确认表明它们各自的交付)。当注册消费者(订阅)时,RabbitMQ 将使用basic.deliver 方法传递(推送)消息。该方法带有一个传递标签,它唯一地标识一个通道上的传递。因此,交付标签的范围是每个渠道。
    交付标签是单调增长的正整数,并由客户端库呈现。确认交付的客户端库方法将交付标签作为参数。
    由于交付标签的范围是每个渠道,因此必须在接收交付的同一渠道上确认交付。在不同的通道上确认将导致“未知传递标签”协议异常并关闭通道。
    
    消费者确认模式和数据安全注意事项
    当节点将消息传递给消费者时,它必须决定是否应将消息视为由消费者处理(或至少接收)。由于多个事物(客户端连接、消费者应用程序等)可能会失败,因此此决定是数据安全问题。消息传递协议通常提供一种确认机制,允许消费者确认传递到他们连接的节点。是否使用该机制在消费者订阅时决定。
    根据使用的确认模式,RabbitMQ 可以认为消息在发送后立即成功传递(写入 TCP 套接字)或在接收到显式(“手动”)客户端确认时。手动发送的确认可以是肯定的或否定的,并使用以下协议方法之一:
    basic.ack用于肯定确认
    basic.nack用于否定确认(注意:这是RabbitMQ 对 AMQP 0-9-1 的扩展)
    basic.reject用于否定确认,但与basic.nack相比有一个限制
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    这里的几个消息确认具体分析下:

    肯定确认只是指示 RabbitMQ 将消息记录为已传递并且可以被丢弃。
    带有basic.reject的否定确认具有相同的效果。区别主要在于语义上:肯定的确认假定消息已成功处理,而否定的对应则表明未处理但仍应删除传递。
    
    在自动确认模式下,消息在发送后立即被视为成功传递。
    这种模式以更高的吞吐量(只要消费者能够跟上)以降低交付和消费者处理的安全性为代价。这种模式通常被称为“即发即弃”。
    与手动确认模型不同的是,如果消费者的 TCP 连接或通道在发送成功之前关闭,服务器发送的消息将丢失。因此,自动消息确认应该被认为是不安全 的,并且不适合所有工作负载。
    
    使用自动确认模式时需要考虑的另一件事是消费者过载。
    手动确认模式通常与有界通道预取一起使用,这限制了通道上未完成(“进行中”)交付的数量。然而,对于自动确认,根据定义没有这样的限制。
    因此,消费者可能会对交付速度感到不知所措,可能会在内存中累积积压并耗尽堆或让他们的进程被操作系统终止。
    一些客户端库将应用 TCP 背压(停止从套接字读取,直到未处理交付的积压下降超过某个限制)。因此,自动确认模式仅推荐给能够以稳定的速度高效处理交付的消费者。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    Params:
    deliveryTag – the tag from the received AMQP.Basic.GetOk or AMQP.Basic.Deliver 
    multiple – true to acknowledge all messages up to and including the supplied delivery tag; false to acknowledge just the supplied delivery tag.
    void basicAck(long deliveryTag, boolean multiple) throws IOException;
    
    • 1
    • 2
    • 3
    • 4

    example:
    一次仅交付一条数据

    // this example assumes an existing channel instance
    
    boolean autoAck = false;
    channel.basicConsume(queueName, autoAck, "a-consumer-tag",
         new DefaultConsumer(channel) {
             @Override
             public void handleDelivery(String consumerTag,
                                        Envelope envelope,
                                        AMQP.BasicProperties properties,
                                        byte[] body)
                 throws IOException
             {
                 long deliveryTag = envelope.getDeliveryTag();
                 // positively acknowledge a single delivery, the message will
                 // be discarded
                 channel.basicAck(deliveryTag, false);
             }
         });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    一次交付多条
    可以批量手动确认以减少网络流量。这是通过将确认方法的多个字段(见上文)设置为true来完成的。请注意,basic.reject历史上没有该字段,这就是RabbitMQ 引入basic.nack作为协议扩展的原因。
    当multiple字段设置为true时,RabbitMQ 将确认所有未完成的交付标签,包括确认中指定的标签。与其他与确认相关的所有内容一样,这是每个通道的范围。例如,假设通道Ch上有未确认的传递标签 5、6、7 和 8 ,当一个确认帧到达该通道时,delivery_tag设置为8 ,multiple设置为true,从 5 到 8 的所有标签都将被确认. 如果multiple设置为false,则交付 5、6 和 7 仍将未被确认。

    // this example assumes an existing channel instance
    
    boolean autoAck = false;
    channel.basicConsume(queueName, autoAck, "a-consumer-tag",
         new DefaultConsumer(channel) {
             @Override
             public void handleDelivery(String consumerTag,
                                        Envelope envelope,
                                        AMQP.BasicProperties properties,
                                        byte[] body)
                 throws IOException
             {
                 long deliveryTag = envelope.getDeliveryTag();
                 // positively acknowledge all deliveries up to
                 // this delivery tag
                 channel.basicAck(deliveryTag, true);
             }
         });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    交付的否定确认和重新排队

    有时消费者无法立即处理交付,但其他实例可能能够。在这种情况下,可能需要重新排队并让另一个消费者接收和处理它。basic.reject和basic.nack是用于此目的的两种协议方法。

    这些方法通常用于否定确认交付。这样的交付可以被经纪人丢弃或死信或重新排队。此行为由requeue字段控制。当该字段设置为true时,代理将使用指定的交付标签重新排队交付(或多个交付,稍后将解释)。或者,当此字段设置为false时,如果已配置,消息将被路由到死信交换,否则将被丢弃。

    Params:
    deliveryTag – the tag from the received AMQP.Basic.GetOk or AMQP.Basic.Deliver
    multiple – true to reject all messages up to and including the supplied delivery tag; false to reject just the supplied delivery tag.
    requeue – true if the rejected message(s) should be requeued rather than discarded/dead-lettered
    void basicNack(long deliveryTag, boolean multiple, boolean requeue)
                throws IOException;
    
    Params:
    deliveryTag – the tag from the received AMQP.Basic.GetOk or AMQP.Basic.Deliver
    requeue – true if the rejected message should be requeued rather than discarded/dead-lettered
    void basicReject(long deliveryTag, boolean requeue) throws IOException;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    当一条消息被重新排队时,如果可能的话,它将被放置到它在队列中的原始位置。如果不是(由于多个消费者共享队列时其他消费者的并发传递和确认),则消息将重新排队到更接近队列头的位置。

    重新排队的消息可能会立即准备好重新传递,具体取决于它们在队列中的位置以及具有活动消费者的通道使用的预取值。这意味着,如果所有消费者由于临时条件而无法处理交付而重新排队,他们将创建重新排队/重新交付循环。就网络带宽和 CPU 资源而言,此类循环的成本可能很高。消费者实现可以跟踪重新传递的数量并永久拒绝消息(丢弃它们)或在延迟后安排重新排队。

    springboot配置:
    默认情况下消息消费者是自动 ack 消息的,以下为 yml 配置

    spring:
      rabbitmq:
        listener:
          simple:
            acknowledge-mode: manual
    
    • 1
    • 2
    • 3
    • 4
    • 5

    或者在 RabbitListenerContainerFactory 中开启手动 ack

    @Bean
    public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        //开启手动 ack
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);             
        return factory;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    消息接收

       @RabbitListener(bindings = {
                @QueueBinding(
                        value = @Queue(value = "${rabbitmq.queue.routing.beijing}", durable = "true",autoDelete="false"),
                        exchange = @Exchange(
                                value = "${rabbitmq.exchange.routing}",
                                durable = "true",
                                type = ExchangeTypes.TOPIC),
                        key = "china.#")})
        public void receive(@Payload String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
            System.out.println("路由监听接受到发送者发送的信息:" + msg);
            // 确认消息
            channel.basicAck(deliveryTag, false);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    rabbitmq配置confirm确认和returnback消息回调

      /**
         * 配置rabbitmqTemplate
         *
         * 推送mq有四种情况:
         *      消息推送到 MQ,但是在 MQ 里找不到交换机
         *      消息推送到 MQ,找到交换机了,当时没有找到队列
         *      消息推送到 MQ,交换机和队列都没找到
         *      消息成功推送
         *
         */
        @Bean
        @ConditionalOnBean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    
            // 设置消息从生产者发送至 rabbitmq broker 成功的回调 (保证信息到达 broker)
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                // ack=true:消息成功发送到Exchange
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    System.out.println("ConfirmCallback:     " + "相关数据:" + correlationData);
                    System.out.println("ConfirmCallback:     " + "确认是否到达交换机:" + ack);
                    System.out.println("ConfirmCallback:     " + "原因:" + cause);
                }
            });
            // 设置信息从交换机发送至 queue 失败的回调
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                @Override
                public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                    System.out.println("ReturnCallback:     " + "消息:" + message);
                    System.out.println("ReturnCallback:     " + "回应码:" + replyCode);
                    System.out.println("ReturnCallback:     " + "回应信息:" + replyText);
                    System.out.println("ReturnCallback:     " + "交换机:" + exchange);
                    System.out.println("ReturnCallback:     " + "路由键:" + routingKey);
                }
            });
            // 为 true 时,消息通过交换器无法匹配到队列时会返回给生产者,为 false 时,匹配不到会直接丢弃
            rabbitTemplate.setMandatory(true);
            // 设置发送时的转换
            // rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
            return rabbitTemplate;
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    具体使用可以结合项目,完成对消息完整性以及顺序性的保证

    消息的可靠投递

    在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。

    confirm 确认模式
    return  退回模式
    
    • 1
    • 2
    rabbitmq 整个消息投递的路径为:
    producer--->rabbitmq broker--->exchange--->queue--->consumer
    消息从 producer 到 exchange 则会返回一个 confirmCallback 。
    消息从 exchange-->queue 投递失败则会返回一个 returnCallback 。
    我们将利用这两个 callback 控制消息的可靠性投递
    
    • 1
    • 2
    • 3
    • 4
    • 5

    消息的可靠性总结:
    生产者:

    1:设置ConnectionFactory的publisher-confirms="true" 开启 确认模式。可以通过代码自己配置,也可以在配置文件配置(高版本的取消了使用publisher-confirms="true" ,取而代之的是 publisher-confirm-type) publisher-confirm-type有三个参数
    NONE 禁用发布确认模式,是默认值
    CORRELATED 发布消息成功到交换器后会触发回调方法,与CorrelationData一起使用可以将确认与发送的消息关联起来。
    SIMPLE(在作用域操作中使用RabbitTemplate# waitforconfirmed()(或waitForConfirmsOrDie()))) 效果一和CORRELATED值一样会触发回调方法,效果二:在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker;
    
    2:使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。
    
    3:设置ConnectionFactory的publisher-returns="true" 开启 退回模式。当消息从交换机传递到队列时,无法找到对应队列时return 回调
    
    4:使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并执行回调函数returnedMessage。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    consumer

    在rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手动确认
    和生产者一样 设置手动确认,通过手动确认保证消息被本地消费或本地存储落地
    
    如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息,false表示只确认当前消息
    
    如果出现异常,则在catch中调用 basicNack或 basicReject,拒绝消息,让MQ重新发送消息。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    Publisher Confirms 发送者消息确认

    RabbitMQ的消息可靠性是非常高的,但是他以往的机制都是保证消息发送到了MQ之后,可以推送到消费者消费,不会丢失消息。
    但是发送者发送消息是否成功是没有保证的。我们可以回顾下,发送者发送消息的基础API:Producer.basicPublish方法是没有返回值的,也就是说,一次发送消息是否成功,应用是不知道的,这在业务上就容易造成消息丢失。而这个模块就是通过给发送者提供一些确认机制,来保证这个消息发送的过程是成功的。

    如果了解了这个机制就会发现,这个消息确认机制就是跟RocketMQ的事务消息机制差不多的。而对于这个机制,RocketMQ的支持明显更优雅。

    ​ 发送者确认模式默认是不开启的,所以如果需要开启发送者确认模式,需要手动在channel中进行声明。

    channel.confirmSelect();
    
    • 1

    在官网的示例中,重点解释了三种策略:
    ​ 1、发布单条消息

    ​ 即发布一条消息就确认一条消息。核心代码:

    for (int i = 0; i < MESSAGE_COUNT; i++) {
        String body = String.valueOf(i);
        channel.basicPublish("", queue, null, body.getBytes());
        channel.waitForConfirmsOrDie(5_000);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    ​ channel.waitForConfirmsOrDie(5_000);这个方法就会在channel端等待RabbitMQ给出一个响应,用来表明这个消息已经正确发送到了RabbitMQ服务端。但是要注意,这个方法会同步阻塞channel,在等待确认期间,channel将不能再继续发送消息,也就是说会明显降低集群的发送速度即吞吐量。

    官方说明了,其实channel底层是异步工作的,会将channel阻塞住,然后异步等待服务端发送一个确认消息,才解除阻塞。但是我们在使用时,可以把他当作一个同步工具来看待。

    然后如果到了超时时间,还没有收到服务端的确认机制,那就会抛出异常。然后通常处理这个异常的方式是记录错误日志或者尝试重发消息,但是尝试重发时一定要注意不要使程序陷入死循环。

    ​ 2、发送批量消息

    ​ 之前单条确认的机制会对系统的吞吐量造成很大的影响,所以稍微中和一点的方式就是发送一批消息后,再一起确认。

    ​ 核心代码:

    int batchSize = 100;
    int outstandingMessageCount = 0;
    
    long start = System.nanoTime();
    for (int i = 0; i < MESSAGE_COUNT; i++) {
        String body = String.valueOf(i);
        ch.basicPublish("", queue, null, body.getBytes());
        outstandingMessageCount++;
    
        if (outstandingMessageCount == batchSize) {
            ch.waitForConfirmsOrDie(5_000);
            outstandingMessageCount = 0;
        }
    }
    
    if (outstandingMessageCount > 0) {
        ch.waitForConfirmsOrDie(5_000);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    ​ 这种方式可以稍微缓解下发送者确认模式对吞吐量的影响。但是也有个固有的问题就是,当确认出现异常时,发送者只能知道是这一批消息出问题了, 而无法确认具体是哪一条消息出了问题。所以接下来就需要增加一个机制能够具体对每一条发送出错的消息进行处理。

    ​ 3、异步确认消息

    ​ 实现的方式也比较简单,Producer在channel中注册监听器来对消息进行确认。核心代码就是一个:

    channel.addConfirmListener(ConfirmCallback var1, ConfirmCallback var2);
    
    
    • 1
    • 2

    ​ 按说监听只要注册一个就可以了,那为什么这里要注册两个呢?如果对照下RocketMQ的事务消息机制,这就很容易理解了。发送者在发送完消息后,就会执行第一个监听器callback1,然后等服务端发过来的反馈后,再执行第二个监听器callback2。

    ​ 然后关于这个ConfirmCallback,这是个监听器接口,里面只有一个方法: void handle(long sequenceNumber, boolean multiple) throws IOException; 这方法中的两个参数,

    sequenceNumer:这个是一个唯一的序列号,代表一个唯一的消息。在RabbitMQ中,他的消息体只是一个二进制数组,并不像RocketMQ一样有一个封装的对象,
    所以默认消息是没有序列号的。而RabbitMQ提供了一个方法int sequenceNumber = channel.getNextPublishSeqNo());来生成一个全局递增的序列号。然后应用程序需要自己来将这个序列号与消息对应起来。没错!是的!需要客户端自己去做对应!
    
    multiple:这个是一个Boolean型的参数。如果是false,就表示这一次只确认了当前一条消息。
    如果是true,就表示RabbitMQ这一次确认了一批消息,在sequenceNumber之前的所有消息都已经确认完成了。
    
    • 1
    • 2
    • 3
    • 4
    • 5

    对比下RocketMQ的事务消息机制,有没有觉得很熟悉,但是又很别扭?当然,考虑到这个对于RabbitMQ来说还是个新鲜玩意,所以有理由相信这个机制在未来会越来越完善。

  • 相关阅读:
    Kubernetes kube-scheduler调度器
    自己写一个定时备份 mysql 的备份工具
    odoo 开发入门教程系列-添加修饰
    一、BurpSuite基本界面学习
    跨平台编程开发工具Xojo 2023 Release mac中文版功能介绍
    flutter升级AS和gradle后编译出错(No signature of method: build_gbqp6.android())错误
    ROS2——分布式通信(十二)
    Java并发基石—CAS原理实战
    FFmpeg 命令:从入门到精通 | ffmpeg filter(过滤器 / 滤镜)
    LeetCode刷题第7周小结
  • 原文地址:https://blog.csdn.net/huanglu0314/article/details/127773385