• 快速入门RabbitMQ并且加入项目实战


    📣 📣 📣 📢📢📢
    ☀️☀️你好啊!小伙伴,我是小冷。是一个兴趣驱动自学练习两年半的的Java工程师。
    📒 一位十分喜欢将知识分享出来的Java博主⭐️⭐️⭐️,擅长使用Java技术开发web项目和工具
    📒 文章内容丰富:覆盖大部分java必学技术栈,前端,计算机基础,容器等方面的文章
    📒 如果你也对Java感兴趣,关注小冷吧,一起探索Java技术的生态与进步,一起讨论Java技术的使用与学习
    ✏️高质量技术专栏专栏链接: 微服务数据结构netty单点登录SSMSpringCloudAlibaba
    😝公众号😝想全栈的小冷,分享一些技术上的文章,以及解决问题的经验
    当前专栏RabbitMQ系列

    文章目录

    一、MQ概述

    精短简介

    Java中有队列数据结构,但是是基于内存的,只有本JVM可以使用
    
    MQ中间件是一个多客户端节点可以操作的队列结构
    
    • 1
    • 2
    • 3

    简单案例

    异步消息处理(注册)

    1639403611783

    发送邮件、发送注册短信使用异步消息的模式,使得注册操作快速响应
    
    • 1

    应用解耦(订单_库存)

    1639403799134

    扣减库存接口可能会根据库存系统的升级而升级,而不得不导致订单系统也需要升级(重新部署)
    
    使用消息队列后,库存系统订阅队列即可,无需关心库存系统接口升级的问题
    
    • 1
    • 2
    • 3

    流量控制

    1639403877743

    高并发场景下(秒杀),将请求存入mq,后台系统按照自己的处理能力来消费任务,达到削峰的目的
    
    • 1

    概述

    1639404125733

    消息代理(message broker)

    消息代理:指安装了消息中间件的服务器,用于接收消息和发送消息
    
    • 1

    目的地(destination)

    通俗解释:消息代理接收到消息后会将消息继续发给目的地(生产者发送消息)
    目的地主要有两种形式:队列、主题
    
    • 1
    • 2
    队列(queue)【单播_点对点消息通信】
    点对点消息通信(point-to-point)
    1.消息发送者发送消息,消息代理将其放入一个队列中,消息接受者从队列中获取消息内容,消息读取后被移出队列
    2.队列可以被多个消费者监听,但一条消息只会被一个消费者成功消费
    
    • 1
    • 2
    • 3
    主题(topic)【广播_发布/订阅】
    发布(publish)/订阅(subscribe)消息通信
    1.发送者发送消息到主题,多个订阅者订阅该主题,多个消费者会同时收到消息
    
    • 1
    • 2

    两种规范

    JMS(JAVA消息服务)

    JMS:(Java Message Service)
    JAVA消息服务,基于JVM信息代理的规范。ActiveMQ、HornetMQ是JMS实现
    
    • 1
    • 2

    AMQP(高级消息队列协议)

    AMQP:(Advanced Message Queuing Protocol)
    高级消息队列协议,也是一个消息代理的规范,兼容JMS
    RabbitMQ是AMQP的实现
    
    • 1
    • 2
    • 3

    基于以上两种规范的分析

    支持消息类型:byte[]=》只要能支持byte[]就可以传输,例如将对象转换为json,然后转二进制流传输即可
    
    五种消息模型:重要,direct exchange其实就是队列
                后四种就是主题的变种
    
    • 1
    • 2
    • 3
    • 4

    1639405192579

    Spring支持与SpringBoot自动装配

    1639405624744

    使用@EnableRabbit开启自动配置
    
    • 1

    二、RabbitMQ安装与测试

    文档:https://www.rabbitmq.com/networking.html
    
    • 1

    docker安装

    4369,25672(Erlang发现&集群端口)
    5672,5671(AMQP端口)
    15672(web管理后台端口)
    61613,61614(STOMP协议端口)
    1883,8883(MQTT协议端口)
    https://www.rabbitmq.com/networking.html 
    
    1.下载镜像并启动
    docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
    
    2.修改为自动重启
    docker update rabbitmq --restart=always
    
    3.登录管理页面
    http://192.168.56.10:15672/
    guest
    guest
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    RabbitMQ运行机制

    1639486954176

    消息路由名词解释

    1.新增了Exchange和Binding角色
    2.消息路由:消息到达Exchange,并根据Binding关系发布到Queue队列中的过程,被称为消息路由
    
    • 1
    • 2

    Exchange类型

    direct
    fanout
    topic
    1.Exchange名词解释:是AMQP高级消息队列协议的划分
    
    2.RabbitMQ有四种Exchange类型:
    	2.1.点对点通信
    		2.1.1.direct
    			解析:直接交换机【单播模式,按照路由关系精确匹配】
    			例:路由键是dog,direct Exchange绑定了两个队列,队列1:dog、队列2:dogTom
    			使用direct Exchange只会把消息发送给队列1
    		2.1.2.headers【性能比较低,不会用到,与direct几乎一致】
    
    	2.2.发布/订阅:
    		2.2.1.fanout
    			解析:扇形【广播模式,不关心路由键】
    			例:消息会发送给fanout Exchange绑定的所有队列
    		2.2.2.topic
    			解析:主题【部分广播,区分路由键】
    			例:binding.key=usa.**处必须有一个单词
                   binding.key=#.news:#处有0个或多个单词
    			注:不能匹配字母,只能匹配单词
    
    3.
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    direct案例:

    1639487844649

    fanout案例:

    1639487913417

    Topic案例:

    1639487923953

    测试案例

    测试案例1

    http://192.168.56.10:15672/#/exchanges
    
    1.新建交换机
    Exchanges=》Add a new Exchange=》Add Exchange
    
    2.新建队列
    Queues=》Add a new Queue=》Add Queue
    
    3.在交换机表格中点击新建的交换机,绑定刚刚新建的队列
    Bindings=》Add binding from this exchange=》To queue=》Bind
    
    4.发送消息
    =》publishe Message
    
    5.获取消息
    Nack message requeue true
    Automatic ack
    Reject requeue true
    Reject requeue false
    
    6.解除binding
    =》unbind
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    1.新建交换机:

    1639488692329

    2.新建队列:

    1639488927909

    3.在交换机表格中点击新建的交换机:

    1639488787531

    1639489151587

    4.发送消息

    1639490481854

    消息发送成功:

    1639490629311

    5.获取消息
    Nack message requeue true
    不回复消息,并且消息重新入队
    
    • 1
    Automatic ack
    自动回复消息,消息不重新入队
    
    • 1
    Reject requeue true
    
    
    • 1
    Reject requeue false
    
    
    • 1
    6.解除binding

    1639489487003

    测试案例2

    1639489298287

    根据上表
    1.建立交换机:
    	exchange.direct【点对点】
    	exchange.fanout【广播,消息会发送给binding的多个队列中】
    	exchange.topic【广播,会先找到存在binding关系的队列,然后按照binding关系的路由规则与路由键进行模糊匹配】
    
    2.建立队列:
    	atguigu
    	atguigu.news
    	atguigu.emps
    	gulixueyuan.news
    
    3.创建binding关系
    	exchange.direct绑定所有队列【绑定规则是根据路由键与路由精确绑定决定消息进入哪个队列】
    	exchange.fanout绑定所有队列【消息会发送给每个队列】
    	exchange.topic绑定所有队列【并设置不同的路由键atguigu.#、*.news】
    
    4.发送消息
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    三、RabbitMQ概念

    1639407551973

    1.简介

    1639405993748

    1639406310372

    1639406273053

    1.1.Message消息

    每条消息都需要在消息头中指定route-key
    
    • 1

    1.2.publisher生产者

    1.3.Exchange交换器

    Exchange:交换器,每一个交换器都连接一个队列(可以看作局域网中的交换机的端口,每一个端口都连接一台电脑)
    
    • 1

    1.4.Queue队列

    Queue队列:用于存储生产者发送的消息
    
    • 1

    1.5.Binding绑定

    交换器与Queue之间存在绑定关系,一个交换器可以绑定多个队列,存在多种绑定关系
    根据消息头中的路由键指定的绑定关系发送到匹配的队列中
    
    • 1
    • 2

    1.6.Connection连接

    网络连接
    	生产者与Broker、消费者与Broker通过连接传输消息
    	一个客户端只会建立一条连接
    
    • 1
    • 2
    • 3

    1.7.Channel通道

    一个客户端建立一条连接,一条连接内存储多个通道用于监听不同队列
    
    • 1

    1.8.Virtual Host虚拟主机

    只需要安装一个rabbitmq,但是可以分离出多个微主机,互相之间配置隔离,使用不同的url访问
    
    1.例如dev、test可以使用不同VHost
    2.不同的系统使用不同的VHost
    
    • 1
    • 2
    • 3
    • 4

    2.消息确认(可靠消息)

    1639582496156

    2.1.可靠抵达_发送端确认

    简介:
    	1.可靠抵达:消息可靠抵达MQ,消息可靠抵达消费者
    	2.事务消息:性能下降250倍,所以采用确认机制代替
    
    文档:https://www.rabbitmq.com/confirms.html#publisher-confirms
    Reliable Delivery=》Acknowledgements and Confirms=>Publisher confirms
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    confirmCallback回调

    1639583499327

    简介:
    	1.生产者发送消息到Queue会经过两个两个过程【确认机制看做一种协议】
    		1)消息从publisher到达Broker(到达后会回调confirmCallback,消费者被告知消息是否抵达服务器)
    			【集群情况下必须所有的broker接收到才会调用confirmCallback】
    		2)消息从Exchange投递到Queue(失败后会回调returnCallback,消费者被告知消息是否抵达Queue)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    # 开启发送端确认:
    spring.rabbitmq.publisher-returns=true
    # 消息在没有被队列接收时是否强行退回
    spring.rabbitmq.template.mandatory=true
    
    • 1
    • 2
    • 3
    • 4
    // 测试步骤,调用单元测试中发送消息的方法,触发回调
    
    @Configuration
    public class MyRabbitConfig {
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        @Bean
        public MessageConverter messageConverter() {
            // 使用json序列化器来序列化消息,发送消息时,消息对象会被序列化成json格式
            return new Jackson2JsonMessageConverter();
        }
    
        /**
         * 定制RabbitTemplate
         * 1、服务收到消息就会回调
         * 1、spring.rabbitmq.publisher-confirms: true
         * 2、设置确认回调
         * 2、消息正确抵达队列就会进行回调
         * 1、spring.rabbitmq.publisher-returns: true
         * spring.rabbitmq.template.mandatory: true
         * 2、设置确认回调ReturnCallback
         * 

    * 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息) */ @PostConstruct // (MyRabbitConfig对象创建完成以后,执行这个方法) public void initRabbitTemplate() { /** * 发送消息触发confirmCallback回调 * @param correlationData:当前消息的唯一关联数据(如果发送消息时未指定此值,则回调时返回null) * @param ack:消息是否成功收到(ack=true,消息抵达Broker) * @param cause:失败的原因 */ rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { System.out.println("发送消息触发confirmCallback回调" + "\ncorrelationData ===> " + correlationData + "\nack ===> " + ack + "" + "\ncause ===> " + cause); }); } }

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    returnCallback回调
    简介:
    	1.生产者发送消息到Queue会经过两个两个过程【确认机制看做一种协议】
    		1)消息从publisher到达Broker(到达后会回调confirmCallback,消费者被告知消息是否抵达服务器)
    		2)消息从Exchange投递到Queue(失败后会回调returnCallback,消费者被告知消息是否抵达Queue)
    
    • 1
    • 2
    • 3
    • 4
    # 开启发送端抵达队列确认
    spring.rabbitmq.publisher-returns=true
    
    • 1
    • 2
    @Configuration
    public class MyRabbitConfig {
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        @Bean
        public MessageConverter messageConverter() {
            // 使用json序列化器来序列化消息,发送消息时,消息对象会被序列化成json格式
            return new Jackson2JsonMessageConverter();
        }
    
        /**
         * 定制RabbitTemplate
         * 1、服务收到消息就会回调
         * 1、spring.rabbitmq.publisher-confirms: true
         * 2、设置确认回调
         * 2、消息正确抵达队列就会进行回调
         * 1、spring.rabbitmq.publisher-returns: true
         * spring.rabbitmq.template.mandatory: true
         * 2、设置确认回调ReturnCallback
         * 

    * 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息) */ @PostConstruct // (MyRabbitConfig对象创建完成以后,执行这个方法) public void initRabbitTemplate() { /** * 消息未到达队列触发returnCallback回调 * 只要消息没有投递给指定的队列,就触发这个失败回调 * @param message:投递失败的消息详细信息 * @param replyCode:回复的状态码 * @param replyText:回复的文本内容 * @param exchange:接收消息的交换机 * @param routingKey:接收消息的路由键 */ rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { // 需要修改数据库 消息的状态【后期定期重发消息】 System.out.println("消息未到达队列触发returnCallback回调" + "\nmessage ===> " + message + "\nreplyCode ===> " + replyCode + "\nreplyText ===> " + replyText + "\nexchange ===> " + exchange + "\nroutingKey ===> " + routingKey); }); } }

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    2.2.可靠接收_消费端确认

    ack机制

    1639904298481

    简介:
    	1.消费者接收消息也会经过ack消息确认机制,只有消费者成功接收消息,broker才允许删除消息
    	2.默认情况下消息抵达客户端后自动确认,服务端消息自动删除
    	3.手动确认模式下,消费者接收消息后但是不执行ack/nack进行确认,服务端队列中的消息会从unacked状态变为ready状态等待下一次消费(即使consumer宕机消息也不会丢失)
    	4.注意:消息到达消费端,消息会进入unacked状态,如果手动确认模式下,消费者因宕机而未执行ack/nack,消息会一直处于unacked状态不会被删除(持久化了)
    
    	5.签收,ack
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    # 消费者手动确认模式,关闭自动确认,否则会消息丢失
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    
    • 1
    • 2
    @RabbitListener(queues = {"hello-java-queue"})
    @Service("orderItemService")
    public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {
    
    	@RabbitHandler
        public void revieveMessage(Message message, OrderEntity entity, Channel channel) {
            // 请求体,序列化存储(本例中已使用Jackson2JsonMessageConverter序列化器作JSON序列化存储)
            byte[] body = message.getBody();
            // 请求头
            MessageProperties properties = message.getMessageProperties();
            // channel内按顺序自增的long类型消息标签
            long deliveryTag = properties.getDeliveryTag();
            // JSON反序列得到消息内容对象
            OrderEntity reason = JSONObject.parseObject(body, OrderEntity.class);
            System.out.println("接受到的消息对象" + message);
            System.out.println("接受到的消息内容" + reason);
            System.out.println("接受到的消息内容" + entity);
            try {
                if (deliveryTag == 2) {
                    // 手动确认,消息会从unacked中删除,total数量减1
                    // boolean multiple:是否批量签收
                    channel.basicAck(deliveryTag, false);
                } else {
                    // 手动拒签
                    // boolean multiple:是否批量拒签
                    // boolean requeue:当前拒签消息是否发回服务器重新入队
                    channel.basicNack(deliveryTag, false, true);
                }
            } catch (IOException e) {
                // 网络中断
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    2.3.本地事务表

    3.延时队列(柔性事务)

    3.1.使用场景

    场景:
    	未付款订单,超时后,系统自动取消订单并释放占有物品
    
    常用解决方案:
    	Spring的Schedule定时任务轮询数据库
    缺点:
    	消耗系统内存、增加数据库的压力、存在较大的时间误差
    
    解决:
    	在保证可靠消息的前提下,使用延时队列+死信队列,达到最终一致性(柔性事务)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    3.2.TTL和死信Exchange

    TTL

    TTL(Time To Live)

    • 消息的TTL就是消息的存活时间
    • RabbitMQ中对队列、消息都可以设置TTL
      • 对队列设置TTL,就是队列没有消费者连着的保留时间;对消息设置TTL,超过了这个时间,消息就死了,称之为死信。
      • 如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间,两者是—样的效果。
    死信Exchange
    • 可以进入死信路由的情况
      • 被consumer拒收的消息,并且reject方法的参数里requeue是false(不会重新入队)
      • TTL过期的消息
      • 队列消息满了,排在前面的消息会被丢弃或进入死信路由
    死信Exchange其实就是普通的Exchange
    队列设置好自己的Dead Letter Exchange,当此队列中的消息过期后会被转发到这个路由,被称为死信路由
    
    • 1
    • 2
    延时队列
    TTL消息 + 死信Exchange
    使用一个队列接收死信Exchange中的TTL消息,这样的队列被称为延时队列
    
    注意:存放TTL消息的队列不要让客户端监听(这个队列和延时队列不是同一个,延时队列是存储已经超时的TTL消息)
    
    • 1
    • 2
    • 3
    • 4

    3.3.延时队列实现方案

    • 实现1:给队列设置TTL

      • 采用方案1,因为队列是先进先出的。如果存在一个消息已经超时但是队首的消息未超时,已超时的消息无法出队
    arguments.put("x-dead-letter-exchange", "order-event-exchange");// 死信路由
    arguments.put("x-dead-letter-routing-key", "order.release.order");// 死信路由键
    arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟
    
    • 1
    • 2
    • 3
    • 实现2:给消息设置TTL
      QQ图片20220102211522
      QQ图片20220102213900

    4.可靠消息(柔性事务)

    场景:
    	配合延时队列+死信队列实现最终一致性(柔性事务)
    
    • 1
    • 2

    创建mq_message表

    CREATE TABLE `mq_message` (
      `message_id` char(32) NOT NULL,
      `content` json,
      `to_exchane` varchar(255) DEFAULT NULL,
      `routing_key` varchar(255) DEFAULT NULL,
      `class_type` varchar(255) DEFAULT NULL,
      `message_status` int(1) DEFAULT '0' COMMENT '0-新建 1-已发送 2-错误抵达 3-已抵达',
      `create_time` datetime DEFAULT NULL,
      `update_time` datetime DEFAULT NULL,
      PRIMARY KEY (`message_id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    4.1.消息丢失

    QQ图片20220103220617.jpg

    前提:
        使用定时器扫描mq_message定时重发
    
    情况1:网络连接失败,消息未抵达Broker
        解决:发送消息时同时将消息持久化到MQ中并设定状态为已抵达
             当出现异常时在catch处修改消息状态为错误抵达
    
    情况2:消息抵达Broker,但为抵达queue,消息会丢失(只有抵达了queue消息才会持久化)
    	解决:开启生产者确认机制,将触发returnCallback的returnedMessage的消息状态修改为错误抵达
    
    情况3:消费者未ack时宕机,导致消息丢失
    	解决:开启消费者手动ack
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    4.2.消息重复

    QQ图片20220103231446.png

    情况1:业务逻辑已经执行,但是ack时宕机,消息由unack变为ready,消息重新入队
      解决:将接口设计成幂等性,例如库存解锁时判断工作单的状态,已解锁则无操作
      解决2:防重表
    
    • 1
    • 2
    • 3

    4.3.消息积压

    情况1:生产者流量太大
      解决:减慢发送消息速率(验证码、防刷、重定向、削峰)
      
    情况2:消费者能力不足或宕机
      解决:上线更多消费者
      解决2:上线专门的队列消费服务,批量取出消息入库,离线处理业务慢慢处理
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    优化方案

    可以添加一个消息服务,各模块调用发送消息API即可
    实现消息存库+异常修改状态
    
    思考:如果feign调用失败没有问题,做好本地事务,feign调用失败回滚即可
    
    • 1
    • 2
    • 3
    • 4

    5.队列削峰(高并发_秒杀)

    场景:
    	高并发秒杀模块,将秒杀成功创建订单的消息发送给MQ,订单模块按照自己的能力消费生成订单
    	这个过程就是队列削峰(不走购物车逻辑,否则秒杀的高并发流量会带给订单模块)
    
    • 1
    • 2
    • 3

    四、Springboot整合RabbitMQ

    1639491342358

    1.引入spring-boot-starter-amqp依赖

    1.订单服务中引入依赖,场景启动器,引入后RabbitAutoConfiguration自动生效
    <!--rabbitmq-->
    <dependency>
    	<groupId>org.springframework.boot</groupId>
    	<artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
    2.RabbitAutoConfiguration生效后自动注入多个容器:
    	CachingConnectionFactory:
    	RabbitTemplate:
    	AmqpAdmin:
    	RabbitMessagingTemplate:
    
    3.RabbitProperties:配置类
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    CachingConnectionFactory

    
    
    • 1

    RabbitTemplate

    
    
    • 1

    AmqpAdmin

    用于创建Exchange、Queue、Binding
    
    • 1

    RabbitMessagingTemplate

    
    
    • 1

    2.开启Rabbit

    启动类标注:(只有需要用到监听消息时才需要该注解,开启后可以使用@RabbitListener)
    @EnableRabbit
    
    • 1
    • 2

    3.配置属性

    spring:
      rabbitmq:
        host: 192.168.56.10
        port: 5672
        # 虚拟主机
        virtual-host: /
        # 开启发送端抵达队列确认【发送端确认机制+本地事务表】
        publisher-returns: true
        # 开启发送确认【发送端确认机制+本地事务表】
        publisher-confirm-type: correlated
        # 只要抵达队列,优先回调return confirm
        template:
          mandatory: true
        # 使用手动确认模式,关闭自动确认【消息丢失】
        listener:
          simple:
            acknowledge-mode: manual
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    4.配置类

    @Configuration
    public class MyRabbitConfig {
    
        @Bean
        public MessageConverter messageConverter() {
            // 使用json序列化器来序列化消息,发送消息时,消息对象会被序列化成json格式
            return new Jackson2JsonMessageConverter();
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    5.单元测试

    创建Exchange

    @Autowired
    AmqpAdmin amqpAdmin;
    
    @Test
    void createExchange() {
        // 创建交换机
        // String name, boolean durable, boolean autoDelete
        DirectExchange exchange = new DirectExchange("hello-java-exchange", true, false);
        amqpAdmin.declareExchange(exchange);
        log.info("Exchange创建[{}]成功", "hello-java-exchange");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    创建Queue

    @Autowired
    AmqpAdmin amqpAdmin;
    
    @Test
    void createQueue() {
        // 创建队列
        // String name, boolean durable, boolean exclusive, boolean autoDelete
        // exclusive:是否排他,true:只有一个连接可以使用此队列,其他连接无法连上此队列
        Queue queue = new Queue("hello-java-queue", true, false, false);
        amqpAdmin.declareQueue(queue);
        log.info("Queue创建[{}]成功", "hello-java-queue");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    创建Binding_交换机绑定

    @Autowired
    AmqpAdmin amqpAdmin;
    
    @Test
    void createBinding() {
        // 创建绑定,交换机绑定目的地
        // String destination:目的地name
        // DestinationType destinationType:目的地类型【queue或exchange(路由)】
        // String exchange:待绑定交换机
        // String routingKey:路由键
        Binding bind = new Binding("hello-java-queue", Binding.DestinationType.QUEUE,"hello-java-exchange", "hello.java", null)
        amqpAdmin.declareBinding(bind);
        log.info("Binding创建[{}]成功", "hello-java-binding");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    发送消息 1639498067759

    @Autowired
    RabbitTemplate rabbitTemplate;
    
    @Test
    void sendMsg() {
        // 如果发送的消息是个对象,会使用序列化机制,将对象写出去。对象类必须实现 serializable
        // 如果使用JSON序列化器,则不需要类实现Serializable
        // String exchange:交换机
        // String routingKey:路由键
        // final Object object:消息
        // CorrelationData correlationData:可指定消息ID
        
        // 消息对象,可以是任意类型,类必须实现serializable,消息会以序列化的方式写入流中
        OrderReturnReasonEntity message = new OrderReturnReasonEntity();// 退货原因
        message.setId(1L);
        message.setCreateTime(new Date());
        message.setName("哈哈");
        // 消息ID
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", message, correlationData);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    接收消息

    @RabbitListener
    简介:
    	1.用于标注在监听类或监听方法上,接收消息,需要指定监听的队列(数组)
    	2.使用该注解之前,需要在启动类加上该注解:@EnableRabbit
    	3.@RabbitListener即可以标注在方法上又可以标注在类上
    		标注在类上:表示该类是监听类,使得@RabbitHandler注解生效
    		标注在方法上:表示该方法时监听方法,会监听指定队列获得消息
    	4.一般只标注在方法上,并配合@RabbitHandler使用,重载的方式接收不同消息对象
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    import com.rabbitmq.client.Channel;
    
    @Service("orderItemService")
    public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {
    
        /**
         * queues:声明需要监听的队列
         * channel:当前传输数据的通道
         * 获取实际消息内容有两种方式:
         *  方式一:在方法参数列表中直接声明出来
         *  方式二:从请求体中取出消息的二进制形式,然后通过JSON反序列化即可
         */
        @RabbitListener(queues = {"hello-java-queue"})
        public void revieveMessage(Message message, OrderReturnReasonEntity entity, Channel channel) {
            // 请求体,序列化存储(本例中已使用Jackson2JsonMessageConverter序列化器作JSON序列化存储)
            byte[] body = message.getBody();
            // 请求头
            MessageProperties messageProperties = message.getMessageProperties();
            // JSON反序列得到消息内容对象
            OrderReturnReasonEntity reason = JSONObject.parseObject(body, OrderReturnReasonEntity.class);
            System.out.println("接受到的消息对象" + message);
            System.out.println("接受到的消息内容" + reason);
            System.out.println("接受到的消息内容" + entity);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    测试集群多客户端监听接收消息

    1639580605368

    简介:
    	复制一份订单配置9011
    	-Xmx100m
    	--server.port=9011
    	同时运行多个订单模块,然后测试发送多条消息
    	
    结论:
    	1.多个客户端可以共同监听同一队列
    	2.一条消息同时只能被一个客户端接收
    	3.同一个客户端接收消息是串行的,revieveMessage方法执行完后才会继续接收下一条消息
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    9010接收消息:

    1639580888252

    9011接收消息:

    1639580848532

    @RabbitHandler
    作用:
    	配合@RabbitListener,使用方法重载的方法接收不同的消息类型
    简介:
    	1.用于标注在监听方法上,接收消息,不需要指定监听的队列
    	2.使用该注解之前,需要在启动类加上该注解:@EnableRabbit
    	3.@RabbitListener只可以标注在方法,重载的方式接收不同消息对象
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    @Slf4j
    @SpringBootTest
    class GulimallOrderApplicationTests {
    
        @Autowired
    	RabbitTemplate rabbitTemplate;
    
        @Test
        void sendMsg2() {
            // 该测试案例向同一队列发送了两个不同类型的消息对象
            // 消息一:
            OrderReturnReasonEntity message1 = new OrderReturnReasonEntity();// 退货原因
            message1.setId(1L);
            message1.setCreateTime(new Date());
            message1.setName("哈哈");
            CorrelationData correlationData1 = new CorrelationData(UUID.randomUUID().toString());// 消息ID
            rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", message1, correlationData1);
    
            // 消息二:
            OrderEntity message2 = new OrderEntity();// 退货原因
            message2.setId(1L);
            message2.setCreateTime(new Date());
            message2.setOrderSn("哈哈");
            CorrelationData correlationData2 = new CorrelationData(UUID.randomUUID().toString());// 消息ID
            rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", message2, correlationData2);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    @RabbitListener(queues = {"hello-java-queue"})
    @Service("orderItemService")
    public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {
    
        /**
         * 方法重载1
         */
        @RabbitHandler
        public void revieveMessage(Message message, OrderReturnReasonEntity entity, Channel channel) {
            // 请求体,序列化存储(本例中已使用Jackson2JsonMessageConverter序列化器作JSON序列化存储)
            byte[] body = message.getBody();
            // 请求头
            MessageProperties messageProperties = message.getMessageProperties();
            // JSON反序列得到消息内容对象
            OrderReturnReasonEntity reason = JSONObject.parseObject(body, OrderReturnReasonEntity.class);
            System.out.println("接受到的消息对象" + message);
            System.out.println("接受到的消息内容" + reason);
            System.out.println("接受到的消息内容" + entity);
        }
    
        /**
         * 方法重载2
         */
        @RabbitHandler
        public void revieveMessage(Message message, OrderEntity entity, Channel channel) {
            // 请求体,序列化存储(本例中已使用Jackson2JsonMessageConverter序列化器作JSON序列化存储)
            byte[] body = message.getBody();
            // 请求头
            MessageProperties messageProperties = message.getMessageProperties();
            // JSON反序列得到消息内容对象
            OrderEntity reason = JSONObject.parseObject(body, OrderEntity.class);
            System.out.println("接受到的消息对象" + message);
            System.out.println("接受到的消息内容" + reason);
            System.out.println("接受到的消息内容" + entity);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    6.消息确认

    spring:
      rabbitmq:
        host: 192.168.56.10
        port: 5672
        # 虚拟主机
        virtual-host: /
        # 开启发送端发送确认,无论是否到达broker都会触发回调【发送端确认机制+本地事务表】
        publisher-confirm-type: correlated
        # 开启发送端抵达队列确认,消息未被队列接收时触发回调【发送端确认机制+本地事务表】
        publisher-returns: true
        # 消息在没有被队列接收时是否强行退回
        template:
          mandatory: true
        # 消费者手动确认模式,关闭自动确认,否则会消息丢失
        listener:
          simple:
            acknowledge-mode: manual
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    7.延时队列、死信队列

    QQ图片20220102220024.jpg

    描述:
    	可以共用一个exchange,指定不同的路由分别绑定延时队列和死信队列
    
    • 1
    • 2
    /**
     * 创建队列,交换机,延时队列,绑定关系 的configuration
     * 1.Broker中的Queue、Exchange、Binding不存在的情况下,会自动创建(在RabbitMQ),不会重复创建覆盖
     * 2.懒加载,只有第一次使用的时候才会创建(例如监听队列)
     */
    @Configuration
    public class MyRabbitMQConfig {
    
        /**
         * 延时队列
         */
        @Bean
        public Queue orderDelayQueue() {
            /**
             * Queue(String name,  队列名字
             *       boolean durable,  是否持久化
             *       boolean exclusive,  是否排他
             *       boolean autoDelete, 是否自动删除
             *       Map arguments) 属性【TTL、死信路由、死信路由键】
             */
            HashMap<String, Object> arguments = new HashMap<>();
            arguments.put("x-dead-letter-exchange", "order-event-exchange");// 死信路由
            arguments.put("x-dead-letter-routing-key", "order.release.order");// 死信路由键
            arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟
            return new Queue("order.delay.queue", true, false, false, arguments);
        }
    
        /**
         * 交换机(死信路由)
         */
        @Bean
        public Exchange orderEventExchange() {
            return new TopicExchange("order-event-exchange", true, false);
        }
    
        /**
         * 死信队列
         */
        @Bean
        public Queue orderReleaseQueue() {
            return new Queue("order.release.order.queue", true, false, false);
        }
    
        /**
         * 绑定:交换机与订单解锁延迟队列
         */
        @Bean
        public Binding orderCreateBinding() {
            /**
             * String destination, 目的地(队列名或者交换机名字)
             * DestinationType destinationType, 目的地类型(Queue、Exhcange)
             * String exchange,
             * String routingKey,
             * Map arguments
             **/
            return new Binding("order.delay.queue",
                    Binding.DestinationType.QUEUE,
                    "order-event-exchange",
                    "order.create.order",
                    null);
        }
    
        /**
         * 绑定:交换机与订单解锁死信队列
         */
        @Bean
        public Binding orderReleaseBinding() {
            return new Binding("order.release.order.queue",
                    Binding.DestinationType.QUEUE,
                    "order-event-exchange",
                    "order.release.order",
                    null);
        }
    
        /**
         * 绑定:交换机与库存解锁
         */
        @Bean
        public Binding orderReleaseOtherBinding() {
            return new Binding("stock.release.stock.queue",
                    Binding.DestinationType.QUEUE,
                    "order-event-exchange",
                    "order.release.other.#",
                  
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
  • 相关阅读:
    day30 SQL注入&CTF&二次&堆叠&DNS带外
    软件工程概述
    [GYCTF2020]Ezsqli 绕过or information_schema 无列名注入
    【python】正则匹配国内手机号
    java学习笔记day04
    QTabWidget 类 (选项卡部件)
    RocketMq(三)springBoot集成rocketMq
    LeetCode练习4——删除有序数组中的重复项
    小程序如何搭建在服务器上
    Python刷CSDN博客访问量的N种方法 , 亲测可用
  • 原文地址:https://blog.csdn.net/doomwatcher/article/details/127606960