生产者:
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-amqpartifactId>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-testartifactId>
dependency>
消费者:
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-amqpartifactId>
dependency>
spring:
rabbitmq:
host: 127.0.0.1 # ip
port: 5672
username: zhangsan
password: 123456
virtual-host: /zhangsan
# 开启confirms回调 P -> Exchange
spring.rabbitmq.publisher-confirms=true
# 开启returnedMessage回调 Exchange -> Queue
spring.rabbitmq.publisher-returns=true
@Configuration
public class RabbitMqConfig {
//定义交换机的名字
public static final String EXCHANGE_NAME = "boot_topic_exchange";
//定义队列的名字
public static final String QUEUE_NAME = "boot_queue";
//1、声明交换机
@Bean("bootExchange")
public Exchange bootExchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
//2、声明队列
@Bean("bootQueue")
public Queue bootQueue(){
return QueueBuilder.durable(QUEUE_NAME).build();
}
//3、队列与交换机进行绑定
@Bean
public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
}
}
@Component
public class RabbitMQListener {
//定义方法进行信息的监听 RabbitListener中的参数用于表示监听的是哪一个队列
@RabbitListener(queues = "boot_queue")
public void ListenerQueue(Message message){
System.out.println("message:"+message.getBody());
}
}
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {
//注入 RabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void send(){
rabbitTemplate.convertAndSend("boot_topic_exchange","boot.haha","boot mq...");
}
}
在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景,RabbitMQ 提供了两种方式用来控制消息的投递可靠性模式。
rabbitmq 整个消息投递的路径为:
producer--->rabbitmq broker--->exchange--->queue--->consumer
可以将利用这两个 callback 控制消息的可靠性投递
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"
publisher-confirms="true"
publisher-returns="true"
/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
<rabbit:queue id="test_queue_confirm" name="test_queue_confirm">rabbit:queue>
<rabbit:direct-exchange name="test_exchange_confirm">
<rabbit:bindings>
<rabbit:binding queue="test_queue_confirm" key="confirm">rabbit:binding>
rabbit:bindings>
rabbit:direct-exchange>
<rabbit:queue name="test_queue_ttl" id="test_queue_ttl">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer">entry>
rabbit:queue-arguments>
rabbit:queue>
<rabbit:topic-exchange name="test_exchange_ttl" >
<rabbit:bindings>
<rabbit:binding pattern="ttl.#" queue="test_queue_ttl">rabbit:binding>
rabbit:bindings>
rabbit:topic-exchange>
<rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="exchange_dlx" />
<entry key="x-dead-letter-routing-key" value="dlx.hehe" />
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
<entry key="x-max-length" value="10" value-type="java.lang.Integer" />
rabbit:queue-arguments>
rabbit:queue>
<rabbit:topic-exchange name="test_exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx">rabbit:binding>
rabbit:bindings>
rabbit:topic-exchange>
<rabbit:queue name="queue_dlx" id="queue_dlx">rabbit:queue>
<rabbit:topic-exchange name="exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="dlx.#" queue="queue_dlx">rabbit:binding>
rabbit:bindings>
rabbit:topic-exchange>
<rabbit:queue id="order_queue" name="order_queue">
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="order_exchange_dlx" />
<entry key="x-dead-letter-routing-key" value="dlx.order.cancel" />
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
rabbit:queue-arguments>
rabbit:queue>
<rabbit:topic-exchange name="order_exchange">
<rabbit:bindings>
<rabbit:binding pattern="order.#" queue="order_queue">rabbit:binding>
rabbit:bindings>
rabbit:topic-exchange>
<rabbit:queue id="order_queue_dlx" name="order_queue_dlx">rabbit:queue>
<rabbit:topic-exchange name="order_exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx">rabbit:binding>
rabbit:bindings>
rabbit:topic-exchange>
beans>
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testConfirm() {
//定义回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
*
* @param correlationData 相关配置信息
* @param ack exchange交换机 是否成功收到了消息。true 成功,false代表失败
* @param cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
//System.out.println("confirm方法被执行了...."+correlationData.getId());
//ack 为 true表示 消息已经到达交换机
if (ack) {
//接收成功
System.out.println("接收成功消息" + cause);
} else {
//接收失败
System.out.println("接收失败消息" + cause);
//做一些处理,让消息再次发送。
}
}
});
//进行消息发送
for (int i = 0; i < 5; i++) {
rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","message Confirm...");
}
//进行睡眠操作
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testReturn() {
//设置交换机处理失败消息的模式 为true的时候,消息达到不了 队列时,会将消息重新返回给生产者
rabbitTemplate.setMandatory(true);
//定义回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
*
* @param message 消息对象
* @param replyCode 错误码
* @param replyText 错误信息
* @param exchange 交换机
* @param routingKey 路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("return 执行了....");
System.out.println("message:"+message);
System.out.println("replyCode:"+replyCode);
System.out.println("replyText:"+replyText);
System.out.println("exchange:"+exchange);
System.out.println("routingKey:"+routingKey);
//处理
}
});
//进行消息发送
rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","message return...");
//进行睡眠操作
try {
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testQos(){
for (int i = 0; i < 10; i++) {
// 发送消息
rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm....");
}
}
TTL 全称 Time To Live(存活时间/过期时间),当消息到达存活时间后,还没有被消费,会被自动清除,RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。
设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。
如果两者都进行了设置,以时间短的为准。
@Test
public void testTtl() {
for (int i = 0; i < 10; i++) {
Message message = MessageBuilder
.withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
//消息单独过期时间
.setExpiration("5000")
.build();
// 发送消息
rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.zhangsan", message);
}
}
死信交换机和死信队列和普通的没有区别,当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列。

@Test
public void testDlx(){
Message message = MessageBuilder
.withBody("死信消息测试".getBytes(StandardCharsets.UTF_8))
//消息单独过期时间
.setExpiration("5000")
.build();
//测试过期时间,死信消息
rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.hehe",message);
//测试长度限制后,消息死信
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.hehe",message);
}
//测试消息拒收
rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.zhangsan",message);
}
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
例如:下单后,30分钟未支付,取消订单,回滚库存。
使用TTL+死信队列组合可以实现延迟队列的效果。

1、声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
2、声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
3、正常队列绑定死信交换机
设置两个参数:
x-dead-letter-exchange:死信交换机名称
x-dead-letter-routing-key:发送给死信交换机的routingkey
@Test
public void testDelay() throws InterruptedException {
Message message = MessageBuilder
.withBody("订单信息:id=1,time=2020年12月...".getBytes(StandardCharsets.UTF_8))
//消息单独过期时间
.setExpiration("5000")
.build();
//发送订单消息。 将来是在订单系统中,下单成功后,发送消息
rabbitTemplate.convertAndSend("order_exchange","order.msg",message);
//打印倒计时10秒
for (int i = 10; i > 0 ; i--) {
System.out.println(i+"...");
Thread.sleep(1000);
}
}
}
Consumer Ack
ack指Acknowledge确认,表示消费端收到消息后的确认方式,有三种确认方式:
自动确认:acknowledge="none"手动确认:acknowledge="manual"根据异常情况确认:acknowledge="auto"
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<context:component-scan base-package="com.zhangsan.listener" />
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="5">
<rabbit:listener ref="ackListener" queue-names="test_queue_confirm">rabbit:listener>
<rabbit:listener ref="dlxListener" queue-names="test_queue_dlx">rabbit:listener>
<rabbit:listener ref="orderListener" queue-names="order_queue_dlx">rabbit:listener>
rabbit:listener-container>
beans>
ACK测试:
@Component
public class AckListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//获取消息的id
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//获取消息
System.out.println("message:"+new String(message.getBody()));
//进行业务处理
System.out.println("=====进行业务处理====");
//模拟出现异常
int i = 5/0;
//进行消息签收
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
//出现异常,拒绝签收
//第二个参数:是否应用于多消息
//第三个参数:requeue:重回队列。
//如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
channel.basicNack(deliveryTag, true, true);
}
}
}
首先消费端的确认模式一定为手动确认:acknowledge=“manual”
配置 prefetch属性设置消费端一次拉取多少消息:prefetch=“5”
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="5" >
在编程中一个幂等操作的特点是:任意多次执行所产生的结果与一次执行的产生的结果相同,在MQ中由于网络故障、客户端延迟消费、MQ自动重试过程中可能会导致消息的重复消费的问题。我们要保证一次或多次请求同一个资源,对资源本身产生的影响和第一次执行的影响相同。
例如:消费者在消费完一条消息后,向RabbitMQ 发送一个ACK 确认,但是此时网络断开或者其他原因导致RabbitMQ 没有收到这个ACK,那么RabbitMQ 并不会讲该条消息删除,而是重回队列,当客户端重新建立到连接后,消费者还是会再次收到该条消息,这就造成了消息的重复消费。