• RabbitMq安装和Springboot整合


    一、安装

    docker run -d --name rabbitmq \
    -p 5671:5671 -p 5672:5672 -p 4369:4369 \
    -p 25672:25672 -p 15671:15671 -p 15672:15672 \
    rabbitmq:management
    
    • 1
    • 2
    • 3
    • 4

    4369, 25672 (Erlang发现&集群端口) 5672, 5671 (AMQP端口) 15672 (web管理后台端口)
    61613, 61614 (STOMP协议端口) 1883, 8883 (MQTT协议端口)
    https://www.rabbitmq.com/networking.html

    二、Exchange 类型

    1. Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、 fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键, headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接 看另外三种类型:

    Direct Exchange

    1. 消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器 就将消息发到对应的队列中。路由键与队 列名完全匹配,如果一个队列绑定到交换 机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发 “dog.puppy”,也不会转发“dog.guard” 等等。它是完全匹配、单播的模式。

    Fanout Exchange

    1. 每个发到 fanout 类型交换器的消息都 会分到所有绑定的队列上去。fanout 交 换器不处理路由键,只是简单的将队列 绑定到交换器上,每个发送到交换器的 消息都会被转发到与该交换器绑定的所 有队列上。很像子网广播,每台子网内 的主机都获得了一份复制的消息。 fanout 类型转发消息是最快的。

    Topic Exchange

    1. topic 交换器通过模式匹配分配消息的 路由键属性,将路由键和某个模式进行 匹配,此时队列需要绑定到一个模式上。 它将路由键和绑定键的字符串切分成单 词,这些单词之间用点隔开。它同样也 会识别两个通配符:符号“#”和符号 “*”。#匹配0个或多个单词,*匹配一 个单词。

    三、Springboot整合

    /**
     * 1使用RabbitMQ
     * 2引入amqp场景,RabbitAutoConfiguration生效
     * 3 容器中自动配置了
     *      RabbitTemplate AmqpAdmin RabbitMessagingTemplate CachingConnectionFactory
     *      所有的属性在@ConfigurationProperties(prefix = "spring.rabbitmq")配置
     * 4 @EnableRabbit 开启功能
     * 5 @RabbitListener 监听消息,(必须要开启@EnableRabbit),类+方法上
     * 6 @RabbitHandler 标在方法上(重载)
     * @author MrChen
     * @create 2022-06-05 19:25
     */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    1. 引入 spring-boot-starter-amqp

     <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
     </dependency>
    
    • 1
    • 2
    • 3
    • 4

    2. application.yml配置

    spring:
      rabbitmq:
        host: 192.168.182.130
        port: 5672
        virtual-host: /
    
    • 1
    • 2
    • 3
    • 4
    • 5

    3. 测试RabbitMQ

    1. AmqpAdmin:管理组件
    2. RabbitTemplate:消息发送处理组件
    3. @RabbitListener 监听消息的方法可以有三种参数(不分数量,顺序)

    创建exchange、queue、和绑定、生产者发送消息

    方式1(推荐)

    @Configuration
    public class MyMQBuildConf {
        /**
         * 延时接收队列
         * @return
         */
        @Bean
        public Queue orderDelayQueue(){
            HashMap<String, Object> arguments = new HashMap<>();
            arguments.put("x-dead-letter-exchange", "order-event-exchange");
            arguments.put("x-dead-letter-routing-key", "order.release.order");
            arguments.put("x-message-ttl", 15000);//2分钟发送给死信队列
            /**
             * Construct a new queue, given a name, durability flag, and auto-delete flag, and arguments.
             * @param name the name of the queue - must not be null; set to "" to have the broker generate the name.
             * @param durable true if we are declaring a durable queue (the queue will survive a server restart)
             * @param exclusive true if we are declaring an exclusive queue (the queue will only be used by the declarer's
             * connection)
             * @param autoDelete true if the server should delete the queue when it is no longer in use
             * @param arguments the arguments used to declare the queue
             */
            return new Queue("order.delay.queue", true, false,false, arguments);
        }
    
        /**
         * 接收队列
         * @return
         */
        @Bean
        public Queue orderReleaseOrderQueue(){
            return new Queue("order.release.order.queue", true,false,false,null);
        }
    
        /**
         * top交换机
         * @return
         */
        @Bean
        public Exchange orderEventExchange(){
            return new TopicExchange("order-event-exchange", true,false);
        }
    
        /**
         * 绑定队列order.delay.queue和交换机order-event-exchange
         * @return
         */
        @Bean
        public Binding orderCreateOrderBind(){
            return new Binding("order.delay.queue",
                    Binding.DestinationType.QUEUE,
                    "order-event-exchange",
                    "order.create.order",
                    null);
        }
    
        /**
         * 绑定队列order.release.order.queue和交换机order-event-exchange
         * @return
         */
        @Bean
        public Binding orderReleaseOrderBind(){
            return new Binding("order.release.order.queue",
                    Binding.DestinationType.QUEUE,
                    "order-event-exchange",
                    "order.release.order",
                    null);
        }
    
        /**
         *
         * 订单交换机和库存延时队列进行绑定,路由key是order.release.other.#
         * @return
         */
        @Bean
        public Binding orderReleaseOtherBind(){
            return new Binding("stock.release.stock.queue",
                    Binding.DestinationType.QUEUE,
                    "order-event-exchange",
                    "order.release.other.#",
                    null);
        }
    
        @Bean
        public Queue orderSeckillOrderQueue(){
            return new Queue("order.seckill.order.queue",true,false,false,null);
        }
    
        //order.seckill.order
        @Bean
        public Binding orderSeckillOrderBind(){
            return new Binding("order.seckill.order.queue",
                    Binding.DestinationType.QUEUE,
                    "order-event-exchange",
                    "order.seckill.order",
                    null);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97

    方式 2 (不推荐)

    @Slf4j
    @SpringBootTest
    @RunWith(SpringRunner.class)
    public class OrderTest {
        @Autowired
        RabbitTemplate rabbitTemplate;
        @Autowired
        AmqpAdmin amqpAdmin;
        /**
         * 1 创建Exchange Queue Binding
         *      1) 使用amqp进行创建exchange
         *      2)创建一个队列queue
         * 2 如何收发消息
         */
        @Test
        public void createExchange(){
            amqpAdmin.declareExchange(new DirectExchange("hello-java-exchange",true,false));
            log.info("exchange created[{}]", "hello-java-exchange");
    
        }
        @Test
        public void createQueue(){
            // public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map arguments)
            //@param exclusive true if we are declaring an exclusive queue (the queue will only be used by the declarer's connection)
            amqpAdmin.declareQueue(new Queue("hello-java-queue",true,false,true));
            log.info("Queue created[{}]", "hello-java-queue");
    
        }
        /**
         * destination;目的地
         * destinationType: 目的类型
         * exchange: 交换机
         * routingKey: 路由键
         * arguments: 参数
         */
        @Test
        public void createBindRelation(){
            //public Binding(String destination, DestinationType destinationType, String exchange, String routingKey,Map arguments)
            //将exchange和destination绑定,使用routingKey路由键
            amqpAdmin.declareBinding(new Binding("hello-java-queue", Binding.DestinationType.QUEUE,"hello-java-exchange","hello.java",null));
            log.info("BindRelation created[{}]", "hello-java-bind");
    
        }
        @Test
        public void send(){
            for (int i = 0; i < 10; i++) {
                if(i%2==0){
                    OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity();
                    orderReturnReasonEntity.setId(1L);
                    orderReturnReasonEntity.setCreateTime(new Date());
                    orderReturnReasonEntity.setSort(1);
                    orderReturnReasonEntity.setStatus(1);
                    orderReturnReasonEntity.setName("order-test:"+i);
                    rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",orderReturnReasonEntity);
                }else{
                    OrderEntity orderEntity = new OrderEntity();
                    rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",orderEntity);
                }
                log.info("send[{}]", "send success");
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    消费者接收消息

    /**
     * 
     * 用在类+方法上 @RabbitListener(queues = {"hello-java-queue"} )监听多个对列,(前提必须要开启@EnableRabbit),
     */
    @RabbitListener(queues = {"hello-java-queue"} )
    @Service("orderItemService")
    public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {
        /**
         *
         * @param message
         * @param content 原生消息
         * @param channel 传输数据的通道
         * Queue 可以很多人监听,只要收到消息,队列删除消息,而且只能有一个收到此消息
         *                 1) 订单服务启动多个,同一个消息,只能一个客户端收到
         *                2) 只有一个消息处理完,方法运行结束,就可以接收到下一个消息
         */
    //    @RabbitListener(queues = {"hello-java-queue"} )	
    	/**
    	*@RabbitHandler 标在方法上(重载)
    	/
        @RabbitHandler
        public void receiveMessage(Message message,
                                   OrderReturnReasonEntity content,
                                   Channel channel){
            //{"id":1,"name":"order-test","sort":1,"status":1,"createTime":1661780724613}获取消息体
            byte[] body = message.getBody();
            MessageProperties messageProperties = message.getMessageProperties();
            System.out.println("接收消息:"+"   body:"+content);
        }
        /**
    	*@RabbitHandler 标在方法上(重载)
    	/
        @RabbitHandler
        public void receiveMessage2(OrderEntity content,
                                    Channel channel){
            System.out.println("接收消息2:"+"   body:"+content);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    四、发送端确认机制(可靠抵达)

    yml

    yml:spring.rabbitmq.publisher-confirms=true
    spring.rabbitmq.publisher-returns=true
    spring.rabbitmq.template.mandatory=true

    写配置类config

    @Configuration
    public class RabbitConfirmConfig {
        @Autowired
        private RabbitTemplate rabbitTemplate;
        
        //json序列化,这个配置最主要
        @Bean
        MessageConverter messageConverter(){
            return new Jackson2JsonMessageConverter();
        }
        //回调消息配置
        @PostConstruct  //RabbitConfirmConfig 对象创建完成后,执行这个方法
        public void initRabbitMQ(){
        	/**
        	 *消息只要成功发送到队列中,无论是否有cline接收都会调用这个回调函数
        	 *可靠抵达-ConfirmCallback
        	 */
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    System.out.println("confirtm:"+correlationData+" ack="+ack+" cause="+cause);
                }
            });
            /**
             * 消息没有投递队列,就触发这个失败回调
             * 可靠抵达-ReturnCallback
             */
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                @Override
                public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                    System.out.println("message:"+message+" replyCode="+replyCode+" replyText="+replyText+" exchange:"+exchange+" routingKey:"+routingKey);
                }
            });
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    五、消费端的消息确认机制

    1.•消费者获取到消息,成功处理,可以回复Ack给Broker
    1.1•basic.ack用于肯定确认;broker将移除此消息
    1.2•basic.nack用于否定确认;可以指定broker是否丢弃此消息,可以批量
    1.3•basic.reject用于否定确认;同上,但不能批量
    2•默认自动ack,消息被消费者收到,就会从broker的queue中移除
    3•queue无消费者,消息依然会被存储,直到消费者消费
    4•消费者收到消息,默认会自动ack。但是如果无法确定此消息是否被处理完成, 或者成功处理。我们可以开启手动ack模式
    4.1•消息处理成功,ack(),接受下一个消息,此消息broker就会移除
    4.2•消息处理失败,nack()/reject(),重新发送给其他人进行处理,或者容错处理后ack
    4.2•消息一直没有调用ack/nack方法,broker认为此消息正在被处理,不会投递给别人,此时客户 端断开,消息不会被broker移除,会投递给别人

    yml:spring.rabbitmq.listener.simple.acknowledge-mode=manual #手动确认接收消息
    在监听消息的业务中设置channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

    @RabbitListener(queues = {"hello-java-queue"} )
    public class OrderItemService{
    	@RabbitHandler
        public void receiveMessage(Message message,
                                   OrderReturnReasonEntity content,
                                   Channel channel) {
            System.out.println("接收消息:"+"   body:"+content);
            //消息头属性
            MessageProperties messageProperties = message.getMessageProperties();
            /**
             * 手动签收,非批量签收
             * channel内DeliveryTag按顺序自增
             */
            try {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    六、集群搭建

    普通模式(默认)

    消息只存在一个集群中的一个节点,对消费者来说,若消息存在A节点的Queue中,当从B节点拉去时,消息要从A中取出,经过B发送给消费者。缺点是,当A节点出现宕机后,消息会丢失,B就无法拉取消息了。一般适用于消息无持久化的场合,如日志队列

    镜像模式(建议)

    消息实体是主动在节点间同步,而不是在拉去数据时临时拉去,高可用场景,如下单,库存队列

    安装集群

    创建目录

     mkdir /mydata/rabbitmq
     cd /mydata/rabbitmq
     mkdir rabbitmq01 rabbitmq02 rabbitmq03
    
    • 1
    • 2
    • 3

    安装
    都是映射15762和5762
    cluster为自己自定义的名字
    RABBITMQ_ERLANG_COOKIE='cluster'

    #master01
    docker run -d --hostname rabbitmq01 --name rabbitmq01 \
    -v /mydata/rabbitmq/rabbitmq01:/var/lib/rabbitmq \
    -p 15673:15672 -p 5763:5762 \
    -e RABBITMQ_ERLANG_COOKIE='cluster'rabbitmq:management
    
    #slaver02
    docker run -d --hostname rabbitmq02 --name rabbitmq02 \
    -v /mydata/rabbitmq/rabbitmq02:/var/lib/rabbitmq \
    -p 15674:15672 -p 5764:5762 \
    -e RABBITMQ_ERLANG_COOKIE='cluster' \
    --link rabbitmq01:rabbitmq01 rabbitmq:management
    
    #slaver03
    docker run -d --hostname rabbitmq03 --name rabbitmq03 \
    -v /mydata/rabbitmq/rabbitmq02:/var/lib/rabbitmq \
    -p 15675:15672 -p 5765:5762 \
    -e RABBITMQ_ERLANG_COOKIE='cluster' \
    --link rabbitmq01:rabbitmq01 \
    --link rabbitmq02:rabbitmq02 rabbitmq:management
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    初始化

    #rabbitmq01 
    docker exec -it rabbitmq01 /bin/bash
     rabbitmqctl stop_app
     rabbitmqctl reset
     rabbitmqctl  start_app
    
    #rabbitmq02
    docker exec -it rabbitmq02 /bin/bash
    rabbitmqctl stop_app
    rabbitmqctl reset
    rabbitmqctl join_cluster --ram rabbit@rabbitmq01
    rabbitmqctl  start_app
    
    #rabbitmq03
    docker exec -it rabbitmq03 /bin/bash
    rabbitmqctl stop_app
    rabbitmqctl reset
    rabbitmqctl join_cluster --ram rabbit@rabbitmq01
    rabbitmqctl  start_app
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    同步镜像

    #进入容器
    docker exec -it rabbitmq01 /bin/bash
    #高可用策略
    rabbitmqctl  set_policy -p / ha "^" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
    #查询当前虚拟主机策略
    rabbitmqctl list_policies -p /
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    安装完成,登录192.168.182.130:15673查看有3个节点了

  • 相关阅读:
    VR全景展示的功能有哪些?你了解多少?
    Linux之奇怪的知识---supervisor超级守护进程的意义和使用方法
    小样本学习跨域(Cross-Domain)问题总结
    百度文心一率先言向全社会开放 应用商店搜“文心一言”可直接下载
    【翻译】一种减小运动伪影的新方法基于AS-LMS自适应滤波器的PPG信号
    集群数据库系统的配置及安装过程
    Git的一些常用概念与操作方法分享
    Springboot毕业设计毕设作品,微信垃圾分类小程序系统设计与实现
    PG大小版本升级步骤
    黑猫带你学UFS协议栈第2篇:DME reset详解
  • 原文地址:https://blog.csdn.net/weixin_45031570/article/details/126591486