RabbitMQ是一种开源的消息代理软件,它实现了高级消息队列协议(AMQP)标准,可用于在应用程序之间传递消息。RabbitMQ最初由LShift开发,现在由Pivotal Software维护。
RabbitMQ可以在多个平台上运行,包括Windows、Mac OS X和各种Linux发行版。它提供了多种编程语言的客户端库,如Java、Python、Ruby、.NET等等。RabbitMQ的主要特点包括:

RabbitMQ的工作原理主要包括生产者(Producer)、消息队列(Queue)和消费者(Consumer)三个部分。
安装步骤如下
需要注意的是,安装 RabbitMQ 之前需要先安装 Erlang,而且版本要匹配。另外,如果在安装过程中出现问题,可以参考 RabbitMQ 的官方文档或者社区论坛来解决。
安装成功后 访问127.0.0.1:15672 出现登录页面安装成功。
首先通过idea准备springboot的项目,添加rabbitMQ的依赖
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-webartifactId>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-amqpartifactId>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-testartifactId>
dependency>
<dependency>
<groupId>mysqlgroupId>
<artifactId>mysql-connector-javaartifactId>
<scope>runtimescope>
dependency>
<dependency>
<groupId>org.mybatis.spring.bootgroupId>
<artifactId>mybatis-spring-boot-starterartifactId>
dependency>
<dependency>
<groupId>org.projectlombokgroupId>
<artifactId>lombokartifactId>
<scope>providedscope>
dependency>
配置rabbitMq如下
spring:
rabbitmq: #配置文件
host: ##### #ip
port: 5672
username: ###
password: ####
virtual-host: /
connection-timeout: 15000
publisher-confirm-type: correlated #开启 confirms 回调 P → Exchange
publisher-returns: true # 开启 returnedMessage 回调 Exchange → Queue
template:
mandatory: true # 抵达队列异步发送有效回调
listener:
simple:
acknowledge-mode: manual # 表示消费者消费成功消息以后需要手工的进行签收(ack),默认为auto
concurrency: 5 #当前线线程数
max-concurrency: 10 # 最大线程数
prefetch: 10
retry:
enabled: true
max-attempts: 5
max-interval: 10000ms # 重试最大间隔时间10s
initial-interval: 2000ms # 重试初始间隔时间2s
multiplier: 2 # 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间,重试时间依次是2s,4s,8s,10s
#消息message
com.acrdpm.smart_topic_queue=smart.queue
com.acrdpm.smart_topic_exchange=smart.exchange
com.acrdpm.smart_topic_routingKey=smart.routing.key
#延迟队列
com.acrdpm.delayed_queue=delayed.queue
com.acrdpm.delayed_exchange=delayed.exchange
com.acrdpm.delayed_routingKey=delayed.routing.key
配置rabbitMq配置文件
@Configuration
@Slf4j
//启用rabbitmQ
@EnableRabbit
@Getter
public class RabbitConfig {
private final RabbitTemplate rabbitTemplate;
// 将配置文件封装成工具类
private final RabbitPropertiesConfig rabbitPropertiesConfig;
// 消息备份类
private final MsgLogService msgLogService;
public RabbitConfig(RabbitTemplate rabbitTemplate, RabbitPropertiesConfig rabbitPropertiesConfig, MsgLogService msgLogService) {
this.rabbitTemplate = rabbitTemplate;
this.rabbitPropertiesConfig = rabbitPropertiesConfig;
this.msgLogService = msgLogService;
}
/**
* 定义硬件需要的topic
* @return
*/
@Bean
public Queue smartQueue() {
return new Queue(rabbitPropertiesConfig.getSmart_topic_queue(), true);
}
@Bean
public TopicExchange smartExchange() {
return new TopicExchange(rabbitPropertiesConfig.getSmart_topic_exchange(), true, false);
}
@Bean
public Binding smartBinding() {
return BindingBuilder.bind( smartQueue()).to(smartExchange()).with(rabbitPropertiesConfig.getSmart_topic_routingKey());
}
/**
* 定义延迟队列
*/
@Bean
public Queue delayedQueue(){
return new Queue(rabbitPropertiesConfig.getDelayed_queue());
}
@Bean
public CustomExchange delayedExchange(){
Map<String, Object> args = new HashMap<>();
//自定义交换机的类型
args.put("x-delayed-type", "direct");
return new CustomExchange(rabbitPropertiesConfig.getDelayed_exchange(), "x-delayed-message", true, false,
args);
}
@Bean
public Binding bindingDelayedQueue(){
return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(rabbitPropertiesConfig.getDelayed_routingKey()).noargs();
}
/**
* 定制RabbitTemplate
* 1、服务收到消息就会回调
* 1、spring.rabbitmq.publisher-confirms: true
* 2、设置确认回调
* 2、消息正确抵达队列就会进行回调
* 1、spring.rabbitmq.publisher-returns: true
* spring.rabbitmq.template.mandatory: true
* 2、设置确认回调ReturnCallback
*
* 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)
* MyRabbitConfig对象创建完成以后,执行这个方法
*/
@PostConstruct
public void initRabbitTemplate() {
/**
* 1、只要消息抵达Broker就ack=true
* correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
* ack:消息是否成功收到
* cause:失败的原因
*/
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
log.info("confirm...correlationData[,{}",correlationData);
log.info("ack是:{}",ack);
log.info("case是:{}",cause);
System.out.println("confirm...correlationData[" + correlationData + "]==>ack:[" + ack + "]==>cause:[" + cause + "]");
if (ack) {
log.info("消息成功发送到Exchange");
String msgId = correlationData.getId();
msgLogService.updateStatus(msgId, MsgLogStatus.DELIVER_SUCCESS);
} else {
log.info("消息发送到Exchange失败, {}, cause: {}", correlationData, cause);
}
});
// 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
rabbitTemplate.setMandatory(true);
/**
* 只要消息没有投递给指定的队列,就触发这个失败回调
* message:投递失败的消息详细信息
* replyCode:回复的状态码
* replyText:回复的文本内容
* exchange:当时这个消息发给哪个交换机
* routingKey:当时这个消息用哪个路邮键
* 修改数据库状态
*/
rabbitTemplate.setReturnsCallback((returnCallback) -> {
Message message = returnCallback.getMessage();
String exchange = returnCallback.getExchange();
int replyCode = returnCallback.getReplyCode();
String routingKey = returnCallback.getRoutingKey();
String replyText = returnCallback.getReplyText();
if(rabbitPropertiesConfig.getDelayed_exchange().equals(exchange)){
/**
* 使用了x-delayed-message 延迟插件,结果每次都强制触发returnedMessage回调方法
* 因为发送方确实没有投递到队列上,只是在交换器上暂存,等过期时间到了 才会发往队列。
* 并非是BUG,而是有原因的,所以使用利用if去拦截这个异常,判断延迟队列交换机名称,然后break;
*/
log.info("如果是延迟队列那么break");
return;
}
log.info("Fail Message[" + message + "]==>replyCode[" + replyCode + "]" +
"==>replyText[" + replyText + "]==>exchange[" + exchange + "]==>routingKey[" + routingKey + "]");
log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange,
routingKey,replyCode,replyText,message);
//todo 没有发送到指定的队列 数据暂存到数据库认定消费失败 再次重新上传
});
}
}
注意rabbitMq延迟队列需要安装插件,可参考官网
配置日志消息类
@Service
@Slf4j
public class MsgLogService {
private final MsgLogMapper msgLogMapper;
public MsgLogService(MsgLogMapper msgLogMapper) {
this.msgLogMapper = msgLogMapper;
}
public void saveMsg(MsgLog msgLog){
msgLogMapper.insert(msgLog);
}
public void updateStatus(String msgId, Integer status) {
log.info("执行");
msgLogMapper.updateStatus(msgId,status);
}
public MsgLog selectByMsgId(String msgId) {
if (!ObjectUtils.isEmpty(msgId)){
return msgLogMapper.seletMsgFormsgId(msgId);
}
return null;
}
public List<MsgLog> selectTimeoutMsg() {
return msgLogMapper.selectTimeOutMsg();
}
public void updateTryCount(String msgId, Integer tryCount) {
MsgLog msgLog = new MsgLog();
msgLog.setMsgId(msgId);
msgLog.setTryCount(tryCount);
msgLogMapper.updateByMsgId(msgLog);
}
}
@Data
@NoArgsConstructor
public class MsgLog {
private static final long serialVersionUID = 4990197789742500403L;
private String msgId;
private JSONObject msg;
private String exchange;
private String routingKey;
private Integer status;
private Integer tryCount;
private String nextTryTime;
private String createTime;
private String updateTime;
private String msgCase;
}
DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.cvdmp.dao.MsgLogMapper">
<insert id="insert" parameterType="com.cvdmp.domain.entity.MsgLog">
insert into msg_log
<trim prefix="(" suffix=")" suffixOverrides=",">
<if test="msgId != null">
msg_id,
if>
<if test="exchange != null">
exchange,
if>
<if test="routingKey != null">
routing_key,
if>
<if test="status != null">
status,
if>
<if test="tryCount != null">
try_count,
if>
<if test="nextTryTime != null">
next_try_time,
if>
<if test="createTime != null">
create_time,
if>
<if test="updateTime != null">
update_time,
if>
<if test="msg != null">
msg,
if>
<if test="msgCase!=null">
msg_case,
if>
trim>
<trim prefix="values (" suffix=")" suffixOverrides=",">
<if test="msgId != null">
#{msgId,jdbcType=VARCHAR},
if>
<if test="exchange != null">
#{exchange,jdbcType=VARCHAR},
if>
<if test="routingKey != null">
#{routingKey,jdbcType=VARCHAR},
if>
<if test="status != null">
#{status,jdbcType=INTEGER},
if>
<if test="tryCount != null">
#{tryCount,jdbcType=INTEGER},
if>
<if test="nextTryTime != null">
#{nextTryTime},
if>
<if test="createTime != null">
#{createTime},
if>
<if test="updateTime != null">
#{updateTime},
if>
<if test="msg != null">
#{msg,typeHandler=com.cvdmp.service.handler.JsonObjectTypeHandler},
if>
<if test="msgCase !=null">
#{msgCase},
if>
trim>
insert>
<update id="updateStatus" parameterType="map">
update msg_log set status = #{status}, update_time = now()
where msg_id = #{msgId}
update>
<select id="selectTimeOutMsg" resultType="com.cvdmp.domain.entity.MsgLog">
select
*
from msg_log
where status = 0
and next_try_time <= now()
select>
<select id="seletMsgFormsgId" parameterType="string" resultType="com.cvdmp.domain.entity.MsgLog">
select
*
from msg_log
where msg_id = #{msgId}
select>
<update id="updateByMsgId" parameterType="com.cvdmp.domain.entity.MsgLog">
update msg_log
<set>
<if test="exchange != null">
exchange = #{exchange,jdbcType=VARCHAR},
if>
<if test="routingKey != null">
routing_key = #{routingKey,jdbcType=VARCHAR},
if>
<if test="status != null">
status = #{status,jdbcType=INTEGER},
if>
<if test="tryCount != null">
try_count = #{tryCount,jdbcType=INTEGER},
if>
<if test="nextTryTime != null">
next_try_time = #{nextTryTime},
if>
<if test="createTime != null">
create_time = #{createTime},
if>
<if test="updateTime != null">
update_time = #{updateTime},
if>
<if test="msg != null">
msg = #{msg,typeHandler=com.cvdmp.service.handler.JsonObjectTypeHandler},
if>
set>
where msg_id = #{msgId,jdbcType=VARCHAR}
update>
mapper>
最后我们定义生产者和消费者
/**
* mq消息推送策略
* 1、通过rabbitmq完成消息的推送保证消息推送成功
* @author daizhihua
* @time 2023/4/25
*/
@Component(value = "mqStrategy")
public class MqStrategyService {
private final RabbitConfig rabbitConfig;
private final MsgLogService msgLogService;
public MqStrategyService(RabbitConfig rabbitConfig, MsgLogService msgLogService) {
this.rabbitConfig = rabbitConfig;
this.msgLogService = msgLogService;
}
public void sendMessage(JSONObject map, HttpServletRequest request) {
RabbitPropertiesConfig rabbitPropertiesConfig = rabbitConfig.getRabbitPropertiesConfig();
String msgId = RandomUtil.getRandomNumber(32);
//设置消息id
map.put("msgId",msgId);
MsgLog msgLog = new MsgLog();
msgLog.setMsgId(msgId);
msgLog.setMsg(map);
msgLog.setExchange(rabbitPropertiesConfig.getSmart_topic_exchange());
msgLog.setRoutingKey(rabbitPropertiesConfig.getSmart_topic_routingKey());
msgLog.setNextTryTime(DateUtil.getNow());
msgLogService.saveMsg(msgLog);
//生成消息的唯一id
CorrelationData correlationData = new CorrelationData(msgId);
RabbitTemplate rabbitTemplate = rabbitConfig.getRabbitTemplate();
// 发送消息
rabbitTemplate.convertAndSend(rabbitPropertiesConfig.getSmart_topic_exchange(),
rabbitPropertiesConfig.getSmart_topic_routingKey(), map, correlationData);
}
/**
* 发送延迟队列消息
* @param map
* @param delayTime
*/
public void sendMessageDelay(JSONObject map,int delayTime){
RabbitPropertiesConfig rabbitPropertiesConfig = rabbitConfig.getRabbitPropertiesConfig();
String msgId = RandomUtil.getRandomNumber(32);
//设置消息id
map.put("msgId",msgId);
MsgLog msgLog = new MsgLog();
msgLog.setMsgId(msgId);
msgLog.setMsg(map);
msgLog.setExchange(rabbitPropertiesConfig.getDelayed_exchange());
msgLog.setRoutingKey(rabbitPropertiesConfig.getDelayed_routingKey());
msgLog.setNextTryTime(DateUtil.getNow());
//生成消息的唯一id
CorrelationData correlationData = new CorrelationData(msgId);
RabbitTemplate rabbitTemplate = rabbitConfig.getRabbitTemplate();
rabbitTemplate.convertAndSend(rabbitPropertiesConfig.getDelayed_exchange(),
rabbitPropertiesConfig.getDelayed_routingKey(), map, message -> {
message.getMessageProperties().setDelay(delayTime);
return message;},correlationData);
}
}
延迟队列的消费
@Slf4j
@Component
@RabbitListener(queues = "${com.acrdpm.delayed_queue}")
public class MessageConsumer {
private final MqStrategyService mqStrategyService;
public MessageConsumer(MqStrategyService mqStrategyService) {
this.mqStrategyService = mqStrategyService;
}
@RabbitHandler
public void consume(Message message, JSONObject map, Channel channel) throws IOException {
System.out.println("First Queue received msg : " );
log.info("数据是:{}",map);
System.out.println(message);
System.out.println(channel);
long tag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(tag, false);
}
}
订阅消息的消费者
@Slf4j
@Component
@RabbitListener(queues = {"${com.acrdpm.smart_topic_queue}"})
public class SmartConsumer {
private MsgLogService msgLogService;
@RabbitHandler
public void consume(Message message, JSONObject mail, Channel channel) throws IOException {
log.info("接收到消息了");
log.info("消息 {}",message);
log.info("收到的消息是:{}",mail);
long tag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(tag, false);
String msgId = mail.getMsgId();
MsgLog msgLog = msgLogService.selectByMsgId(msgId);
if (null == msgLog || msgLog.getStatus().equals(MsgLogStatus.CONSUMED_SUCCESS)) {
// 消费幂等性:确定不是重复的消息:及消费完成的消息
log.info("重复消费, msgId: {}", msgId);
return;
}
//获取投送标签
long tag = message.getMessageProperties().getDeliveryTag();
// boolean success = false;
// if (success) {
// log.info("成功发送消息");
// msgLogService.updateStatus(msgId, MsgLogStatus.CONSUMED_SUCCESS);
// // 消费确认手动ack
// channel.basicAck(tag, false);
// } else {
// channel.basicAck(tag, false);
// }
// try {
boolean success = EmailUtil.sendEmail(mail);
//
// } catch (EmailException e) {
// log.error("email 发送异常" , e);
// } catch (IOException e) {
// log.error("消息处理异常" , e);
// }
}
}
在发送消息的过程中,肯定会出现网络异常等情况所以我们定义了发送消息的持久化,为了保证一致性,可参考如下时序图
