一:解决办法 1、添加一个监听器
@RabbitListener(queues = "你的队列名")
public void handle(Message message){
}
2、或者加入如下bean创建一个connection
@Bean
ApplicationRunner runner(ConnectionFactory cf) {
return args -> cf.createConnection().close();
}
二原理: 因为在rabbitMQ采用的是懒加载, connection 启动时不会创建,下面第6步的 ConnectionListener 为空,因此需要创建一个连接,通过 ConnectionFactory接口 调用createConnection()方法创建一个连接注册到容器中
- public interface ConnectionFactory {
- Connection createConnection() throws AmqpException;
- public interface ConnectionListener {
- void onCreate(Connection var1);
-
- default void onClose(Connection connection) {
- }
-
- default void onShutDown(ShutdownSignalException signal) {
- }
-
- default void onFailed(Exception exception) {
- }
- }
下面为springboot为自动创建rabbit的交换机,队列,绑定关系的原理
1、springBoot自动帮助我们创建RabbitMQ的Queue和Exchange是通过 RabbitAutoConfiguration类
- @AutoConfiguration
- @ConditionalOnClass({ RabbitTemplate.class, Channel.class })
- @EnableConfigurationProperties(RabbitProperties.class)
- @Import({ RabbitAnnotationDrivenConfiguration.class, RabbitStreamConfiguration.class })
- public class RabbitAutoConfiguration
2、该类中注入了AmqpAdmin,可以看到是通过RabbitAdmin方法创建的连接
- @Bean
- @ConditionalOnSingleCandidate(ConnectionFactory.class)
- @ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true)
- @ConditionalOnMissingBean
- public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
- return new RabbitAdmin(connectionFactory);
- }
3、查看 RabbitAdmin类,可以看到实现了InitializingBean接口,这个接口是用于实例化bean
-
- @ManagedResource(
- description = "Admin Tasks"
- )
- public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, ApplicationEventPublisherAware, BeanNameAware, InitializingBean
4、查看InitializingBean接口,只有afterPropertiesSet一个接口
- package org.springframework.beans.factory;
-
- public interface InitializingBean {
- void afterPropertiesSet() throws Exception;
- }
5、查看RabbitAdmin中是如何实现这个接口
- public void afterPropertiesSet() {
- synchronized(this.lifecycleMonitor) {
- if (!this.running && this.autoStartup) {
- if (this.retryTemplate == null && !this.retryDisabled) {
- this.retryTemplate = new RetryTemplate();
- this.retryTemplate.setRetryPolicy(new SimpleRetryPolicy(5));
- ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
- backOffPolicy.setInitialInterval(1000L);
- backOffPolicy.setMultiplier(2.0D);
- backOffPolicy.setMaxInterval(5000L);
- this.retryTemplate.setBackOffPolicy(backOffPolicy);
- }
-
- if (this.connectionFactory instanceof CachingConnectionFactory && ((CachingConnectionFactory)this.connectionFactory).getCacheMode() == CacheMode.CONNECTION) {
- this.logger.warn("RabbitAdmin auto declaration is not supported with CacheMode.CONNECTION");
- } else {
- AtomicBoolean initializing = new AtomicBoolean(false);
- this.connectionFactory.addConnectionListener((connection) -> {
- if (initializing.compareAndSet(false, true)) {
- try {
- if (this.retryTemplate != null) {
- this.retryTemplate.execute((c) -> {
- this.initialize();
- return null;
- });
- } else {
- this.initialize();
- }
- } finally {
- initializing.compareAndSet(true, false);
- }
-
- }
- });
- this.running = true;
- }
- }
- }
- }
6、可以看到连接工厂添加了一个连接监听器,在里面调用了initalize方法
- this.connectionFactory.addConnectionListener((connection) -> {
- if (initializing.compareAndSet(false, true)) {
- try {
- if (this.retryTemplate != null) {
- this.retryTemplate.execute((c) -> {
- this.initialize();
- return null;
- });
- } else {
- this.initialize();
- }
- } finally {
- initializing.compareAndSet(true, false);
- }
-
- }
- });
7、查看initialize方法
- public void initialize() {
- if (this.applicationContext == null) {
- this.logger.debug("no ApplicationContext has been set, cannot auto-declare Exchanges, Queues, and Bindings");
- } else {
- this.logger.debug("Initializing declarations");
- Collection
contextExchanges = new LinkedList(this.applicationContext.getBeansOfType(Exchange.class).values()); - Collection
contextQueues = new LinkedList(this.applicationContext.getBeansOfType(Queue.class).values()); - Collection
contextBindings = new LinkedList(this.applicationContext.getBeansOfType(Binding.class).values()); - Collection
customizers = this.applicationContext.getBeansOfType(DeclarableCustomizer.class).values(); - this.processDeclarables(contextExchanges, contextQueues, contextBindings);
- Collection
exchanges = this.filterDeclarables(contextExchanges, customizers); - Collection
queues = this.filterDeclarables(contextQueues, customizers); - Collection
bindings = this.filterDeclarables(contextBindings, customizers); - Iterator var8 = exchanges.iterator();
-
- while(true) {
- Exchange exchange;
- do {
- if (!var8.hasNext()) {
- var8 = queues.iterator();
-
- while(true) {
- Queue queue;
- do {
- if (!var8.hasNext()) {
- if (exchanges.size() == 0 && queues.size() == 0 && bindings.size() == 0 && this.manualDeclarables.size() == 0) {
- this.logger.debug("Nothing to declare");
- return;
- }
-
- this.rabbitTemplate.execute((channel) -> {
- this.declareExchanges(channel, (Exchange[])exchanges.toArray(new Exchange[exchanges.size()]));
- this.declareQueues(channel, (Queue[])queues.toArray(new Queue[queues.size()]));
- this.declareBindings(channel, (Binding[])bindings.toArray(new Binding[bindings.size()]));
- return null;
- });
- if (this.manualDeclarables.size() > 0) {
- synchronized(this.manualDeclarables) {
- this.logger.debug("Redeclaring manually declared Declarables");
- Iterator var14 = this.manualDeclarables.values().iterator();
-
- while(var14.hasNext()) {
- Declarable dec = (Declarable)var14.next();
- if (dec instanceof Queue) {
- this.declareQueue((Queue)dec);
- } else if (dec instanceof Exchange) {
- this.declareExchange((Exchange)dec);
- } else {
- this.declareBinding((Binding)dec);
- }
- }
- }
- }
-
- this.logger.debug("Declarations finished");
- return;
- }
-
- queue = (Queue)var8.next();
- } while(queue.isDurable() && !queue.isAutoDelete() && !queue.isExclusive());
-
- if (this.logger.isInfoEnabled()) {
- this.logger.info("Auto-declaring a non-durable, auto-delete, or exclusive Queue (" + queue.getName() + ") durable:" + queue.isDurable() + ", auto-delete:" + queue.isAutoDelete() + ", exclusive:" + queue.isExclusive() + ". It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.");
- }
- }
- }
-
- exchange = (Exchange)var8.next();
- } while(exchange.isDurable() && !exchange.isAutoDelete());
-
- if (this.logger.isInfoEnabled()) {
- this.logger.info("Auto-declaring a non-durable or auto-delete Exchange (" + exchange.getName() + ") durable:" + exchange.isDurable() + ", auto-delete:" + exchange.isAutoDelete() + ". It will be deleted by the broker if it shuts down, and can be redeclared by closing and reopening the connection.");
- }
- }
- }
- }
8、可以看到,底层还是RabbitAdmin 在执行声明交换机,队列,绑定关系
- this.rabbitTemplate.execute((channel) -> {
- this.declareExchanges(channel, (Exchange[])exchanges.toArray(new Exchange[exchanges.size()]));
- this.declareQueues(channel, (Queue[])queues.toArray(new Queue[queues.size()]));
- this.declareBindings(channel, (Binding[])bindings.toArray(new Binding[bindings.size()]));
- return null;
- });
9、进入declareExchanges 方法
- private void declareExchanges(Channel channel, Exchange... exchanges) throws IOException {
- Exchange[] var3 = exchanges;
- int var4 = exchanges.length;
-
- for(int var5 = 0; var5 < var4; ++var5) {
- Exchange exchange = var3[var5];
- if (this.logger.isDebugEnabled()) {
- this.logger.debug("declaring Exchange '" + exchange.getName() + "'");
- }
-
- if (!this.isDeclaringDefaultExchange(exchange)) {
- try {
- if (exchange.isDelayed()) {
- Map
arguments = exchange.getArguments(); - HashMap arguments;
- if (arguments == null) {
- arguments = new HashMap();
- } else {
- arguments = new HashMap(arguments);
- }
-
- arguments.put("x-delayed-type", exchange.getType());
- channel.exchangeDeclare(exchange.getName(), "x-delayed-message", exchange.isDurable(), exchange.isAutoDelete(), exchange.isInternal(), arguments);
- } else {
- channel.exchangeDeclare(exchange.getName(), exchange.getType(), exchange.isDurable(), exchange.isAutoDelete(), exchange.isInternal(), exchange.getArguments());
- }
- } catch (IOException var8) {
- this.logOrRethrowDeclarationException(exchange, "exchange", var8);
- }
- }
- }
-
- }
10、可以看到最底层调用了channel接口的实现
channel.exchangeDeclare(exchange.getName(), "x-delayed-message", exchange.isDurable(), exchange.isAutoDelete(), exchange.isInternal(), arguments);
11、可以进入Channel接口查看:
- DeclareOk exchangeDeclare(String var1, String var2) throws IOException;
-
- DeclareOk exchangeDeclare(String var1, BuiltinExchangeType var2) throws IOException;
-
- DeclareOk exchangeDeclare(String var1, String var2, boolean var3) throws IOException;
-
- DeclareOk exchangeDeclare(String var1, BuiltinExchangeType var2, boolean var3) throws IOException;
-
- DeclareOk exchangeDeclare(String var1, String var2, boolean var3, boolean var4, Map
var5) throws IOException; -
- DeclareOk exchangeDeclare(String var1, BuiltinExchangeType var2, boolean var3, boolean var4, Map
var5) throws IOException; -
- DeclareOk exchangeDeclare(String var1, String var2, boolean var3, boolean var4, boolean var5, Map
var6) throws IOException; -
- DeclareOk exchangeDeclare(String var1, BuiltinExchangeType var2, boolean var3, boolean var4, boolean var5, Map
var6) throws IOException;