简单模式(Simple):消息队列中的消息被消费后,消息就拿不到了。

工作模式(Work):多个消费者消费同一个队列中的消息,队列采用轮询的方式将消息平均发送给消费者。谁先拿到谁先消费。

发布订阅模式(Publish/Subscirbe):每个消费者监听自己的队列,生产者将消息由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。

路由模式(Routing):每个消费者监听自己的队列,并且设置routingkey生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列;

topic 主题模式(路由模式的一种),消息产生者产生消息,把消息交给交换机,交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费

RocketMQ认为任意一条消息只需要被集群内的任意一个消费者处理即可。使用与消费之集群部署,每条消息只处理一次。
RocketMQ会将每条消息推送给集群内所有的消费者,保证消息至少被每个消费者消费一次。
MQ来进行处理。
用 RabbitMQ 提供的事务功能,生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit。
事务机制会对吞吐量有一定的影响,开启 confirm 模式,在生产者那里设置开启 confirm 模式之后,每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调你的一个 nack 接口,告诉你这个消息接收失败,你可以重试。
事务机制和 cnofirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的
RabbitMQ 提供的 ack 机制ack 一把。这样的话,如果你还没处理完,不就没有 ack 了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。Rabbitmq消息持久化RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,RabbitMQ 还没持久化,自己就挂了,可能导致少量数据丢失,但是这个概率较小。deliveryMode 设置为 2direct(默认模式):发送方把消息发送给订阅方,如果有多个订阅者,默认采取轮询的方式进行消息发送dequeOneConsumeOne received message: hello world
dequeOneConsumeTwo received message: hello world
dequeOneConsumeOne received message: hello world
dequeOneConsumeTwo received message: hello world
dequeOneConsumeOne received message: hello world
dequeOneConsumeTwo received message: hello world
headers:与direct类似,但是性能差,基本不用fanout:分发模式,将消息发送给所有订阅者 : dequeOne consume one received message: 我是交换机发出的消息
dequeTwo consume two message: (Body:'我是交换机发出的消息' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=ExchangeOne, receivedRoutingKey=, deliveryTag=10, consumerTag=amq.ctag-H6qjuEJTupM-ihNzwi-dnA, consumerQueue=TestDequeueTwo])
topic:匹配订阅模式,使用正则匹配到消息队列,能匹配到的都能接收到ConnectionFactory(连接管理器):应用程序与Rabbit之间建立连接的管理器,程序代码中使用。
Channel(信道):消息推送使用的通道。
Exchange(交换器):用于接受、分配消息。
Queue(队列):用于存储生产者的消息。
RoutingKey(路由键):用于把生成者的数据分配到交换器上。
BindingKey(绑定键):用于把交换器的消息绑定到队列上。
Virtual host:用于消息隔离(类似Redis 16个db这种概念),最上层的消息路由,一个包含若干Exchange和Queue,同一个里面Exchange
ymlrabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
# 发布消息成功到交换器后会触发回调方法
publisher-confirm-type: correlated
# 消息成功确认
publisher-confirms: true
# 指定消息确认模式为手动确认
template:
mandatory: true # 手动签收机制
# 消息失败确认
publisher-returns: true
publisher-confirm-type:NONE 值是禁用发布确认模式,是默认值,CORRELATED 值是发布消息成功到交换器后会触发回调方法。SIMPLE值经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker;RabbitTemplate只允许设置一个callback方法,你可以将RabbitTemplate的bean设为单例然后设置回调,在controller、service都设置为 protype// 设置 bena 为多例
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
noSingleRabbitTemplate bean/**
* 注册多例 bean
*
* @param connectionFactory connectionFactory
* @return RabbitTemplate
*/
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate noSingleRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMandatory(true);
return template;
}
test.java@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Service
public class RabbitMqServiceImpl implements RabbitMqService {
@Resource(name = "noSingleRabbitTemplate")
private RabbitTemplate rabbitTemplate;
@Override
public String testOne() {
RabbitMqProduceConfirm confirm = new RabbitMqProduceConfirm();
RabbitMqProduceReturnBack callback = new RabbitMqProduceReturnBack();
rabbitTemplate.convertAndSend(RabbitMqConst.DEQUE_ONE, "hello world");
// 生产者拿到没有到达消费者的消息
rabbitTemplate.setReturnsCallback(callback);
// 生产者得到消费者拿到消息的确认
rabbitTemplate.setConfirmCallback(confirm);
return null;
}
}
RabbitMqProduceConfirm.javapublic class RabbitMqProduceConfirm implements RabbitTemplate.ConfirmCallback {
private static final Logger logger = LoggerFactory.getLogger(RabbitMqProduceConfirm.class);
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
logger.info("ACK :{} ", JSON.toJSONString(correlationData));
} else {
logger.info("ACK error");
}
}
}
yml# 消息失败确认
publisher-returns: true
RabbitMqProduceReturnBack.javapublic class RabbitMqProduceReturnBack implements RabbitTemplate.ReturnsCallback{
private static final Logger logger = LoggerFactory.getLogger(RabbitMqProduceReturnBack.class);
@Override
public void returnedMessage(ReturnedMessage returned) {
logger.info("Confirmed RabbitMQ");
}
}
RabbitMQ消费者默认为自动确认,不会管消费者是否成功消费/处理了消息AmqpRejectAndDontRequeueException 异常的时候,则消息会被拒绝,且 requeue = false(不重新入队列)ImmediateAcknowledgeAmqpException 异常,则消费者会被确认requeue = true(如果此时只有一个消费者监听该队列,则有发生死循环的风险,多消费端也会造成资源的极大浪费,这个在开发过程中一定要避免的)。可以通过 setDefaultRequeueRejected(默认是true)去设置Basic.Ack :用于确认当前消息Basic.Nack :用于否定当前消息Basic.Reject :用于拒绝当前消息RabbitMQ,必须先要安装一个RabbitMQ服务。这个服务就是Broker,中文叫做代理,因为MQ服务器帮我们对消息做了存储和转发。一般情况下为了保证服务的高可用,需要多个Broker。Confirm确认和消息回退设置RabbitMQ是基于信道Channel的方式来传输数据,排除了使用TCP链接来进行数据的传输,因为TCP链接创建和销毁对于系统性能的开销比较大,且并发能力受系统资源的限制,这样很容易造成rabbitMQ的性能瓶颈。RabbitMQ其实就是一个TCP链接,一旦链接创建成功之后,就会基于链接创建Channel,每个线程把持一个Channel,Channel复用TCP链接,减少了系统创建和销毁链接的消耗,提高了性能