• SpringCloud-Stream


    一、介绍

    (1)提供统一接口操作不同厂商的消息队列组件,降低学习成本
    (2)生产者和消费者只需操作binder对象即可与消息队列交互,生产者output,消费者input
    (3)核心概念:发布订阅、消费组、分区
    (4)使用topic模式
    在这里插入图片描述
    在这里插入图片描述

    二、项目搭建

    (1)生产者
    a、编写pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>demo20220821</artifactId>
            <groupId>com.wsh.springcloud</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>cloud-stream-rabbitmq-provider8801</artifactId>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
            </dependency>
            <dependency>
                <groupId>com.wsh.springcloud</groupId>
                <artifactId>cloud-api-common</artifactId>
                <version>1.0-SNAPSHOT</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-actuator</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
    </project>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    b、编写application.yml

    server:
      port: 8801
    
    spring:
      application:
        name: cloud-stream-rabbit-provider
      cloud:
        stream:
          binders:
            defaultRabbit:
              type: rabbit
              environment:
                spring:
                  rabbitmq:
                    host: 192.168.0.166
                    port: 5672
                    username: guest
                    password: guest
          bindings:
            output:
              destination: testExchange
              content-type: application/json
              binder: defaultRabbit
    
    
    eureka:
      client:
        #    客户端设置为true
        register-with-eureka: true
        #    客户端设置为true
        fetch-registry: true
        service-url:
          #      defaultZone: http://localhost:7001/eureka
          defaultZone: http://eureka1.com:7001/eureka, http://eureka2.com:7002/eureka
      instance:
        instance-id: cloudSreamRabbitProvider8801
        prefer-ip-address: true
    
    management:
      endpoints:
        web:
          exposure:
            include: "*"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    c、编写启动类

    package com.wsh.springcloud;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
    
    /**
     * @ClassName ConfigMain3344
     * @Description: TODO
     * @Author wshaha
     * @Date 2023/10/15
     * @Version V1.0
     **/
    @SpringBootApplication
    @EnableEurekaClient
    public class StreamRabbitMqProvider8801 {
        public static void main(String[] args) {
            SpringApplication.run(StreamRabbitMqProvider8801.class, args);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    d、编写接口及实现类

    package com.wsh.springcloud.service;
    
    public interface IMessageProvider {
    
        public String send();
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    package com.wsh.springcloud.service;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.messaging.Source;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.support.MessageBuilder;
    
    /**
     * @ClassName MessageProviderImpl
     * @Description: TODO
     * @Author wshaha
     * @Date 2023/10/15
     * @Version V1.0
     **/
    @EnableBinding(Source.class)
    public class MessageProviderImpl implements IMessageProvider {
    
        @Autowired
        @Qualifier("output")
        private MessageChannel messageChannel;
    
        @Override
        public String send() {
            messageChannel.send(MessageBuilder.withPayload("hello").build());
            return null;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    e、编写Controller

    package com.wsh.springcloud.controller;
    
    import com.wsh.springcloud.service.IMessageProvider;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    /**
     * @ClassName MessageController
     * @Description: TODO
     * @Author wshaha
     * @Date 2023/10/15
     * @Version V1.0
     **/
    @RestController
    public class MessageController {
    
        @Autowired
        private IMessageProvider messageProvider;
    
        @GetMapping("/sendMessage")
        public void sendMessage(){
            messageProvider.send();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    (2)编写消费者
    a、编写pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>demo20220821</artifactId>
            <groupId>com.wsh.springcloud</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>cloud-stream-rabbitmq-consumer8802</artifactId>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
            </dependency>
            <dependency>
                <groupId>com.wsh.springcloud</groupId>
                <artifactId>cloud-api-common</artifactId>
                <version>1.0-SNAPSHOT</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-actuator</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    </project>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    b、编写application.yml

    server:
      port: 8802
    
    spring:
      application:
        name: cloud-stream-rabbit-provider
      cloud:
        stream:
          binders:
            defaultRabbit:
              type: rabbit
              environment:
                spring:
                  rabbitmq:
                    host: 192.168.0.166
                    port: 5672
                    username: guest
                    password: guest
          bindings:
            input:
              destination: testExchange
              content-type: application/json
              binder: defaultRabbit
    
    
    eureka:
      client:
        #    客户端设置为true
        register-with-eureka: true
        #    客户端设置为true
        fetch-registry: true
        service-url:
          #      defaultZone: http://localhost:7001/eureka
          defaultZone: http://eureka1.com:7001/eureka, http://eureka2.com:7002/eureka
      instance:
        instance-id: cloudSreamRabbitProvider8801
        prefer-ip-address: true
    
    management:
      endpoints:
        web:
          exposure:
            include: "*"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    c、编写启动类

    package com.wsh.springcloud;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
    
    /**
     * @ClassName ConfigMain3344
     * @Description: TODO
     * @Author wshaha
     * @Date 2023/10/15
     * @Version V1.0
     **/
    @SpringBootApplication
    @EnableEurekaClient
    public class StreamRabbitMqConsumer8802 {
        public static void main(String[] args) {
            SpringApplication.run(StreamRabbitMqConsumer8802.class, args);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    d、编写Controller

    package com.wsh.springcloud.controller;
    
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.cloud.stream.messaging.Sink;
    import org.springframework.messaging.Message;
    import org.springframework.web.bind.annotation.RestController;
    
    /**
     * @ClassName ConsumerController
     * @Description: TODO
     * @Author wshaha
     * @Date 2023/10/15
     * @Version V1.0
     **/
    @RestController
    @EnableBinding(Sink.class)
    public class ConsumerController {
    
        @StreamListener(Sink.INPUT)
        public void receiveMessage(Message<String> message){
            System.out.println(message.getPayload());
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    (3)运行
    在这里插入图片描述

    三、解决消息重复消费

    (1)绑定同一交换机且不同组的消费者会收到相同消息
    (2)解决方式,同一组的消费者只有一个消费者会收到消息,故配置这群消费者为同一个组即可
    (3)配置
    在这里插入图片描述
    在这里插入图片描述

    四、消息持久化

    (1)定义分组后会实现消息持久化,原理:没定义分组时,服务对应的队列是autodelete,服务停止后就删除队列,手续发送的消息无法收到

  • 相关阅读:
    ELECTRA:Pre-training Text Encoders as Discriminators Rather Than Generators
    查找和排序 + 集合 + 单例模式【Java 基础_简单学习】
    @Controller VS @RestController 对比解析
    线性代数学习笔记5-3:标准正交基、正交矩阵、施密特正交化、QR分解
    2022年贵州省职业院校技能大赛中职组网络安全赛项规程
    马铃薯甲虫的成虫和幼虫数据集(YOLO检测)
    力扣(566.303)补8.25
    【RegNet】《Designing Network Design Spaces》
    第4章 Redis,一站式高性能存储方案(下)
    盛会落幕,精彩延续 | 云扩科技入选《2022中国AI商业落地市场研究报告》
  • 原文地址:https://blog.csdn.net/qq_25243147/article/details/133840953