• springBoot整合rabbitMQ,通过注册bean的方式创建exchange,queue,容器中成功注入,但rabbitserver中不显示


    一:解决办法
    1、添加一个监听器
    @RabbitListener(queues = "你的队列名")
    public void handle(Message message){
    
    }
    2、或者加入如下bean创建一个connection 
    @Bean
    ApplicationRunner runner(ConnectionFactory cf) {
        return args -> cf.createConnection().close();
    }
    
    
    二原理:
    因为在rabbitMQ采用的是懒加载, connection 启动时不会创建,下面第6步的 ConnectionListener 为空,因此需要创建一个连接,通过 ConnectionFactory接口 调用createConnection()方法创建一个连接注册到容器中
    1. public interface ConnectionFactory {
    2. Connection createConnection() throws AmqpException;
    1. public interface ConnectionListener {
    2. void onCreate(Connection var1);
    3. default void onClose(Connection connection) {
    4. }
    5. default void onShutDown(ShutdownSignalException signal) {
    6. }
    7. default void onFailed(Exception exception) {
    8. }
    9. }
     
    

    下面为springboot为自动创建rabbit的交换机,队列,绑定关系的原理

    1、springBoot自动帮助我们创建RabbitMQ的Queue和Exchange是通过 RabbitAutoConfiguration类

    1. @AutoConfiguration
    2. @ConditionalOnClass({ RabbitTemplate.class, Channel.class })
    3. @EnableConfigurationProperties(RabbitProperties.class)
    4. @Import({ RabbitAnnotationDrivenConfiguration.class, RabbitStreamConfiguration.class })
    5. public class RabbitAutoConfiguration

    2、该类中注入了AmqpAdmin,可以看到是通过RabbitAdmin方法创建的连接

    1. @Bean
    2. @ConditionalOnSingleCandidate(ConnectionFactory.class)
    3. @ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true)
    4. @ConditionalOnMissingBean
    5. public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
    6. return new RabbitAdmin(connectionFactory);
    7. }

    3、查看 RabbitAdmin类,可以看到实现了InitializingBean接口,这个接口是用于实例化bean

    1. @ManagedResource(
    2. description = "Admin Tasks"
    3. )
    4. public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, ApplicationEventPublisherAware, BeanNameAware, InitializingBean

    4、查看InitializingBean接口,只有afterPropertiesSet一个接口

    1. package org.springframework.beans.factory;
    2. public interface InitializingBean {
    3. void afterPropertiesSet() throws Exception;
    4. }

    5、查看RabbitAdmin中是如何实现这个接口

    1. public void afterPropertiesSet() {
    2. synchronized(this.lifecycleMonitor) {
    3. if (!this.running && this.autoStartup) {
    4. if (this.retryTemplate == null && !this.retryDisabled) {
    5. this.retryTemplate = new RetryTemplate();
    6. this.retryTemplate.setRetryPolicy(new SimpleRetryPolicy(5));
    7. ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    8. backOffPolicy.setInitialInterval(1000L);
    9. backOffPolicy.setMultiplier(2.0D);
    10. backOffPolicy.setMaxInterval(5000L);
    11. this.retryTemplate.setBackOffPolicy(backOffPolicy);
    12. }
    13. if (this.connectionFactory instanceof CachingConnectionFactory && ((CachingConnectionFactory)this.connectionFactory).getCacheMode() == CacheMode.CONNECTION) {
    14. this.logger.warn("RabbitAdmin auto declaration is not supported with CacheMode.CONNECTION");
    15. } else {
    16. AtomicBoolean initializing = new AtomicBoolean(false);
    17. this.connectionFactory.addConnectionListener((connection) -> {
    18. if (initializing.compareAndSet(false, true)) {
    19. try {
    20. if (this.retryTemplate != null) {
    21. this.retryTemplate.execute((c) -> {
    22. this.initialize();
    23. return null;
    24. });
    25. } else {
    26. this.initialize();
    27. }
    28. } finally {
    29. initializing.compareAndSet(true, false);
    30. }
    31. }
    32. });
    33. this.running = true;
    34. }
    35. }
    36. }
    37. }

    6、可以看到连接工厂添加了一个连接监听器,在里面调用了initalize方法

    1. this.connectionFactory.addConnectionListener((connection) -> {
    2. if (initializing.compareAndSet(false, true)) {
    3. try {
    4. if (this.retryTemplate != null) {
    5. this.retryTemplate.execute((c) -> {
    6. this.initialize();
    7. return null;
    8. });
    9. } else {
    10. this.initialize();
    11. }
    12. } finally {
    13. initializing.compareAndSet(true, false);
    14. }
    15. }
    16. });

    7、查看initialize方法

    1. public void initialize() {
    2. if (this.applicationContext == null) {
    3. this.logger.debug("no ApplicationContext has been set, cannot auto-declare Exchanges, Queues, and Bindings");
    4. } else {
    5. this.logger.debug("Initializing declarations");
    6. Collection contextExchanges = new LinkedList(this.applicationContext.getBeansOfType(Exchange.class).values());
    7. Collection contextQueues = new LinkedList(this.applicationContext.getBeansOfType(Queue.class).values());
    8. Collection contextBindings = new LinkedList(this.applicationContext.getBeansOfType(Binding.class).values());
    9. Collection customizers = this.applicationContext.getBeansOfType(DeclarableCustomizer.class).values();
    10. this.processDeclarables(contextExchanges, contextQueues, contextBindings);
    11. Collection exchanges = this.filterDeclarables(contextExchanges, customizers);
    12. Collection queues = this.filterDeclarables(contextQueues, customizers);
    13. Collection bindings = this.filterDeclarables(contextBindings, customizers);
    14. Iterator var8 = exchanges.iterator();
    15. while(true) {
    16. Exchange exchange;
    17. do {
    18. if (!var8.hasNext()) {
    19. var8 = queues.iterator();
    20. while(true) {
    21. Queue queue;
    22. do {
    23. if (!var8.hasNext()) {
    24. if (exchanges.size() == 0 && queues.size() == 0 && bindings.size() == 0 && this.manualDeclarables.size() == 0) {
    25. this.logger.debug("Nothing to declare");
    26. return;
    27. }
    28. this.rabbitTemplate.execute((channel) -> {
    29. this.declareExchanges(channel, (Exchange[])exchanges.toArray(new Exchange[exchanges.size()]));
    30. this.declareQueues(channel, (Queue[])queues.toArray(new Queue[queues.size()]));
    31. this.declareBindings(channel, (Binding[])bindings.toArray(new Binding[bindings.size()]));
    32. return null;
    33. });
    34. if (this.manualDeclarables.size() > 0) {
    35. synchronized(this.manualDeclarables) {
    36. this.logger.debug("Redeclaring manually declared Declarables");
    37. Iterator var14 = this.manualDeclarables.values().iterator();
    38. while(var14.hasNext()) {
    39. Declarable dec = (Declarable)var14.next();
    40. if (dec instanceof Queue) {
    41. this.declareQueue((Queue)dec);
    42. } else if (dec instanceof Exchange) {
    43. this.declareExchange((Exchange)dec);
    44. } else {
    45. this.declareBinding((Binding)dec);
    46. }
    47. }
    48. }
    49. }
    50. this.logger.debug("Declarations finished");
    51. return;
    52. }
    53. queue = (Queue)var8.next();
    54. } while(queue.isDurable() && !queue.isAutoDelete() && !queue.isExclusive());
    55. if (this.logger.isInfoEnabled()) {
    56. this.logger.info("Auto-declaring a non-durable, auto-delete, or exclusive Queue (" + queue.getName() + ") durable:" + queue.isDurable() + ", auto-delete:" + queue.isAutoDelete() + ", exclusive:" + queue.isExclusive() + ". It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.");
    57. }
    58. }
    59. }
    60. exchange = (Exchange)var8.next();
    61. } while(exchange.isDurable() && !exchange.isAutoDelete());
    62. if (this.logger.isInfoEnabled()) {
    63. this.logger.info("Auto-declaring a non-durable or auto-delete Exchange (" + exchange.getName() + ") durable:" + exchange.isDurable() + ", auto-delete:" + exchange.isAutoDelete() + ". It will be deleted by the broker if it shuts down, and can be redeclared by closing and reopening the connection.");
    64. }
    65. }
    66. }
    67. }

    8、可以看到,底层还是RabbitAdmin 在执行声明交换机,队列,绑定关系

    1. this.rabbitTemplate.execute((channel) -> {
    2. this.declareExchanges(channel, (Exchange[])exchanges.toArray(new Exchange[exchanges.size()]));
    3. this.declareQueues(channel, (Queue[])queues.toArray(new Queue[queues.size()]));
    4. this.declareBindings(channel, (Binding[])bindings.toArray(new Binding[bindings.size()]));
    5. return null;
    6. });

    9、进入declareExchanges 方法

    1. private void declareExchanges(Channel channel, Exchange... exchanges) throws IOException {
    2. Exchange[] var3 = exchanges;
    3. int var4 = exchanges.length;
    4. for(int var5 = 0; var5 < var4; ++var5) {
    5. Exchange exchange = var3[var5];
    6. if (this.logger.isDebugEnabled()) {
    7. this.logger.debug("declaring Exchange '" + exchange.getName() + "'");
    8. }
    9. if (!this.isDeclaringDefaultExchange(exchange)) {
    10. try {
    11. if (exchange.isDelayed()) {
    12. Map arguments = exchange.getArguments();
    13. HashMap arguments;
    14. if (arguments == null) {
    15. arguments = new HashMap();
    16. } else {
    17. arguments = new HashMap(arguments);
    18. }
    19. arguments.put("x-delayed-type", exchange.getType());
    20. channel.exchangeDeclare(exchange.getName(), "x-delayed-message", exchange.isDurable(), exchange.isAutoDelete(), exchange.isInternal(), arguments);
    21. } else {
    22. channel.exchangeDeclare(exchange.getName(), exchange.getType(), exchange.isDurable(), exchange.isAutoDelete(), exchange.isInternal(), exchange.getArguments());
    23. }
    24. } catch (IOException var8) {
    25. this.logOrRethrowDeclarationException(exchange, "exchange", var8);
    26. }
    27. }
    28. }
    29. }

    10、可以看到最底层调用了channel接口的实现

    channel.exchangeDeclare(exchange.getName(), "x-delayed-message", exchange.isDurable(), exchange.isAutoDelete(), exchange.isInternal(), arguments);

    11、可以进入Channel接口查看:

    1. DeclareOk exchangeDeclare(String var1, String var2) throws IOException;
    2. DeclareOk exchangeDeclare(String var1, BuiltinExchangeType var2) throws IOException;
    3. DeclareOk exchangeDeclare(String var1, String var2, boolean var3) throws IOException;
    4. DeclareOk exchangeDeclare(String var1, BuiltinExchangeType var2, boolean var3) throws IOException;
    5. DeclareOk exchangeDeclare(String var1, String var2, boolean var3, boolean var4, Map var5) throws IOException;
    6. DeclareOk exchangeDeclare(String var1, BuiltinExchangeType var2, boolean var3, boolean var4, Map var5) throws IOException;
    7. DeclareOk exchangeDeclare(String var1, String var2, boolean var3, boolean var4, boolean var5, Map var6) throws IOException;
    8. DeclareOk exchangeDeclare(String var1, BuiltinExchangeType var2, boolean var3, boolean var4, boolean var5, Map var6) throws IOException;

  • 相关阅读:
    javaweb基于vue影院在线售票系统的设计与实现
    .locked勒索病毒的最新威胁:如何恢复您的数据?
    三目运算符以及debugger的应用
    「PAT乙级真题解析」Basic Level 1103 (问题分析+完整步骤+伪代码描述+提交通过代码)
    【渝偲】DSPE-PEG-Galactose;DSPE-PEG-半乳糖;磷脂聚乙二醇半乳糖 长循环脂质体
    【iOS开发】iOS App的加固保护原理:使用ipaguard混淆加固
    Debian直接安装mysql8
    AI算力反碎片化:世界上最快的统一矩阵乘法
    探索增强型灰狼优化算法
    pytorch实现图像的腐蚀和膨胀
  • 原文地址:https://blog.csdn.net/liulala16/article/details/127119742