• RabbitMQ(六)仲裁队列、流式队列、异地容灾(联邦队列Federation Queue)


    仲裁队列

    https://www.rabbitmq.com/docs/quorum-queues

    1、创建交换机

    和仲裁队列绑定的交换机没有特殊,我们还是创建一个direct交换机即可
    交换机名称:exchange.quorum.test
    在这里插入图片描述

    2、创建仲裁队列

    队列名称:queue.quorum.test
    经典队列类型为Classic
    在这里插入图片描述
    点击队列名称查看详细信息
    在这里插入图片描述
    绑定交换机
    在这里插入图片描述

    3、验证

    // 测试是否成功绑定
    @SpringBootTest
    public class RabbitMQTest {
        public static final String EXCHANGE_QUORUM_TEST = "exchange.quorum.test";
        public static final String ROUTING_KEY_QUORUM_TEST = "routing.key.quorum.test";
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void testSendMessageToQuorum() {
            rabbitTemplate.convertAndSend(EXCHANGE_QUORUM_TEST, ROUTING_KEY_QUORUM_TEST, "message test quorum ~~~ @@@");
        }
    }
    

    在这里插入图片描述
    基础配置参考

    @Component
    @Slf4j
    public class MyProcessor {
    
        public static final String QUEUE_QUORUM_TEST = "queue.quorum.test";
        
        @RabbitListener(queues = {QUEUE_QUORUM_TEST})
        public void quorumMessageProcess(String data, Message message, Channel channel) throws IOException {
            log.info("消费端:" + data);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    
    }
    

    主节点宕机不影响消息发送和接收

    流式队列(不推荐,Kafka主场)

    概念

    在这里插入图片描述

    • 从客户端支持角度来说,生态尚不健全
    • 从使用习惯角度来说,和原有队列用法不完全兼容
    • 从竞品角度来说,像Kafka但远远比不上Kafka
    • 从应用场景角度来说:
    • 经典队列:适用于系统内部异步通信场景
    • 流式队列:适用于系统间跨平台、大流量、实时计算场景(Kafka主场)
    • 使用建议:Stream队列在目前企业实际应用非常少,真有特定场景需要使用肯定会倾向于使用Kafka,而不是RabbitMQ Stream
    • 未来展望:Classic Queue已经有和Quorum Queue合二为一的趋势,Stream也有加入进来整合成一种队列的趋势,但Stream内部机制决定这很

    异地容灾

    Disaster Recovery at a Different Location

    • Federation插件
    • Shovel插件

    概述:是通过配置联邦交换机拉取上游节点消息,关于联邦交换机的配置都在下游节点配置upstream,上游节点为常规配置

    一、Federation插件

    官方链接:
    Federation Plugin:https://www.rabbitmq.com/docs/federation
    Federated-exchanges:https://www.rabbitmq.com/docs/federated-exchanges

    概述

    Federation插件的设计目标是使RabbitMQ在不同的Broker节点之间进行消息传递而无须建立集群。

    它可以在不同的管理域中的Broker或集群间传递消息,这些管理域可能设置了不同的用户和vhost,也可能运行在不同版本的RabbitMQ和Erlang上。Federation基于AMQP 0-9-1协议在不同的Broker之间进行通信,并且设计成能够容忍不稳定的网络连接情况。
    在这里插入图片描述

    二、Federation交换机

    1、总体说明

    • 各节点操作:启用联邦插件
    • 下游操作:
      • 添加上游连接端点
      • 创建控制策略

    2、准备工作

    为了执行相关测试,我们使用Docker创建两个RabbitMQ实例。

    特别提示:由于Federation机制的最大特点就是跨集群同步数据,所以这两个Docker容器中的RabbitMQ实例不加入集群!!!是两个独立的broker实例

    docker run -d \
    --name rabbitmq-shenzhen \
    -p 51000:5672 \
    -p 52000:15672 \
    -v rabbitmq-plugin:/plugins \
    -e RABBITMQ_DEFAULT_USER=guest \
    -e RABBITMQ_DEFAULT_PASS=123456 \
    rabbitmq:3.13-management
    
    docker run -d \
    --name rabbitmq-shanghai \
    -p 61000:5672 \
    -p 62000:15672 \
    -v rabbitmq-plugin:/plugins \
    -e RABBITMQ_DEFAULT_USER=guest \
    -e RABBITMQ_DEFAULT_PASS=123456 \
    rabbitmq:3.13-management
    

    使用

    docker ps
    

    在这里插入图片描述

    3、启用联邦插件

    在上游、下游节点中都需要开启。

    Docker容器中的RabbitMQ已经开启了rabbitmq_federation,还需要开启rabbitmq_federation_management

    rabbitmq-plugins enable rabbitmq_federation
    rabbitmq-plugins enable rabbitmq_federation_management
    

    rabbitmq_federation_management插件启用后会在Management UI的Admin选项卡下看到:
    在这里插入图片描述

    4、添加上游连接端点

    在下游节点填写上游节点的连接信息:

    # 
    
    
    # 
    amqp://guest:123456@192.168.217.134:51000
    

    在这里插入图片描述
    添加后
    在这里插入图片描述

    5、创建控制策略

    在这里插入图片描述
    在这里插入图片描述

    6、测试

    测试计划

    特别提示

    • 普通交换机和联邦交换机名称要一致
    • 交换机名称要能够和策略正则表达式匹配上
    • 发送消息时,两边使用的路由键也要一致
    • 队列名称不要求一致
      在这里插入图片描述

    创建组件

    交换机名称和路由建键相同,队列名称可不同

    所在机房交换机名称路由键队列名称
    深圳机房(上游)federated.exchange.demorouting.key.demo.testqueue.normal.shenzhen
    上海机房(下游)federated.exchange.demorouting.key.demo.testqueue.normal.shanghai

    创建组件后可以查看一下联邦状态,连接成功的联邦状态如下:
    查看状态
    在这里插入图片描述

    发布消息执行测试

    在上游节点向交换机发布消息:
    在这里插入图片描述
    上游
    在这里插入图片描述
    下游在这里插入图片描述

    三、Federation队列

    1、总体说明

    Federation队列和Federation交换机的最核心区别就是:

    • Federation Police作用在交换机上,就是Federation交换机
    • Federation Police作用在队列上,就是Federation队列

    2、创建控制策略

    queue.federation
    ^fed.queue.

    3、测试

    测试计划

    上游节点和下游节点中队列名称是相同的,只是下游队列中的节点附加了联邦策略而已

    所在机房交换机路由键队列
    深圳机房(上游)exchange.normal.shenzhenrouting.key.normal.shenzhenfed.queue.demo
    上海机房(下游)————fed.queue.demo

    创建组件

    上游节点都是常规操作,此处省略。重点需要关注的是下游节点的联邦队列创建时需要指定相关参数:

    创建组件后可以查看一下联邦状态,连接成功的联邦状态如下:
    在这里插入图片描述

    执行测试

    在上游节点向交换机发布消息:
    在这里插入图片描述
    注意:
    但此时发现下游节点中联邦队列并没有接收到消息,这是为什么呢?这里就体现出了联邦队列和联邦交换机工作逻辑的区别。

    对联邦队列来说,如果没有监听联邦队列的消费端程序,它是不会到上游去拉取消息的!

    如果有消费端监听联邦队列,那么首先消费联邦队列自身的消息;如果联邦队列为空,这时候才会到上游队列节点中拉取消息。

    所以现在的测试效果需要消费端程序配合才能看到:
    在这里插入图片描述
    消费端配置
    application.yml

    spring:
      rabbitmq:
        host: 192.168.217.134
        port: 61000
        username: guest
        password: 123456
        virtual-host: /
        listener:
          simple:
            acknowledge-mode: manual # 把消息确认模式改为手动确认
    

    消费端

    @Component
    @Slf4j
    public class MyMessageListener {
    
        public static final String QUEUE_NAME = "fed.queue.demo";
    
        @RabbitListener(queues = {QUEUE_NAME})
        public void processMessage(String dataString, Message message, Channel channel) throws IOException {
            log.info("[Federation]" + dataString);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    
    }
    
  • 相关阅读:
    wgcna 官网教程II.Consensus analysis of female and male liver expression data
    02 【基础篇-vim编辑器 网络配置 远程登录】
    【Linux】权限不足的情况下在指定环境运行command
    前端埋点方式
    java中的IO流之序列化与反序列化(对象数据和文件的读写交互)
    Systemverilog中使用interface连接testbench和dut的端口
    二十、商城 - 商家入驻审核-BCrypt 加密算法(8)
    MongoDB自学笔记(一)
    【漏洞分析】Reflection Token 反射型代币攻击事件通用分析思路
    [CTF] python的pip源更改及常用python库
  • 原文地址:https://blog.csdn.net/qq_45742250/article/details/139586937