Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如:item.insert
通配符规则:
#:匹配一个或多个词
*:匹配不多不少恰好1个词
举例:
item.#:能够匹配item.insert.abc 或者 item.insert
item.*:只能匹配item.insert
图解:
红色Queue:绑定的是usa.# ,因此凡是以 usa.开头的routing key 都会被匹配到
黄色Queue:绑定的是#.news ,因此凡是以 .news结尾的 routing key 都会被匹配
需求:
生产者:创建 topic 类型的 Exchange,以及绑定两个队列
也就是说 队列2 可以接收 item.insert、item.update、item.delete 三种 routing key 消息,而 队列1 只能接收 item.insert、item.update 两种 routing key 消息。
设置 队列1 绑定发送 item.insert 、item.update 两种 routing key 的消息
设置 队列2 绑定发送 item.* 的通配符 routing key 的消息
1)生产者
使用topic类型的Exchange,发送消息的routing key有3种:item.insert、item.update、item.delete:
- package com.lijw.producer;
-
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- /**
- * @author Aron.li
- * @date 2022/3/3 8:16
- */
- public class Producer_Topic {
-
- //交换机名称
- static final String TOPIC_EXCHAGE = "topic_exchange";
- //队列名称
- static final String TOPIC_QUEUE_1 = "topic_queue_1";
- //队列名称
- static final String TOPIC_QUEUE_2 = "topic_queue_2";
-
- public static void main(String[] args) throws IOException, TimeoutException {
-
- //1.创建连接工厂
- ConnectionFactory factory = new ConnectionFactory();
-
- //2. 设置参数
- factory.setHost("127.0.0.1"); // ip 默认值 localhost
- factory.setPort(5672); //端口 默认值 5672
- factory.setVirtualHost("/test"); //虚拟机 默认值 /
- factory.setUsername("libai"); // 用户名 默认 guest
- factory.setPassword("libai"); //密码 默认值 guest
-
- //3. 创建连接 Connection
- Connection connection = factory.newConnection();
-
- //4. 创建Channel
- Channel channel = connection.createChannel();
-
- //5. 创建交换机
- /*
- exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map
arguments) - 参数:
- 1. exchange:交换机名称
- 2. type:交换机类型
- DIRECT("direct"):定向
- FANOUT("fanout"):扇形(广播),发送消息到每一个与之绑定队列。
- TOPIC("topic") 通配符的方式
- HEADERS("headers") 参数匹配
- 3. durable:是否持久化
- 4. autoDelete:自动删除
- 5. internal:内部使用。 一般false
- 6. arguments:参数
- */
- channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC, true, false, false, null);
-
- // 6.声明(创建)队列
- /**
- * 参数1:队列名称
- * 参数2:是否定义持久化队列
- * 参数3:是否独占本次连接
- * 参数4:是否在不使用的时候自动删除队列
- * 参数5:队列其它参数
- */
- channel.queueDeclare(TOPIC_QUEUE_1, true, false, false, null);
- channel.queueDeclare(TOPIC_QUEUE_2, true, false, false, null);
-
- // 7. 绑定队列和交换机
- /*
- queueBind(String queue, String exchange, String routingKey)
- 参数:
- 1. queue:队列名称
- 2. exchange:交换机名称
- 3. routingKey:路由键,绑定规则
- 如果交换机的类型为fanout ,routingKey设置为""
- */
- // 7.1 队列1 设置 item.insert \ item.update 的 routing key
- channel.queueBind(TOPIC_QUEUE_1, TOPIC_EXCHAGE, "item.insert");
- channel.queueBind(TOPIC_QUEUE_1, TOPIC_EXCHAGE, "item.update");
- // 7.2 队列2 设置 item.* 的 routing key
- channel.queueBind(TOPIC_QUEUE_2, TOPIC_EXCHAGE, "item.*");
-
- //8. 发送消息至交换机,由交换机分发消息
- /**
- * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
- * 参数1:交换机名称,如果没有指定则使用默认Default Exchage
- * 参数2:路由key,简单模式可以传递队列名称
- * 参数3:消息其它属性
- * 参数4:消息内容
- */
- // 发送信息
- String message = "新增了商品。Topic模式;routing key 为 item.insert " ;
- channel.basicPublish(TOPIC_EXCHAGE, "item.insert", null, message.getBytes());
- System.out.println("已发送消息:" + message);
-
- // 发送信息
- message = "修改了商品。Topic模式;routing key 为 item.update" ;
- channel.basicPublish(TOPIC_EXCHAGE, "item.update", null, message.getBytes());
- System.out.println("已发送消息:" + message);
-
- // 发送信息
- message = "删除了商品。Topic模式;routing key 为 item.delete" ;
- channel.basicPublish(TOPIC_EXCHAGE, "item.delete", null, message.getBytes());
- System.out.println("已发送消息:" + message);
-
- //9. 释放资源
- channel.close();
- connection.close();
-
- }
- }
执行如下:
在执行完测试代码后,其实到RabbitMQ的管理后台找到Exchanges选项卡,点击 topic_exchange 的交换机,可以查看到如下的绑定:
可以看到交换机与队列的绑定规则。下面我们进入队列看看接收到的消息,如下:
2)消费者1
接收两种类型的消息:新增商品和更新商品
- package com.lijw.consumer;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- /**
- * @author Aron.li
- * @date 2022/3/2 16:16
- */
- public class Consumer_Topic1 {
-
- //队列名称
- static final String TOPIC_QUEUE_1 = "topic_queue_1";
-
- public static void main(String[] args) throws IOException, TimeoutException {
- //1.创建连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- //2. 设置参数
- factory.setHost("127.0.0.1"); // ip 默认值 localhost
- factory.setPort(5672); //端口 默认值 5672
- factory.setVirtualHost("/test"); //虚拟机 默认值 /
- factory.setUsername("libai"); // 用户名 默认 guest
- factory.setPassword("libai"); //密码 默认值 guest
- //3. 创建连接 Connection
- Connection connection = factory.newConnection();
- //4. 创建Channel
- Channel channel = connection.createChannel();
- //5. 创建队列Queue
- /*
- queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map
arguments) - 参数:
- 1. queue:队列名称
- 2. durable:是否持久化,当mq重启之后,还在
- 3. exclusive:
- * 是否独占。只能有一个消费者监听这队列
- * 当Connection关闭时,是否删除队列
- 4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
- 5. arguments:参数。
- */
- channel.queueDeclare(TOPIC_QUEUE_1, true, false, false, null);
-
- /*
- basicConsume(String queue, boolean autoAck, Consumer callback)
- 参数:
- 1. queue:队列名称
- 2. autoAck:是否自动确认
- 3. callback:回调对象
- */
- // 接收消息
- Consumer consumer = new DefaultConsumer(channel){
- /*
- 回调方法,当收到消息后,会自动执行该方法
- 1. consumerTag:标识
- 2. envelope:获取一些信息,交换机,路由key...
- 3. properties:配置信息
- 4. body:数据
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("接收队列的数据 body: " + new String(body));
- }
- };
- channel.basicConsume(TOPIC_QUEUE_1,true,consumer);
-
- //不需要关闭资源,因为消费者需要持续监听队列信息
- }
- }
3)消费者2
接收所有类型的消息:新增商品,更新商品和删除商品。
- package com.lijw.consumer;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- /**
- * @author Aron.li
- * @date 2022/3/2 16:16
- */
- public class Consumer_Topic2 {
-
- //队列名称
- static final String TOPIC_QUEUE_2 = "topic_queue_2";
-
- public static void main(String[] args) throws IOException, TimeoutException {
- //1.创建连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- //2. 设置参数
- factory.setHost("127.0.0.1"); // ip 默认值 localhost
- factory.setPort(5672); //端口 默认值 5672
- factory.setVirtualHost("/test"); //虚拟机 默认值 /
- factory.setUsername("libai"); // 用户名 默认 guest
- factory.setPassword("libai"); //密码 默认值 guest
- //3. 创建连接 Connection
- Connection connection = factory.newConnection();
- //4. 创建Channel
- Channel channel = connection.createChannel();
- //5. 创建队列Queue
- /*
- queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map
arguments) - 参数:
- 1. queue:队列名称
- 2. durable:是否持久化,当mq重启之后,还在
- 3. exclusive:
- * 是否独占。只能有一个消费者监听这队列
- * 当Connection关闭时,是否删除队列
- 4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
- 5. arguments:参数。
- */
- channel.queueDeclare(TOPIC_QUEUE_2, true, false, false, null);
-
- /*
- basicConsume(String queue, boolean autoAck, Consumer callback)
- 参数:
- 1. queue:队列名称
- 2. autoAck:是否自动确认
- 3. callback:回调对象
- */
- // 接收消息
- Consumer consumer = new DefaultConsumer(channel){
- /*
- 回调方法,当收到消息后,会自动执行该方法
- 1. consumerTag:标识
- 2. envelope:获取一些信息,交换机,路由key...
- 3. properties:配置信息
- 4. body:数据
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("接收队列的数据 body: " + new String(body));
- }
- };
- channel.basicConsume(TOPIC_QUEUE_2,true,consumer);
-
- //不需要关闭资源,因为消费者需要持续监听队列信息
- }
- }
启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达按照需要接收的效果;并且这些routing key可以使用通配符。
消费者1:只可以接收 item.insert 和 item.update 消息
消费者2:可以接收 item.* 所有通配的消息
Topic主题模式可以实现 Publish/Subscribe发布与订阅模式 和 Routing路由模式 的功能;只是Topic在配置routing key 的时候可以使用通配符,显得更加灵活。