• 十二、消息服务(3)


    本章概要

    • Spring Boot 整合 RabbitMQ

    1. 创建项目

    Spring Boot 为 AMQP 提供了自动化配置依赖 Spring-boot-starter-amqp,因此首先创建 Spring Boot 项目并添加该依赖

    <dependency>
      <groupId>org.springframework.bootgroupId>
      <artifactId>spring-boot-starter-amqpartifactId>
    dependency>
    
    • 1
    • 2
    • 3
    • 4

    项目创建后,在 application.properties 中配置 RabbitMQ 的基本连接信息

    spring.rabbitmq.host=ip地址
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=long
    spring.rabbitmq.password=123
    
    • 1
    • 2
    • 3
    • 4

    接下来进行RabbitMQ 配置,在 RabbitMQ 中,所有的消息盛传着提交的消息都会交由 Exchange 进行再分配,Exchange 会根据不同的策略将消息分发到不同的 Queue 中。
    RabbitMQ 中一共提供了4种不同的 Exchange 策略,分别是 Direct、Fanout、Topic以及Hearder,其中前三种使用频率较高。

    2. 使用 Direct 策略

    DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,当一条消息到达 DirectExchange 时,会被转发到与该条消息 routing key 相同的 Queue 上,例如消息队列名为 “hello-queue”,则 routingkey 为 “hello-queue” ,DirectExchange 的配置如下:

    @Configuration
    public class RabbitDirectConfig {
        public final static String DIRECTNAME = "long-direct";
        @Bean
        Queue queue() {
            return new Queue("hello-queue");
        }
        @Bean
        DirectExchange directExchange() {
            return new DirectExchange(DIRECTNAME, true, false);
        }
        @Bean
        Binding binding() {
            return BindingBuilder.bind(queue()).to(directExchange()).with("direct");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    代码解释:

    • 首先提供一个消息队列 Queue ,然后创建一个 DirectExchange 对象,三个参数分别是名字、重启后是否依然有效、长期未使用时是否删除
    • 创建一个 Binding 对象,将 DirectExchange 和 Queue 绑定在一起
    • DirectExchange 和 Binding 两个 Bean 的配置可以省略掉,即如果使用 DirectExchange ,只配置一个 Queue 的实例即可

    然后配置一个消费者

    @Component
    public class DirectReceiver {
        @RabbitListener(queues = "hello-queue")
        public void handler1(String msg) {
            System.out.println("===================================DirectReceiver:" + msg);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    通过 @RabbitListener 注解指定一个方法是一个消息消费方法,方法参数就是所接收到的消息。然后在单元测试类中注入一个 RabbitTemplate 对象来进行消息发送

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitmqApplicationTests {
        @Autowired
        RabbitTemplate rabbitTemplate;
        @Test
        public void directTest() {
            rabbitTemplate.convertAndSend("hello-queue","hello direct !");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    确认 RabbitMQ 已经启动,然后启动 Spring Boot 项目,运行此单元测试方法,日志如下

    ===================================DirectReceiver:hello direct !
    
    • 1

    3. 使用 Fanout 策略

    FanoutExchange 的数据交换策略是吧所有到达 FanoutExchange 的消息转发给所有与它绑定的 Queue ,在这种策略中,routingkey 将不起任何作用,FanoutExchange 的配置方法如下:

    @Configuration
    public class RabbitFanoutConfig {
        public final static String FANOUTNAME = "long-fanout";
        @Bean
        FanoutExchange fanoutExchange() {
            return new FanoutExchange(FANOUTNAME, true, false);
        }
        @Bean
        Queue queueOne() {
            return new Queue("queue-one");
        }
        @Bean
        Queue queueTwo() {
            return new Queue("queue-two");
        }
        @Bean
        Binding bindingOne() {
            return BindingBuilder.bind(queueOne()).to(fanoutExchange());
        }
        @Bean
        Binding bindingTwo() {
            return BindingBuilder.bind(queueTwo()).to(fanoutExchange());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    在这里首先创建 FanoutExchange ,参数的含义与创建 DirectExchange 的参数的含义一致,然后创建两个 Queue ,再将这两个 Queue 绑定到 FanoutExchange 上。然后创建两个消费者,如下

    @Component
    public class FanoutReceiver {
        @RabbitListener(queues = "queue-one")
        public void handler1(String message) {
            System.out.println("==========FanoutReceiver:handler1:" + message);
        }
        @RabbitListener(queues = "queue-two")
        public void handler2(String message) {
            System.out.println("==========FanoutReceiver:handler2:" + message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    两个消费者分别消费两个消息队列中的消息,然后在单元测试中发送消息

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitmqApplicationTests {
        @Autowired
        RabbitTemplate rabbitTemplate;
        @Test
        public void fanoutTest() {
            rabbitTemplate.convertAndSend(RabbitFanoutConfig.FANOUTNAME,null,"hello fanout !");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    注意,这里发送消息时不需要 routingkey,指定exchange 即可,routingkey 可以直接传一个 null。
    重启项目,运行单元测试,打印日志如下:

    ==========FanoutReceiver:handler1:hello fanout !
    ==========FanoutReceiver:handler2:hello fanout !
    
    • 1
    • 2

    可以看到,一条消息发送出去之后,所有和该 FanoutExchange 绑定的 Queue 都收到了消息。

    4. 使用 Topic 策略

    TopicExchange 是比较复杂也比较灵活的一种路由策略,在 TopicExchange 中 ,Queue 通过 routingkey 绑定到 TopicExchange 上,当消息到达 TopicExchange 后,TopicExchange 根据消息的 routing 将消息路由到一个或者多个 Queue 上

    @Configuration
    public class RabbitTopicConfig {
        public final static String TOPICNAME = "long-topic";
        @Bean
        TopicExchange topicExchange() {
            return new TopicExchange(TOPICNAME, true, false);
        }
        @Bean
        Queue xiaomi() {
            return new Queue("xiaomi");
        }
        @Bean
        Queue huawei() {
            return new Queue("huawei");
        }
        @Bean
        Queue phone() {
            return new Queue("phone");
        }
        @Bean
        Binding xiaomiBinding() {
            return BindingBuilder.bind(xiaomi()).to(topicExchange())
                    .with("xiaomi.#");
        }
        @Bean
        Binding huaweiBinding() {
            return BindingBuilder.bind(huawei()).to(topicExchange())
                    .with("huawei.#");
        }
        @Bean
        Binding phoneBinding() {
            return BindingBuilder.bind(phone()).to(topicExchange())
                    .with("#.phone.#");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    代码解释:

    • 首先创建 TopicExchange ,参数和前面的一致。然后创建三个 Queue ,第一个 Queue 用来存储和 “xiaomi”有关的消息,另外两个类似
    • 将三个 Queue 分别绑定到 TopicExchange 上,第一个 Binding 中的“小米.#”表示消息的 routingkey 凡是以“xiaomi”开头的,都将被路由到名称为“xiaomi”的 Queue 上,另外两个类似

    接下来针对三个 Queue 创建三个消费者

    @Component
    public class TopicReceiver {
        @RabbitListener(queues = "phone")
        public void handler1(String message) {
            System.out.println("PhoneReceiver:"+ message);
        }
        @RabbitListener(queues = "xiaomi")
        public void handler2(String message) {
            System.out.println("XiaoMiReceiver:"+message);
        }
        @RabbitListener(queues = "huawei")
        public void handler3(String message) {
            System.out.println("HuaWeiReceiver:"+message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    然后在单元测试中进行消息的发送

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitmqApplicationTests {
        @Autowired
        RabbitTemplate rabbitTemplate;
        @Test
        public void topicTest() {
            rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"xiaomi.news","小米新闻(xiaomi接收)");
            rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"huawei.news","华为新闻(huawei接收)");
            rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"xiaomi.phone","小米手机(xiaomi、phone接收)");
            rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"huawei.phone","华为手机(xiaomi、phone接收)");
            rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"phone.news","手机新闻(phone接收)");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    重启项目,运行单元测试方法,日志如下

    XiaoMiReceiver:小米新闻(xiaomi接收)
    HuaWeiReceiver:华为新闻(huawei接收)
    PhoneReceiver:小米手机(xiaomi、phone接收)
    XiaoMiReceiver:小米手机(xiaomi、phone接收)
    PhoneReceiver:华为手机(xiaomi、phone接收)
    HuaWeiReceiver:华为手机(xiaomi、phone接收)
    PhoneReceiver:手机新闻(phone接收)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    疑问:以上单元测试发送消息,如果是一个一个的发送(其它的注释掉)运行结果没问题。但如果是同时发送消息,有些消费者并没有消费到消息,如下:

    HuaWeiReceiver:华为新闻(huawei接收)
    XiaoMiReceiver:小米新闻(xiaomi接收)
    PhoneReceiver:小米手机(xiaomi、phone接收)
    PhoneReceiver:手机新闻(phone接收)
    
    • 1
    • 2
    • 3
    • 4

    5. 使用 Header 策略

    HeaderExchange 是一种较少的路由策略,HeaderExchange 会根据消息的 Header 将消息路由到不同的 Queue 上,这种策略也和 routingkey 无关,配置如下:

    @Configuration
    public class RabbitHeaderConfig {
        public final static String HEADERNAME = "long-header";
        @Bean
        HeadersExchange headersExchange() {
            return new HeadersExchange(HEADERNAME, true, false);
        }
        @Bean
        Queue queueName() {
            return new Queue("name-queue");
        }
        @Bean
        Queue queueAge() {
            return new Queue("age-queue");
        }
        @Bean
        Binding bindingName() {
            Map<String, Object> map = new HashMap<>();
            map.put("name", "long");
            return BindingBuilder.bind(queueName())
                    .to(headersExchange()).whereAny(map).match();
        }
        @Bean
        Binding bindingAge() {
            return BindingBuilder.bind(queueAge())
                    .to(headersExchange()).where("age").exists();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    这里的配置大部分和前面介绍的一样,差别主要体现的 Binding 的配置上,第一个 bindingName 方法中,whereAny 表示消息的Header 中只要有一个 Header 匹配上 map 中的 key/value ,就把该消息路由到名为“name-queue”的 Queue 上,这里也可以使用 whereAll 方法,表示消息的所有 Header 都要匹配。whereAny 和 whereAll 实际上对应了一个名为 x-match 的属性。
    bindingAge 中的配置则表示只要消息的 Header 中包含 age,无论 age 的值是多少,都将路由到名为“age-queue”的 Queue 上。
    接下来创建两个消费者,如下

    @Component
    public class HeaderReceiver {
        @RabbitListener(queues = "name-queue")
        public void handler1(byte[] msg) {
            System.out.println("HeaderReceiver:name:"
                    + new String(msg, 0, msg.length));
        }
    
        @RabbitListener(queues = "age-queue")
        public void handler2(byte[] msg) {
            System.out.println("HeaderReceiver:age:"
                    + new String(msg, 0, msg.length));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    注意,这里的参数用 byte 数组接收。然后在单元测试中创建消息的发送方法,这里消息的发送也可 routingkey 无关。

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitmqApplicationTests1 {
        @Autowired
        RabbitTemplate rabbitTemplate;
        @Test
        public void headerTest() {
            Message nameMsg = MessageBuilder
                    .withBody("hello header! name-queue".getBytes())
                    .setHeader("name", "long").build();
            Message ageMsg = MessageBuilder
                    .withBody("hello header! age-queue".getBytes())
                    .setHeader("age", "99").build();
            rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, ageMsg);
            rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, nameMsg);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    这里创建两条消息,具有不同的 header ,将被发送到不同的 Queue 中。
    重启项目,运行单元测试方法,日志如下:

    HeaderReceiver:age:hello header! age-queue
    HeaderReceiver:name:hello header! name-queue
    
    • 1
    • 2
  • 相关阅读:
    蓝牙耳机学生党推荐,性价比高的学生党蓝牙耳机推荐
    10-io java
    【论文阅读】面向抽取和理解基于Transformer的自动作文评分模型的隐式评价标准(实验结果部分)
    bootstrap柵格
    Hi3798MV200 恩兔N2 NS-1 (一): 设备介绍和刷机说明
    解决方案| anyRTC远程检修应用场景
    实战项目:瑞吉外卖开发笔记
    【GAMES103学习笔记】刚体(Rigid Body)
    JVM基础05_执行引擎
    每日一题|2022-11-7|816. 模糊坐标|字符串枚举|Golang
  • 原文地址:https://blog.csdn.net/GXL_1012/article/details/126366638