docker run -d --name rabbitmq \
-p 5671:5671 -p 5672:5672 -p 4369:4369 \
-p 25672:25672 -p 15671:15671 -p 15672:15672 \
rabbitmq:management
4369, 25672 (Erlang发现&集群端口) 5672, 5671 (AMQP端口) 15672 (web管理后台端口)
61613, 61614 (STOMP协议端口) 1883, 8883 (MQTT协议端口)
https://www.rabbitmq.com/networking.html
/**
* 1使用RabbitMQ
* 2引入amqp场景,RabbitAutoConfiguration生效
* 3 容器中自动配置了
* RabbitTemplate AmqpAdmin RabbitMessagingTemplate CachingConnectionFactory
* 所有的属性在@ConfigurationProperties(prefix = "spring.rabbitmq")配置
* 4 @EnableRabbit 开启功能
* 5 @RabbitListener 监听消息,(必须要开启@EnableRabbit),类+方法上
* 6 @RabbitHandler 标在方法上(重载)
* @author MrChen
* @create 2022-06-05 19:25
*/
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
rabbitmq:
host: 192.168.182.130
port: 5672
virtual-host: /
方式1(推荐)
@Configuration
public class MyMQBuildConf {
/**
* 延时接收队列
* @return
*/
@Bean
public Queue orderDelayQueue(){
HashMap<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "order-event-exchange");
arguments.put("x-dead-letter-routing-key", "order.release.order");
arguments.put("x-message-ttl", 15000);//2分钟发送给死信队列
/**
* Construct a new queue, given a name, durability flag, and auto-delete flag, and arguments.
* @param name the name of the queue - must not be null; set to "" to have the broker generate the name.
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
* @param exclusive true if we are declaring an exclusive queue (the queue will only be used by the declarer's
* connection)
* @param autoDelete true if the server should delete the queue when it is no longer in use
* @param arguments the arguments used to declare the queue
*/
return new Queue("order.delay.queue", true, false,false, arguments);
}
/**
* 接收队列
* @return
*/
@Bean
public Queue orderReleaseOrderQueue(){
return new Queue("order.release.order.queue", true,false,false,null);
}
/**
* top交换机
* @return
*/
@Bean
public Exchange orderEventExchange(){
return new TopicExchange("order-event-exchange", true,false);
}
/**
* 绑定队列order.delay.queue和交换机order-event-exchange
* @return
*/
@Bean
public Binding orderCreateOrderBind(){
return new Binding("order.delay.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.create.order",
null);
}
/**
* 绑定队列order.release.order.queue和交换机order-event-exchange
* @return
*/
@Bean
public Binding orderReleaseOrderBind(){
return new Binding("order.release.order.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.order",
null);
}
/**
*
* 订单交换机和库存延时队列进行绑定,路由key是order.release.other.#
* @return
*/
@Bean
public Binding orderReleaseOtherBind(){
return new Binding("stock.release.stock.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.other.#",
null);
}
@Bean
public Queue orderSeckillOrderQueue(){
return new Queue("order.seckill.order.queue",true,false,false,null);
}
//order.seckill.order
@Bean
public Binding orderSeckillOrderBind(){
return new Binding("order.seckill.order.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.seckill.order",
null);
}
}
方式 2 (不推荐)
@Slf4j
@SpringBootTest
@RunWith(SpringRunner.class)
public class OrderTest {
@Autowired
RabbitTemplate rabbitTemplate;
@Autowired
AmqpAdmin amqpAdmin;
/**
* 1 创建Exchange Queue Binding
* 1) 使用amqp进行创建exchange
* 2)创建一个队列queue
* 2 如何收发消息
*/
@Test
public void createExchange(){
amqpAdmin.declareExchange(new DirectExchange("hello-java-exchange",true,false));
log.info("exchange created[{}]", "hello-java-exchange");
}
@Test
public void createQueue(){
// public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map arguments)
//@param exclusive true if we are declaring an exclusive queue (the queue will only be used by the declarer's connection)
amqpAdmin.declareQueue(new Queue("hello-java-queue",true,false,true));
log.info("Queue created[{}]", "hello-java-queue");
}
/**
* destination;目的地
* destinationType: 目的类型
* exchange: 交换机
* routingKey: 路由键
* arguments: 参数
*/
@Test
public void createBindRelation(){
//public Binding(String destination, DestinationType destinationType, String exchange, String routingKey,Map arguments)
//将exchange和destination绑定,使用routingKey路由键
amqpAdmin.declareBinding(new Binding("hello-java-queue", Binding.DestinationType.QUEUE,"hello-java-exchange","hello.java",null));
log.info("BindRelation created[{}]", "hello-java-bind");
}
@Test
public void send(){
for (int i = 0; i < 10; i++) {
if(i%2==0){
OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity();
orderReturnReasonEntity.setId(1L);
orderReturnReasonEntity.setCreateTime(new Date());
orderReturnReasonEntity.setSort(1);
orderReturnReasonEntity.setStatus(1);
orderReturnReasonEntity.setName("order-test:"+i);
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",orderReturnReasonEntity);
}else{
OrderEntity orderEntity = new OrderEntity();
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",orderEntity);
}
log.info("send[{}]", "send success");
}
}
}
/**
*
* 用在类+方法上 @RabbitListener(queues = {"hello-java-queue"} )监听多个对列,(前提必须要开启@EnableRabbit),
*/
@RabbitListener(queues = {"hello-java-queue"} )
@Service("orderItemService")
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {
/**
*
* @param message
* @param content 原生消息
* @param channel 传输数据的通道
* Queue 可以很多人监听,只要收到消息,队列删除消息,而且只能有一个收到此消息
* 1) 订单服务启动多个,同一个消息,只能一个客户端收到
* 2) 只有一个消息处理完,方法运行结束,就可以接收到下一个消息
*/
// @RabbitListener(queues = {"hello-java-queue"} )
/**
*@RabbitHandler 标在方法上(重载)
/
@RabbitHandler
public void receiveMessage(Message message,
OrderReturnReasonEntity content,
Channel channel){
//{"id":1,"name":"order-test","sort":1,"status":1,"createTime":1661780724613}获取消息体
byte[] body = message.getBody();
MessageProperties messageProperties = message.getMessageProperties();
System.out.println("接收消息:"+" body:"+content);
}
/**
*@RabbitHandler 标在方法上(重载)
/
@RabbitHandler
public void receiveMessage2(OrderEntity content,
Channel channel){
System.out.println("接收消息2:"+" body:"+content);
}
}
yml:spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
@Configuration
public class RabbitConfirmConfig {
@Autowired
private RabbitTemplate rabbitTemplate;
//json序列化,这个配置最主要
@Bean
MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
//回调消息配置
@PostConstruct //RabbitConfirmConfig 对象创建完成后,执行这个方法
public void initRabbitMQ(){
/**
*消息只要成功发送到队列中,无论是否有cline接收都会调用这个回调函数
*可靠抵达-ConfirmCallback
*/
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirtm:"+correlationData+" ack="+ack+" cause="+cause);
}
});
/**
* 消息没有投递队列,就触发这个失败回调
* 可靠抵达-ReturnCallback
*/
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("message:"+message+" replyCode="+replyCode+" replyText="+replyText+" exchange:"+exchange+" routingKey:"+routingKey);
}
});
}
}
1.•消费者获取到消息,成功处理,可以回复Ack给Broker
1.1•basic.ack用于肯定确认;broker将移除此消息
1.2•basic.nack用于否定确认;可以指定broker是否丢弃此消息,可以批量
1.3•basic.reject用于否定确认;同上,但不能批量
2•默认自动ack,消息被消费者收到,就会从broker的queue中移除
3•queue无消费者,消息依然会被存储,直到消费者消费
4•消费者收到消息,默认会自动ack。但是如果无法确定此消息是否被处理完成, 或者成功处理。我们可以开启手动ack模式
4.1•消息处理成功,ack(),接受下一个消息,此消息broker就会移除
4.2•消息处理失败,nack()/reject(),重新发送给其他人进行处理,或者容错处理后ack
4.2•消息一直没有调用ack/nack方法,broker认为此消息正在被处理,不会投递给别人,此时客户 端断开,消息不会被broker移除,会投递给别人
yml:spring.rabbitmq.listener.simple.acknowledge-mode=manual #手动确认接收消息
在监听消息的业务中设置channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
@RabbitListener(queues = {"hello-java-queue"} )
public class OrderItemService{
@RabbitHandler
public void receiveMessage(Message message,
OrderReturnReasonEntity content,
Channel channel) {
System.out.println("接收消息:"+" body:"+content);
//消息头属性
MessageProperties messageProperties = message.getMessageProperties();
/**
* 手动签收,非批量签收
* channel内DeliveryTag按顺序自增
*/
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (IOException e) {
e.printStackTrace();
}
}
}
消息只存在一个集群中的一个节点,对消费者来说,若消息存在A节点的Queue中,当从B节点拉去时,消息要从A中取出,经过B发送给消费者。缺点是,当A节点出现宕机后,消息会丢失,B就无法拉取消息了。一般适用于消息无持久化的场合,如日志队列。
消息实体是主动在节点间同步,而不是在拉去数据时临时拉去,高可用场景,如下单,库存队列
创建目录
mkdir /mydata/rabbitmq
cd /mydata/rabbitmq
mkdir rabbitmq01 rabbitmq02 rabbitmq03
安装
都是映射15762和5762
cluster为自己自定义的名字
RABBITMQ_ERLANG_COOKIE='cluster'
#master01
docker run -d --hostname rabbitmq01 --name rabbitmq01 \
-v /mydata/rabbitmq/rabbitmq01:/var/lib/rabbitmq \
-p 15673:15672 -p 5763:5762 \
-e RABBITMQ_ERLANG_COOKIE='cluster'rabbitmq:management
#slaver02
docker run -d --hostname rabbitmq02 --name rabbitmq02 \
-v /mydata/rabbitmq/rabbitmq02:/var/lib/rabbitmq \
-p 15674:15672 -p 5764:5762 \
-e RABBITMQ_ERLANG_COOKIE='cluster' \
--link rabbitmq01:rabbitmq01 rabbitmq:management
#slaver03
docker run -d --hostname rabbitmq03 --name rabbitmq03 \
-v /mydata/rabbitmq/rabbitmq02:/var/lib/rabbitmq \
-p 15675:15672 -p 5765:5762 \
-e RABBITMQ_ERLANG_COOKIE='cluster' \
--link rabbitmq01:rabbitmq01 \
--link rabbitmq02:rabbitmq02 rabbitmq:management
#rabbitmq01
docker exec -it rabbitmq01 /bin/bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
#rabbitmq02
docker exec -it rabbitmq02 /bin/bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster --ram rabbit@rabbitmq01
rabbitmqctl start_app
#rabbitmq03
docker exec -it rabbitmq03 /bin/bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster --ram rabbit@rabbitmq01
rabbitmqctl start_app
#进入容器
docker exec -it rabbitmq01 /bin/bash
#高可用策略
rabbitmqctl set_policy -p / ha "^" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
#查询当前虚拟主机策略
rabbitmqctl list_policies -p /