主题模式采用 routingKey匹配的方式实现,只要生产者发送消息是设置的routingKey 满足消费者绑定时候设置的规则,消费者就能够接收到消息。
生产者端: 声明一个BuiltinExchangeType.TOPIC类型的交换机,然后发送消息时候设置routingkey
/**
* 声明一个交换机
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
/**
* 发送消息
* param1 发送到哪个交换机
* param2 routingKey
* param3 其他参数信息
* param4 发送的消息体
*/
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
消费者端:
private static final String ROUTING_KEY_ORANGE = "*.orange.*";
/**
* 绑定交换机和队列
* param1 队列名称
* param2 交换机名称
* param3 routingkey
*/
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_ORANGE);
发送到类型是topic交换机的消息的routing_key不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:“stock.usd.nyse”, “nyse.xmw”,“quick.orange.rabbit”.这种类型的。当然这个单词列表最多不能超过255个字节。
在这个规则列表中,其中有两个替换符是大家需要注意的***(星号)可以代替一个单词,#(井号)可以替代零个或多个单词。**


当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像fanout 了
如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是direct了
###1. 生产者LogProducerTopic
package com.rabbitmqDemo.rabbitmq.seven;
import com.rabbitmq.client.Channel;
import com.rabbitmqDemo.rabbitmq.utils.RabbitMqUtils;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
/**
* 将消息发送到交换机
*/
public class LogProducerTopic {
private static final String EXCHANGE_NAME = "logs_topic_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
Map<String, String> bindingKdyMap = new HashMap<>();
bindingKdyMap.put("quick.orange.rabbit", "被队列Q1Q2接收到");
bindingKdyMap.put("lazy.orange.elephant", "被队列Q1Q2接收到");
bindingKdyMap.put("quick.orange.fox", "被队列Q1接收到");
bindingKdyMap.put("lazy.brown.fox", "被队列Q2接收到");
bindingKdyMap.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列Q2接收一次");
bindingKdyMap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");
bindingKdyMap.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃");
bindingKdyMap.put("lazy.orange.male.rabbit", "是四个单词但匹配Q2");
for (Map.Entry<String, String> stringStringEntry : bindingKdyMap.entrySet()) {
String routingKey = stringStringEntry.getKey();
String message = stringStringEntry.getValue();
/**
* 发送消息
* param1 发送到哪个交换机
* param2 routingKey
* param3 其他参数信息
* param4 发送的消息体
*/
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.println("message send end : " + message);
}
}
}
###2. 消费者Logworker05
package com.rabbitmqDemo.rabbitmq.seven;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmqDemo.rabbitmq.utils.RabbitMqUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Logworker05 {
private static final String EXCHANGE_NAME = "logs_topic_exchange";
private static final String QUEUE_NAME = "logs_topic_q1";
private static final String ROUTING_KEY_ORANGE = "*.orange.*";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
/**
* 声明一个交换机
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
/**
* 声明一个队列
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/**
* 绑定交换机和队列
* param1 队列名称
* param2 交换机名称
* param3 routingkey
*/
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_ORANGE);
System.out.println("wait receive message ,print message to console... ");
//声明 消费者成功消费的回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("Logworker05-message:" + new String(message.getBody(), "UTF-8") + ", queue name:" + QUEUE_NAME + " , routing key:" + message.getEnvelope().getRoutingKey());
};
//声明 取消消息时的回调
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("Logworker05-消息消费被中断-" + consumerTag);
};
/**
* 消费者消费消息
* param1 队列名称
* param2 消费成功之后是否自动应答,true 代表自动应答,false表示不自动应答
* param3 消费者成功消费的回调
* param4 消费者取消消费回调
*/
System.out.println("Logworker05等待接收消息......");
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
###3. 消费者Logworker06
package com.rabbitmqDemo.rabbitmq.seven;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmqDemo.rabbitmq.utils.RabbitMqUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Logworker06 {
private static final String EXCHANGE_NAME = "logs_topic_exchange";
private static final String QUEUE_NAME = "logs_topic_q2";
private static final String ROUTING_KEY_RABBIT = "*.*.rabbit";
private static final String ROUTING_KEY_LAZY = "lazy.#";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
/**
* 声明一个交换机
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
/**
* 声明一个队列
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/**
* 绑定交换机和队列
* param1 队列名称
* param2 交换机名称
* param3 routingkey
*/
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_RABBIT);
/**
* 多重绑定
*/
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_LAZY);
System.out.println("wait receive message ,print message to console... ");
//声明 消费者成功消费的回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("Logworker06-message:" + new String(message.getBody(), "UTF-8") + ", queue name:" + QUEUE_NAME + " , routing key:" + message.getEnvelope().getRoutingKey());
};
//声明 取消消息时的回调
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("Logworker06-消息消费被中断-" + consumerTag);
};
/**
* 消费者消费消息
* param1 队列名称
* param2 消费成功之后是否自动应答,true 代表自动应答,false表示不自动应答
* param3 消费者成功消费的回调
* param4 消费者取消消费回调
*/
System.out.println("Logworker06等待接收消息......");
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}