• SpringBoot 集成 RabbitMq 实现五种常用消息模型


    前言:本次主要实现 SpringBoot 集成 RabbitMq 模拟 RabbitMq 的五种常用消息模型

    项目背景
    一个简单的 Demo ,生产者与消费者都在同一个项目,这里只是为了简单实现功能,所以就不区分生产者和消费者创建两个项目了,而且只是为了实现五种消息模型,快速上手,关于 RabbitMq 的相关特性这里也不多做赘述,有兴趣可以自行了解。

    项目结构
    在这里插入图片描述

    一、项目基础

    1、配置文件

    server:
      port: 7008
    spring:
      application:
        name: rabbitmq-demo
      #配置rabbitMq 服务器
      rabbitmq:
        host: 192.168.202.128
        port: 5672
        username: guest
        password: guest
        virtual-host: /
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    2、pom 文件引入依赖

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    二、RabbitMq 五种消息模式

    1、 HelloWorld 简单队列模式

    一对一生产消费,一个生产者,一个消费者

    1.1、创建配置文件,配置队列

    @Configuration
    public class RabbitMqConfig {
    
        // 简单队列模式 hello world 一个生产者一个消费者
        public static final String HELLO_WORLD_QUEUE = "hello_world";
    
    
        /**
         * ********************************** helloWorld 简单队列模式 **********************************
         */
        @Bean
        public Queue helloQueue() {
            return new Queue(HELLO_WORLD_QUEUE);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    1.2、生产者

        private final RabbitTemplate rabbitTemplate;
    
    
        @RequestMapping("/helloSend")
        public void helloSend() {
            String message = "helloWorld Message";
            log.info("生产者发送消息 :{}", message);
            rabbitTemplate.convertAndSend(RabbitMqConfig.HELLO_WORLD_QUEUE, message);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    1.3、消费者

    @Slf4j
    @Component
    public class RabbitMqConsumer {
    
        /**
         * helloWorld 模式
         *
         * @param message 消息内容
         */
        @RabbitListener(queues = RabbitMqConfig.HELLO_WORLD_QUEUE)
        public void helloWorldConsumer(String message) {
            log.info("消费者接收到消息:{}", message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    1.4、测试

    在这里插入图片描述

    2、Work 工作队列模式

    一个生产者对应多个消费者,比如生产者生产大量数据,我们可以用多个消费者去进行消费,每个消费者之间不会重复消费

    2.1、创建配置文件,配置队列

    @Configuration
    public class RabbitMqConfig {
    
        // 工作队列模式 一个生产者多个消费者
        public static final String WORK_QUEUE = "work";
    
    
        /**
         * ********************************** work 工作队列模式 **********************************
         */
        @Bean
        public Queue workQueue() {
            return new Queue(WORK_QUEUE);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    2.2、生产者

        private final RabbitTemplate rabbitTemplate;
    
    
        @RequestMapping("/workSend")
        public void workSend() {
            for (int i = 1; i <= 10; i++) {
                String message = "work Message" + i;
                log.info("工作队列模式生产者发送消息 :{}", message);
                rabbitTemplate.convertAndSend(RabbitMqConfig.WORK_QUEUE, message);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    2.3、消费者

    @Slf4j
    @Component
    public class RabbitMqConsumer {
    
        /**
         * work 模式
         *
         * @param message 消息内容
         */
        @RabbitListener(queues = RabbitMqConfig.WORK_QUEUE)
        public void workConsumer1(String message) {
            log.info("工作队列模式消费者1接收到消息:{}", message);
        }
    
        /**
         * work 模式
         *
         * @param message 消息内容
         */
        @RabbitListener(queues = RabbitMqConfig.WORK_QUEUE)
        public void workConsumer2(String message) {
            log.info("工作队列模式消费者2接收到消息:{}", message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    2.4、测试

    在这里插入图片描述

    3、Public/Subscribe 发布订阅模式

    发布订阅模式,一个生产者,可以有多个消费者,例如,有一个上游公共数据,下游用户谁需要就可以自行订阅,进行消费,使用较多的一个消费模式。

    一条消息可以被多个消费者消费

    队列需要与交换机绑定

    3.1、创建配置文件,配置队列和交换机

    @Configuration
    public class RabbitMqConfig {
    
    /**
         * ********************************** publish/subscribe 发布订阅模式 **********************************
         */
    
        // publish/subscribe 发布订阅模式 队列
        public static final String FANOUT_QUEUE1 = "fanout_queue1";
        public static final String FANOUT_QUEUE2 = "fanout_queue2";
        // 发布订阅模式交换机
        public static final String FANOUT_EXCHANGE = "fanout_exchange";
    
        /**
         * 发布订阅模式队列1
         */
        @Bean
        public Queue fanoutQueue1() {
            return new Queue(FANOUT_QUEUE1);
        }
    
        /**
         * 发布订阅模式队列2
         */
        @Bean
        public Queue fanoutQueue2() {
            return new Queue(FANOUT_QUEUE2);
        }
    
        /**
         * 发布订阅模式交换机
         */
        @Bean
        public FanoutExchange fanoutExchange() {
            return new FanoutExchange(FANOUT_EXCHANGE);
        }
    
        /**
         * 发布订阅模式队列1和交换机绑定
         */
        @Bean
        public Binding bindingFanoutExchange1() {
            return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
        }
    
        /**
         * 发布订阅模式队列2和交换机绑定
         */
        @Bean
        public Binding bindingFanoutExchange2() {
            return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    3.2、生产者

    注意:这里需要将第二个参数 RoutingKey 置为空

        private final RabbitTemplate rabbitTemplate;
    
        @RequestMapping("/fanoutSend")
        public void fanoutSend() {
            String message = "fanout Message";
            log.info("发布订阅模式生产者发送消息 :{}", message);
            rabbitTemplate.convertAndSend(RabbitMqConfig.FANOUT_EXCHANGE, "", message);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    3.3、消费者

    @Slf4j
    @Component
    public class RabbitMqConsumer {
    
    /**
         * 发布订阅模式
         *
         * @param message 消息内容
         */
        @RabbitListener(queues = RabbitMqConfig.FANOUT_QUEUE1)
        public void fanoutConsumer1(String message) {
            log.info("发布订阅模式消费者1接收到消息:{}", message);
        }
    
        /**
         * 发布订阅模式
         *
         * @param message 消息内容
         */
        @RabbitListener(queues = RabbitMqConfig.FANOUT_QUEUE2)
        public void fanoutConsumer2(String message) {
            log.info("发布订阅模式消费者2接收到消息:{}", message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    3.4、测试

    在这里插入图片描述

    4、RoutingKey 路由模式

    路由模式,比发布订阅模式多了一个 RoutingKey 的区分,队列需要与交换机绑定,同时需要给队列指定 routingKey

    比如,我们希望不同的队列消费不同的消息,A队列消费订单数据,B队列消费库存数据
    我们就需要分别给两个队列去指定属于订单和库存的 RoutingKey

    交换机会将消息根据 RoutingKey 分发到不同的队列

    4.1、创建配置文件,配置队列和交换机

    @Configuration
    public class RabbitMqConfig {
    
     /**
         * ********************************** routing 路由模式 **********************************
         */
    
        // routing 路由模式
        public static final String DIRECT_QUEUE1 = "direct_queue1";
        public static final String DIRECT_QUEUE2 = "direct_queue2";
        // 路由模式交换机
        public static final String DIRECT_EXCHANGE = "direct_exchange";
        // 路由模式 routing key
        public static final String DIRECT_ROUTING_KEY1 = "direct.routing.key1";
        public static final String DIRECT_ROUTING_KEY2 = "direct.routing.key2";
    
        /**
         * 路由模式队列1
         */
        @Bean
        public Queue directQueue1() {
            return new Queue(DIRECT_QUEUE1);
        }
    
        /**
         * 路由模式队列2
         */
        @Bean
        public Queue directQueue2() {
            return new Queue(DIRECT_QUEUE2);
        }
    
        /**
         * 路由模式交换机
         */
        @Bean
        public DirectExchange directExchange() {
            return new DirectExchange(DIRECT_EXCHANGE);
        }
    
        /**
         * 路由模式队列1和交换机绑定
         */
        @Bean
        public Binding bindingDirectExchange1() {
            return BindingBuilder.bind(directQueue1()).to(directExchange()).with(DIRECT_ROUTING_KEY1);
        }
    
        /**
         * 路由模式队列2和交换机绑定
         */
        @Bean
        public Binding bindingDirectExchange2() {
            return BindingBuilder.bind(directQueue2()).to(directExchange()).with(DIRECT_ROUTING_KEY2);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    4.2、生产者

    消息发送时需要指定交换机和 RoutingKey

        private final RabbitTemplate rabbitTemplate;
    
    
        @RequestMapping("/routingSend")
        public void routingSend() {
            String routingKeyMessage1 = "routing Message 1";
            String routingKeyMessage2 = "routing Message 2";
            log.info("路由模式生产者发送消息 :{}", routingKeyMessage1);
            log.info("路由模式生产者发送消息 :{}", routingKeyMessage2);
            rabbitTemplate.convertAndSend(RabbitMqConfig.DIRECT_EXCHANGE, RabbitMqConfig.DIRECT_ROUTING_KEY1, routingKeyMessage1);
            rabbitTemplate.convertAndSend(RabbitMqConfig.DIRECT_EXCHANGE, RabbitMqConfig.DIRECT_ROUTING_KEY2, routingKeyMessage2);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    4.3、消费者

    @Slf4j
    @Component
    public class RabbitMqConsumer {
    
        /**
         * 路由模式
         *
         * @param message 消息内容
         */
        @RabbitListener(queues = RabbitMqConfig.DIRECT_QUEUE1)
        public void directConsumer1(String message) {
            log.info("路由模式消费者1接收到消息:{}", message);
        }
    
        /**
         * 路由模式
         *
         * @param message 消息内容
         */
        @RabbitListener(queues = RabbitMqConfig.DIRECT_QUEUE2)
        public void directConsumer2(String message) {
            log.info("路由模式消费者2接收到消息:{}", message);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    4.4、测试

    在这里插入图片描述

    5、Topic 通配符模式

    通配符模式与路由模式较为类似,路由模式通过交换机+RoutingKey 将消息分发到对应的队列,而通配符模式则是类似于 RoutingKey 多了一个模糊查询
    通配符模式,队列绑定交换机时,需要指定对应的 RoutingKey 通配符

    RoutingKey 一般可能由一个或者多个单词组成,中间用 . 隔开,如:user.data

    通配符模式有两个通配符号,* 和 #
    星号: 表示匹配一个单词,例如,“user.*” 匹配 user.data , user.name 等等
    #号: 表示匹配多个单词,例如,”user.#“ 匹配 user.data.name , user.data.name.age

    5.1、创建配置文件,配置队列和交换机

    @Configuration
    public class RabbitMqConfig {
    
         /**
         * ********************************** topic 通配符模式 **********************************
         */
    
        // topic 通配符模式
        public static final String TOPIC_QUEUE1 = "topic_queue1";
        public static final String TOPIC_QUEUE2 = "topic_queue2";
        // 通配符模式交换机
        public static final String TOPIC_EXCHANGE = "topic_exchange";
        public static final String TOPIC_ROUTING_KEY1 = "topic.routing.key.*";
        public static final String TOPIC_ROUTING_KEY2 = "topic.routing.key.#";
    
        /**
         * 通配符模式队列1
         */
        @Bean
        public Queue topicQueue1() {
            return new Queue(TOPIC_QUEUE1);
        }
    
        /**
         * 通配符模式队列2
         */
        @Bean
        public Queue topicQueue2() {
            return new Queue(TOPIC_QUEUE2);
        }
    
        /**
         * 通配符模式交换机
         */
        @Bean
        public TopicExchange topicExchange() {
            return new TopicExchange(TOPIC_EXCHANGE);
        }
    
        /**
         * 通配符模式队列1和交换机绑定
         */
        @Bean
        public Binding bindingTopictExchange1() {
            return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with(TOPIC_ROUTING_KEY1);
        }
    
        /**
         * 通配符模式队列2和交换机绑定
         */
        @Bean
        public Binding bindingTopicExchange2() {
            return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with(TOPIC_ROUTING_KEY2);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    5.2、生产者

    消息发送指定交换机和 RoutingKey 通配符

        private final RabbitTemplate rabbitTemplate;
    
    
        @RequestMapping("/topicSend")
        public void topicSend() {
            String routingKeyMessage1 = "topic Message 1";
            String routingKeyMessage2 = "topic Message 2";
            String routingKeyMessage3 = "topic Message 3";
            log.info("通配符模式生产者发送消息 :{}", routingKeyMessage1);
            log.info("通配符模式生产者发送消息 :{}", routingKeyMessage2);
            log.info("通配符模式生产者发送消息 :{}", routingKeyMessage3);
            rabbitTemplate.convertAndSend(RabbitMqConfig.TOPIC_EXCHANGE, "topic.routing.key.value.1", routingKeyMessage1);
            rabbitTemplate.convertAndSend(RabbitMqConfig.TOPIC_EXCHANGE, "topic.routing.key.value.2", routingKeyMessage2);
            rabbitTemplate.convertAndSend(RabbitMqConfig.TOPIC_EXCHANGE, "topic.routing.key.value", routingKeyMessage3);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    5.3、消费者

    @Slf4j
    @Component
    public class RabbitMqConsumer {
    
        /**
         * 通配符模式
         *
         * @param message 消息内容
         */
        @RabbitListener(queues = RabbitMqConfig.TOPIC_QUEUE1)
        public void topicConsumer1(String message) {
            log.info("通配符模式消费者1接收到消息:{}", message);
        }
    
        /**
         * 通配符模式
         *
         * @param message 消息内容
         */
        @RabbitListener(queues = RabbitMqConfig.TOPIC_QUEUE2)
        public void topicConsumer2(String message) {
            log.info("通配符模式消费者2接收到消息:{}", message);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    5.4、测试

    在这里插入图片描述

    源码

    注意,代码里面还包含了消息确认机制、死信队列、延时队列的内容,看的话,请仔细一点

    https://github.com/wxwhowever/springboot-notes/tree/master/rabbitmq-demo

  • 相关阅读:
    流程控制for和while循环语句
    反射机制了解
    Python多线程模块concurrent.futures使用方法
    两年独立开发经验程序员告诉我们赚钱的经验(听听真正赚到钱的高手做法)
    探究竟篇之React中的state
    英国消费“避之不及”,东南亚“爱不释手”,TikTok Shop为何?
    华为智慧屏 招一招新玩法,手机操控智慧屏,视频搜索,播放控制,截图分享轻松搞定
    软考高级软件架构师学习笔记一
    Linux权限的理解
    Java版工程项目管理系统平台+企业工程系统源码+助力工程企业实现数字化管理
  • 原文地址:https://blog.csdn.net/wxw1997a/article/details/126030879