• Flink1.15源码解析--启动JobManager----Dispatcher启动



    一、前言
    从上文 Flink1.15源码解析---- DispatcherResourceManagerComponent 我们知道ResourceManagerServiceImpl 的创建及启动大概流程

    接下来我们详细的梳理一下

    一、DispatcherRunner 创建

    // org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory#create
                final HistoryServerArchivist historyServerArchivist =
                        HistoryServerArchivist.createHistoryServerArchivist(
                                configuration, webMonitorEndpoint, ioExecutor);
    
                final DispatcherOperationCaches dispatcherOperationCaches =
                        new DispatcherOperationCaches(
                                configuration.get(RestOptions.ASYNC_OPERATION_STORE_DURATION));
    
                final PartialDispatcherServices partialDispatcherServices =
                        new PartialDispatcherServices(
                                configuration,
                                highAvailabilityServices,
                                resourceManagerGatewayRetriever,
                                blobServer,
                                heartbeatServices,
                                () ->
                                        JobManagerMetricGroup.createJobManagerMetricGroup(
                                                metricRegistry, hostname),
                                executionGraphInfoStore,
                                fatalErrorHandler,
                                historyServerArchivist,
                                metricRegistry.getMetricQueryServiceGatewayRpcAddress(),
                                ioExecutor,
                                dispatcherOperationCaches);
    
                log.debug("Starting Dispatcher.");
                dispatcherRunner =
                        dispatcherRunnerFactory.createDispatcherRunner(
                                highAvailabilityServices.getDispatcherLeaderElectionService(),
                                fatalErrorHandler,
                                new HaServicesJobPersistenceComponentFactory(highAvailabilityServices),
                                ioExecutor,
                                rpcService,
                                partialDispatcherServices);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    通过工厂 DefaultDispatcherRunnerFactory 创建 DispatcherRunner

    // org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory#createDispatcherRunner
        @Override
        public DispatcherRunner createDispatcherRunner(
                LeaderElectionService leaderElectionService,
                FatalErrorHandler fatalErrorHandler,
                JobPersistenceComponentFactory jobPersistenceComponentFactory,
                Executor ioExecutor,
                RpcService rpcService,
                PartialDispatcherServices partialDispatcherServices)
                throws Exception {
    
            final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory =
                    dispatcherLeaderProcessFactoryFactory.createFactory(
                            jobPersistenceComponentFactory,
                            ioExecutor,
                            rpcService,
                            partialDispatcherServices,
                            fatalErrorHandler);
    
            return DefaultDispatcherRunner.create(
                    leaderElectionService, fatalErrorHandler, dispatcherLeaderProcessFactory);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    1.1、LeaderElectionService 初始化

     highAvailabilityServices.getDispatcherLeaderElectionService()
    
    // org.apache.flink.runtime.highavailability.AbstractHaServices#getDispatcherLeaderElectionService
        @Override
        public LeaderElectionService getDispatcherLeaderElectionService() {
            return createLeaderElectionService(getLeaderPathForDispatcher());
        }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    highAvailabilityServices 就是 ClusterEntrypoint 初始化的HA服务, 以 ZooKeeperHaServices 为例 DefaultLeaderElectionService

        @Override
        protected LeaderElectionService createLeaderElectionService(String leaderPath) {
            return ZooKeeperUtils.createLeaderElectionService(getCuratorFramework(), leaderPath);
        }
        public static DefaultLeaderElectionService createLeaderElectionService(
                final CuratorFramework client, final String path) {
            return new DefaultLeaderElectionService(createLeaderElectionDriverFactory(client, path));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
  • 相关阅读:
    高斯推断推导
    游戏盾能防住几T的攻击吗
    利用Pinpoint搭建全链路监控系统
    SSO 基于token vue + element ui spring boot前端分离
    Spring Cloud alibaba 集成 nacos 以及整合 Ribbon 与 Feign 实现负载调用(3)
    【Git】Git cherry-pick
    LeetCode·每日一题·779.第K个语法符合·递归
    透过现象看本质,HuntingNFT缘何具备成为爆款链游的潜力?
    Android SdkManager简介
    索引和切片--numpy
  • 原文地址:https://blog.csdn.net/wuxintdrh/article/details/127858011