

为什么引入cloud Stream?解决的痛点是什么?
- 市面上存在着多种消息中间件技术:ActiveMQ,RabbitMQ,RocketMQ,Kafka,那么每多出来一种新的技术,就要付出响应的学习成本,消息中间件技术的多样导致开发者的学习成本很大。
- 不同的系统中会用到不同的消息中间件,那么当需要系统进行整合时,或者系统进行切换时,由于用的是不同的中间件技术,该怎么整合切换。存在多种MQ的情况时,如何进行切换、维护和开发?具体的实现,需要的成本很大。
- 那么有没有一种新的技术,让我们不再关注具体的MQ的细节,我们只需要用一种适配绑定的方式,自动的给我们在各种MQ内切换。
- 引出了
Spring Cloud Stream。屏蔽消息中间件底层的细节差异,让我只需要操作一个Cloud Stream,就可以操作底层下面各种各样不同的MQ。达到我们以更小的代价实现切换,维护,开发。
官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。
inputs 或者 outputs 来与 Spring Cloud Stream 中 binder对象 交互。binding(绑定) ,而 Spring Cloud Stream 的 binder对象 负责与消息中间件交互。Spring Integration来连接消息代理中间件以实现消息事件驱动。发布-订阅、消费组、分区的三个核心概念。目前仅支持RabbitMQ、Kafka。
一句话:屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。
标准MQ

生产者/消费者之间靠消息媒介传递信息内容
➢ Message
消息必须走特定的通道
➢ 消息通道MessageChannel
消息通道里的消息如何被消费呢,谁负责收发处理
➢ 消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息处理器所订阅
为什么用Cloud Stream?
比方说我们用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,像RabbitMQ有exchange,kafka有Topic和Partitions分区。

这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候Spring Cloud Stream给我们提供了一种解耦合的方式。
Stream凭什么可以统一底层差异?
Binder
Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(RabbitMQ切换为Kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。

通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。
Binder可以生成Binding,Binding用来绑定消息容器的生产者和消费者,它有两种类型,INPUT和OUTPUT:
INPUT对应于消费者(消费者从Stream接收消息)OUTPUT对应于生产者(生产者从Stream发布消息)Stream中的消息通信方式遵循了发布-订阅模式
Topic主题进行广播



RabbitMQ环境已经OK。
工程中新建三个子模块:
cloud-stream-rabbitmq-provider8801,作为生产者进行发消息模块cloud-stream-rabbitmq-consumer8802,作为消费者进行接收消息模块cloud-stream-rabbitmq-consumer8803,作为消费者进行接收消息模块建Module
cloud-stream-rabbitmq-provider8801
改POM
<dependency>
<groupId>org.springframework.cloudgroupId>
<artifactId>spring-cloud-starter-stream-rabbitartifactId>
dependency>

写YML
这里是output

server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
output: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置(爆红不要管)
eureka:
client: # 客户端进行Eureka注册的配置
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
instance-id: send-8801.com # 在信息列表时显示主机名称
prefer-ip-address: true # 访问的路径变为IP地址
主启动

业务类
➢ 发送消息接口

➢ 发送消息接口实现类
@EnableBinding:生产者对应Source.class,可参见上面图示。
这里MessageChannel的实例名必须是output,要不然无法启动。

➢ Controller

测试
➢ 启动7001eureka
➢ 启动rabbitmq
➢ 启动8801

➢ 访问 http://localhost:8801/sendMessage


建Module
cloud-stream-rabbitmq-consumer8802
改POM
<dependency>
<groupId>org.springframework.cloudgroupId>
<artifactId>spring-cloud-starter-stream-rabbitartifactId>
dependency>
写YML
这里是input

server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
input: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置(爆红不要管)
eureka:
client: # 客户端进行Eureka注册的配置
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
instance-id: receive-8802.com # 在信息列表时显示主机名称
prefer-ip-address: true # 访问的路径变为IP地址
主启动

业务类
@EnableBinding:消费者对应Sink.class,可参见上面图示。

测试
8801发送消息,8802接收消息
http://localhost:8801/sendMessage
➢ 8801

➢ 8802

依照8802,clone出来一份运行8803
启动
➢ RabbitMQ
➢ 7001:服务注册
➢ 8801:消息生产
➢ 8802:消息消费
➢ 8803:消息消费
运行后有两个问题
➢ 有重复消费问题
➢ 消息持久化问题
目前是8802/8803同时都收到了,存在重复消费问题
如何解决?
生产实际案例
比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。
比如8801下一个订单,但是被两个服务获取消费,会多扣一次款。
这时我们就可以使用Stream中的消息分组来解决。

注意在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。
避免重复消费,需要对流水号做自定义的group分组配置:

原理
group: atguiguA、atguiguB
8802修改YML

8803修改YML

我们自己配置

分布式微服务应用为了实现高可用和负载均衡,实际上都会部署多个实例,本例阳哥启动了两个消费微服务(8802/8803)
多数情况,生产者发送消息给某个具体微服务时只希望被消费一次,按照上面我们启动两个应用的例子,虽然它们同属一个应用,
但是这个消息出现了被重复消费两次的情况。为了解决这个问题,在Spring Cloud Stream中提供了消费组的概念。
结论
➢ 还是重复消费
怎么解决?
8802/8803实现轮询分组,每次只有一个消费者,8801模块的发的消息只能被8802或8803其中一个接收到,这样避免了重复消费。
group: atguiguA
8802修改YML

8803修改YML

结论
➢ 发送方发了两条消息

➢ 8802只收到了一条消息

➢ 8802也只收到了一条消息

➢ 同一个组的多个微服务实例,每次只会有一个拿到
通过上述,解决了重复消费问题,再看看持久化
停止8802/8803并去除掉8802的分组group: atguiguA
➢ 8803的分组group: atguiguA没有去掉
8801先发送4条消息到rabbitmq
先启动8802,无分组属性配置,后台没有打出来消息,消息丢失!!

再启动8803,有分组属性配置,后台打出来了MQ上的消息,重新获得并消费。

group分组属性在解决 消息重复消费问题 和 消息持久化 避免消息丢失问题 是一个非常重要的属性。