• # 技术栈知识点巩固——消息队列


    技术栈知识点巩固——消息队列

    MQ 的优点

    • 异步处理:相比于传统的串行、并行方式,提高了系统吞吐量
    • 应用解耦:系统间通过消息通信,不用关心其它系统的处理
    • 流量削锋:可以通过消息队列长度控制请求量;可以缓解短时间内的高并发请求。
    • 日志处理:解决大量日志传输。

    MQ 的缺点

    • 系统可用性降低
    • 系统复杂度提高:加入了消息队列,要多考虑很多方面的问题
    • 一致性问题:复杂度提升

    RabbitMq工作模式

    • 简单模式(Simple):消息队列中的消息被消费后,消息就拿不到了。
      在这里插入图片描述

    • 工作模式(Work):多个消费者消费同一个队列中的消息,队列采用轮询的方式将消息平均发送给消费者。谁先拿到谁先消费。
      在这里插入图片描述

    • 发布订阅模式Publish/Subscirbe):每个消费者监听自己的队列,生产者将消息由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。

    在这里插入图片描述

    • 路由模式Routing):每个消费者监听自己的队列,并且设置routingkey生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列;
      在这里插入图片描述

    • topic 主题模式(路由模式的一种),消息产生者产生消息,把消息交给交换机,交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费

    在这里插入图片描述


    RabbitMq有可能发生重复消费,如何避免,做到幂等

    • 每条消息有唯一的编号,消费完之后存到消费日志表中,下次拿到消息后,先去日志表中查找一下是否有这条记录,如果有那么就不消费了,直接丢弃或者进行更新操作。

    集群消费和广播消费

    集群消费

    • 当使用集群消费模式时,消息队列RocketMQ认为任意一条消息只需要被集群内的任意一个消费者处理即可。使用与消费之集群部署,每条消息只处理一次。

    在这里插入图片描述

    特点
    • 集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。
    • 集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上。

    广播消费

    • 当使用广播消费模式时,消息队列RocketMQ会将每条消息推送给集群内所有的消费者,保证消息至少被每个消费者消费一次。

    在这里插入图片描述

    特点
    • 不支持顺序消息。
    • 不支持重置消费位点。
    • 不支持线下联调分组消息。
    • 每条消息都需要被相同订阅逻辑的多台机器处理。
    • 消费进度在客户端维护,出现重复消费的概率稍大于集群模式。
    • 广播模式下,消息队列保证每条消息至少被每台客户端消费一次,但是并不会重投消费失败的消息,因此业务方需要关注消费失败的情况。
    • 广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。
    • 广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。
    • 广播模式下服务端不维护消费进度,所以消息队列控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。

    消息队列有哪些使用场景

    • 异步处理:异步发送消息、发送短信
    • 应用解耦:基于线程的异步处理,能确保用户体验,但是极端情况下可能会出现异常,影响系统的稳定性,而同步调用很多时候无法保证理想的性能,那么我们就可以用MQ来进行处理。
    • 日志处理
    • 流量削锋:面对大量的请求,可以使用消息队列进行流量削锋

    消息中间件如何解决消息丢失问题

    在这里插入图片描述

    生产端开启事务机制

    • RabbitMQ 提供的事务功能,生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit

    • 事务机制会对吞吐量有一定的影响,开启 confirm 模式,在生产者那里设置开启 confirm 模式之后,每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调你的一个 nack 接口,告诉你这个消息接收失败,你可以重试。

    • 事务机制和 cnofirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的

    消费端消息丢失

    • RabbitMQ 提供的 ack 机制
    • 通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack 一把。这样的话,如果你还没处理完,不就没有 ack 了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。

    Rabbit消息持久化

    • 开启Rabbitmq消息持久化
    • 消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,RabbitMQ 还没持久化,自己就挂了,可能导致少量数据丢失,但是这个概率较小。
    • 将队列设置为持久化
    • 发送消息的时候将消息deliveryMode 设置为 2

    Rabbitmq 有几种广播类型

    • direct(默认模式):发送方把消息发送给订阅方,如果有多个订阅者,默认采取轮询的方式进行消息发送
    dequeOneConsumeOne received message: hello world
    dequeOneConsumeTwo received message: hello world
    dequeOneConsumeOne received message: hello world
    dequeOneConsumeTwo received message: hello world
    dequeOneConsumeOne received message: hello world
    dequeOneConsumeTwo received message: hello world
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • headers:与direct类似,但是性能差,基本不用
    • fanout:分发模式,将消息发送给所有订阅者
     : dequeOne consume one received message: 我是交换机发出的消息
    dequeTwo consume two message: (Body:'我是交换机发出的消息' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=ExchangeOne, receivedRoutingKey=, deliveryTag=10, consumerTag=amq.ctag-H6qjuEJTupM-ihNzwi-dnA, consumerQueue=TestDequeueTwo])
    
    • 1
    • 2
    • topic:匹配订阅模式,使用正则匹配到消息队列,能匹配到的都能接收到

    RabbitMQ 有哪些重要组件

    • ConnectionFactory(连接管理器):应用程序与Rabbit之间建立连接的管理器,程序代码中使用。

    • Channel(信道):消息推送使用的通道。

    • Exchange(交换器):用于接受、分配消息。

    • Queue(队列):用于存储生产者的消息。

    • RoutingKey(路由键):用于把生成者的数据分配到交换器上。

    • BindingKey(绑定键):用于把交换器的消息绑定到队列上。

    • Virtual host:用于消息隔离(类似Redis 16db这种概念),最上层的消息路由,一个包含若干ExchangeQueue,同一个里面Exchange

    Mq 消息确认机制

    生产者确认机制

    • 生产者使用的机制,用来确认消息是否被成功消费。
    消费者接受确认 ConfirmCallback
    • yml
    rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: guest
        password: guest
        # 发布消息成功到交换器后会触发回调方法
        publisher-confirm-type: correlated
        # 消息成功确认
        publisher-confirms: true
        # 指定消息确认模式为手动确认
        template:
          mandatory: true # 手动签收机制
        # 消息失败确认
        publisher-returns: true
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • publisher-confirm-typeNONE 值是禁用发布确认模式,是默认值,CORRELATED 值是发布消息成功到交换器后会触发回调方法。SIMPLE值经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirmswaitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker;
    • RabbitTemplate只允许设置一个callback方法,你可以将RabbitTemplatebean设为单例然后设置回调,在controller、service都设置为 protype
    // 设置 bena 为多例
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    
    • 1
    • 2
    • 注册 noSingleRabbitTemplate bean
    /**
         * 注册多例 bean
         *
         * @param connectionFactory connectionFactory
         * @return RabbitTemplate
         */
    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate noSingleRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMandatory(true);
        return template;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • test.java
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    @Service
    public class RabbitMqServiceImpl implements RabbitMqService {
    
        @Resource(name = "noSingleRabbitTemplate")
        private RabbitTemplate rabbitTemplate;
    
        @Override
        public String testOne() {
            RabbitMqProduceConfirm confirm = new RabbitMqProduceConfirm();
            RabbitMqProduceReturnBack callback = new RabbitMqProduceReturnBack();
            rabbitTemplate.convertAndSend(RabbitMqConst.DEQUE_ONE, "hello world");
            // 生产者拿到没有到达消费者的消息
            rabbitTemplate.setReturnsCallback(callback);
            // 生产者得到消费者拿到消息的确认
            rabbitTemplate.setConfirmCallback(confirm);
            return null;
        }
    }    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • RabbitMqProduceConfirm.java
    public class RabbitMqProduceConfirm implements RabbitTemplate.ConfirmCallback {
    
        private static final Logger logger = LoggerFactory.getLogger(RabbitMqProduceConfirm.class);
    
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if (ack) {
                logger.info("ACK :{} ", JSON.toJSONString(correlationData));
            } else {
                logger.info("ACK error");
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    将不可达目的地消息返回给生产者 ReturnsCallback
    • yml
    # 消息失败确认
    publisher-returns: true
    
    • 1
    • 2
    • RabbitMqProduceReturnBack.java
    public class RabbitMqProduceReturnBack implements RabbitTemplate.ReturnsCallback{
    
        private static final Logger logger = LoggerFactory.getLogger(RabbitMqProduceReturnBack.class);
    
        @Override
        public void returnedMessage(ReturnedMessage returned) {
            logger.info("Confirmed RabbitMQ");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    消费者确认

    自动确认
    • RabbitMQ消费者默认为自动确认,不会管消费者是否成功消费/处理了消息
    根据情况确认
    • 如果消息成功被消费(成功的意思是在消费的过程中没有抛出异常),则自动确认
    • 当抛出 AmqpRejectAndDontRequeueException 异常的时候,则消息会被拒绝,且 requeue = false(不重新入队列)
    • 当抛出 ImmediateAcknowledgeAmqpException 异常,则消费者会被确认
    • 其他的异常,则消息会被拒绝,且 requeue = true(如果此时只有一个消费者监听该队列,则有发生死循环的风险,多消费端也会造成资源的极大浪费,这个在开发过程中一定要避免的)。可以通过 setDefaultRequeueRejected(默认是true)去设置
    手动确认
    • 消费者收到消息后,手动对消息进行处理,完成消费
    • Basic.Ack :用于确认当前消息
    • Basic.Nack :用于否定当前消息
    • Basic.Reject :用于拒绝当前消息

    RabbitMQ broker

    • 如果要使用RabbitMQ,必须先要安装一个RabbitMQ服务。这个服务就是Broker,中文叫做代理,因为MQ服务器帮我们对消息做了存储和转发。一般情况下为了保证服务的高可用,需要多个Broker
    • 客户端和服务端之间消息传递中介

    RabbitMQ 如何确保消息接收方消费了消息

    • 发送方进行Confirm确认和消息回退设置
    • 消费方进行消息接收确认

    RabbitMQ 消息基于什么传输

    • RabbitMQ是基于信道Channel的方式来传输数据,排除了使用TCP链接来进行数据的传输,因为TCP链接创建和销毁对于系统性能的开销比较大,且并发能力受系统资源的限制,这样很容易造成rabbitMQ的性能瓶颈。
    • 消费者链接RabbitMQ其实就是一个TCP链接,一旦链接创建成功之后,就会基于链接创建Channel,每个线程把持一个Channel,Channel复用TCP链接,减少了系统创建和销毁链接的消耗,提高了性能
  • 相关阅读:
    HarmonyOS 网络请求工具库封装,直接无脑用!!!
    卡顿分析与布局优化
    Mysql:sql去重的几种方式(大数据hive也可参考)
    Hadoop 集群搭建
    Qt基础教程:数据类型与容器
    贝赛尔曲线
    CentOS7 编译安装最新的Linux Kernel 6.0 rc3
    web端动效 PAG
    主动调度是如何发生的
    随手查_python
  • 原文地址:https://blog.csdn.net/qq_37248504/article/details/126412980