Spring Boot 为 AMQP 提供了自动化配置依赖 Spring-boot-starter-amqp,因此首先创建 Spring Boot 项目并添加该依赖
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-amqpartifactId>
dependency>
项目创建后,在 application.properties 中配置 RabbitMQ 的基本连接信息
spring.rabbitmq.host=ip地址
spring.rabbitmq.port=5672
spring.rabbitmq.username=long
spring.rabbitmq.password=123
接下来进行RabbitMQ 配置,在 RabbitMQ 中,所有的消息盛传着提交的消息都会交由 Exchange 进行再分配,Exchange 会根据不同的策略将消息分发到不同的 Queue 中。
RabbitMQ 中一共提供了4种不同的 Exchange 策略,分别是 Direct、Fanout、Topic以及Hearder,其中前三种使用频率较高。
DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,当一条消息到达 DirectExchange 时,会被转发到与该条消息 routing key 相同的 Queue 上,例如消息队列名为 “hello-queue”,则 routingkey 为 “hello-queue” ,DirectExchange 的配置如下:
@Configuration
public class RabbitDirectConfig {
public final static String DIRECTNAME = "long-direct";
@Bean
Queue queue() {
return new Queue("hello-queue");
}
@Bean
DirectExchange directExchange() {
return new DirectExchange(DIRECTNAME, true, false);
}
@Bean
Binding binding() {
return BindingBuilder.bind(queue()).to(directExchange()).with("direct");
}
}
代码解释:
然后配置一个消费者
@Component
public class DirectReceiver {
@RabbitListener(queues = "hello-queue")
public void handler1(String msg) {
System.out.println("===================================DirectReceiver:" + msg);
}
}
通过 @RabbitListener 注解指定一个方法是一个消息消费方法,方法参数就是所接收到的消息。然后在单元测试类中注入一个 RabbitTemplate 对象来进行消息发送
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void directTest() {
rabbitTemplate.convertAndSend("hello-queue","hello direct !");
}
}
确认 RabbitMQ 已经启动,然后启动 Spring Boot 项目,运行此单元测试方法,日志如下
===================================DirectReceiver:hello direct !
FanoutExchange 的数据交换策略是吧所有到达 FanoutExchange 的消息转发给所有与它绑定的 Queue ,在这种策略中,routingkey 将不起任何作用,FanoutExchange 的配置方法如下:
@Configuration
public class RabbitFanoutConfig {
public final static String FANOUTNAME = "long-fanout";
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUTNAME, true, false);
}
@Bean
Queue queueOne() {
return new Queue("queue-one");
}
@Bean
Queue queueTwo() {
return new Queue("queue-two");
}
@Bean
Binding bindingOne() {
return BindingBuilder.bind(queueOne()).to(fanoutExchange());
}
@Bean
Binding bindingTwo() {
return BindingBuilder.bind(queueTwo()).to(fanoutExchange());
}
}
在这里首先创建 FanoutExchange ,参数的含义与创建 DirectExchange 的参数的含义一致,然后创建两个 Queue ,再将这两个 Queue 绑定到 FanoutExchange 上。然后创建两个消费者,如下
@Component
public class FanoutReceiver {
@RabbitListener(queues = "queue-one")
public void handler1(String message) {
System.out.println("==========FanoutReceiver:handler1:" + message);
}
@RabbitListener(queues = "queue-two")
public void handler2(String message) {
System.out.println("==========FanoutReceiver:handler2:" + message);
}
}
两个消费者分别消费两个消息队列中的消息,然后在单元测试中发送消息
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void fanoutTest() {
rabbitTemplate.convertAndSend(RabbitFanoutConfig.FANOUTNAME,null,"hello fanout !");
}
}
注意,这里发送消息时不需要 routingkey,指定exchange 即可,routingkey 可以直接传一个 null。
重启项目,运行单元测试,打印日志如下:
==========FanoutReceiver:handler1:hello fanout !
==========FanoutReceiver:handler2:hello fanout !
可以看到,一条消息发送出去之后,所有和该 FanoutExchange 绑定的 Queue 都收到了消息。
TopicExchange 是比较复杂也比较灵活的一种路由策略,在 TopicExchange 中 ,Queue 通过 routingkey 绑定到 TopicExchange 上,当消息到达 TopicExchange 后,TopicExchange 根据消息的 routing 将消息路由到一个或者多个 Queue 上
@Configuration
public class RabbitTopicConfig {
public final static String TOPICNAME = "long-topic";
@Bean
TopicExchange topicExchange() {
return new TopicExchange(TOPICNAME, true, false);
}
@Bean
Queue xiaomi() {
return new Queue("xiaomi");
}
@Bean
Queue huawei() {
return new Queue("huawei");
}
@Bean
Queue phone() {
return new Queue("phone");
}
@Bean
Binding xiaomiBinding() {
return BindingBuilder.bind(xiaomi()).to(topicExchange())
.with("xiaomi.#");
}
@Bean
Binding huaweiBinding() {
return BindingBuilder.bind(huawei()).to(topicExchange())
.with("huawei.#");
}
@Bean
Binding phoneBinding() {
return BindingBuilder.bind(phone()).to(topicExchange())
.with("#.phone.#");
}
}
代码解释:
接下来针对三个 Queue 创建三个消费者
@Component
public class TopicReceiver {
@RabbitListener(queues = "phone")
public void handler1(String message) {
System.out.println("PhoneReceiver:"+ message);
}
@RabbitListener(queues = "xiaomi")
public void handler2(String message) {
System.out.println("XiaoMiReceiver:"+message);
}
@RabbitListener(queues = "huawei")
public void handler3(String message) {
System.out.println("HuaWeiReceiver:"+message);
}
}
然后在单元测试中进行消息的发送
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void topicTest() {
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"xiaomi.news","小米新闻(xiaomi接收)");
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"huawei.news","华为新闻(huawei接收)");
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"xiaomi.phone","小米手机(xiaomi、phone接收)");
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"huawei.phone","华为手机(xiaomi、phone接收)");
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"phone.news","手机新闻(phone接收)");
}
}
重启项目,运行单元测试方法,日志如下
XiaoMiReceiver:小米新闻(xiaomi接收)
HuaWeiReceiver:华为新闻(huawei接收)
PhoneReceiver:小米手机(xiaomi、phone接收)
XiaoMiReceiver:小米手机(xiaomi、phone接收)
PhoneReceiver:华为手机(xiaomi、phone接收)
HuaWeiReceiver:华为手机(xiaomi、phone接收)
PhoneReceiver:手机新闻(phone接收)
疑问:以上单元测试发送消息,如果是一个一个的发送(其它的注释掉)运行结果没问题。但如果是同时发送消息,有些消费者并没有消费到消息,如下:
HuaWeiReceiver:华为新闻(huawei接收)
XiaoMiReceiver:小米新闻(xiaomi接收)
PhoneReceiver:小米手机(xiaomi、phone接收)
PhoneReceiver:手机新闻(phone接收)
HeaderExchange 是一种较少的路由策略,HeaderExchange 会根据消息的 Header 将消息路由到不同的 Queue 上,这种策略也和 routingkey 无关,配置如下:
@Configuration
public class RabbitHeaderConfig {
public final static String HEADERNAME = "long-header";
@Bean
HeadersExchange headersExchange() {
return new HeadersExchange(HEADERNAME, true, false);
}
@Bean
Queue queueName() {
return new Queue("name-queue");
}
@Bean
Queue queueAge() {
return new Queue("age-queue");
}
@Bean
Binding bindingName() {
Map<String, Object> map = new HashMap<>();
map.put("name", "long");
return BindingBuilder.bind(queueName())
.to(headersExchange()).whereAny(map).match();
}
@Bean
Binding bindingAge() {
return BindingBuilder.bind(queueAge())
.to(headersExchange()).where("age").exists();
}
}
这里的配置大部分和前面介绍的一样,差别主要体现的 Binding 的配置上,第一个 bindingName 方法中,whereAny 表示消息的Header 中只要有一个 Header 匹配上 map 中的 key/value ,就把该消息路由到名为“name-queue”的 Queue 上,这里也可以使用 whereAll 方法,表示消息的所有 Header 都要匹配。whereAny 和 whereAll 实际上对应了一个名为 x-match 的属性。
bindingAge 中的配置则表示只要消息的 Header 中包含 age,无论 age 的值是多少,都将路由到名为“age-queue”的 Queue 上。
接下来创建两个消费者,如下
@Component
public class HeaderReceiver {
@RabbitListener(queues = "name-queue")
public void handler1(byte[] msg) {
System.out.println("HeaderReceiver:name:"
+ new String(msg, 0, msg.length));
}
@RabbitListener(queues = "age-queue")
public void handler2(byte[] msg) {
System.out.println("HeaderReceiver:age:"
+ new String(msg, 0, msg.length));
}
}
注意,这里的参数用 byte 数组接收。然后在单元测试中创建消息的发送方法,这里消息的发送也可 routingkey 无关。
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests1 {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void headerTest() {
Message nameMsg = MessageBuilder
.withBody("hello header! name-queue".getBytes())
.setHeader("name", "long").build();
Message ageMsg = MessageBuilder
.withBody("hello header! age-queue".getBytes())
.setHeader("age", "99").build();
rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, ageMsg);
rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, nameMsg);
}
}
这里创建两条消息,具有不同的 header ,将被发送到不同的 Queue 中。
重启项目,运行单元测试方法,日志如下:
HeaderReceiver:age:hello header! age-queue
HeaderReceiver:name:hello header! name-queue