• RocketMq源码分析(三)--Broker启动流程


    一、Broker启动

      Broker启动的main函数位于org.apache.rocketmq.broker.BrokerStartup类,执行代码如下

        public static void main(String[] args) {
            start(createBrokerController(args));
        }
    
        public static BrokerController start(BrokerController controller) {
            try {
    
                controller.start();
    
                String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
                    + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
    
                if (null != controller.getBrokerConfig().getNamesrvAddr()) {
                    tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();
                }
    
                log.info(tip);
                System.out.printf("%s%n", tip);
                return controller;
            } catch (Throwable e) {
                e.printStackTrace();
                System.exit(-1);
            }
    
            return null;
        }
    

      Broker启动可分为三步:

    • 解析配置文件,并将配置初始化到BrokerConfig、NettyServerConfig、NettyClientConfig、MessageStoreConfig实例
    • 根据以上config实例,创建BrokerController,然后进行初始化、注册钩子函数
    • 启动NamesrvController

    二、创建BrokerController实例

    1、读取配置

      与nameSrv读取配置方式相似,启动命令-c参数可指定Broker配置文件的位置,将指定位置的配置文件中以key-value键值对解析成java.util.Properties对象,然后分别初始化到brokerConfig 、nettyServerConfig 、nettyClientConfig 、messageStoreConfig 配置对象中

       public static BrokerController createBrokerController(String[] args) {
       	// ........ 
    		final BrokerConfig brokerConfig = new BrokerConfig();
    		final NettyServerConfig nettyServerConfig = new NettyServerConfig();
    		final NettyClientConfig nettyClientConfig = new NettyClientConfig();
    		//设置是否开启SSL协议
    		nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
    		    String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
    		//netty服务器默认监听端口
    		nettyServerConfig.setListenPort(10911);
    		final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
    		
    		if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
    		    //如果broker是SLAVE角色,降低访问消息在内存中比率10%,默认40
    		    int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
    		    messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
    		}
    		
    		if (commandLine.hasOption('c')) {
    		    //读取 -c 命令参数设置的config文件,初始化到对应的配置对象中
    		    String file = commandLine.getOptionValue('c');
    		    if (file != null) {
    		        configFile = file;
    		        InputStream in = new BufferedInputStream(new FileInputStream(file));
    		        properties = new Properties();
    		        properties.load(in);
    		
    		        properties2SystemEnv(properties);
    		        MixAll.properties2Object(properties, brokerConfig);
    		        MixAll.properties2Object(properties, nettyServerConfig);
    		        MixAll.properties2Object(properties, nettyClientConfig);
    		        MixAll.properties2Object(properties, messageStoreConfig);
    		
    		        BrokerPathConfigHelper.setBrokerConfigPath(file);
    		        in.close();
    		    }
    		}
    		//把启动命令解析出来的参数放到brokerConfig
    		MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
    	// ...........
    	}
    

    2、创建BrokerController

      在brokerConfig 、nettyServerConfig 、nettyClientConfig 、messageStoreConfig完成初始化后,根据这几个创建BrokerController实例,然后进行初始化

       public static BrokerController createBrokerController(String[] args) {
       	// ........ 
    		final BrokerController controller = new BrokerController(
    		     brokerConfig,
    		     nettyServerConfig,
    		     nettyClientConfig,
    		     messageStoreConfig);
    		 // remember all configs to prevent discard
    		 controller.getConfiguration().registerConfig(properties);
    		 //进行初始化brokerController
    		 boolean initResult = controller.initialize();
    		 if (!initResult) {
    		     controller.shutdown();
    		     System.exit(-3);
    		 }
       	// ........ 
       	}
    

    1)BrokerController初始化

      BrokerController初始化方法中,主要完成了几个事情:

    • 加载topic、消费偏移量、消息订阅组、消息过滤
    • 加载消息存储
    • 创建并注册发送消息、查询消息、客户端心跳等功能对应使用的线程池
    • 创建各种定时任务
    • 初始化事务、ACL、rpcHooks
     public boolean initialize() throws CloneNotSupportedException {
            boolean result = this.topicConfigManager.load();//加载topic配置。没有配置时。尝试从.bak文件中提取
    
            result = result && this.consumerOffsetManager.load();//加载消息消费的偏移量
            result = result && this.subscriptionGroupManager.load();//订阅组配置
            result = result && this.consumerFilterManager.load();//消息过滤配置
    
            if (result) {
                try {
                    //初始化消息存储对象
                    this.messageStore =
                        new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
                            this.brokerConfig);
                    if (messageStoreConfig.isEnableDLegerCommitLog()) {
                        //如果支持Dleger,添加一个主从切换的handler
                        DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
                        ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
                    }
                    this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
                    //load plugin
                    MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
                    this.messageStore = MessageStoreFactory.build(context, this.messageStore);
                    this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
                } catch (IOException e) {
                    result = false;
                    log.error("Failed to initialize", e);
                }
            }
    
            result = result && this.messageStore.load();
    
            if (result) {
                this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
                NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
                fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
                this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
                
                // ...省略各个线程池对象创建代码
    
                this.registerProcessor();
    
                final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();
                final long period = 1000 * 60 * 60 * 24;
                //每天记录一次broker状态
                this.scheduledExecutorService.scheduleAtFixedRate(() -> {
                    try {
                        BrokerController.this.getBrokerStats().record();
                    } catch (Throwable e) {
                        log.error("schedule record error.", e);
                    }
                }, initialDelay, period, TimeUnit.MILLISECONDS);
                //每5s持久化消费者消费偏移量
                this.scheduledExecutorService.scheduleAtFixedRate(() -> {
                    try {
                        BrokerController.this.consumerOffsetManager.persist();
                    } catch (Throwable e) {
                        log.error("schedule persist consumerOffset error.", e);
                    }
                }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
                //每10s持久化消费过滤配置
                this.scheduledExecutorService.scheduleAtFixedRate(() -> {
                    try {
                        BrokerController.this.consumerFilterManager.persist();
                    } catch (Throwable e) {
                        log.error("schedule persist consumer filter error.", e);
                    }
                }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
                //每3分钟检查一次消费者消费速度,停止消息堆积的订阅组进行消费
                this.scheduledExecutorService.scheduleAtFixedRate(() -> {
                    try {
                        BrokerController.this.protectBroker();
                    } catch (Throwable e) {
                        log.error("protectBroker error.", e);
                    }
                }, 3, 3, TimeUnit.MINUTES);
    
                this.scheduledExecutorService.scheduleAtFixedRate(() -> {
                    try {
                        BrokerController.this.printWaterMark();
                    } catch (Throwable e) {
                        log.error("printWaterMark error.", e);
                    }
                }, 10, 1, TimeUnit.SECONDS);
    
                this.scheduledExecutorService.scheduleAtFixedRate(() -> {
                    try {
                        log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
                    } catch (Throwable e) {
                        log.error("schedule dispatchBehindBytes error.", e);
                    }
                }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
    
                if (this.brokerConfig.getNamesrvAddr() != null) {
                    //首次启动时,初始化nameSrv信息,而后每2分钟更新一次
                    this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
                    log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
                    this.scheduledExecutorService.scheduleAtFixedRate(() -> {
                        try {
                            BrokerController.this.brokerOuterAPI.updateNameServerAddressList(BrokerController.this.brokerConfig.getNamesrvAddr());
                        } catch (Throwable e) {
                            log.error("ScheduledTask updateNameServerAddr exception", e);
                        }
                    }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
                } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
                    this.scheduledExecutorService.scheduleAtFixedRate(() -> {
                        try {
                            BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
                        } catch (Throwable e) {
                            log.error("ScheduledTask fetchNameServerAddr exception", e);
                        }
                    }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
                }
    
                if (!messageStoreConfig.isEnableDLegerCommitLog()) {
                    if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
                        if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
                            this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
                            this.updateMasterHAServerAddrPeriodically = false;
                        } else {
                            this.updateMasterHAServerAddrPeriodically = true;
                        }
                    } else {
                        this.scheduledExecutorService.scheduleAtFixedRate(() -> {
                            try {
                                BrokerController.this.printMasterAndSlaveDiff();
                            } catch (Throwable e) {
                                log.error("schedule printMasterAndSlaveDiff error.", e);
                            }
                        }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
                    }
                }
    
                // ...省略tlsMode设置代码
                
                //初始化事务
                initialTransaction();
                initialAcl();
                initialRpcHooks();
            }
            return result;
        }
    

    3、钩子函数

      结束Broker启动时开启的线程池、线程任务

       public static BrokerController createBrokerController(String[] args) {
       	// ........ 
    		Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
    			  private volatile boolean hasShutdown = false;
    			  private AtomicInteger shutdownTimes = new AtomicInteger(0);
    			
    			  @Override
    			  public void run() {
    			      synchronized (this) {
    			          log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
    			          if (!this.hasShutdown) {
    			              this.hasShutdown = true;
    			              long beginTime = System.currentTimeMillis();
    			              controller.shutdown();
    			              long consumingTimeTotal = System.currentTimeMillis() - beginTime;
    			              log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
    			          }
    			      }
    			  }
    		}, "ShutdownHook"));
       	// ........ 
       	}
    

    4、启动BrokerController

      启动Broker时,除了启动自身消息存储、远程通信等线程池/线程任务,还有重要的一步,那就是向nameSrv注册自己的信息。核心代码如下:

    public void start() throws Exception {
         //...省略各种线程池、线程任务的启动代码
    
           if (!messageStoreConfig.isEnableDLegerCommitLog()) {
               startProcessorByHa(messageStoreConfig.getBrokerRole());
               handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
               this.registerBrokerAll(true, false, true);
           }
    	   //默认每30s向nameSrv同步自己的信息
           this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
               @Override
               public void run() {
                   try {
                   	   //向nameSrv注册自己的信息
                       BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                   } catch (Throwable e) {
                       log.error("registerBrokerAll Exception", e);
                   }
               }
           }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
    
           if (this.brokerStatsManager != null) {
               this.brokerStatsManager.start();
           }
    
           if (this.brokerFastFailure != null) {
               this.brokerFastFailure.start();
           }
    
    
        }
    

      至此,Broker整一个启动过程完成

  • 相关阅读:
    Java EE-使用Servlet搭建一个简单的前后端交互程序
    C++算法 —— 贪心(1)介绍
    前端工作总结215-混入思路
    根据表名动态获取数据
    A股风格因子看板 (2023.10 第11期)
    设计模式之访问者模式
    【乐吾乐3D可视化组态编辑器】灯光
    梯度消失/梯度爆炸
    搜索技术——模拟退火算法
    千兆以太网硬件设计及链路层 MAC 协议格式
  • 原文地址:https://blog.csdn.net/u012786993/article/details/126822410