• 【java_wxid项目】【第十四章】【Spring Cloud Stream集成】


    主项目链接:https://gitee.com/java_wxid/java_wxid
    项目架构及博文总结:

    项目模块:
    前期规划,实现部分

    java_wxid   
    ├── demo                                                            // 演示模块
    │     └── 模块名称:apache-mybatis-demo模块                            //Apache Mybatis集成(已实现并有博文总结)
    │     └── 模块名称:apache-shardingsphere-demo模块                     //Apache ShardingSphere集成(已实现并有博文总结)
    │     └── 模块名称:design-demo模块                                    //设计模式实战落地(已实现并有博文总结)
    │     └── 模块名称:elasticsearch-demo模块                             //ElasticSearch集成(已实现并有博文总结)
    │     └── 模块名称:mongodb-demo模块                                   //MongoDB集成(已实现并有博文总结)
    │     └── 模块名称:redis-demo模块                                     //Redis集成(已实现并有博文总结)
    │     └── 模块名称:spring-boot-demo模块                               //Spring Boot快速构建应用(已实现并有博文总结)
    │     └── 模块名称:spring-cloud-alibaba-nacos-demo模块                //Spring Cloud Alibaba Nacos集成(已实现并有博文总结)
    │     └── 模块名称:spring-cloud-alibaba-seata-demo模块                //Spring Cloud Alibaba Seata集成(已实现并有博文总结)
    │     └── 模块名称:spring-cloud-alibaba-sentinel-demo模块             //Spring Cloud Alibaba Sentinel集成(已实现并有博文总结)
    │     └── 模块名称:spring-cloud-gateway-demo模块                      //Spring Cloud Gateway集成(已实现并有博文总结)
    │     └── 模块名称:spring-cloud-hystrix-demo模块                      //Spring Cloud Hystrix集成(已实现并有博文总结)
    │     └── 模块名称:spring-cloud-open-feign-demo模块                   //Spring Cloud Open Feign集成(已实现并有博文总结)
    │     └── 模块名称:spring-cloud-ribbon-demo模块                       //Spring Cloud Ribbon集成(已实现并有博文总结)
    │     └── 模块名称:spring-cloud-security-oauth2-demo模块              //Spring Cloud Security Oauth2集成(已实现并有博文总结)
    │     └── 模块名称:spring-cloud-security-oauth2-sso-client-demo模块   //Spring Cloud Security Oauth2集成(已实现并有博文总结)
    │     └── 模块名称:spring-cloud-skywalking-demo模块                   //Spring Cloud Skywalking集成(已实现并有博文总结)
    │     └── 模块名称:spring-cloud-stream-demo模块                       //Spring Cloud Stream集成(已实现并有博文总结)
    │     └── 模块名称:swagger-demo模块                                   //springfox-swagger2集成(已实现并有博文总结)
    │     └── 模块名称:xxl-job模块                                        //xxl-job集成(已实现并有博文总结)
    │     └── 模块名称:apache-spark-demo模块                              //Apache Spark集成
    │     └── 模块名称:etl-hdfs-hive-hbase-demo模块                       //ETL、HDFS、Hive、Hbase集成
    │     └── 模块名称:ddd-mode-demo模块                                  //DDD领域设计
    │     └── 模块名称:netty-demo模块                                     //Netty集成
    │     └── 模块名称:vue-demo模块                                       //前端vue集成
    ├── document                                                        // 文档
    │     └── JavaKnowledgeDocument                                     //java知识点
    │           └── java基础知识点.md                     
    │           └── mq知识点.md
    │           └── mysql知识点.md
    │           └── redis知识点.md
    │           └── springcould知识点.md
    │           └── spring知识点.md
    │     └── FounderDocument                                           //创始人
    │           └── 创始人.md
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    系列文章:快速集成各种微服务相关的技术,帮助大家可以快速集成到自己的项目中,节约开发时间。
    提示:系列文章还未全部完成,后续的文章,会慢慢补充进去的。

    创建elasticsearch-demo项目

    项目代码:https://gitee.com/java_wxid/java_wxid/tree/master/demo/spring-cloud-stream-demo
    项目结构如下(示例):
    在这里插入图片描述

    修改pom.xml

    代码如下(示例):

    
    
        4.0.0
        com.example
        spring-cloud-stream-demo
        0.0.1-SNAPSHOT
        spring-cloud-stream-demo
        Demo project for Spring Boot
    
        
            UTF-8
            UTF-8
            1.8
            
            2.3.12.RELEASE
            Hoxton.SR12
            2.2.7.RELEASE
        
        
        
            
                
                    org.springframework.boot
                    spring-boot-starter-parent
                    ${spring.boot.version}
                    pom
                    import
                
                
                    org.springframework.cloud
                    spring-cloud-dependencies
                    ${spring.cloud.version}
                    pom
                    import
                
                
                    com.alibaba.cloud
                    spring-cloud-alibaba-dependencies
                    ${spring.cloud.alibaba.version}
                    pom
                    import
                
            
        
        
            
                org.springframework.boot
                spring-boot-starter
            
    
            
                org.springframework.boot
                spring-boot-starter-test
                test
            
    
            
                org.springframework.cloud
                spring-cloud-stream
            
    
            
                org.springframework.cloud
                spring-cloud-stream
            
    
            
            
                com.alibaba.cloud
                spring-cloud-starter-stream-rocketmq
                2.0.1.RELEASE
            
            
            
                org.springframework.cloud
                spring-cloud-starter-stream-kafka
                2.0.1.RELEASE
            
            
                org.apache.kafka
                kafka-streams
            
    
    
            
            
                org.springframework.cloud
                spring-cloud-starter-stream-rabbit
                2.0.1.RELEASE
            
    
            
                org.springframework.boot
                spring-boot-starter-web
                2.3.3.RELEASE
            
        
    
        
            
                
                    org.springframework.boot
                    spring-boot-maven-plugin
                
            
        
    
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114

    修改SpringCloudStreamDemoApplication

    代码如下(示例):

    package com.example.springcloudstreamdemo;
    
    import com.example.springcloudstreamdemo.passage.*;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    
    @EnableBinding(value = {StreamSource.class, StreamSink.class})
    @SpringBootApplication
    //Spring boot的CommandLineRunner接口主要用于实现在应用初始化后,去执行一段代码块逻辑,这段初始化代码在整个应用生命周期内只会执行一次
    public class SpringCloudStreamDemoApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(SpringCloudStreamDemoApplication.class, args);
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    创建application.yml

    代码如下(示例):

    server:
      port: 8097
    spring:
      rabbitmq:
    #    host: 106.14.132.94
    #    port: 5672
    #    username: admin
    #    password: java@2022
    #    virtual-host: /
        host: 192.168.160.128
        port: 5672
        username: admin
        password: admin
        virtual-host: /
      application:
        name: spring-cloud-stream-demo
      autoconfigure:
        # 使用multiple RabbitMQ binders 时需要排除RabbitAutoConfiguration
        exclude:
          - org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration
      cloud:
        stream:
          default-binder: rocketmq #选择默认绑定器
          rocketmq:
            binder:
              name-server: 192.168.160.128:9876
          kafka:
            binder:
              brokers: 106.14.132.94:9092
              # 自动创建Topic
              auto-create-topics: true
          binders: #可以绑定多个消息中间件
            rabbit: #表示定义的名称,用于binding整合 名字可以自定义  在此处配置要绑定的rabbitmq的服务信息
              type: rabbit # 消息组件类型
            rocketmq: #表示定义的名称,用于binding整合 名字可以自定义 在此处配置要绑定的rocket的服务信息
              type: rocketmq
            kafka:
              type: kafka
          bindings: # 服务的整合处理
            rabbitmqOutput:  # 这个名字是一个binding的名称(自定义)
              destination: rabbitmq-destination # 通道,如果用的是RabbitMQ就是交换机名称
              content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
              default-binder: rabbit # 如果没设定,就使用default-binder默认的
              producer:
                routing-key-expression: headers.routingKey   # 发送端路由key
                delayed-exchange: true    # 开启延时队列
              # 指定了消息分区的数量。
              partitionCount: 2
              # 指定分区键的表达式规则,我们可以根据实际的输出消息规则来配置SpEL来生成合适的分区键;
              partition-key-expression: headers.id1
            rabbitmqInput:  # 这个名字是一个binding的名称(自定义)
              destination: rabbitmq-destination # 通道,如果用的是RabbitMQ就是交换机名称
              content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
              default-binder: rabbit # 设置要绑定的消息服务的具体设置
              group: my-rabbitmq-group  # 分组名称,在rabbit当中其实就是交换机绑定的队列名称
              consumer:
                concurrency: 2 # 初始/最少/空闲时 消费者数量,默认1
                max-attempts: 3 #重试次数
                partitioned: true #通过该参数开启消费者分区功能;
            kafkaOutput: # 通道名称
              destination: kafka-destination # 消息的主题名 消息发往的目的地,对应topic 在发送消息的配置里面,group是不用配置的
              content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain” 如果我们需要传输json的信息,那么在发送消息端需要设置content-type为json(其实可以不写,默认content-type就是json)
              default-binder: kafka # 如果没设定,就使用default-binder默认的
              # 指定了消息分区的数量。
              partitionCount: 2
              # 指定分区键的表达式规则,我们可以根据实际的输出消息规则来配置SpEL来生成合适的分区键;
              partition-key-expression: headers.id2
            kafkaInput:
              destination: kafka-destination # 消息的主题名 消息发往的目的地,对应topic
              content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
              default-binder: kafka # 设置要绑定的消息服务的具体设置
              group: my-kafka-group  # 分组名称,在kafka当中其实就是交换机绑定的队列名称
              consumer:
                concurrency: 2 # 初始/最少/空闲时 消费者数量,默认1
                max-attempts: 3 #重试次数
                partitioned: true #通过该参数开启消费者分区功能;
            rocketmqOutput: # 通道名称
              destination: rocket-destination # 消息发往的目的地,对应topic 在发送消息的配置里面,group是不用配置的
              content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain” 如果我们需要传输json的信息,那么在发送消息端需要设置content-type为json(其实可以不写,默认content-type就是json)
              default-binder: rocketmq # 如果没设定,就使用default-binder默认的
              # 指定了消息分区的数量。
              partitionCount: 2
              # 指定分区键的表达式规则,我们可以根据实际的输出消息规则来配置SpEL来生成合适的分区键;
              partition-key-expression: headers.id3
            rocketmqInput:
              destination: rocket-destination # 消息发往的目的地,对应topic
              content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
              default-binder: rocketmq # 设置要绑定的消息服务的具体设置
              group: my-rocketmq-group  # 分组名称,在rocket当中其实就是交换机绑定的队列名称
              consumer:
                concurrency: 2 # 初始/最少/空闲时 消费者数量,默认1
                max-attempts: 3 #重试次数
                partitioned: true #通过该参数开启消费者分区功能;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93

    创建KafkaConsumer

    代码如下(示例):

    package com.example.springcloudstreamdemo.consumer;
    
    import com.example.springcloudstreamdemo.passage.StreamSink;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.stereotype.Component;
    
    /**
     * @author zhiwei Liao
     * @Description
     * @Date create in 2022/9/12 0012 21:03
     */
    @Component
    @EnableBinding(StreamSink.class)
    public class KafkaConsumer {
    
        private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
    
        @StreamListener(target = StreamSink.KAFKAINPUT)
        public void consume(String message) {
            logger.info("Kafka recieved a string message : " + message);
        }
    
        @StreamListener(target = StreamSink.KAFKAINPUT, condition = "headers['type']=='kafka'")
        public void handle(String message) {
            logger.info("Kafka recieved a complex message: {}",  message);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    创建RabbitmqConsumer

    代码如下(示例):

    package com.example.springcloudstreamdemo.consumer;
    
    import com.example.springcloudstreamdemo.passage.StreamSink;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.stereotype.Component;
    
    /**
     * @author zhiwei Liao
     * @Description
     * @Date create in 2022/9/12 0012 21:03
     */
    @Component
    @EnableBinding(StreamSink.class)
    public class RabbitmqConsumer {
    
        private static final Logger logger = LoggerFactory.getLogger(RabbitmqConsumer.class);
    
        @StreamListener(target = StreamSink.RABBITMQINPUT)
        public void consume(String message) {
            logger.info("rabbitmq recieved a string message : " + message);
        }
    
        @StreamListener(target = StreamSink.RABBITMQINPUT, condition = "headers['type']=='rabbitmq'")
        public void handle(String message) {
            logger.info("rabbitmq recieved a complex message : {}",  message);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    创建RocketmqConsumer

    代码如下(示例):

    package com.example.springcloudstreamdemo.consumer;
    
    import com.example.springcloudstreamdemo.passage.StreamSink;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.stereotype.Component;
    
    /**
     * @author zhiwei Liao
     * @Description
     * @Date create in 2022/9/12 0012 21:03
     */
    @Component
    @EnableBinding(StreamSink.class)
    public class RocketmqConsumer {
    
        private static final Logger logger = LoggerFactory.getLogger(RocketmqConsumer.class);
    
        @StreamListener(target = StreamSink.ROCKETMQINPUT)
        public void consume(String message) {
            logger.info("Rocketmq recieved a string message : " + message);
        }
    
        @StreamListener(target = StreamSink.ROCKETMQINPUT, condition = "headers['type']=='rocketmq'")
        public void handle(String message) {
            logger.info("Rocketmq recieved a complex message : {}", message);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    创建KafkaController

    代码如下(示例):

    package com.example.springcloudstreamdemo.controller;
    
    import com.example.springcloudstreamdemo.producer.StraamProducer;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    /**
     * @author zhiwei Liao
     * @Description
     * @Date create in 2022/9/12 0012 21:03
     */
    @RestController
    @RequestMapping("/kafka")
    public class KafkaController {
    
        @Autowired
        private StraamProducer straamProducer;
    
        /**
         * 发送消息
         * @param payload
         */
        @GetMapping("/sendSucceed")
        public void sendSucceed(String payload) {
            straamProducer.kafkaSendMessage(payload);
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    创建RabbitmqController

    代码如下(示例):

    package com.example.springcloudstreamdemo.controller;
    
    import com.example.springcloudstreamdemo.producer.StraamProducer;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    /**
     * @author zhiwei Liao
     * @Description
     * @Date create in 2022/9/12 0012 21:03
     */
    @RestController
    @RequestMapping("/rabbitmq")
    public class RabbitmqController {
    
        @Autowired
        private StraamProducer straamProducer;
    
        /**
         * 发送消息
         * @param payload
         */
        @GetMapping("/sendSucceed")
        public boolean sendSucceed(String payload) {
            return straamProducer.rabbitmqSendMessage(payload);
        }
    
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    创建RocketmqController

    代码如下(示例):

    package com.example.springcloudstreamdemo.controller;
    
    import com.example.springcloudstreamdemo.passage.StreamSink;
    import com.example.springcloudstreamdemo.producer.StraamProducer;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;
    
    import javax.annotation.Resource;
    
    /**
     * @author zhiwei Liao
     * @Description
     * @Date create in 2022/9/12 0012 21:03
     */
    @EnableBinding(StreamSink.class)
    @RestController
    @RequestMapping("/rocketmq")
    public class RocketmqController {
    
        @Resource
        private StraamProducer straamProducer;
    
        @GetMapping("/sendSucceed")
        public boolean sendSucceed(@RequestParam(value="payload") String payload) {
            return straamProducer.rocketmqSendMessage(payload);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    创建StreamSink

    代码如下(示例):

    package com.example.springcloudstreamdemo.passage;
    
    import org.springframework.cloud.stream.annotation.Input;
    import org.springframework.messaging.SubscribableChannel;
    
    /**
     * @author zhiwei Liao
     * @Description
     * @Date create in 2022/9/12 0012 21:03
     */
    public interface StreamSink {
    
    
        String KAFKAINPUT = "kafkaInput";
    
        @Input(KAFKAINPUT)
        SubscribableChannel kafkaInput();
    
        String ROCKETMQINPUT = "rocketmqInput";
    
        @Input(ROCKETMQINPUT)
        SubscribableChannel rocketmqInput();
    
        String RABBITMQINPUT = "rabbitmqInput";
    
        @Input(RABBITMQINPUT)
        SubscribableChannel rabbitmqInput();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    创建StreamSource

    代码如下(示例):

    package com.example.springcloudstreamdemo.passage;
    
    import org.springframework.cloud.stream.annotation.Output;
    import org.springframework.messaging.MessageChannel;
    
    /**
     * @author zhiwei Liao
     * @Description
     * @Date create in 2022/9/12 0012 21:03
     */
    public interface StreamSource {
    
        String ROCKETMQOUTPUT = "rocketmqOutput";
    
        @Output(ROCKETMQOUTPUT)
        MessageChannel rocketmqOutput();
    
        String RABBITMQOUTPUT = "rabbitmqOutput";
    
        @Output(RABBITMQOUTPUT)
        MessageChannel rabbitmqOutput();
    
        String KAFKAOUTPUT = "kafkaOutput";
    
        @Output(KAFKAOUTPUT)
        MessageChannel kafkaOutput();
    
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    创建StreamProducer

    代码如下(示例):

    package com.example.springcloudstreamdemo.producer;
    
    import com.example.springcloudstreamdemo.passage.StreamSource;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.function.StreamBridge;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    
    /**
     * @author zhiweiLiao
     * @Description kafka的生产者
     * @Date create in 2022/9/13 0013 14:39
     */
    @EnableBinding({StreamSource.class})
    public class StreamProducer {
    
        @Autowired
        private StreamSource streamSource;
    
        @Autowired
        private StreamBridge streamBridge;
    
        public boolean rocketmqSendMessage(Object payload) {
            Message message = MessageBuilder.withPayload(payload)
                    .setHeader("type", "rocketmq")
                    .setHeader("x-delay", 5000)
                    .build();
    //        return streamBridge.send("rocketmqOutput",message);
            return streamSource.rocketmqOutput().send(message);
        }
    
        public boolean rabbitmqSendMessage(Object payload) {
            Message message = MessageBuilder.withPayload(payload)
                    .setHeader("type", "rabbitmq")
                    .setHeader("x-delay", 5000)
                    .build();
            return streamSource.rabbitmqOutput().send(message);
        }
    
        public boolean kafkaSendMessage(Object payload) {
            Message message = MessageBuilder.withPayload(payload)
                    .setHeader("type", "kafka")
                    .setHeader("x-delay", 5000)
                    .build();
            return streamSource.kafkaOutput().send(message);
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    创建RabbitMQUtil

    代码如下(示例):

    package com.example.springcloudstreamdemo.util;
    
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class RabbitMQUtil {
        private static ConnectionFactory factory;
        //程序一加载就会启动
        static {
            //1.创建连接工厂对象
            factory=new ConnectionFactory();
            //设置连接对象的参数
            factory.setHost("106.14.132.94");
            factory.setPort(5672);
            factory.setUsername("admin");
            factory.setPassword("java@2022");
            factory.setVirtualHost("/");
        }
        public static Connection getconnection(){
            Connection connection=null;
            try{
                connection=factory.newConnection();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
            return connection;
        }
        public static void main(String[] args) {
            System.out.println(getconnection());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    校验是否正常工作

    使用rocketmq发消息

    如下图(示例):

    在这里插入图片描述调用接口发消息,如下图(示例):
    在这里插入图片描述控制台打印,如下图(示例):
    在这里插入图片描述

  • 相关阅读:
    C++之STL-String
    ES6 入门教程 6 正则的扩展 6.13 d 修饰符:正则匹配索引 & 6.14 String.prototype.matchAll()
    了解一下拉绳位移编码器还有哪些功能特点?
    npm ERR! path /Users/apple/.npm/_cacache/index-v5/11/77/cf18d9ab54d565b57fb3
    技术干货|你需要知道的 10 种常见机器学习算法
    springboot项目配置多数据库连接
    TS的类型编程
    合成孔径雷达干涉测量InSAR数据处理、地形三维重建、形变信息提取、监测等应用
    【Linux系统】第二篇、权限管理篇
    Lens5 指南:专为Kubernetes人员设计的IDE
  • 原文地址:https://blog.csdn.net/java_wxid/article/details/127145677