• Flink1.15源码解析--启动JobManager----WebMonitorEndpoint启动


    一、前言

    从上文 Flink1.15源码解析---- DispatcherResourceManagerComponent 我们知道WebMonitorEndpoint的创建及启动

    org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory#create

                // 构建了一个线程池用于执行 WebMonitorEndpointEndpoint 所接收到的client发送过来的请求
                final ScheduledExecutorService executor =
                        WebMonitorEndpoint.createExecutorService(
                                configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
                                configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
                                "DispatcherRestEndpoint");
    
                // 初始化 MetricFetcher, 默认刷新间隔是10s
                final long updateInterval =
                        configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
                final MetricFetcher metricFetcher =
                        updateInterval == 0
                                ? VoidMetricFetcher.INSTANCE
                                : MetricFetcherImpl.fromConfiguration(
                                        configuration,
                                        metricQueryServiceRetriever,
                                        dispatcherGatewayRetriever,
                                        executor);
    
                // 创建 三大组件之 WebMonitorEndpoint
                webMonitorEndpoint =
                        restEndpointFactory.createRestEndpoint(
                                configuration,
                                dispatcherGatewayRetriever,
                                resourceManagerGatewayRetriever,
                                blobServer,
                                executor,
                                metricFetcher,
                                highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
                                fatalErrorHandler);
                // 启动 三大组件之 WebMonitorEndpoint
                log.debug("Starting Dispatcher REST endpoint.");
                webMonitorEndpoint.start();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    本文我们将详细的梳理 WebMonitorEndpoint 的构建与启动

    二、WebMonitorEndpoint 构建

    WebMonitorEndpoint 由 restEndpointFactory 构建, restEndpointFactory 的初始化 由 DispatcherResourceManagerComponentFactory 根据启动方式不同
    在这里插入图片描述

    接下来我们以 StandaloneSessionClusterEntrypoint 为例 看 restEndpointFactory 的初始化

    2.1、restEndpointFactory 的初始化

    1、StandaloneSessionClusterEntrypoint 创建 DefaultDispatcherResourceManagerComponentFactory
    org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint#createDispatcherResourceManagerComponentFactory

        @Override
        protected DefaultDispatcherResourceManagerComponentFactory
                createDispatcherResourceManagerComponentFactory(Configuration configuration) {
            return DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory(
                    StandaloneResourceManagerFactory.getInstance());
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    2、createSessionComponentFactory 包含三大组件工厂的创建
    org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory#createSessionComponentFactory

        public static DefaultDispatcherResourceManagerComponentFactory createSessionComponentFactory(
                ResourceManagerFactory<?> resourceManagerFactory) {
            return new DefaultDispatcherResourceManagerComponentFactory(
                    DefaultDispatcherRunnerFactory.createSessionRunner(
                            SessionDispatcherFactory.INSTANCE),
                    resourceManagerFactory,
                    SessionRestEndpointFactory.INSTANCE);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    restEndpointFactory 是 SessionRestEndpointFactory.INSTANCE

    2.2、createRestEndpoint 创建 WebMonitorEndpoint

    RestEndpointFactory 创建 DispatcherRestEndpoint

    /** {@link RestEndpointFactory} which creates a {@link DispatcherRestEndpoint}. */
    public enum SessionRestEndpointFactory implements RestEndpointFactory<DispatcherGateway> {
        INSTANCE;
    
        @Override
        public WebMonitorEndpoint<DispatcherGateway> createRestEndpoint(
                Configuration configuration,
                LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
                LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
                TransientBlobService transientBlobService,
                ScheduledExecutorService executor,
                MetricFetcher metricFetcher,
                LeaderElectionService leaderElectionService,
                FatalErrorHandler fatalErrorHandler)
                throws Exception {
            final RestHandlerConfiguration restHandlerConfiguration =
                    RestHandlerConfiguration.fromConfiguration(configuration);
    
    		// 创建 DispatcherRestEndpoint
            return new DispatcherRestEndpoint(
                    dispatcherGatewayRetriever,
                    configuration,
                    restHandlerConfiguration,
                    resourceManagerGatewayRetriever,
                    transientBlobService,
                    executor,
                    metricFetcher,
                    leaderElectionService,
                    RestEndpointFactory.createExecutionGraphCache(restHandlerConfiguration),
                    fatalErrorHandler);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    创建的 DispatcherRestEndpoint 是 Dispatcher 的 REST endpoint

    /** REST endpoint for the {@link Dispatcher} component. */
    public class DispatcherRestEndpoint extends WebMonitorEndpoint<DispatcherGateway> {
    //......
    }
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

    三、WebMonitorEndpoint 启动

    实际调用 org.apache.flink.runtime.rest.RestServerEndpoint#start

    3.1、Router

                // 1、首先创建Router,来解析Client的请求并寻找对应的Handler
                final Router router = new Router();
    
    • 1
    • 2

    3.2、注册了一堆Handler

                // 2、 注册了一堆Handler
                // 2.1、初始化 handlers
                final CompletableFuture<String> restAddressFuture = new CompletableFuture<>();
                handlers = initializeHandlers(restAddressFuture);
    
                // 2.2、将这些Handler进行排序,这里的排序是为了确认URL和Handler一对一的关系
                /* sort the handlers such that they are ordered the following:
                 * /jobs
                 * /jobs/overview
                 * /jobs/:jobid
                 * /jobs/:jobid/config
                 * /:*
                 */
                Collections.sort(handlers, RestHandlerUrlComparator.INSTANCE);
                // 2.3、 排序好后通过checkAllEndpointsAndHandlersAreUnique方法来确认唯一性
                checkAllEndpointsAndHandlersAreUnique(handlers);
                // 2.4、 注册 handlers
                handlers.forEach(handler -> registerHandler(router, handler, log));
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    3.3、Netty启动的相关操作

    3.3.1、 ChannelInitializer 初始化

    // 3.1、 ChannelInitializer 初始化
                ChannelInitializer<SocketChannel> initializer =
                        new ChannelInitializer<SocketChannel>() {
    
                            @Override
                            protected void initChannel(SocketChannel ch) throws ConfigurationException {
                                RouterHandler handler = new RouterHandler(router, responseHeaders);
    
                                // SSL should be the first handler in the pipeline
                                if (isHttpsEnabled()) {
                                    ch.pipeline()
                                            .addLast(
                                                    "ssl",
                                                    new RedirectingSslHandler(
                                                            restAddress,
                                                            restAddressFuture,
                                                            sslHandlerFactory));
                                }
    
                                ch.pipeline()
                                        .addLast(new HttpServerCodec())
                                        .addLast(new FileUploadHandler(uploadDir))
                                        .addLast(
                                                new FlinkHttpObjectAggregator(
                                                        maxContentLength, responseHeaders));
    
                                for (InboundChannelHandlerFactory factory :
                                        inboundChannelHandlerFactories) {
                                    Optional<ChannelHandler> channelHandler =
                                            factory.createHandler(configuration, responseHeaders);
                                    if (channelHandler.isPresent()) {
                                        ch.pipeline().addLast(channelHandler.get());
                                    }
                                }
    
                                ch.pipeline()
                                        .addLast(new ChunkedWriteHandler())
                                        .addLast(handler.getName(), handler)
                                        .addLast(new PipelineErrorHandler(log, responseHeaders));
                            }
                        };
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    3.3.2、NioEventLoopGroup 初始化

                NioEventLoopGroup bossGroup =
                        new NioEventLoopGroup(
                                1, new ExecutorThreadFactory("flink-rest-server-netty-boss"));
                NioEventLoopGroup workerGroup =
                        new NioEventLoopGroup(
                                0, new ExecutorThreadFactory("flink-rest-server-netty-worker"));
    
                bootstrap = new ServerBootstrap();
                bootstrap
                        .group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(initializer);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    3.3.3、绑定 rest endpoint

                //  3.3、 Binding rest endpoint
                // 3.3.1、获取可用端口范围
                Iterator<Integer> portsIterator;
                try {
                    portsIterator = NetUtils.getPortRangeFromString(restBindPortRange);
                } catch (IllegalConfigurationException e) {
                    throw e;
                } catch (Exception e) {
                    throw new IllegalArgumentException(
                            "Invalid port range definition: " + restBindPortRange);
                }
    
                // 3.3.2、处理端口冲突 将逐一尝试端口是否可用
                int chosenPort = 0;
                while (portsIterator.hasNext()) {
                    try {
                        chosenPort = portsIterator.next();
                        final ChannelFuture channel;
                        // 绑定address,port 获取 channel
                        if (restBindAddress == null) {
                            channel = bootstrap.bind(chosenPort);
                        } else {
                            channel = bootstrap.bind(restBindAddress, chosenPort);
                        }
                        serverChannel = channel.syncUninterruptibly().channel();
                        break;
                    } catch (final Exception e) {
                        // syncUninterruptibly() throws checked exceptions via Unsafe
                        // continue if the exception is due to the port being in use, fail early
                        // otherwise
                        if (!(e instanceof java.net.BindException)) {
                            throw e;
                        }
                    }
                }
    
                if (serverChannel == null) {
                    throw new BindException(
                            "Could not start rest endpoint on any port in port range "
                                    + restBindPortRange);
                }
    
                log.debug("Binding rest endpoint to {}:{}.", restBindAddress, chosenPort);
    
                final InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();
                final String advertisedAddress;
                if (bindAddress.getAddress().isAnyLocalAddress()) {
                    advertisedAddress = this.restAddress;
                } else {
                    advertisedAddress = bindAddress.getAddress().getHostAddress();
                }
    
                port = bindAddress.getPort();
    
                log.info("Rest endpoint listening at {}:{}", advertisedAddress, port);
    
                restBaseUrl = new URL(determineProtocol(), advertisedAddress, port, "").toString();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57

    3.3.4、restAddress 启动成功

    restAddressFuture.complete(restBaseUrl);
    
    • 1

    3.3.5、修改 EndPoint 状态为 RUNNING, 到这里 WebMonitorEndpoint 的 Netty 服务就启动完毕了

    state = State.RUNNING;
    
    • 1

    3.4、钩子来启动子类特定的服务。

    
        /**
         * Hook to start sub class specific services.
         *
         * @throws Exception if an error occurred
         */
        protected abstract void startInternal() throws Exception;
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    我们看下子类 startInternal 的 具体实现
    org.apache.flink.runtime.webmonitor.WebMonitorEndpoint#startInternal

        @Override
        public void startInternal() throws Exception {
        	// 1、 节点选举
            leaderElectionService.start(this);
            startExecutionGraphCacheCleanupTask();
    
            if (hasWebUI) {
                log.info("Web frontend listening at {}.", getRestBaseUrl());
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    3.4.1、 节点选举

    HighAvailabilityServices 初始化, 根据 high-availability 的类型创建不同的 HighAvailabilityServices

    leaderElectionService 初始化是在 WebMonitorEndpoint 创建时构建的

    highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
    
    • 1
    3.4.1.1、以 standalone 模式启动为例 创建的 StandaloneLeaderElectionService 对象
        @Override
        public LeaderElectionService getClusterRestEndpointLeaderElectionService() {
            synchronized (lock) {
                checkNotShutdown();
    
                return new StandaloneLeaderElectionService();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    节点选举, 直接将 contender 设置为领导者, 此处的 contender 就是 WebMonitorEndpoint

        @Override
        public void start(LeaderContender newContender) throws Exception {
            if (contender != null) {
                // Service was already started
                throw new IllegalArgumentException(
                        "Leader election service cannot be started multiple times.");
            }
    
            contender = Preconditions.checkNotNull(newContender);
    
            // directly grant leadership to the given contender
            contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    3.4.1.2、以 zookeeper HA 模式启动为例 创建的 DefaultLeaderElectionService对象

    org.apache.flink.runtime.highavailability.AbstractHaServices#getClusterRestEndpointLeaderElectionService

        @Override
        public LeaderElectionService getClusterRestEndpointLeaderElectionService() {
           // 由子类实现 创建 选举leader服务
            return createLeaderElectionService(getLeaderPathForRestServer());
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    子类实现

        //org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices#createLeaderElectionService
        @Override
        protected LeaderElectionService createLeaderElectionService(String leaderPath) {
            return ZooKeeperUtils.createLeaderElectionService(getCuratorFramework(), leaderPath);
        }
    
    	// 创建 DefaultLeaderElectionService
    	// org.apache.flink.runtime.util.ZooKeeperUtils#createLeaderElectionService
        public static DefaultLeaderElectionService createLeaderElectionService(
                final CuratorFramework client, final String path) {
            return new DefaultLeaderElectionService(createLeaderElectionDriverFactory(client, path));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    DefaultLeaderElectionService 启动节点选举, 此处传入的 contender 就是 WebMonitorEndpoint

    Flink的选举使用的是Curator框架,节点的选举针对每一个参选对象,会创建一个选举驱动leaderElectionDriver,在完成选举之后,会回调两个方法,如果选举成功会回调isLeader方法,如果竞选失败则回调notLeader方法。

    // org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService#start
    
        @Override
        public final void start(LeaderContender contender) throws Exception {
            checkNotNull(contender, "Contender must not be null.");
            Preconditions.checkState(leaderContender == null, "Contender was already set.");
    
            synchronized (lock) {
                running = true;
                 /*
                 在WebMonitorEndpoint中调用时,此contender为DispatcherRestEndPoint
                  在ResourceManager中调用时,contender为ResourceManager
                  在DispatcherRunner中调用时,contender为DispatcherRunner
                 */
                leaderContender = contender;
                
                // 针对每一个参选对象,会创建一个选举驱动leaderElectionDriver
                leaderElectionDriver =
                        leaderElectionDriverFactory.createLeaderElectionDriver(
                                this, 
                                new LeaderElectionFatalErrorHandler(),
                                leaderContender.getDescription());
                LOG.info("Starting DefaultLeaderElectionService with {}.", leaderElectionDriver);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    Flink的选举使用的是Curator框架,节点的选举针对每一个参选对象,会创建一个选举驱动leaderElectionDriver,在完成选举之后,会回调两个方法,如果选举成功会回调isLeader方法,如果竞选失败则回调notLeader方法。
    在这里插入图片描述

    ZooKeeperLeaderElectionDriverFactory 创建 ZooKeeperLeaderElectionDriver, LeaderElectionDriver负责执行领导选举和存储
    领导信息。

    // org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriverFactory#createLeaderElectionDriver
        @Override
        public ZooKeeperLeaderElectionDriver createLeaderElectionDriver(
                LeaderElectionEventHandler leaderEventHandler,   // DefaultLeaderElectionService对象
                FatalErrorHandler fatalErrorHandler,	//  new LeaderElectionFatalErrorHandler()
                String leaderContenderDescription)
                throws Exception {
            return new ZooKeeperLeaderElectionDriver(
                    client, path, leaderEventHandler, fatalErrorHandler, leaderContenderDescription);
        }
        
        public ZooKeeperLeaderElectionDriver(
                CuratorFramework client,
                String path,
                LeaderElectionEventHandler leaderElectionEventHandler, // 传入的是 DefaultLeaderElectionService
                FatalErrorHandler fatalErrorHandler,
                String leaderContenderDescription)
                throws Exception {
            checkNotNull(path);
            this.client = checkNotNull(client);
            this.connectionInformationPath = ZooKeeperUtils.generateConnectionInformationPath(path);
            this.leaderElectionEventHandler = checkNotNull(leaderElectionEventHandler);
            this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
            this.leaderContenderDescription = checkNotNull(leaderContenderDescription);
    
            leaderLatchPath = ZooKeeperUtils.generateLeaderLatchPath(path);
            leaderLatch = new LeaderLatch(client, leaderLatchPath);
            this.cache =
                    ZooKeeperUtils.createTreeCache(
                            client,
                            connectionInformationPath,
                            this::retrieveLeaderInformationFromZooKeeper);
    
            running = true;
    
    		// 开始选举 zookeeper 的 leaderLatch 提供选举功能
            leaderLatch.addListener(this);
            leaderLatch.start();
    
    		 /*
            TODO 选举开始后,不就会接收到响应:
             1.如果竞选成功,则回调该类的isLeader方法
             2.如果竞选失败,则回调该类的notLeader方法
             每一个竞选者对应一个竞选Driver
             */
            cache.start();
    
            client.getConnectionStateListenable().addListener(listener);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    在这里,通过start方法开始进行选举,正如我们上面所说,在选举完成和会调用回调方法,我们去看该类的isLeader方法

        @Override
        public void isLeader() {
            leaderElectionEventHandler.onGrantLeadership(UUID.randomUUID());
        }
    
    // org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService#onGrantLeadership
        @Override
        @GuardedBy("lock")
        public void onGrantLeadership(UUID newLeaderSessionId) {
            synchronized (lock) {
                if (running) {
                    issuedLeaderSessionID = newLeaderSessionId;
                    clearConfirmedLeaderInformation();
    
                    if (LOG.isDebugEnabled()) {
                        LOG.debug(
                                "Grant leadership to contender {} with session ID {}.",
                                leaderContender.getDescription(),
                                issuedLeaderSessionID);
                    }
     				 /*
                     有4中竞选者类型,LeaderContender有4中情况
                     1.Dispatcher = DefaultDispatcherRunner
                     2.JobMaster = JobManagerRunnerImpl
                     3.ResourceManager = ResourceManager
                     4.WebMonitorEndpoint = WebMonitorEndpoint
                     */
                    leaderContender.grantLeadership(issuedLeaderSessionID);
                } else {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug(
                                "Ignoring the grant leadership notification since the {} has "
                                        + "already been closed.",
                                leaderElectionDriver);
                    }
                }
            }
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    我们再进入 leaderContender.grantLeadership 方法,因为当前是 WebMonitorEndpoint 的选举,所以我们进入WebMonitorEndpoint 的实现

        @Override
        public void grantLeadership(final UUID leaderSessionID) {
            log.info(
                    "{} was granted leadership with leaderSessionID={}",
                    getRestBaseUrl(),
                    leaderSessionID);
            leaderElectionService.confirmLeadership(leaderSessionID, getRestBaseUrl());
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    没什么好说的,我们再进入leaderElectionService.confirmLeadership方法,选择DefaultLeaderElectionService实现

        @Override
        public void confirmLeadership(UUID leaderSessionID, String leaderAddress) {
            if (LOG.isDebugEnabled()) {
                LOG.debug(
                        "Confirm leader session ID {} for leader {}.", leaderSessionID, leaderAddress);
            }
    
            checkNotNull(leaderSessionID);
    
            synchronized (lock) {
                if (hasLeadership(leaderSessionID)) {
                    if (running) {
                    	//  确认Leader信息,并将节点信息写入zk
                        confirmLeaderInformation(leaderSessionID, leaderAddress);
                    } else {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug(
                                    "Ignoring the leader session Id {} confirmation, since the "
                                            + "LeaderElectionService has already been stopped.",
                                    leaderSessionID);
                        }
                    }
                } else {
                    // Received an old confirmation call
                    if (!leaderSessionID.equals(this.issuedLeaderSessionID)) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug(
                                    "Receive an old confirmation call of leader session ID {}, "
                                            + "current issued session ID is {}",
                                    leaderSessionID,
                                    issuedLeaderSessionID);
                        }
                    } else {
                        LOG.warn(
                                "The leader session ID {} was confirmed even though the "
                                        + "corresponding JobManager was not elected as the leader.",
                                leaderSessionID);
                    }
                }
            }
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    进入confirmLeaderInformation方法里

    @GuardedBy("lock")
    private void confirmLeaderInformation(UUID leaderSessionID, String leaderAddress) {
        confirmedLeaderSessionID = leaderSessionID;
        confirmedLeaderAddress = leaderAddress;
        leaderElectionDriver.writeLeaderInformation(
                LeaderInformation.known(confirmedLeaderSessionID, confirmedLeaderAddress));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    可以看到,WebMonitorEndpoint在选举Leader成功后,并没有做什么,只是将自己的信息写入zookeeper。

    在信息写入完毕后,WebMonitorEndpoint就算是启动完成了。

    四、总结

    WebMonitorEndpoint的启动流程并不复杂,总结一下就是做了以下这些工作:

    • 初始化一堆Handler
    • 启动Netty服务,注册Handler
    • 启动内部服务: 执行竞选,WebMonitorEndpoint本身就是一个LeaderContender角色
      -竞选成功,其实只是把WebMonitorEndpoint的address以及和zk的sessionId写入znode中

    返回Flink1.15源码解析-总目录

  • 相关阅读:
    WPF 实现点击按钮跳转页面功能
    excel的frequency函数的用法和实例
    C++(二)
    【计算机网络】子网掩码、子网划分
    java基于ssm+vue+elementui楼盘房屋销售系统 前后端分离
    数据结构:二叉排序树
    MySQL表的约束
    OpenCV学习笔记(四)——对视频的读取操作
    关于常见的嵌入式开发IDE的选择
    Prometheus规则定义及基于docker简单邮箱钉钉服务发现
  • 原文地址:https://blog.csdn.net/wuxintdrh/article/details/127829390