• 「SpringCloud」10 Stream消息驱动


    SpringCloud—Stream消息驱动

    笔记整理自【尚硅谷】周阳SpringCloud框架开发教程

    stream

    image-20220801123922304

    1. 概述

    为什么引入cloud Stream?解决的痛点是什么?

    • 市面上存在着多种消息中间件技术:ActiveMQ,RabbitMQ,RocketMQ,Kafka,那么每多出来一种新的技术,就要付出响应的学习成本,消息中间件技术的多样导致开发者的学习成本很大。
    • 不同的系统中会用到不同的消息中间件,那么当需要系统进行整合时,或者系统进行切换时,由于用的是不同的中间件技术,该怎么整合切换。存在多种MQ的情况时,如何进行切换、维护和开发?具体的实现,需要的成本很大。
    • 那么有没有一种新的技术,让我们不再关注具体的MQ的细节,我们只需要用一种适配绑定的方式,自动的给我们在各种MQ内切换。
    • 引出了 Spring Cloud Stream屏蔽消息中间件底层的细节差异,让我只需要操作一个Cloud Stream,就可以操作底层下面各种各样不同的MQ。达到我们以更小的代价实现切换,维护,开发。

    Ⅰ. 什么是 Spring Cloud Stream

    Spring Cloud Stream 官网

    官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。

    • 应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中 binder对象 交互。
    • 通过我们配置来 binding(绑定) ,而 Spring Cloud Stream 的 binder对象 负责与消息中间件交互。
    • 所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。
    • 通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。
    • Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅消费组分区的三个核心概念。
    • 绑定器对象:Binder Implementations,就是靠它屏蔽了我们底层的MQ的差异。

    目前仅支持RabbitMQ、Kafka。

    一句话:屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。

    Ⅱ. 设计思想

    标准MQ

    image-20220908172337544

    • 生产者/消费者之间靠消息媒介传递信息内容

      ➢ Message

    • 消息必须走特定的通道

      ➢ 消息通道MessageChannel

    • 消息通道里的消息如何被消费呢,谁负责收发处理

      ➢ 消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息处理器所订阅

    为什么用Cloud Stream?

    比方说我们用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,像RabbitMQ有exchange,kafka有Topic和Partitions分区。

    image-20220908181028938

    这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候Spring Cloud Stream给我们提供了一种解耦合的方式。

    Stream凭什么可以统一底层差异?

    Binder

    • 在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性。
    • 通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。
    • 通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。

    Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(RabbitMQ切换为Kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。

    image-20220908181405588

    通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。

    Binder可以生成Binding,Binding用来绑定消息容器的生产者和消费者,它有两种类型,INPUTOUTPUT

    • INPUT对应于消费者(消费者从Stream接收消息)
    • OUTPUT对应于生产者(生产者从Stream发布消息)

    Stream中的消息通信方式遵循了发布-订阅模式

    Topic主题进行广播

    • 在RabbitMQ就是Exchange
    • 在Kakfa中就是Topic

    Ⅲ. Spring Cloud Stream标准流程套路

    image-20220908195252214

    image-20220908182426518

    • Binder:很方便的连接中间件,屏蔽差异(用于连接中间件与生产/消费者)
    • Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
    • Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出(output),接受消息就是输入(input)(简单的理解为输出/输入)

    Ⅳ. 编码API和常用注解

    image-20220908195138871

    2. 案例说明

    RabbitMQ环境已经OK。

    工程中新建三个子模块:

    • cloud-stream-rabbitmq-provider8801,作为生产者进行发消息模块
    • cloud-stream-rabbitmq-consumer8802,作为消费者进行接收消息模块
    • cloud-stream-rabbitmq-consumer8803,作为消费者进行接收消息模块

    3. 消息驱动之生产者

    • 建Module

      cloud-stream-rabbitmq-provider8801

    • 改POM

      <dependency>
      	<groupId>org.springframework.cloudgroupId>
      	<artifactId>spring-cloud-starter-stream-rabbitartifactId>
      dependency>
      
      • 1
      • 2
      • 3
      • 4

      image-20220908200159592

    • 写YML

      这里是output

      image-20220908211237196

      server:
        port: 8801
      
      spring:
        application:
          name: cloud-stream-provider
        cloud:
          stream:
            binders: # 在此处配置要绑定的rabbitmq的服务信息;
              defaultRabbit: # 表示定义的名称,用于于binding整合
                type: rabbit # 消息组件类型
                environment: # 设置rabbitmq的相关的环境配置
                  spring:
                    rabbitmq:
                      host: localhost
                      port: 5672
                      username: guest
                      password: guest
            bindings: # 服务的整合处理
              output: # 这个名字是一个通道的名称
                destination: studyExchange # 表示要使用的Exchange名称定义
                content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
                binder: defaultRabbit # 设置要绑定的消息服务的具体设置(爆红不要管)
      
      eureka:
        client: # 客户端进行Eureka注册的配置
          service-url:
            defaultZone: http://localhost:7001/eureka
        instance:
          lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
          lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
          instance-id: send-8801.com  # 在信息列表时显示主机名称
          prefer-ip-address: true     # 访问的路径变为IP地址
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
    • 主启动

      image-20220908200419567

    • 业务类

      ➢ 发送消息接口

      image-20220908202036041

      ➢ 发送消息接口实现类

      @EnableBinding:生产者对应Source.class,可参见上面图示。

      这里MessageChannel的实例名必须是output,要不然无法启动。

      image-20220908202016347

      ➢ Controller

      image-20220908202118176

    • 测试

      ➢ 启动7001eureka

      ➢ 启动rabbitmq

      ➢ 启动8801

      image-20220908210447105

      ➢ 访问 http://localhost:8801/sendMessage

      image-20220908205706634

      image-20220908205841768

    4. 消息驱动之消费者

    • 建Module

      cloud-stream-rabbitmq-consumer8802

    • 改POM

      <dependency>
      	<groupId>org.springframework.cloudgroupId>
      	<artifactId>spring-cloud-starter-stream-rabbitartifactId>
      dependency>
      
      • 1
      • 2
      • 3
      • 4
    • 写YML

      这里是input

      image-20220909111716530

      server:
        port: 8802
      
      spring:
        application:
          name: cloud-stream-consumer
        cloud:
          stream:
            binders: # 在此处配置要绑定的rabbitmq的服务信息;
              defaultRabbit: # 表示定义的名称,用于于binding整合
                type: rabbit # 消息组件类型
                environment: # 设置rabbitmq的相关的环境配置
                  spring:
                    rabbitmq:
                      host: localhost
                      port: 5672
                      username: guest
                      password: guest
            bindings: # 服务的整合处理
              input: # 这个名字是一个通道的名称
                destination: studyExchange # 表示要使用的Exchange名称定义
                content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
                binder: defaultRabbit # 设置要绑定的消息服务的具体设置(爆红不要管)
      
      eureka:
        client: # 客户端进行Eureka注册的配置
          service-url:
            defaultZone: http://localhost:7001/eureka
        instance:
          lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
          lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
          instance-id: receive-8802.com  # 在信息列表时显示主机名称
          prefer-ip-address: true     # 访问的路径变为IP地址
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
    • 主启动

      image-20220908210620727

    • 业务类

      @EnableBinding:消费者对应Sink.class,可参见上面图示。

      image-20220908211824514

    • 测试

      8801发送消息,8802接收消息

      http://localhost:8801/sendMessage

      ➢ 8801

      image-20220908212205628

      ➢ 8802

      image-20220908212219407

    5. 分组消费与持久化

    Ⅰ. 前置工作

    • 依照8802,clone出来一份运行8803

    • 启动

      ➢ RabbitMQ

      ➢ 7001:服务注册

      ➢ 8801:消息生产

      ➢ 8802:消息消费

      ➢ 8803:消息消费

    • 运行后有两个问题

      ➢ 有重复消费问题

      消息持久化问题

    Ⅱ. 重复消费问题

    目前是8802/8803同时都收到了,存在重复消费问题

    如何解决?

    • 分组和持久化属性group(重要)

    生产实际案例

    比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。

    比如8801下一个订单,但是被两个服务获取消费,会多扣一次款。

    这时我们就可以使用Stream中的消息分组来解决

    image-20220908222050013

    注意在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。

    • 不同组是可以全面消费的(重复消费)
    • 同一组内会发生竞争关系,只有其中一个可以消费。

    避免重复消费,需要对流水号做自定义的group分组配置:

    image-20220908222607324

    Ⅲ. 分组

    原理

    • 微服务应用放置于同一个group中,就能够保证消息只会被其中一个应用消费一次。
    • 不同的组是可以全面消费的(重复消费),同一个组内会发生竞争关系,只有其中一个可以消费。
    1️⃣ 8802/8803都变成不同组,group两个不同
    • group: atguiguA、atguiguB

    • 8802修改YML

      image-20220909111854896

    • 8803修改YML

      image-20220909111931474

    • 我们自己配置

      image-20220909112121291

      分布式微服务应用为了实现高可用和负载均衡,实际上都会部署多个实例,本例阳哥启动了两个消费微服务(8802/8803)

      多数情况,生产者发送消息给某个具体微服务时只希望被消费一次,按照上面我们启动两个应用的例子,虽然它们同属一个应用,
      但是这个消息出现了被重复消费两次的情况。为了解决这个问题,在Spring Cloud Stream中提供了消费组的概念。

    • 结论

      还是重复消费

    怎么解决?

    8802/8803实现轮询分组,每次只有一个消费者,8801模块的发的消息只能被8802或8803其中一个接收到,这样避免了重复消费。

    2️⃣ 8802/8803都变成相同组,group两个相同
    • group: atguiguA

    • 8802修改YML

      image-20220909111854896

    • 8803修改YML

    • 结论

      ➢ 发送方发了两条消息

      image-20220909112353695

      ➢ 8802只收到了一条消息

      image-20220909112425994

      ➢ 8802也只收到了一条消息

      image-20220909112451238

      同一个组的多个微服务实例,每次只会有一个拿到

    Ⅳ. 持久化

    • 通过上述,解决了重复消费问题,再看看持久化

    • 停止8802/8803并去除掉8802的分组group: atguiguA

      ➢ 8803的分组group: atguiguA没有去掉

    • 8801先发送4条消息到rabbitmq

    • 先启动8802,无分组属性配置,后台没有打出来消息,消息丢失!!

      image-20220909112709456

    • 再启动8803,有分组属性配置,后台打出来了MQ上的消息,重新获得并消费。

      image-20220909112717532

    group分组属性在解决 消息重复消费问题 和 消息持久化 避免消息丢失问题 是一个非常重要的属性。

  • 相关阅读:
    Juniper交换机收集日志
    Linux下将驱动编译进内核
    九洲
    高速公路堵车动力学
    Vue3.0第一天
    GBase 8c系统表-pg_amproc
    使用 K8spacket 和 Grafana 对 Kubernetes 的 TCP 数据包流量可视化
    选择和循环结构的机器级表示
    2022年全球市场BVM袋阀面罩总体规模、主要生产商、主要地区、产品和应用细分研究报告
    理解RNN循环神经网络
  • 原文地址:https://blog.csdn.net/weixin_53407527/article/details/126780759