pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0modelVersion>
<parent>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-parentartifactId>
<version>2.1.13.RELEASEversion>
<relativePath/>
parent>
<groupId>com.examplegroupId>
<artifactId>02_rabbitmq_springbootartifactId>
<version>0.0.1-SNAPSHOTversion>
<name>02_rabbitmq_springbootname>
<description>Demo project for Spring Bootdescription>
<properties>
<java.version>1.8java.version>
properties>
<dependencies>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-amqpartifactId>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-webartifactId>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-testartifactId>
<scope>testscope>
dependency>
<dependency>
<groupId>org.springframework.amqpgroupId>
<artifactId>spring-rabbit-testartifactId>
<scope>testscope>
dependency>
<dependency>
<groupId>junitgroupId>
<artifactId>junitartifactId>
<version>4.13.2version>
<scope>testscope>
dependency>
dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-maven-pluginartifactId>
plugin>
plugins>
build>
project>
application.yml# 应用名,此场景没多大用,在微服务中作用比较大
spring:
application:
name: rabbitmq-spring
rabbitmq:
host: 192.168.19.129
port: 5672
username: ems
password: 123456
virtual-host: /ems
提示:以下是本篇文章正文内容,下面案例可供参考

代码如下(示例):
package com.example;
import com.example.helloworld.Application;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest(classes = Application.class)
@RunWith(SpringRunner.class)
public class TestRabbitMQ {
@Autowired
private RabbitTemplate rabbitTemplate;
//hello world模型
@Test
public void testHelloWorld(){
//执行发现,没有队列,这里需要注意:必须需要有消费者才能出现队列。
//第一个参数:路由key 第二个参数:发送的消息
rabbitTemplate.convertAndSend("hello","hello world");
}
}
代码如下(示例):
package com.example.helloworld;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
//持久化 非独占 不是自动删除队列
@RabbitListener(queuesToDeclare = @Queue("hello")) //RabbitMQ消费者监听
public class HelloCustomer {
@RabbitHandler //表示该队列处理消息的方法
public void receive(String message){
System.out.println("message = " + message);
}
}



代码如下(示例):
package com.example;
import com.example.helloworld.Application;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest(classes = Application.class)
@RunWith(SpringRunner.class)
@EnableRabbit
@ComponentScan("com.example")
public class TestRabbitMQ {
@Autowired
private RabbitTemplate rabbitTemplate;
//work模型
@Test
public void testWork(){
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("work","work模型" + i);
}
}
}
代码如下(示例):
package com.example.work;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 说明:默认在Spring AMQP实现中Work这种方式就是公平调度,如果需要实现能者多劳需要额外配置
*/
@Component
public class WorkCustomer {
/**
* 消费者1
* @param message
*/
@RabbitListener(queuesToDeclare = @Queue("work")) //RabbitMQ消费者监听
public void receive1(String message){
System.out.println("消费者1:" + message);
}
/**
* 消费者2
* @param message
*/
@RabbitListener(queuesToDeclare = @Queue("work")) //表示该方法被监听
public void receive2(String message){
System.out.println("消费者2:" + message);
}
}


在work模型中,需要使用到@EnableRabbit和@ComponentScan("com.example")

package com.example;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
@EnableRabbit
@ComponentScan("com.example")
public class TestRabbitMQ {
@Autowired
private RabbitTemplate rabbitTemplate;
//fanout 广播
@Test
public void testFanout(){
//第一个参数:交换机名称
//第二个参数:路由key
//第三个参数:发送的消息
//注意:生产者执行后,在rabbitmq后台管理器中是看不到logs交换机的,得消费者消费才会出现
rabbitTemplate.convertAndSend("logs","","Fanout模型发送的消息");
}
}
package com.example.fanout;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class FanoutCustomer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, //创建临时队列
exchange = @Exchange(value = "logs",type = ExchangeTypes.FANOUT) //绑定的交换机
)
})
public void receive1(String message){
System.out.println("消费者1:" + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, //创建临时队列
exchange = @Exchange(value = "logs",type = ExchangeTypes.FANOUT) //绑定的交换机
)
})
public void receive2(String message){
System.out.println("消费者2:" + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, //创建临时队列
exchange = @Exchange(value = "logs",type = ExchangeTypes.FANOUT) //绑定的交换机
)
})
public void receive3(String message){
System.out.println("消费者3:" + message);
}
}


无临时队列,估计是消费完就删除了


package com.example;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
@EnableRabbit
@ComponentScan("com.example")
public class TestRabbitMQ {
@Autowired
private RabbitTemplate rabbitTemplate;
//route 路由模式
@Test
public void testRouteInfo(){
rabbitTemplate.convertAndSend("directs","info","发送info的key的路由信息");
}
@Test
public void testRouteAll(){
rabbitTemplate.convertAndSend("directs","error","发送error的key的路由信息");
}
}
package com.example.route;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class RouteConsumer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, //创建临时队列
exchange = @Exchange(value = "directs",type = ExchangeTypes.DIRECT), //自定义交换机名称和类型
key = {"info","error","warn"}
)
})
public void receive1(String message){
System.out.println("都可以拿到的消费者:" + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, //创建临时队列
exchange = @Exchange(value = "directs",type = ExchangeTypes.DIRECT), //自定义交换机名称和类型
key = {"error"}
)
})
public void receive2(String message){
System.out.println("只能拿到error的消费者:" + message);
}
}




package com.example;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
@EnableRabbit
@ComponentScan("com.example")
public class TestRabbitMQ {
@Autowired
private RabbitTemplate rabbitTemplate;
//topic 动态路由 订阅模式
@Test
public void testTopic1(){
rabbitTemplate.convertAndSend("topics","user.save","user.save 路由消息");
}
@Test
public void testTopic2(){
rabbitTemplate.convertAndSend("topics","order.save.base","order.save.base 路由消息");
}
}
package com.example.topic;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class TopicConsumer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(name = "topics",type = ExchangeTypes.TOPIC),
key = {"user.save","user.*"}
)
})
public void receive1(String message){
System.out.println("message1 = " + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(name = "topics",type = ExchangeTypes.TOPIC),
key = {"order.#","produce.#","user.*"}
)
})
public void receive2(String message){
System.out.println("message2 = " + message);
}
}



提示:这里对文章进行总结:
1、在helloword模式中,生产端,指定的routingKey:helllo。这里指定的是路由规则,
并不是老师说的hello队列。队列是在消费方生成的,生产端没有指定交换机只有routingKey
和Object,也就是说这个消费方产生hello队列,放在默认的交换机(AMQP default)上。
而默认的交换机有一个特点,只要你的routerKey与这个交换机中有同名的队列,他就会自动路由上。
生产端routingKey 叫hello ,消费端生产hello队列。他们就路由上了

点击默认交换机进去,会看到一条话

The default exchange is implicitly bound to every queue, with a routing key equal to the queue name. It is not possible to explicitly bind to, or unbind from the default exchange. It also cannot be deleted.
默认交换器隐式绑定到每个队列,其路由键等于队列名。 不能显式地绑定到默认交换,也不能从默认交换取消绑定。 也不能删除。