• RabbitMQ基础篇 笔记


    RabbitMQ

    余额支付

    同步调用

    一步一步的来,支付业务写完后,如果之后加需求,还需要增加代码,不符合开闭原则

    性能上也有问题,openfeign是同步调用,性能太差。

    同步调用耦合太多。

    同步的优势是可以立即得到结果,例如查询,查到了就能知道结果。但是拓展性差,性能下降,级联失败。

    异步调用

    异步调用时基于消息通知的方式,一般包括3个角色。

    • 消息发送者

    • 消息代理

    • 消息接收值

    发送者发送东西到消息代理,消息接受者监听消息代理。(类似于外卖柜)

    故障也会隔离。

    缓存消息,起到流量削峰填谷的功能。(流量整形)

    缺点

    拿不到对方执行的结果。不确定有没有执行成功。

    安全依赖于mq的可靠性。(broker)


    对于运行结果不关心的场景,性能要求较高就可以使用异步。

    MQ技术选型

    Message Queue(MQ)先进先出的消息队列。MQ的技术有很多实现方案,就用RabbitMQ erlang编译的

    消费者与queue绑定。

    生产者与exchange(交换机)绑定

    交换机路由给queue,exchange与queue构成了broker(RabbitMQ)

    公司可能搭建一个mq,然后所有的服务都用这个。因此为了隔离创建了VirtualHost(类似于数据库)

    快速入门

    交换机是负责路由转发消息的,它本身没有存储消息的能力。
    必须让队列和交换机产生关系(绑定)

    数据隔离

    有虚拟主机的概念
    首先创建一个用户。
    添加用户

    添加虚拟主机
    不同的虚拟主机是互相隔离的。创建一个新的虚拟主机,那么所有的东西都是新的,类似与数据库。

    使用Java操作

    Spring AMQP

    amqp是一种消息通信协议,它是协议。Spring 提供了一套统一的amqp协议标准。定义了接口,没实现,只有Spring Rabbit实现了。

    1. 引入依赖
    2. 在每个微服务中引入MQ服务端信息,5672端口。
    3. rabbitTemplate发送和接收。

    生产者代码

    @SpringBootTest
    public class SpringAmqpTest {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        void testSendMessage2Queue() {
            String queueName = "simple.queue";
            String msg = "hello, amqp!";
            rabbitTemplate.convertAndSend(queueName, msg);
        }
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    消费者代码

        @RabbitListener(queues = "simple.queue")
        public void listenSimpleQueue(String msg){
            System.out.println("消费者收到了simple.queue的消息:【" + msg +"】");
        }
    
    • 1
    • 2
    • 3
    • 4

    Work Queues

    多个消费者绑定同一个queue。
    每个消息只会被一个消费者消费掉。
    生产者投递的消息,不会考虑到消费者的处理能力。 所以需要添加 prefetch 参数。
    每次只能取一条消息,处理完成才能获得下一个。这样就是能者多劳。
    解决消息堆积问题。

    Fanout 交换机(广播)

    真正的环境一定会有交换机的,而不是直接发送到队列。交换机是有路由功能的,比如多个服务监听队列,只有一个服务能收到信息。但是交换机不同,多个服务都可以监听到队列。
    他会将接收到的消息分发给与它绑定的每一个队列。
    可以为每个微服务创建队列,这样每个微服务就都收到了。

        @Test
        void testSendFanout() {
            String exchangeName = "hmall.fanout2";
            String msg = "hello, everyone!";
            //交换机名字,routingKey,消息
            rabbitTemplate.convertAndSend(exchangeName, null, msg);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    交换机就是接受消息,路由转发消息,fanout就是广播。

    Direct交换机(定向路由)

    消息发给不同的人。

    每一个Queue都与Exchange 设置一个 routingKey。
    发送者发送消息时,制定消息的 routingKey
    交换机只会给相同的 routingKey的队列投递消息。

    Topic 交换机

    与Direct交换机类似,区别是routingKey可以是多个单词的列表,并且按照.分割。#类似于正则中的*。而* 则代表一个单词。
    例如china.*表示所有关于china的内容。

    使用SpringAMQP创建交换机,队列

    
    @Configuration
    public class FanoutConfiguration {
    
        @Bean
        public FanoutExchange fanoutExchange(){
            // ExchangeBuilder.fanoutExchange("").build();
            return new FanoutExchange("hmall.fanout2");
        }
    
        @Bean
        public Queue fanoutQueue3(){
            // QueueBuilder.durable("ff").build();
            return new Queue("fanout.queue3");
        }
    
        @Bean
        public Binding fanoutBinding3(Queue fanoutQueue3, FanoutExchange fanoutExchange){
            return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);
        }
    
        @Bean
        public Queue fanoutQueue4(){
            return new Queue("fanout.queue4");
        }
    
        @Bean
        public Binding fanoutBinding4(){
            return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    这种绑定太麻烦,可以通过注解绑定

      @RabbitListener(bindings = @QueueBinding(
                value = @Queue(name = "direct.queue1", durable = "true"),
                exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
                key = {"red", "blue"}
        ))
        public void listenDirectQueue1(String msg) throws InterruptedException {
            System.out.println("消费者1 收到了 direct.queue1的消息:【" + msg +"】");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    消息转换器

    发送一个对象类型的消息,接受后变成了一堆乱码,使用了Java的序列化方式。我们需要将它转为JSON格式的。
    在有 @Configuration 注解的类上

        @Bean
        public MessageConverter jacksonMessageConvertor(){
            return new Jackson2JsonMessageConverter();
        }
    
    • 1
    • 2
    • 3
    • 4
  • 相关阅读:
    【初阶数据结构】——堆排序和TopK问题
    动态分析股票走势算法图,股票趋势预测算法
    MySQL 多表查询
    泛型擦除的理解
    C++编程功底和常用规则
    Linux工具的基本使用
    java毕业设计在线航班订票系统Mybatis+系统+数据库+调试部署
    前端大文件上传如何实现(文件切片)
    SQL抛出数据集用游标单条逐一执行
    2023最新短剧小程序搭建,短剧分销系统功能介绍
  • 原文地址:https://blog.csdn.net/qq_15046309/article/details/133998998