• 7-RabbitMQ工作模式-Topics通配符模式


    7-RabbitMQ工作模式-Topics通配符模式

    Topics通配符模式

    1. 模式说明

    d0f7e47d799cb9475aab27f4be6f8701.png

    Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符

    Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如:item.insert

    通配符规则:

    #:匹配一个或多个词

    *:匹配不多不少恰好1个词

    举例:

    item.#:能够匹配item.insert.abc 或者 item.insert

    item.*:只能匹配item.insert

    652c8aa881c6019f558de2caea5b1939.png

    图解:

    • 红色Queue:绑定的是usa.# ,因此凡是以 usa.开头的routing key 都会被匹配到

    • 黄色Queue:绑定的是#.news ,因此凡是以 .news结尾的 routing key 都会被匹配

    2. 案例

    需求:

    • 生产者:创建 topic 类型的 Exchange,以及绑定两个队列

      也就是说 队列2 可以接收  item.insertitem.updateitem.delete 三种 routing key 消息,而 队列1 只能接收  item.insertitem.update 两种 routing key 消息。

      • 设置 队列1 绑定发送 item.insert 、item.update 两种 routing key 的消息

      • 设置 队列2 绑定发送 item.* 的通配符 routing key 的消息

    1)生产者

    使用topic类型的Exchange,发送消息的routing key有3种:item.insertitem.updateitem.delete

    df8e78f025dd7d3193d110c3f00d5aea.png
    1. package com.lijw.producer;
    2. import com.rabbitmq.client.BuiltinExchangeType;
    3. import com.rabbitmq.client.Channel;
    4. import com.rabbitmq.client.Connection;
    5. import com.rabbitmq.client.ConnectionFactory;
    6. import java.io.IOException;
    7. import java.util.concurrent.TimeoutException;
    8. /**
    9.  * @author Aron.li
    10.  * @date 2022/3/3 8:16
    11.  */
    12. public class Producer_Topic {
    13.     //交换机名称
    14.     static final String TOPIC_EXCHAGE = "topic_exchange";
    15.     //队列名称
    16.     static final String TOPIC_QUEUE_1 = "topic_queue_1";
    17.     //队列名称
    18.     static final String TOPIC_QUEUE_2 = "topic_queue_2";
    19.     public static void main(String[] args) throws IOException, TimeoutException {
    20.         //1.创建连接工厂
    21.         ConnectionFactory factory = new ConnectionFactory();
    22.         //2. 设置参数
    23.         factory.setHost("127.0.0.1"); // ip  默认值 localhost
    24.         factory.setPort(5672); //端口  默认值 5672
    25.         factory.setVirtualHost("/test"); //虚拟机 默认值 /
    26.         factory.setUsername("libai"); // 用户名 默认 guest
    27.         factory.setPassword("libai"); //密码 默认值 guest
    28.         //3. 创建连接 Connection
    29.         Connection connection = factory.newConnection();
    30.         //4. 创建Channel
    31.         Channel channel = connection.createChannel();
    32.         //5. 创建交换机
    33.         /*
    34.            exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map arguments)
    35.            参数:
    36.             1. exchange:交换机名称
    37.             2. type:交换机类型
    38.                 DIRECT("direct"):定向
    39.                 FANOUT("fanout"):扇形(广播),发送消息到每一个与之绑定队列。
    40.                 TOPIC("topic") 通配符的方式
    41.                 HEADERS("headers") 参数匹配
    42.             3. durable:是否持久化
    43.             4. autoDelete:自动删除
    44.             5. internal:内部使用。 一般false
    45.             6. arguments:参数
    46.         */
    47.         channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC, truefalsefalse, null);
    48.         // 6.声明(创建)队列
    49.         /**
    50.          * 参数1:队列名称
    51.          * 参数2:是否定义持久化队列
    52.          * 参数3:是否独占本次连接
    53.          * 参数4:是否在不使用的时候自动删除队列
    54.          * 参数5:队列其它参数
    55.          */
    56.         channel.queueDeclare(TOPIC_QUEUE_1, truefalsefalse, null);
    57.         channel.queueDeclare(TOPIC_QUEUE_2, truefalsefalse, null);
    58.         // 7. 绑定队列和交换机
    59.         /*
    60.             queueBind(String queue, String exchange, String routingKey)
    61.             参数:
    62.                 1. queue:队列名称
    63.                 2. exchange:交换机名称
    64.                 3. routingKey:路由键,绑定规则
    65.                     如果交换机的类型为fanout ,routingKey设置为""
    66.          */
    67.         // 7.1 队列1 设置 item.insert \ item.update 的 routing key
    68.         channel.queueBind(TOPIC_QUEUE_1, TOPIC_EXCHAGE, "item.insert");
    69.         channel.queueBind(TOPIC_QUEUE_1, TOPIC_EXCHAGE, "item.update");
    70.         // 7.2 队列2 设置 item.* 的 routing key
    71.         channel.queueBind(TOPIC_QUEUE_2, TOPIC_EXCHAGE, "item.*");
    72.         //8. 发送消息至交换机,由交换机分发消息
    73.         /**
    74.          * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
    75.          * 参数1:交换机名称,如果没有指定则使用默认Default Exchage
    76.          * 参数2:路由key,简单模式可以传递队列名称
    77.          * 参数3:消息其它属性
    78.          * 参数4:消息内容
    79.          */
    80.         // 发送信息
    81.         String message = "新增了商品。Topic模式;routing key 为 item.insert " ;
    82.         channel.basicPublish(TOPIC_EXCHAGE, "item.insert", null, message.getBytes());
    83.         System.out.println("已发送消息:" + message);
    84.         // 发送信息
    85.         message = "修改了商品。Topic模式;routing key 为 item.update" ;
    86.         channel.basicPublish(TOPIC_EXCHAGE, "item.update", null, message.getBytes());
    87.         System.out.println("已发送消息:" + message);
    88.         // 发送信息
    89.         message = "删除了商品。Topic模式;routing key 为 item.delete" ;
    90.         channel.basicPublish(TOPIC_EXCHAGE, "item.delete", null, message.getBytes());
    91.         System.out.println("已发送消息:" + message);
    92.         //9. 释放资源
    93.         channel.close();
    94.         connection.close();
    95.     }
    96. }

    执行如下:

    430589b34f08d9afeb91dbb6c453a529.png

    在执行完测试代码后,其实到RabbitMQ的管理后台找到Exchanges选项卡,点击 topic_exchange 的交换机,可以查看到如下的绑定:

    439af14a949ccd685c28614ee79cac03.png 85ad269e35d3c498c0b7b93649acf30c.png

    可以看到交换机与队列的绑定规则。下面我们进入队列看看接收到的消息,如下:

    e94128a6137584fbcd1edd9ae36e9b11.png

    2)消费者1

    接收两种类型的消息:新增商品和更新商品

    e23641b7e0a84ed4c338b115fd7ee873.png
    1. package com.lijw.consumer;
    2. import com.rabbitmq.client.*;
    3. import java.io.IOException;
    4. import java.util.concurrent.TimeoutException;
    5. /**
    6.  * @author Aron.li
    7.  * @date 2022/3/2 16:16
    8.  */
    9. public class Consumer_Topic1 {
    10.     //队列名称
    11.     static final String TOPIC_QUEUE_1 = "topic_queue_1";
    12.     public static void main(String[] args) throws IOException, TimeoutException {
    13.         //1.创建连接工厂
    14.         ConnectionFactory factory = new ConnectionFactory();
    15.         //2. 设置参数
    16.         factory.setHost("127.0.0.1"); // ip  默认值 localhost
    17.         factory.setPort(5672); //端口  默认值 5672
    18.         factory.setVirtualHost("/test"); //虚拟机 默认值 /
    19.         factory.setUsername("libai"); // 用户名 默认 guest
    20.         factory.setPassword("libai"); //密码 默认值 guest
    21.         //3. 创建连接 Connection
    22.         Connection connection = factory.newConnection();
    23.         //4. 创建Channel
    24.         Channel channel = connection.createChannel();
    25.         //5. 创建队列Queue
    26.         /*
    27.         queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments)
    28.         参数:
    29.             1. queue:队列名称
    30.             2. durable:是否持久化,当mq重启之后,还在
    31.             3. exclusive:
    32.                 * 是否独占。只能有一个消费者监听这队列
    33.                 * 当Connection关闭时,是否删除队列
    34.             4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
    35.             5. arguments:参数。
    36.          */
    37.         channel.queueDeclare(TOPIC_QUEUE_1, truefalsefalse, null);
    38.         /*
    39.         basicConsume(String queue, boolean autoAck, Consumer callback)
    40.         参数:
    41.             1. queue:队列名称
    42.             2. autoAck:是否自动确认
    43.             3. callback:回调对象
    44.          */
    45.         // 接收消息
    46.         Consumer consumer = new DefaultConsumer(channel){
    47.             /*
    48.                 回调方法,当收到消息后,会自动执行该方法
    49.                 1. consumerTag:标识
    50.                 2. envelope:获取一些信息,交换机,路由key...
    51.                 3. properties:配置信息
    52.                 4. body:数据
    53.              */
    54.             @Override
    55.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    56.                 System.out.println("接收队列的数据 body: " + new String(body));
    57.             }
    58.         };
    59.         channel.basicConsume(TOPIC_QUEUE_1,true,consumer);
    60.         //不需要关闭资源,因为消费者需要持续监听队列信息
    61.     }
    62. }

    3)消费者2

    接收所有类型的消息:新增商品,更新商品和删除商品。

    ee3dd5627a12c9458e001e5510e2a41a.png
    1. package com.lijw.consumer;
    2. import com.rabbitmq.client.*;
    3. import java.io.IOException;
    4. import java.util.concurrent.TimeoutException;
    5. /**
    6.  * @author Aron.li
    7.  * @date 2022/3/2 16:16
    8.  */
    9. public class Consumer_Topic2 {
    10.     //队列名称
    11.     static final String TOPIC_QUEUE_2 = "topic_queue_2";
    12.     public static void main(String[] args) throws IOException, TimeoutException {
    13.         //1.创建连接工厂
    14.         ConnectionFactory factory = new ConnectionFactory();
    15.         //2. 设置参数
    16.         factory.setHost("127.0.0.1"); // ip  默认值 localhost
    17.         factory.setPort(5672); //端口  默认值 5672
    18.         factory.setVirtualHost("/test"); //虚拟机 默认值 /
    19.         factory.setUsername("libai"); // 用户名 默认 guest
    20.         factory.setPassword("libai"); //密码 默认值 guest
    21.         //3. 创建连接 Connection
    22.         Connection connection = factory.newConnection();
    23.         //4. 创建Channel
    24.         Channel channel = connection.createChannel();
    25.         //5. 创建队列Queue
    26.         /*
    27.         queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments)
    28.         参数:
    29.             1. queue:队列名称
    30.             2. durable:是否持久化,当mq重启之后,还在
    31.             3. exclusive:
    32.                 * 是否独占。只能有一个消费者监听这队列
    33.                 * 当Connection关闭时,是否删除队列
    34.             4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
    35.             5. arguments:参数。
    36.          */
    37.         channel.queueDeclare(TOPIC_QUEUE_2, truefalsefalse, null);
    38.         /*
    39.         basicConsume(String queue, boolean autoAck, Consumer callback)
    40.         参数:
    41.             1. queue:队列名称
    42.             2. autoAck:是否自动确认
    43.             3. callback:回调对象
    44.          */
    45.         // 接收消息
    46.         Consumer consumer = new DefaultConsumer(channel){
    47.             /*
    48.                 回调方法,当收到消息后,会自动执行该方法
    49.                 1. consumerTag:标识
    50.                 2. envelope:获取一些信息,交换机,路由key...
    51.                 3. properties:配置信息
    52.                 4. body:数据
    53.              */
    54.             @Override
    55.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    56.                 System.out.println("接收队列的数据 body: " + new String(body));
    57.             }
    58.         };
    59.         channel.basicConsume(TOPIC_QUEUE_2,true,consumer);
    60.         //不需要关闭资源,因为消费者需要持续监听队列信息
    61.     }
    62. }

    3. 测试

    启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达按照需要接收的效果;并且这些routing key可以使用通配符。

    • 消费者1:只可以接收 item.insert 和 item.update 消息

    0fc9e93280d1e65043ea9289560d2671.png
    • 消费者2:可以接收 item.* 所有通配的消息

    68ec18d844cbf7ee44b975665d9f8546.png

    4. 小结

    Topic主题模式可以实现 Publish/Subscribe发布与订阅模式 Routing路由模式 的功能;只是Topic在配置routing key 的时候可以使用通配符,显得更加灵活。

  • 相关阅读:
    Lianwei 安全周报|2024.06.17
    ROS 笔记(06)— 话题消息的定义和使用
    idea菜单栏任务栏放缩比例修改
    GBASE 8s onconfig文件格式
    C++:多态
    使用cephadm部署单节点ceph集群,后期可扩容(基于官方文档,靠谱,读起来舒服)
    第六章 数据流建模
    【DevOps】Docker 容器及其常用命令
    多行省略号
    【视频加水印】Video Watermark Pro视频添加动态水印(附工具下载地址)
  • 原文地址:https://blog.csdn.net/u012887259/article/details/126239526