• springboot对接rabbitmq并且实现动态创建队列和消费


    背景

    1、对接多个节点上的MQ(如master-MQ,slave-MQ),若读者需要自己模拟出两个MQ,可以部署多个VM然后参考 docker 安装rabbitmq_Steven-Russell的博客-CSDN博客

    2、队列名称不是固定的,需要接受外部参数,并且通过模板进行格式化,才能够得到队列名称

    3、需要在master-MQ上延迟一段时间,然后将消息再转发给slave-MQ

    问题

    1、采用springboot的自动注入bean需要事先知道队列的名称,但是队列名称是动态的情况下,无法实现自动注入

    2、mq弱依赖,在没有master-mq或者slave-mq时,不能影响到现有能力

    解决方案

    1、由于mq的队列创建、exchange创建以及队列和exchange的绑定关系是可重入的,所以采用connectFactory进行手动声明

    2、增加自定义条件OnMqCondition,防止不必要的bean创建

    总体流程

    实施过程

    搭建springboot项目

    参考 搭建最简单的SpringBoot项目_Steven-Russell的博客-CSDN博客

    引入amqp依赖

    
        org.springframework.boot
        spring-boot-starter-amqp
    

    引入后续会用到的工具类依赖

    
        commons-io
        commons-io
        2.11.0
    
    
        org.projectlombok
        lombok
        1.18.28
        provided
    
    
        com.alibaba.fastjson2
        fastjson2
        2.0.40
    

    创建配置文件

    在application.yml中增加如下配置

    mq:
      master:
        addresses: 192.168.30.128:5672
        username: guest
        password: guest
        vhost: /
      slave:
        addresses: 192.168.30.131:5672
        username: guest
        password: guest
        vhost: /
    

    创建自定义Condition注解和注解实现

    1. package com.wd.config.condition;
    2. import org.springframework.context.annotation.Conditional;
    3. import java.lang.annotation.*;
    4. @Target({ElementType.TYPE, ElementType.METHOD})
    5. @Retention(RetentionPolicy.RUNTIME)
    6. @Documented
    7. @Conditional(OnMqCondition.class)
    8. public @interface MqConditional {
    9. String[] keys();
    10. }
    1. package com.wd.config.condition;
    2. import org.springframework.context.annotation.Condition;
    3. import org.springframework.context.annotation.ConditionContext;
    4. import org.springframework.core.type.AnnotatedTypeMetadata;
    5. import org.springframework.lang.NonNull;
    6. import org.springframework.util.ObjectUtils;
    7. import java.util.Map;
    8. public class OnMqCondition implements Condition {
    9. @Override
    10. public boolean matches(@NonNull ConditionContext context, @NonNull AnnotatedTypeMetadata metadata) {
    11. Map annotationAttributes = metadata.getAnnotationAttributes(MqConditional.class.getName());
    12. if (annotationAttributes == null || annotationAttributes.isEmpty()) {
    13. // 为空则不进行校验了
    14. return true;
    15. }
    16. String[] keys = (String[])annotationAttributes.get("keys");
    17. for (String key : keys) {
    18. String property = context.getEnvironment().getProperty(key);
    19. if (ObjectUtils.isEmpty(property)) {
    20. return false;
    21. }
    22. }
    23. return true;
    24. }
    25. }

    创建多个链接工厂connectFactory

    1. package com.wd.config;
    2. import com.wd.config.condition.MqConditional;
    3. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    4. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    5. import org.springframework.beans.factory.annotation.Value;
    6. import org.springframework.context.annotation.Bean;
    7. import org.springframework.context.annotation.Configuration;
    8. import org.springframework.context.annotation.Primary;
    9. @Configuration
    10. public class MqConnectionFactory {
    11. @Value("${mq.master.addresses}")
    12. private String masterAddresses;
    13. @Value("${mq.master.username}")
    14. private String masterUsername;
    15. @Value("${mq.master.password}")
    16. private String masterPassword;
    17. @Value("${mq.master.vhost}")
    18. private String masterVhost;
    19. @Value("${mq.slave.addresses}")
    20. private String slaveAddresses;
    21. @Value("${mq.slave.username}")
    22. private String slaveUsername;
    23. @Value("${mq.slave.password}")
    24. private String slavePassword;
    25. @Value("${mq.slave.vhost}")
    26. private String slaveVhost;
    27. @Bean
    28. @Primary
    29. @MqConditional(keys = {"mq.master.addresses", "mq.master.vhost", "mq.master.username", "mq.master.password"})
    30. public ConnectionFactory masterConnectionFactory() {
    31. return doCreateConnectionFactory(masterAddresses, masterUsername, masterPassword, masterVhost);
    32. }
    33. @Bean
    34. @MqConditional(keys = {"mq.slave.addresses", "mq.slave.vhost", "mq.slave.username", "mq.slave.password"})
    35. public ConnectionFactory slaveConnectionFactory() {
    36. return doCreateConnectionFactory(slaveAddresses, slaveUsername, slavePassword, slaveVhost);
    37. }
    38. private ConnectionFactory doCreateConnectionFactory(String addresses,
    39. String username,
    40. String password,
    41. String vhost) {
    42. CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
    43. cachingConnectionFactory.setAddresses(addresses);
    44. cachingConnectionFactory.setUsername(username);
    45. cachingConnectionFactory.setPassword(password);
    46. cachingConnectionFactory.setVirtualHost(vhost);
    47. return cachingConnectionFactory;
    48. }
    49. }

    创建交换机名称枚举 DeclareQueueExchange

    1. package com.wd.config;
    2. public enum DeclareQueueExchange {
    3. EXCHANGE("exchange"),
    4. DEAD_EXCHANGE("deadExchange"),
    5. DELAY_EXCHANGE("delayExchange");
    6. private final String exchangeName;
    7. DeclareQueueExchange(String exchangeName) {
    8. this.exchangeName = exchangeName;
    9. }
    10. public String getExchangeName() {
    11. return exchangeName;
    12. }
    13. }

    创建消息队列模板枚举 DeclareQueueName

    1. package com.wd.config;
    2. public enum DeclareQueueName {
    3. DELAY_QUEUE_NAME_SUFFIX("_delay"),
    4. DEAD_QUEUE_NAME_SUFFIX("_dead"),
    5. QUEUE_NAME_TEMPLATE("wd.simple.queue.{0}");
    6. private final String queueName;
    7. DeclareQueueName(String queueName) {
    8. this.queueName = queueName;
    9. }
    10. public String getQueueName() {
    11. return queueName;
    12. }
    13. }

    创建消息VO和消息

    1. package com.wd.controller.vo;
    2. import com.wd.pojo.Phone;
    3. import lombok.Data;
    4. @Data
    5. public class DelayMsgVo {
    6. private String queueId;
    7. private Phone phone;
    8. }
    1. package com.wd.pojo;
    2. import lombok.AllArgsConstructor;
    3. import lombok.Data;
    4. import lombok.NoArgsConstructor;
    5. import java.io.Serializable;
    6. import java.util.Date;
    7. import java.util.List;
    8. @Data
    9. @AllArgsConstructor
    10. @NoArgsConstructor
    11. public class Phone implements Serializable {
    12. private static final long serialVersionUID = -1L;
    13. private String id;
    14. private String name;
    15. private Date createTime;
    16. private List userList;
    17. }
    1. package com.wd.pojo;
    2. import lombok.AllArgsConstructor;
    3. import lombok.Data;
    4. import lombok.NoArgsConstructor;
    5. import java.io.Serializable;
    6. import java.util.Date;
    7. @Data
    8. @AllArgsConstructor
    9. @NoArgsConstructor
    10. public class User implements Serializable {
    11. private static final long serialVersionUID = -1L;
    12. private String username;
    13. private Date create;
    14. }

    定义队列id列表缓存,用于替换三方缓存,用于队列名称模板初始化

    1. package com.wd.config;
    2. import java.util.ArrayList;
    3. import java.util.List;
    4. public interface QueueIdListConfig {
    5. /**
    6. * 先用本地缓存维护队列id
    7. */
    8. List QUEUE_ID_LIST = new ArrayList() {{
    9. add(111);
    10. add(222);
    11. add(333);
    12. }};
    13. }

    创建消息接受入口 controller

    注意:此处就以web用户输入为入口,所以创建controller

    1. package com.wd.controller;
    2. import com.alibaba.fastjson2.JSONObject;
    3. import com.rabbitmq.client.*;
    4. import com.wd.config.DeclareQueueExchange;
    5. import com.wd.config.DeclareQueueName;
    6. import com.wd.controller.vo.DelayMsgVo;
    7. import org.springframework.amqp.rabbit.connection.Connection;
    8. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    9. import org.springframework.beans.factory.annotation.Qualifier;
    10. import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
    11. import org.springframework.web.bind.annotation.*;
    12. import java.io.IOException;
    13. import java.nio.charset.StandardCharsets;
    14. import java.text.MessageFormat;
    15. import java.util.HashMap;
    16. import java.util.Map;
    17. import java.util.concurrent.TimeoutException;
    18. @RestController
    19. @ConditionalOnBean(value = ConnectionFactory.class, name = "masterConnectionFactory")
    20. public class DynamicCreateQueueController {
    21. private final ConnectionFactory masterConnectionFactory;
    22. public DynamicCreateQueueController(@Qualifier(value = "masterConnectionFactory") ConnectionFactory masterConnectionFactory) {
    23. this.masterConnectionFactory = masterConnectionFactory;
    24. }
    25. @PostMapping(value = "sendDelayMsg")
    26. public String sendMsg2DelayQueue(@RequestBody DelayMsgVo delayMsgVo) throws IOException, TimeoutException {
    27. doSendMsg2DelayQueue(delayMsgVo);
    28. return "success";
    29. }
    30. private void doSendMsg2DelayQueue(DelayMsgVo delayMsgVo) throws IOException, TimeoutException {
    31. // 根据id 动态生成队列名称
    32. String queueNameTemplate = DeclareQueueName.QUEUE_NAME_TEMPLATE.getQueueName();
    33. String queueName = MessageFormat.format(queueNameTemplate, delayMsgVo.getQueueId());
    34. String delayQueueName = queueName + DeclareQueueName.DELAY_QUEUE_NAME_SUFFIX.getQueueName();
    35. String deadQueueName = queueName + DeclareQueueName.DEAD_QUEUE_NAME_SUFFIX.getQueueName();
    36. // 注意:下述声明交换机和队列的操作是可以重入的,MQ并不会报错
    37. try (Connection connection = masterConnectionFactory.createConnection();
    38. Channel channel = connection.createChannel(false)){
    39. // 声明死信交换机
    40. channel.exchangeDeclare(DeclareQueueExchange.DEAD_EXCHANGE.getExchangeName(), BuiltinExchangeType.DIRECT);
    41. // 声明死信队列
    42. AMQP.Queue.DeclareOk deadQueueDeclareOk = channel.queueDeclare(deadQueueName,
    43. true, false, false, null);
    44. // 定时任务 绑定消费者,避免出现多个消费者以及重启后无法消费存量消息的问题
    45. // 注意:因为需要保证消费顺序,所以此处仅声明一个消费者
    46. // 死信队列和交换机绑定
    47. channel.queueBind(deadQueueName, DeclareQueueExchange.DEAD_EXCHANGE.getExchangeName(), deadQueueName);
    48. // 声明延迟队列
    49. Map args = new HashMap<>();
    50. //设置延迟队列绑定的死信交换机
    51. args.put("x-dead-letter-exchange", DeclareQueueExchange.DEAD_EXCHANGE.getExchangeName());
    52. //设置延迟队列绑定的死信路由键
    53. args.put("x-dead-letter-routing-key", deadQueueName);
    54. //设置延迟队列的 TTL 消息存活时间
    55. args.put("x-message-ttl", 10 * 1000);
    56. channel.queueDeclare(delayQueueName, true, false, false, args);
    57. channel.exchangeDeclare(DeclareQueueExchange.DELAY_EXCHANGE.getExchangeName(), BuiltinExchangeType.DIRECT);
    58. channel.queueBind(delayQueueName, DeclareQueueExchange.DELAY_EXCHANGE.getExchangeName(), delayQueueName);
    59. // 发送消息到延迟队列
    60. channel.basicPublish(DeclareQueueExchange.DELAY_EXCHANGE.getExchangeName(), delayQueueName, null,
    61. JSONObject.toJSONString(delayMsgVo.getPhone()).getBytes(StandardCharsets.UTF_8));
    62. }
    63. }
    64. }

    创建master延迟消息消费者

    1. package com.wd.mq.consumer;
    2. import com.rabbitmq.client.*;
    3. import com.wd.config.DeclareQueueExchange;
    4. import com.wd.config.DeclareQueueName;
    5. import org.springframework.amqp.rabbit.connection.Connection;
    6. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    7. import java.io.IOException;
    8. import java.util.concurrent.TimeoutException;
    9. /**
    10. * 死信消费者,消费消息转发给targetConnectionFactory对应的目标MQ
    11. */
    12. public class MasterDeadQueueConsumer extends DefaultConsumer {
    13. private final ConnectionFactory targetConnectionFactory;
    14. public MasterDeadQueueConsumer(Channel channel, ConnectionFactory targetConnectionFactory) {
    15. super(channel);
    16. this.targetConnectionFactory = targetConnectionFactory;
    17. }
    18. @Override
    19. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    20. // 从死信队列的名称中截取队列名称,作为后续队列的名称
    21. String routingKey = envelope.getRoutingKey();
    22. String targetQueueName = routingKey.substring(0, routingKey.length() - DeclareQueueName.DEAD_QUEUE_NAME_SUFFIX.getQueueName().length());
    23. try (Connection targetConnection = targetConnectionFactory.createConnection();
    24. Channel targetChannel = targetConnection.createChannel(false)){
    25. // 声明交换机和队列
    26. targetChannel.exchangeDeclare(DeclareQueueExchange.EXCHANGE.getExchangeName(), BuiltinExchangeType.DIRECT);
    27. targetChannel.queueDeclare(targetQueueName, true, false, false, null);
    28. targetChannel.queueBind(targetQueueName, DeclareQueueExchange.EXCHANGE.getExchangeName(), targetQueueName);
    29. // 转发消息
    30. targetChannel.basicPublish(DeclareQueueExchange.EXCHANGE.getExchangeName(), targetQueueName, properties, body);
    31. } catch (TimeoutException e) {
    32. e.printStackTrace();
    33. // 注意此处获取的源队列的channel
    34. getChannel().basicNack(envelope.getDeliveryTag(), false, true);
    35. }
    36. // 注意此处获取的源队列的channel
    37. getChannel().basicAck(envelope.getDeliveryTag(), false);
    38. }
    39. }

    创建slave队列消息消费者

    1. package com.wd.mq.consumer;
    2. import com.alibaba.fastjson2.JSONObject;
    3. import com.rabbitmq.client.AMQP;
    4. import com.rabbitmq.client.Channel;
    5. import com.rabbitmq.client.DefaultConsumer;
    6. import com.rabbitmq.client.Envelope;
    7. import com.wd.pojo.Phone;
    8. import java.io.IOException;
    9. public class SlaveQueueConsumer extends DefaultConsumer {
    10. public SlaveQueueConsumer(Channel channel) {
    11. super(channel);
    12. }
    13. @Override
    14. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    15. Phone phone = JSONObject.parseObject(new String(body), Phone.class);
    16. System.out.println("SlaveQueueConsumer consume ==> " + phone);
    17. getChannel().basicAck(envelope.getDeliveryTag(), false);
    18. }
    19. }

    创建定时任务,消费延迟消息

    注意:因为采用的是死信队列的方式实现的延迟效果,此处只需要消费对应的死信队列即可

    1. package com.wd.mq.quartz;
    2. import com.rabbitmq.client.AMQP;
    3. import com.rabbitmq.client.BuiltinExchangeType;
    4. import com.rabbitmq.client.Channel;
    5. import com.wd.config.DeclareQueueExchange;
    6. import com.wd.config.DeclareQueueName;
    7. import com.wd.config.QueueIdListConfig;
    8. import com.wd.mq.consumer.MasterDeadQueueConsumer;
    9. import org.springframework.amqp.rabbit.connection.Connection;
    10. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    11. import org.springframework.beans.factory.annotation.Qualifier;
    12. import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
    13. import org.springframework.context.annotation.Configuration;
    14. import org.springframework.scheduling.annotation.Scheduled;
    15. import java.io.IOException;
    16. import java.text.MessageFormat;
    17. import java.util.concurrent.TimeoutException;
    18. @Configuration
    19. @ConditionalOnBean(value = ConnectionFactory.class, name = {"slaveConnectionFactory", "masterConnectionFactory"})
    20. public class MasterDeadQueueSubscribeProcessor {
    21. private final ConnectionFactory masterConnectionFactory;
    22. private final ConnectionFactory slaveConnectionFactory;
    23. public MasterDeadQueueSubscribeProcessor(@Qualifier(value = "masterConnectionFactory") ConnectionFactory masterConnectionFactory,
    24. @Qualifier(value = "slaveConnectionFactory") ConnectionFactory slaveConnectionFactory) {
    25. this.masterConnectionFactory = masterConnectionFactory;
    26. this.slaveConnectionFactory = slaveConnectionFactory;
    27. }
    28. /**
    29. * 消费死信队列信息,并且转发到其他mq
    30. */
    31. @Scheduled(fixedDelay = 10 * 1000)
    32. public void subscribeMasterDeadQueue() throws IOException, TimeoutException {
    33. // 根据id 动态生成队列名称
    34. // 此处的queueIdList可以从第三方缓存查询得到,并且和sendDelayMsg接口保持同步刷新,此处先用本地缓存代替,id同步刷新机制不是重点,此处暂不讨论
    35. for (Integer id : QueueIdListConfig.QUEUE_ID_LIST) {
    36. String queueNameTemplate = DeclareQueueName.QUEUE_NAME_TEMPLATE.getQueueName();
    37. String deadQueueName = MessageFormat.format(queueNameTemplate, id) + DeclareQueueName.DEAD_QUEUE_NAME_SUFFIX.getQueueName();
    38. try (Connection connection = masterConnectionFactory.createConnection();
    39. Channel channel = connection.createChannel(false)){
    40. AMQP.Queue.DeclareOk queueDeclare = channel.queueDeclare(deadQueueName, true, false, false, null);
    41. if (queueDeclare.getConsumerCount() == 0) {
    42. channel.exchangeDeclare(DeclareQueueExchange.DEAD_EXCHANGE.getExchangeName(), BuiltinExchangeType.DIRECT);
    43. }
    44. channel.queueBind(deadQueueName, DeclareQueueExchange.DEAD_EXCHANGE.getExchangeName(), deadQueueName);
    45. channel.basicConsume(deadQueueName, false, new MasterDeadQueueConsumer(channel, slaveConnectionFactory));
    46. }
    47. }
    48. }
    49. }

    创建定时任务,消费slave队列的消息

    1. package com.wd.mq.quartz;
    2. import com.rabbitmq.client.AMQP;
    3. import com.rabbitmq.client.BuiltinExchangeType;
    4. import com.rabbitmq.client.Channel;
    5. import com.wd.config.DeclareQueueExchange;
    6. import com.wd.config.DeclareQueueName;
    7. import com.wd.config.QueueIdListConfig;
    8. import com.wd.mq.consumer.SlaveQueueConsumer;
    9. import org.springframework.amqp.rabbit.connection.Connection;
    10. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    11. import org.springframework.beans.factory.annotation.Qualifier;
    12. import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
    13. import org.springframework.context.annotation.Configuration;
    14. import org.springframework.scheduling.annotation.Scheduled;
    15. import java.io.IOException;
    16. import java.text.MessageFormat;
    17. import java.util.concurrent.TimeoutException;
    18. @Configuration
    19. @ConditionalOnBean(value = ConnectionFactory.class, name = "slaveConnectionFactory")
    20. public class SlaveQueueSubscribeProcessor {
    21. private final ConnectionFactory slaveConnectionFactory;
    22. public SlaveQueueSubscribeProcessor(@Qualifier(value = "slaveConnectionFactory") ConnectionFactory slaveConnectionFactory) {
    23. this.slaveConnectionFactory = slaveConnectionFactory;
    24. }
    25. /**
    26. * 消费队列信息
    27. */
    28. @Scheduled(fixedDelay = 10 * 1000)
    29. public void subscribeSlaveDeadQueue() throws IOException, TimeoutException {
    30. // 根据id 动态生成队列名称
    31. // 此处的queueIdList可以从第三方缓存查询得到,并且和sendDelayMsg接口保持同步刷新,此处先用本地缓存代替
    32. for (Integer id : QueueIdListConfig.QUEUE_ID_LIST) {
    33. String queueNameTemplate = DeclareQueueName.QUEUE_NAME_TEMPLATE.getQueueName();
    34. String queueName = MessageFormat.format(queueNameTemplate, id);
    35. try (Connection connection = slaveConnectionFactory.createConnection();
    36. Channel channel = connection.createChannel(false)){
    37. AMQP.Queue.DeclareOk queueDeclare = channel.queueDeclare(queueName, true, false, false, null);
    38. if (queueDeclare.getConsumerCount() == 0) {
    39. channel.basicConsume(queueName, false, new SlaveQueueConsumer(channel));
    40. }
    41. channel.exchangeDeclare(DeclareQueueExchange.EXCHANGE.getExchangeName(), BuiltinExchangeType.DIRECT);
    42. channel.queueBind(queueName, DeclareQueueExchange.EXCHANGE.getExchangeName(), queueName);
    43. }
    44. }
    45. }
    46. }

    启动项目

    请求接口发送消息 http://localhost:8080/sendDelayMsg

    检查消息传递过程

    先在master-mq延迟队列发现消息

    再到master-mq死信队列中发现消息

    再到slave-mq中发现消息

    检查日志打印

    发现SlaveQueueConsumer打印如下日志:

    结论

    消息传递流程如下,验证通过

  • 相关阅读:
    DeepExploit——基于强化学习的自动渗透工具
    UE5 ChaosVehicles载具 增加方向盘动画 (连载三)
    union all 和 union 的区别,mysql union全连接查询
    Qt Installation命名问题导致vs编译Qt项目出错
    LibAlias
    猿创征文|程序员都应该了解的十款开发神器
    js继承的几种方式(原型链继承、构造函数继承、组合式继承、寄生组合式继承、ES6的Class类继承)
    浅析SR隧道路径批量构造方法
    半结构化数据
    【LeetCode】No.75. Sort Colors -- Java Version
  • 原文地址:https://blog.csdn.net/weixin_43317111/article/details/133106940