• springboot集成rabbitmmq多数据源,解决对源码不熟悉导致多个源出现同样队列,交换机等问题


    多数据源配置类

    1. /**
    2. *
    3. * @ClassName: RabbitMqMultiDataSourceConfig
    4. * @Description: 集成rabbitmq 多数据源
    5. * @author cqj
    6. * @date 2022/7/01
    7. */
    8. @Configuration
    9. public class RabbitMqMultiDataSourceConfig {
    10. @Bean(name = "secondConnectionFactory")
    11. public CachingConnectionFactory mqEquipControlConnectionFactory(
    12. @Value("${mq.second.host}") String host,
    13. @Value("${mq.second.port}") int port,
    14. @Value("${mq.second.username}") String username,
    15. @Value("${mq.second.password}") String password) {
    16. return connectionFactory(host, port, username, password);
    17. }
    18. public CachingConnectionFactory connectionFactory(String host, int port, String username, String password) {
    19. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    20. connectionFactory.setHost(host);
    21. connectionFactory.setPort(port);
    22. connectionFactory.setUsername(username);
    23. connectionFactory.setPassword(password);
    24. return connectionFactory;
    25. }
    26. @Bean(name = "secondFactory")
    27. public SimpleRabbitListenerContainerFactory secondFactory(
    28. SimpleRabbitListenerContainerFactoryConfigurer configurer,
    29. @Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) {
    30. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    31. factory.setMaxConcurrentConsumers(20);
    32. factory.setAcknowledgeMode(AcknowledgeMode.NONE);
    33. factory.setConcurrentConsumers(10);
    34. configurer.configure(factory, connectionFactory);
    35. return factory;
    36. }
    37. @Bean(name = "secondRabbitTemplate")
    38. public RabbitTemplate secondRabbitTemplate(@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) {
    39. RabbitTemplate template = new RabbitTemplate(connectionFactory);
    40. // 这里的转换器设置实现了发送消息时自动序列化消息对象为message body
    41. template.setMessageConverter(new Jackson2JsonMessageConverter());
    42. return template;
    43. }
    44. @Bean(name = "secondRabbitAdmin")
    45. public RabbitAdmin rabbitAdmin(@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) {
    46. RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
    47. rabbitAdmin.setAutoStartup(false);
    48. //设置忽略声明异常
    49. rabbitAdmin.setIgnoreDeclarationExceptions(true);
    50. return rabbitAdmin;
    51. }
    52. @Bean(name = "primaryConnectionFactory")
    53. @Primary
    54. public CachingConnectionFactory primaryConnectionFactory(
    55. @Value("${spring.rabbitmq.host}") String host,
    56. @Value("${spring.rabbitmq.port}") int port,
    57. @Value("${spring.rabbitmq.username}") String username,
    58. @Value("${spring.rabbitmq.password}") String password) {
    59. return connectionFactory(host, port, username, password);
    60. }
    61. @Bean(name = "primaryFactory")
    62. @Primary
    63. public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(@Qualifier("primaryConnectionFactory") ConnectionFactory connectionFactory) {
    64. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    65. factory.setConnectionFactory(connectionFactory);
    66. //当前的消费者数量
    67. factory.setConcurrentConsumers(1);
    68. factory.setMaxConcurrentConsumers(1);
    69. //是否重回队列
    70. factory.setDefaultRequeueRejected(true);
    71. return factory;
    72. }
    73. @Bean
    74. @Primary
    75. public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
    76. RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
    77. //设置忽略声明异常
    78. rabbitAdmin.setIgnoreDeclarationExceptions(true);
    79. return rabbitAdmin;
    80. }
    81. }

    上述代码就是springboot多数据源配置

    集成多数据源RabbitAdmin这玩意大家不陌生吧,这里需要说明的是

    如果RabbitAdmin不设置autoStartup属性(true/false)默认是true。会默认项目启动时候自动创建队列跟交换机。
    
    源码位置
    
    1. /**
    2. * If {@link #setAutoStartup(boolean) autoStartup} is set to true, registers a callback on the
    3. * {@link ConnectionFactory} to declare all exchanges and queues in the enclosing application context. If the
    4. * callback fails then it may cause other clients of the connection factory to fail, but since only exchanges,
    5. * queues and bindings are declared failure is not expected.
    6. *
    7. * @see InitializingBean#afterPropertiesSet()
    8. * @see #initialize()
    9. */
    10. @Override
    11. public void afterPropertiesSet() {
    12. synchronized (this.lifecycleMonitor) {
    13. if (this.running || !this.autoStartup) {
    14. return;
    15. }
    16. if (this.retryTemplate == null && !this.retryDisabled) {
    17. this.retryTemplate = new RetryTemplate();
    18. this.retryTemplate.setRetryPolicy(new SimpleRetryPolicy(DECLARE_MAX_ATTEMPTS));
    19. ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    20. backOffPolicy.setInitialInterval(DECLARE_INITIAL_RETRY_INTERVAL);
    21. backOffPolicy.setMultiplier(DECLARE_RETRY_MULTIPLIER);
    22. backOffPolicy.setMaxInterval(DECLARE_MAX_RETRY_INTERVAL);
    23. this.retryTemplate.setBackOffPolicy(backOffPolicy);
    24. }
    25. if (this.connectionFactory instanceof CachingConnectionFactory &&
    26. ((CachingConnectionFactory) this.connectionFactory).getCacheMode() == CacheMode.CONNECTION) {
    27. this.logger.warn("RabbitAdmin auto declaration is not supported with CacheMode.CONNECTION");
    28. return;
    29. }
    30. // Prevent stack overflow...
    31. final AtomicBoolean initializing = new AtomicBoolean(false);
    32. this.connectionFactory.addConnectionListener(connection -> {
    33. if (!initializing.compareAndSet(false, true)) {
    34. // If we are already initializing, we don't need to do it again...
    35. return;
    36. }
    37. try {
    38. /*
    39. * ...but it is possible for this to happen twice in the same ConnectionFactory (if more than
    40. * one concurrent Connection is allowed). It's idempotent, so no big deal (a bit of network
    41. * chatter). In fact it might even be a good thing: exclusive queues only make sense if they are
    42. * declared for every connection. If anyone has a problem with it: use auto-startup="false".
    43. */
    44. if (this.retryTemplate != null) {
    45. this.retryTemplate.execute(c -> {
    46. initialize();
    47. return null;
    48. });
    49. }
    50. else {
    51. initialize();
    52. }
    53. }
    54. finally {
    55. initializing.compareAndSet(true, false);
    56. }
    57. });
    58. this.running = true;
    59. }
    60. }
    上面代码initialize()方法会去初始化队列交换机等。源码如下:
    1. /**
    2. * Declares all the exchanges, queues and bindings in the enclosing application context, if any. It should be safe
    3. * (but unnecessary) to call this method more than once.
    4. */
    5. @Override // NOSONAR complexity
    6. public void initialize() {
    7. if (this.applicationContext == null) {
    8. this.logger.debug("no ApplicationContext has been set, cannot auto-declare Exchanges, Queues, and Bindings");
    9. return;
    10. }
    11. this.logger.debug("Initializing declarations");
    12. Collection<Exchange> contextExchanges = new LinkedList<Exchange>(
    13. this.applicationContext.getBeansOfType(Exchange.class).values());
    14. Collection<Queue> contextQueues = new LinkedList<Queue>(
    15. this.applicationContext.getBeansOfType(Queue.class).values());
    16. Collection<Binding> contextBindings = new LinkedList<Binding>(
    17. this.applicationContext.getBeansOfType(Binding.class).values());
    18. Collection<DeclarableCustomizer> customizers =
    19. this.applicationContext.getBeansOfType(DeclarableCustomizer.class).values();
    20. processDeclarables(contextExchanges, contextQueues, contextBindings);
    21. final Collection<Exchange> exchanges = filterDeclarables(contextExchanges, customizers);
    22. final Collection<Queue> queues = filterDeclarables(contextQueues, customizers);
    23. final Collection<Binding> bindings = filterDeclarables(contextBindings, customizers);
    24. for (Exchange exchange : exchanges) {
    25. if ((!exchange.isDurable() || exchange.isAutoDelete()) && this.logger.isInfoEnabled()) {
    26. this.logger.info("Auto-declaring a non-durable or auto-delete Exchange ("
    27. + exchange.getName()
    28. + ") durable:" + exchange.isDurable() + ", auto-delete:" + exchange.isAutoDelete() + ". "
    29. + "It will be deleted by the broker if it shuts down, and can be redeclared by closing and "
    30. + "reopening the connection.");
    31. }
    32. }
    33. for (Queue queue : queues) {
    34. if ((!queue.isDurable() || queue.isAutoDelete() || queue.isExclusive()) && this.logger.isInfoEnabled()) {
    35. this.logger.info("Auto-declaring a non-durable, auto-delete, or exclusive Queue ("
    36. + queue.getName()
    37. + ") durable:" + queue.isDurable() + ", auto-delete:" + queue.isAutoDelete() + ", exclusive:"
    38. + queue.isExclusive() + ". "
    39. + "It will be redeclared if the broker stops and is restarted while the connection factory is "
    40. + "alive, but all messages will be lost.");
    41. }
    42. }
    43. if (exchanges.size() == 0 && queues.size() == 0 && bindings.size() == 0) {
    44. this.logger.debug("Nothing to declare");
    45. return;
    46. }
    47. this.rabbitTemplate.execute(channel -> {
    48. declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));
    49. declareQueues(channel, queues.toArray(new Queue[queues.size()]));
    50. declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));
    51. return null;
    52. });
    53. this.logger.debug("Declarations finished");
    54. }
    上述源码就是在自动初始化队列跟交换机等。
    
    
    这里需要说明一下多数据源一般情况都是在原有项目的基础在在添加信息rabbitmmq数据源,一般情况会默认自动创建队列,如果手动创建当我没说。
    

    如果是多数据源情况第二数据源或者更多数据源配置信息RabbitAdmin类里面的autoStartup属性设置为false,这时候就不会自动创建队列跟交换机,需要手动取创建。这样就不会出现多个数据源初始化创建相同的队列跟交换机。

    这里特别强调一下@RabbitListener等绑定队列交换机是不会创建的,如下代码第二数据源并不对创建队列

    1. /**
    2. * MQ接收刀具补偿消息
    3. *
    4. * @param msg
    5. */
    6. @RabbitListener(bindings = @QueueBinding(
    7. value = @Queue(value = "test", durable = "true", autoDelete = "false"),
    8. exchange = @Exchange(value = "testExchange", type = ExchangeTypes.DIRECT), key = "testRouting"
    9. ), containerFactory = "secondFactory")
    10. @RabbitHandler
    11. public void testMsg(Message msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
    12. try {
    13. if (msg.getBody() != null) {
    14. Date date = new Date();
    15. String strJson = new String(msg.getBody());
    16. channel.basicAck(deliveryTag, false);
    17. log.error("MQ接收到刀具补偿消息:{}", msg);
    18. }
    19. } catch (Exception e) {
    20. log.error("MQ接收到刀具补偿消息失败,消息内容:{}", e);
    21. log.error("MQ接收到刀具补偿消息失败,消息内容:{}", new String(msg.getBody()));
    22. }
    23. }

    如下是我写的一个第二数据源手动创建队列demo

    1. /**
    2. * @ClassName: RabbitSecondQueueConfig
    3. * @Description: []
    4. * @Author: cqj
    5. * @Date: 2022/7/1
    6. * @Version 1.0
    7. */
    8. @Configuration
    9. public class RabbitSecondQueueConfig {
    10. @Resource(name = "secondRabbitAdmin")
    11. private RabbitAdmin secondRabbitAdmin;
    12. @Bean
    13. void EcuOperationLogQueueDirect() {
    14. Queue queue = new Queue("test",true);
    15. secondRabbitAdmin.declareQueue(queue);
    16. DirectExchange directExchange = new DirectExchange("testExchange",true,false);
    17. secondRabbitAdmin.declareExchange(directExchange);
    18. secondRabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(directExchange).with("testRouting"));
    19. }
    20. }

    然后写监听消费队列测试代码如下:

    1. @RabbitListener(queues = "test", containerFactory = "secondFactory")
    2. @RabbitHandler
    3. public void testMsg(Message msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
    4. try {
    5. if (msg.getBody() != null) {
    6. Date date = new Date();
    7. String strJson = new String(msg.getBody());
    8. channel.basicAck(deliveryTag, false);
    9. log.error("消息:{}", msg);
    10. }
    11. } catch (Exception e) {
    12. log.error("MQ接收消息失败,消息内容:{}", e);
    13. log.error("MQ接消息失败,消息内容:{}", new String(msg.getBody()));
    14. }
    15. }

    如上代码均为测试代码,感兴趣的同伴可以自己测试,根据自己情况来更改,不过大体思路就是这样。

    本人技术有限,以前没仔细研究源码,所以一直困惑着。大佬不喜勿喷,哪里不足请多指点,小弟我会十分感谢。

     
    
    
                    
  • 相关阅读:
    Spark 连接 Mongodb 批量读取数据
    visual studio code 创建 SSH 远程连接
    谁懂啊!自制的科普安全手册居然火了
    [北大集训2021]经典游戏
    基于Xilinx平台MicroBlaze的SPI方式FatFs移植
    Linux 文件系统Ramfs, rootfs and initramfs
    CTO不让用Calendar,那用啥?
    vue内置组件keep-alive多级路由缓存最佳实践
    java中的try-with-resource语法
    vue小技能:组件的 data 选项、methods 选项、计算属性、侦听器
  • 原文地址:https://blog.csdn.net/chenqunjian/article/details/125550954