• 消息队列:原理与应用


    1 简介(是什么?)

    1.1 基本定义

    在这里插入图片描述
    消息队列(Message Queue,MQ)是一种进程间通信或同一进程的不同线程间的通信方式。被广泛应用为分布式服务框架的消息中间件。
    从数据结构上说,我认为他的本质就是 将封装好的消息体,依次存放到队列这种先入先出数据结构中去。

    Message可以是一个类、 并保留全局唯一的编号 message Id。
    Message ID:消息的全局唯一标识,由消息队列RocketMQ系统自动生成,唯一标识某条消息。

    1.2 生产者-消费者模型

    接下来这种数据结构的收发方式,选用 生产者消费者模型:由生产者发布消息队列至消息服务器,再由消费者订阅消息。

    在这里插入图片描述
    生产者(Producer)业务的发起方,负责生产消息发布给Broker。
    消费者(Consumer)业务的处理方,负责从Broker订阅消息并进行业务逻辑处理。
    消息服务器(Broker)MQ的服务器。包括接收 Producer 发过来的消息、处理 Consumer 的消费消息请求、消息的持久化存储、以及服务端过滤功能等。

    注意消息服务器可以是,分布式,或多节点的集群,且每个节点里可能不止一个队列。

    在这里插入图片描述
    当然除了消息服务器外,生产者、消费者和消息本身也可以拥有集群的概念。
    我们可以对这三者进行分组,形成主题的概念,并进一步细化出二级标签,实现特定的集群收发特定消息的功能。

    在这里插入图片描述
    主题(topic)一级消息类型,不同生产者向特定的topic发送消息,再由MQ分发至特定的订阅者,实现消息的传递。
    标签(tag)二级消息类型,用来进一步区分某个Topic下的消息子类。
    集群(group)一组生产者或消费者,这组生产者或消费者通常生产或消费同一类消息,且消息发布或订阅的逻辑一致。

    到这一步,就构造了消息队列服务器的雏形。

    1.3 生产者-消费者模型

    回过头来分析一下刚才构建的模型,任何一项技术都有他的利弊。
    抛去额外的维护成本不说,这个模型的弊端在于,过渡依赖于消息中间件,一旦中间件宕机了,整个消息体系就瓦解了。
    此外,在设计时需要考虑更多因素。生产过程中难免会出现生产者,消费者或中间件服务器不可用的情况,随之带来的问题就是消息重复、消息堆积等等。
    所以实际运用的时候呢,往往会给出一些补偿措施。

    在这里插入图片描述
    弊端:

    1. 消息的收发依赖于中间件,且中间件的稳定运行需要维护成本。
    2. 提高开发复杂度。需要考虑消息的处理,包括消息幂等性(重复消费问题)、消息中间件的持久化和稳定性、可靠性等。

    PS:这里说一下持久化的问题。
    简单来说就是将数据存入磁盘,而不是存在内存中随服务器重启断开而消失,使数据能够永久保存,重启后数据能够从磁盘中读取恢复。MQ会将你的持久化消息写入磁盘上的持久化日志文件,等消息被消费之后,RabbitMQ会把这条消息标识为等待垃圾回收。
    缺点:性能低,写入硬盘要比写入内存性能较低很多,从而降低了服务器的吞吐量。

    1.4 重复消费问题 - 消息幂等性

    这里可以讲一下其中的一种非常常见的问题及其补偿措施,就是重复消费的问题:
    重复消费:生产者多发、消费者多次消费等。

    生产者多发、消费者多次消费,都会造成重复消费的问题。
    解决这种问题的常用办法,就是保证操作的幂等性:
    幂等操作(Idempotent Operation):执行任意多次幂等操作所产生的影响均与一次执行的效果相同。
    幂等操作有一个特点,甚至还有公式:
    f ( x ) = f ( f ( x ) ) . f(x) = f(f(x)). f(x)=f(f(x)).
    举个例子,数据库脚本insert前都会先delete,这么一组数据库操作无论执行多少次结果都是一样的。
    重复消费的问题也可以用这个思想去解决。

    实现:

    1. 消息中间件端根据 Message Id 去重。
    2. 消费端: 数据库:新增/修改。 组件如redis进行自身去重。

    1.5 主流MQ对比

    目前主流的MQ有以下几种:
    在这里插入图片描述
    在流量和大数据的时代,ActiveMQRabbitMQ这两者因为吞吐量以及GitHub的社区活跃度的原因,在各大互联网公司基本上销声匿迹了,越来越多的公司开始青睐于后两者。其中RocketMQ是阿里开源的,这和同样是阿里开源的rpc框架-dubbo设计风格比较类似。Kafka则更多应用在大数据业务场景中。

    2 应用场景(为什么?)

    知道了概念之后,再介绍下什么场景适合使用MQ。

    2.1 业务解耦

    第一种就是需要用到业务解耦的场景,怎么个解耦法呢?
    这里以某系统的信息同步功能为例:

    现有基于接口调用的同步方法:
    业务逻辑:用户在页面录入信息,再通过同步的方式将录入的信息推送至各接收方做后续业务。
    弊端
    1.接口繁杂,难以维护。
    2.同步接收方需要保持在线状态。

    在这里插入图片描述

    此系统的问题在于:

    1. 对于每个同步接收方来说,都应该为系统提供一个调用接口,后续渠道新增、同步调整也会新增接口,长期以往会导致接口错综复杂,难以维护。
    2. 需要接收方服务随时保持在线监听状态。一旦服务挂了,就比如测试时经常会遇到的问题,同步时对方在打板,同步异常,no provider avalable)。1个渠道还好说,往往是4个渠道串行工作,这么一来出错概率一下子乘以4倍了。

    根据该系统的后期的整改规划,可以考虑运用基于消息队列的同步方法:

    1. 仅发布基本信息数据+少量渠道特有数据至通用消息队列。
    2. 同步接收方按需订阅。

    为了保证数据源的唯一性,该系统只发布必要的基本信息数据+少量渠道特有数据至通用消息队列,然后渠道端无论是新增的还是存量的,只要按需订阅。当然这个过程可以借助topic、tag,实现消息的分组
    这样的话,后期渠道端的增删几乎不会导致同步系统的修改,实现了业务的解耦。
    另外,即使同步时渠道端服务器宕机,没关系,我们可以等。消息的缓存机制也可以避免直接推过去所造成的同步异常。
    在这里插入图片描述

    PS:同步失败了怎么办?
    1.在MQ控制台查找死信队列。
    2.将整个同步过程放在一个分布式事务里面。

    2.1 异步执行

    消息队列也适用于需要异步处理的场景。

    在这里插入图片描述
    这里以我以前接触过的一个智能外呼系统为例:智能外呼:客服中心以电话的方式,主动发起的对客户的呼叫问答活动。广泛应用在产品营销、贷款催缴、投资理财等方面。
    说白了就是机器人给您打电话,不断问问题,然后将您的问题转成文字存储在数据库里的过程。

    在这里插入图片描述
    业务逻辑:该系统的上游是外呼请求的发起方,下游是外呼动作的执行机构。语音识别,自然语言处理的模块集成在这里面。

    由于打电话是个耗时的过程,整个系统异步实现,具体来说有2步:

    1. 当有外呼请求发起时,中间件解析上游发来的请求报文,推送至执行机构并且即时回复响应报文,实现异步第一步。
    2. 中间件轮询下游处理结果(如性别的声纹检验),封装结果返回,异步第二步,实现闭环。 注意打电话是个耗时的操作,在这个过程中,如果用传统的基于请求/响应的同步通讯方式,在上游发起请求后,监听过程中生产者线程会一直阻塞。如果这条流水线上有其他业务处理,会造成时间和资源的浪费。

    但如果使用如果使用异步消息处理,立即返回消息发送成功或失败的回调方法,就能实现生产者线程不阻塞,从而达到异步执行的效果。

    3 Demo(怎么用?)

    在最后的展示模块,我想以最近接触的一个小项目为例,讲解一下MQ的一个真实用例。

    在这里插入图片描述
    先简单介绍一下这个项目:
    业务逻辑:本项目中,我们采用rocketMQ消息队列。消息体是某种信号,包括信号等级、信号规则、信号内容等,由A部门推送过来。B部门即我们的系统,后端对消息进行一系列操作,最后将表的内容到前端页面上。后续还会对风险信号进行excel统计,日终跑批发邮件。

    RocketMQ-Console这个工具可以看到MQ服务器里的所有消息,以及消费记录。

    在这里插入图片描述

    1. message菜单即消息体。时效为3天 具体内容:包括信号详情,messageId等。
    2. producer菜单即消费者。可以看到它是一个多节点、多队列的结构(且每个节点包含一个%Retry%死信队列)。

    消费者最核心的代码,包括消费者端的注解:
    @MessageConsumer、@MessageListener注解(hades-mq):参数封装在这里面(本地起服务时为了避免本地消费造成的日志平台内容缺失,这里一般会把注解注释掉),内容包括主题、集群、支持多线程生产/消费多个队列,遇到异常时重复消息等。

    4 资源

    相关资源的链接在这里,有兴趣大家可以一起去探索更好的设计模式。

    《RocketMQ实战与原理解析》.pdf

    RocketMQ - GitHub

  • 相关阅读:
    75、SpringBoot 整合 MyBatis------使用 Mapper 作为 Dao 组件
    Redis03-过期策略和淘汰策略
    4.2 实现注册与登录模块
    分治法求解问题
    【Excel导出】(亲测可用)使用实现Hutool工具类将list对象数组导出的简单实现
    antdesignvue数字输入框限制只能输入整数
    2022-08-26 第六小组 瞒春 学习笔记
    Pytorch中张量的高级选择操作
    Golang字符串处理
    C++指针的使用
  • 原文地址:https://blog.csdn.net/BenJamin_Blue/article/details/125946812