• Flink Yarn Per Job - 创建启动Dispatcher RM JobManager


    图片

    Per-job 模式的 AM container 加载运行入口是 YarnJobClusterEntryPoint 中的 main()方法

    YarnClusterDescriptor

    public ClusterClientProvider deployJobCluster(
      ClusterSpecification clusterSpecification,
      JobGraph jobGraph,
      boolean detached) throws ClusterDeploymentException {
      try {
        return deployInternal(
          clusterSpecification,
          "Flink per-job cluster",
          getYarnJobClusterEntrypoint(),
          jobGraph,
          detached);
      } catch (Exception e) {
        throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", e);
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    YarnClusterDescriptor的main方法

    YarnJobClusterEntrypoint

    // ------------------------------------------------------------------------
    //  The executable entry point for the Yarn Application Master Process
    //  for a single Flink job.
    // ------------------------------------------------------------------------
    
    public static void main(String[] args) {
      // startup checks and logging
      EnvironmentInformation.logEnvironmentInfo(LOG, YarnJobClusterEntrypoint.class.getSimpleName(), args);
      SignalHandler.register(LOG);
      JvmShutdownSafeguard.installAsShutdownHook(LOG);
    
      Map env = System.getenv();
    
      final String workingDirectory = env.get(ApplicationConstants.Environment.PWD.key());
      Preconditions.checkArgument(
        workingDirectory != null,
        "Working directory variable (%s) not set",
        ApplicationConstants.Environment.PWD.key());
    
      try {
        YarnEntrypointUtils.logYarnEnvironmentInformation(env, LOG);
      } catch (IOException e) {
        LOG.warn("Could not log YARN environment information.", e);
      }
    
      final Configuration dynamicParameters = ClusterEntrypointUtils.parseParametersOrExit(
        args,
        new DynamicParametersConfigurationParserFactory(),
        YarnJobClusterEntrypoint.class);
      final Configuration configuration = YarnEntrypointUtils.loadConfiguration(workingDirectory, dynamicParameters, env);
    
      YarnJobClusterEntrypoint yarnJobClusterEntrypoint = new YarnJobClusterEntrypoint(configuration);
      // 重要
      ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    ClusterEntrypoint

    // --------------------------------------------------
    // Helper methods
    // --------------------------------------------------
    
    public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {
    
      final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName();
      try {
        // 重要
        clusterEntrypoint.startCluster();
      } catch (ClusterEntrypointException e) {
     ... ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    public void startCluster() throws ClusterEntrypointException {
      LOG.info("Starting {}.", getClass().getSimpleName());
    
      try {
        replaceGracefulExitWithHaltIfConfigured(configuration);
        PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration);
        configureFileSystems(configuration, pluginManager);
    
        SecurityContext securityContext = installSecurityContext(configuration);
    
        securityContext.runSecured((Callable) () -> {
         // 重要
          runCluster(configuration, pluginManager);
    
          return null;
        });
      } 
      ... ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    private void runCluster(Configuration configuration, PluginManager pluginManager) throws Exception {
      synchronized (lock) {
    
        // 初始化服务:Rpc相关
        initializeServices(configuration, pluginManager);
    
        // write host information into configuration
        // 将host信息写入配置
        configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
        configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
        // 创建 dispatcher、ResourceManager 对象的工厂类
        final DispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration);
    
        // 创建和启动 JobManager里的组件:
    // 启动 RpcService、HAService、BlobServer、HeartbeatServices、
    // MetricRegistry、ExecutionGraphStore 等
        clusterComponent = dispatcherResourceManagerComponentFactory.create(
          configuration,
          ioExecutor,
          commonRpcService,
          haServices,
          blobServer,
          heartbeatServices,
          metricRegistry,
          archivedExecutionGraphStore,
          new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
          this);
    
        clusterComponent.getShutDownFuture().whenComplete(
          (ApplicationStatus applicationStatus, Throwable throwable) -> {
            if (throwable != null) {
              shutDownAsync(
                ApplicationStatus.UNKNOWN,
                ExceptionUtils.stringifyException(throwable),
                false);
            } else {
              // This is the general shutdown path. If a separate more specific shutdown was
              // already triggered, this will do nothing
              shutDownAsync(
                applicationStatus,
                null,
                true);
            }
          });
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 初始化RPC相关的服务

    • 将host信息写入配置

    • 创建 dispatcher、ResourceManager 对象的工厂类

    • 创建和启动 JobManager里的组件

    • RpcService

    • HAService

    • BlobServer

    • HeartbeatServices

    • MetricRegistry

    • ExecutionGraphStore

    JobManager里面的三个组件

    DefaultDispatcherResourceManagerComponentFactory
    
    • 1
    @Override
    public DispatcherResourceManagerComponent create(
        Configuration configuration,
        Executor ioExecutor,
        RpcService rpcService,
        HighAvailabilityServices highAvailabilityServices,
        BlobServer blobServer,
        HeartbeatServices heartbeatServices,
        MetricRegistry metricRegistry,
        ArchivedExecutionGraphStore archivedExecutionGraphStore,
        MetricQueryServiceRetriever metricQueryServiceRetriever,
        FatalErrorHandler fatalErrorHandler) throws Exception {
    
      LeaderRetrievalService dispatcherLeaderRetrievalService = null;
      LeaderRetrievalService resourceManagerRetrievalService = null;
      WebMonitorEndpoint webMonitorEndpoint = null;
      ResourceManager resourceManager = null;
      DispatcherRunner dispatcherRunner = null;
    
      try {
        dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();
    
        resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever();
    
        final LeaderGatewayRetriever dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
          rpcService,
          DispatcherGateway.class,
          DispatcherId::fromUuid,
          new ExponentialBackoffRetryStrategy(12, Duration.ofMillis(10), Duration.ofMillis(50)));
    
        final LeaderGatewayRetriever resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
          rpcService,
          ResourceManagerGateway.class,
          ResourceManagerId::fromUuid,
          new ExponentialBackoffRetryStrategy(12, Duration.ofMillis(10), Duration.ofMillis(50)));
    
        final ScheduledExecutorService executor = WebMonitorEndpoint.createExecutorService(
          configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
          configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
          "DispatcherRestEndpoint");
    
        final long updateInterval = configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
        final MetricFetcher metricFetcher = updateInterval == 0
          ? VoidMetricFetcher.INSTANCE
          : MetricFetcherImpl.fromConfiguration(
            configuration,
            metricQueryServiceRetriever,
            dispatcherGatewayRetriever,
            executor);
    // 创建接收前端Rest请求的节点,web页面提交的
        webMonitorEndpoint = restEndpointFactory.createRestEndpoint(
          configuration,
          dispatcherGatewayRetriever,
          resourceManagerGatewayRetriever,
          blobServer,
          executor,
          metricFetcher,
          highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
          fatalErrorHandler);
    
        log.debug("Starting Dispatcher REST endpoint.");
        webMonitorEndpoint.start();
    
        final String hostname = RpcUtils.getHostname(rpcService);
    
        // 创建 Yarn模式的 ResourceManager
        resourceManager = resourceManagerFactory.createResourceManager(
          configuration,
          ResourceID.generate(),
          rpcService,
          highAvailabilityServices,
          heartbeatServices,
          fatalErrorHandler,
          new ClusterInformation(hostname, blobServer.getPort()),
          webMonitorEndpoint.getRestBaseUrl(),
          metricRegistry,
          hostname,
          ioExecutor);
    
        final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint, ioExecutor);
    
        final PartialDispatcherServices partialDispatcherServices = new PartialDispatcherServices(
          configuration,
          highAvailabilityServices,
          resourceManagerGatewayRetriever,
          blobServer,
          heartbeatServices,
          () -> MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, hostname),
          archivedExecutionGraphStore,
          fatalErrorHandler,
          historyServerArchivist,
          metricRegistry.getMetricQueryServiceGatewayRpcAddress(),
          ioExecutor);
    
        log.debug("Starting Dispatcher.");
        //  创建和启动 Dispatcher => dispatcher会创建和启动JobMaster
        dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner(
          highAvailabilityServices.getDispatcherLeaderElectionService(),
          fatalErrorHandler,
          new HaServicesJobGraphStoreFactory(highAvailabilityServices),
          ioExecutor,
          rpcService,
          partialDispatcherServices);
    
        log.debug("Starting ResourceManager.");
        //  启动 ResourceManager
        resourceManager.start();
    
        resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
        dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);
    
        return new DispatcherResourceManagerComponent(
          dispatcherRunner,
          DefaultResourceManagerService.createFor(resourceManager),
          dispatcherLeaderRetrievalService,
          resourceManagerRetrievalService,
          webMonitorEndpoint,
          fatalErrorHandler);
    
      } 
      ... ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 创建接收前端Rest请求的节点,web页面提交的应用

    • 创建 Yarn模式的 ResourceManager

    • 创建和启动 Dispatcher => dispatcher会创建和启动JobMaster

    • 启动 ResourceManager

    创建 YarnResourceMananger

    ResourceManagerFactory
    
    • 1
    public ResourceManager createResourceManager(
        Configuration configuration,
        ResourceID resourceId,
        RpcService rpcService,
        HighAvailabilityServices highAvailabilityServices,
        HeartbeatServices heartbeatServices,
        FatalErrorHandler fatalErrorHandler,
        ClusterInformation clusterInformation,
        @Nullable String webInterfaceUrl,
        MetricRegistry metricRegistry,
        String hostname,
        Executor ioExecutor) throws Exception {
    
      final ResourceManagerMetricGroup resourceManagerMetricGroup = ResourceManagerMetricGroup.create(metricRegistry, hostname);
      final SlotManagerMetricGroup slotManagerMetricGroup = SlotManagerMetricGroup.create(metricRegistry, hostname);
    
    //  重要
      final ResourceManagerRuntimeServices resourceManagerRuntimeServices = createResourceManagerRuntimeServices(
        configuration, rpcService, highAvailabilityServices, slotManagerMetricGroup);
    
      return createResourceManager(
        configuration,
        resourceId,
        rpcService,
        highAvailabilityServices,
        heartbeatServices,
        fatalErrorHandler,
        clusterInformation,
        webInterfaceUrl,
        resourceManagerMetricGroup,
        resourceManagerRuntimeServices,
        ioExecutor);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    ResourceManagerFactory

      protected abstract ResourceManager createResourceManager(
          Configuration configuration,
          ResourceID resourceId,
          RpcService rpcService,
          HighAvailabilityServices highAvailabilityServices,
          HeartbeatServices heartbeatServices,
          FatalErrorHandler fatalErrorHandler,
          ClusterInformation clusterInformation,
          @Nullable String webInterfaceUrl,
          ResourceManagerMetricGroup resourceManagerMetricGroup,
          ResourceManagerRuntimeServices resourceManagerRuntimeServices,
          Executor ioExecutor) throws Exception;
    
      private ResourceManagerRuntimeServices createResourceManagerRuntimeServices(
          Configuration configuration,
          RpcService rpcService,
          HighAvailabilityServices highAvailabilityServices,
          SlotManagerMetricGroup slotManagerMetricGroup) throws ConfigurationException {
        
        //  重要
        return ResourceManagerRuntimeServices.fromConfiguration(
          createResourceManagerRuntimeServicesConfiguration(configuration),
          highAvailabilityServices,
          rpcService.getScheduledExecutor(),
          slotManagerMetricGroup);
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    YarnResourceManagerFactory

    protected ResourceManagerDriver createResourceManagerDriver(Configuration configuration, String webInterfaceUrl, String rpcAddress) {
      final YarnResourceManagerDriverConfiguration yarnResourceManagerDriverConfiguration = new YarnResourceManagerDriverConfiguration(System.getenv(), rpcAddress, webInterfaceUrl);
    
      return new YarnResourceManagerDriver(
        configuration,
        yarnResourceManagerDriverConfiguration,
        DefaultYarnResourceManagerClientFactory.getInstance(),
        DefaultYarnNodeManagerClientFactory.getInstance());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    YarnResourceManagerDriver

    public YarnResourceManagerDriver(
      Configuration flinkConfig,
      YarnResourceManagerDriverConfiguration configuration,
      YarnResourceManagerClientFactory yarnResourceManagerClientFactory,
      YarnNodeManagerClientFactory yarnNodeManagerClientFactory) {
      super(flinkConfig, GlobalConfiguration.loadConfiguration(configuration.getCurrentDir()));
    
      this.yarnConfig = new YarnConfiguration();
      this.requestResourceFutures = new HashMap<>();
      this.configuration = configuration;
    
    // yarn心跳间隔 yarn.heartbeat.interval
      final int yarnHeartbeatIntervalMS = flinkConfig.getInteger(
        YarnConfigOptions.HEARTBEAT_DELAY_SECONDS) * 1000;
    
    // 过期间隔时间
      final long yarnExpiryIntervalMS = yarnConfig.getLong(
        YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
        YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS);
    
      if (yarnHeartbeatIntervalMS >= yarnExpiryIntervalMS) {
        log.warn("The heartbeat interval of the Flink Application master ({}) is greater " +
            "than YARN's expiry interval ({}). The application is likely to be killed by YARN.",
          yarnHeartbeatIntervalMS, yarnExpiryIntervalMS);
      }
      yarnHeartbeatIntervalMillis = yarnHeartbeatIntervalMS;
      containerRequestHeartbeatIntervalMillis = flinkConfig.getInteger(YarnConfigOptions.CONTAINER_REQUEST_HEARTBEAT_INTERVAL_MILLISECONDS);
    
      this.registerApplicationMasterResponseReflector = new RegisterApplicationMasterResponseReflector(log);
    
      this.yarnResourceManagerClientFactory = yarnResourceManagerClientFactory;
      this.yarnNodeManagerClientFactory = yarnNodeManagerClientFactory;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    创建slotManager

    public static ResourceManagerRuntimeServices fromConfiguration(
    ResourceManagerRuntimeServicesConfiguration configuration,
    HighAvailabilityServices highAvailabilityServices,
    ScheduledExecutor scheduledExecutor,
    SlotManagerMetricGroup slotManagerMetricGroup) {


    // 创建slotManager

    final SlotManager slotManager = createSlotManager(configuration, scheduledExecutor, slotManagerMetricGroup);

    final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
    highAvailabilityServices,
    scheduledExecutor,
    configuration.getJobTimeout());

    return new ResourceManagerRuntimeServices(slotManager, jobLeaderIdService);

    }


    创建和启动 Dispatcher

    DefaultDispatcherResourceManagerComponentFactory

    dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner(
      highAvailabilityServices.getDispatcherLeaderElectionService(),
      fatalErrorHandler,
      new HaServicesJobGraphStoreFactory(highAvailabilityServices),
      ioExecutor,
      rpcService,
      partialDispatcherServices);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    DefaultDispatcherRunnerFactory implements DispatcherRunnerFactory
    
    • 1
    public DispatcherRunner createDispatcherRunner(
        LeaderElectionService leaderElectionService,
        FatalErrorHandler fatalErrorHandler,
        JobGraphStoreFactory jobGraphStoreFactory,
        Executor ioExecutor,
        RpcService rpcService,
        PartialDispatcherServices partialDispatcherServices) throws Exception {
    
      final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory = dispatcherLeaderProcessFactoryFactory.createFactory(
        jobGraphStoreFactory,
        ioExecutor,
        rpcService,
        partialDispatcherServices,
        fatalErrorHandler);
    
     // 重要
      return DefaultDispatcherRunner.create(
        leaderElectionService,
        fatalErrorHandler,
        dispatcherLeaderProcessFactory);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    DefaultDispatcherRunner

    public static DispatcherRunner create(
        LeaderElectionService leaderElectionService,
        FatalErrorHandler fatalErrorHandler,
        DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory) throws Exception {
      final DefaultDispatcherRunner dispatcherRunner = new DefaultDispatcherRunner(
        leaderElectionService,
        fatalErrorHandler,
        dispatcherLeaderProcessFactory);
      return DispatcherRunnerLeaderElectionLifecycleManager.createFor(dispatcherRunner, leaderElectionService);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    DispatcherRunnerLeaderElectionLifecycleManager

    public static  DispatcherRunner createFor(T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception {
      return new DispatcherRunnerLeaderElectionLifecycleManager<>(dispatcherRunner, leaderElectionService);
    }
    
    private DispatcherRunnerLeaderElectionLifecycleManager(T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception {
      this.dispatcherRunner = dispatcherRunner;
      this.leaderElectionService = leaderElectionService;
      // 启动Dispatcher的leader选举
      leaderElectionService.start(dispatcherRunner);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    StandaloneLeaderElectionService implements LeaderElectionService
    
    • 1
    @Override
    public void start(LeaderContender newContender) throws Exception {
      if (contender != null) {
        // Service was already started
        throw new IllegalArgumentException("Leader election service cannot be started multiple times.");
      }
    
      contender = Preconditions.checkNotNull(newContender);
    
      // directly grant leadership to the given contender
      contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    DefaultDispatcherRunner
    
    • 1
    @Override
    public void grantLeadership(UUID leaderSessionID) {
      runActionIfRunning(() -> startNewDispatcherLeaderProcess(leaderSessionID));
    }
    
    private void startNewDispatcherLeaderProcess(UUID leaderSessionID) {
      // 先停止 
      stopDispatcherLeaderProcess();
      // 创建dispatcher的leader
      dispatcherLeaderProcess = createNewDispatcherLeaderProcess(leaderSessionID);
      final DispatcherLeaderProcess newDispatcherLeaderProcess = dispatcherLeaderProcess;
      // 继续往下
      FutureUtils.assertNoException(
        previousDispatcherLeaderProcessTerminationFuture.thenRun(newDispatcherLeaderProcess::start));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    AbstractDispatcherLeaderProcess

    public final void start() {
      runIfStateIs(
        State.CREATED,
        this::startInternal);
    }
    
    private void startInternal() {
      log.info("Start {}.", getClass().getSimpleName());
      state = State.RUNNING;
      onStart();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    JobDispatcherLeaderProcess

    @Override
    protected void onStart() {
      final DispatcherGatewayService dispatcherService = dispatcherGatewayServiceFactory.create(
        DispatcherId.fromUuid(getLeaderSessionId()),
        Collections.singleton(jobGraph),
        ThrowingJobGraphWriter.INSTANCE);
    
      completeDispatcherSetup(dispatcherService);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    DefaultDispatcherGatewayServiceFactory implements AbstractDispatcherLeaderProcess.DispatcherGatewayServiceFactory
    
    • 1
    @Override
    public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
      DispatcherId fencingToken,
      Collection recoveredJobs,
      JobGraphWriter jobGraphWriter) {
    
      final Dispatcher dispatcher;
      try {
        // 创建Dispatcher
        dispatcher = dispatcherFactory.createDispatcher(
          rpcService,
          fencingToken,
          recoveredJobs,
          (dispatcherGateway, scheduledExecutor, errorHandler) -> new NoOpDispatcherBootstrap(),
          PartialDispatcherServicesWithJobGraphStore.from(partialDispatcherServices, jobGraphWriter));
      } catch (Exception e) {
        throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);
      }
    
      // 启动 Dispatcher,接着看 onStart()
      dispatcher.start();
    
      return DefaultDispatcherGatewayService.from(dispatcher);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    RpcEndpoint

      /**
       * Triggers start of the rpc endpoint. This tells the underlying rpc server that the rpc endpoint is ready
       * to process remote procedure calls.
       */
      public final void start() {
        // 终端的启动,实际上是由 自身网关(RpcServer)来启动的
        rpcServer.start();
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, RpcServer
    
    • 1
      @Override
      public void start() {
        rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());
      }
    
      @Override
      public void stop() {
        rpcEndpoint.tell(ControlMessages.STOP, ActorRef.noSender());
      }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    启动 ResourceManager

    DefaultDispatcherResourceManagerComponentFactory

        log.debug("Starting ResourceManager.");
        /*TODO 启动 ResourceManager*/
        resourceManager.start();
    
    • 1
    • 2
    • 3

    ResourceManager

      @Override
      public final void onStart() throws Exception {
        try {
          startResourceManagerServices();
        } catch (Throwable t) {
          final ResourceManagerException exception = new ResourceManagerException(String.format("Could not start the ResourceManager %s", getAddress()), t);
          onFatalError(exception);
          throw exception;
        }
      }
    
      private void startResourceManagerServices() throws Exception {
        try {
          leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();
    
          // 创建了Yarn的RM和NM的客户端,初始化并启动
          initialize();
    
          //  通过选举服务,启动ResourceManager
          leaderElectionService.start(this);
    
          jobLeaderIdService.start(new JobLeaderIdActionsImpl());
    
          registerTaskExecutorMetrics();
        } catch (Exception e) {
          handleStartResourceManagerServicesException(e);
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    YarnResourceManagerDriver

    @Override
    protected void initializeInternal() throws Exception {
      final YarnContainerEventHandler yarnContainerEventHandler = new YarnContainerEventHandler();
      try {
          // 创建Yarn的ResourceManager的客户端,并且初始化和启动
        resourceManagerClient = yarnResourceManagerClientFactory.createResourceManagerClient(
          yarnHeartbeatIntervalMillis,
          yarnContainerEventHandler);
        resourceManagerClient.init(yarnConfig);
        resourceManagerClient.start();
    
        final RegisterApplicationMasterResponse registerApplicationMasterResponse = registerApplicationMaster();
        getContainersFromPreviousAttempts(registerApplicationMasterResponse);
        taskExecutorProcessSpecContainerResourcePriorityAdapter =
          new TaskExecutorProcessSpecContainerResourcePriorityAdapter(
            registerApplicationMasterResponse.getMaximumResourceCapability(),
            ExternalResourceUtils.getExternalResources(flinkConfig, YarnConfigOptions.EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX));
      } catch (Exception e) {
        throw new ResourceManagerException("Could not start resource manager client.", e);
      }
    
        // 创建yarn的 NodeManager的客户端,并且初始化和启动
      nodeManagerClient = yarnNodeManagerClientFactory.createNodeManagerClient(yarnContainerEventHandler);
      nodeManagerClient.init(yarnConfig);
      nodeManagerClient.start();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    创建Yarn的ResourceManager的客户端,并且初始化和启动

    创建yarn的 NodeManager的客户端,并且初始化和启动

    Dispatcher 启动 JobManager

    Dispatcher

      @Override
      public void onStart() throws Exception {
        try {
          // 启动 dispatcher服务
          startDispatcherServices();
        } catch (Throwable t) {
          final DispatcherException exception = new DispatcherException(String.format("Could not start the Dispatcher %s", getAddress()), t);
          onFatalError(exception);
          throw exception;
        }
    
        //  启动JobMaster
        startRecoveredJobs();
        this.dispatcherBootstrap = this.dispatcherBootstrapFactory.create(
            getSelfGateway(DispatcherGateway.class),
            this.getRpcService().getScheduledExecutor() ,
            this::onFatalError);
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    启动 dispatcher服务

    启动JobMaster

    private void startRecoveredJobs() {
      for (JobGraph recoveredJob : recoveredJobs) {
        // 下追
        runRecoveredJob(recoveredJob);
      }
      recoveredJobs.clear();
    }
    
    private void runRecoveredJob(final JobGraph recoveredJob) {
      checkNotNull(recoveredJob);
      try {
       // 下追
        runJob(recoveredJob, ExecutionType.RECOVERY);
      } catch (Throwable throwable) {
        onFatalError(new DispatcherException(String.format("Could not start recovered job %s.", recoveredJob.getJobID()), throwable));
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    private void runJob(JobGraph jobGraph, ExecutionType executionType) {
      Preconditions.checkState(!runningJobs.containsKey(jobGraph.getJobID()));
      long initializationTimestamp = System.currentTimeMillis();
      // 下追
      CompletableFuture jobManagerRunnerFuture = createJobManagerRunner(jobGraph, initializationTimestamp);
    
      DispatcherJob dispatcherJob = DispatcherJob.createFor(
          jobManagerRunnerFuture,
          jobGraph.getJobID(),
          jobGraph.getName(),
          initializationTimestamp);
      runningJobs.put(jobGraph.getJobID(), dispatcherJob);
    
      final JobID jobId = jobGraph.getJobID();
    
    ... ...
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    CompletableFuture createJobManagerRunner(JobGraph jobGraph, long initializationTimestamp) {
      final RpcService rpcService = getRpcService();
      return CompletableFuture.supplyAsync(
        () -> {
          try {
            //  创建JobMaster 
            JobManagerRunner runner = jobManagerRunnerFactory.createJobManagerRunner(
              jobGraph,
              configuration,
              rpcService,
              highAvailabilityServices,
              heartbeatServices,
              jobManagerSharedServices,
              new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
              fatalErrorHandler,
              initializationTimestamp);
            // 启动JobMaster
            runner.start();
            return runner;
          } catch (Exception e) {
            throw new CompletionException(new JobInitializationException(jobGraph.getJobID(), "Could not instantiate JobManager.", e));
          }
        },
        ioExecutor); // do not use main thread executor. Otherwise, Dispatcher is blocked on JobManager creation
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    创建JobMaster

    启动JobMaster

    DefaultJobManagerRunnerFactory implements JobManagerRunnerFactory
    
    • 1
      @Override
      public JobManagerRunner createJobManagerRunner(
          JobGraph jobGraph,
          Configuration configuration,
          RpcService rpcService,
          HighAvailabilityServices highAvailabilityServices,
          HeartbeatServices heartbeatServices,
          JobManagerSharedServices jobManagerServices,
          JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
          FatalErrorHandler fatalErrorHandler,
          long initializationTimestamp) throws Exception {
    
        final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration);
    
        final SlotPoolFactory slotPoolFactory = SlotPoolFactory.fromConfiguration(configuration);
        final SchedulerNGFactory schedulerNGFactory = SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration);
        final ShuffleMaster shuffleMaster = ShuffleServiceLoader.loadShuffleServiceFactory(configuration).createShuffleMaster(configuration);
    
        final JobMasterServiceFactory jobMasterFactory = new DefaultJobMasterServiceFactory(
          jobMasterConfiguration,
          slotPoolFactory,
          rpcService,
          highAvailabilityServices,
          jobManagerServices,
          heartbeatServices,
          jobManagerJobMetricGroupFactory,
          fatalErrorHandler,
          schedulerNGFactory,
          shuffleMaster);
      //  下追
        return new JobManagerRunnerImpl(
          jobGraph,
          jobMasterFactory,
          highAvailabilityServices,
          jobManagerServices.getLibraryCacheManager().registerClassLoaderLease(jobGraph.getJobID()),
          jobManagerServices.getScheduledExecutorService(),
          fatalErrorHandler,
          initializationTimestamp);
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    JobManagerRunnerImpl

      public JobManagerRunnerImpl(
          final JobGraph jobGraph,
          final JobMasterServiceFactory jobMasterFactory,
          final HighAvailabilityServices haServices,
          final LibraryCacheManager.ClassLoaderLease classLoaderLease,
          final Executor executor,
          final FatalErrorHandler fatalErrorHandler,
          long initializationTimestamp) throws Exception {
    
    ... ... 
        // now start the JobManager
        this.jobMasterService = jobMasterFactory.createJobMasterService(jobGraph, this, userCodeLoader, initializationTimestamp);
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    DefaultJobMasterServiceFactory implements JobMasterServiceFactory
    
    • 1
    @Override
    public JobMaster createJobMasterService(
        JobGraph jobGraph,
        OnCompletionActions jobCompletionActions,
        ClassLoader userCodeClassloader,
        long initializationTimestamp) throws Exception {
    
      return new JobMaster(
        rpcService,
        jobMasterConfiguration,
        ResourceID.generate(),
        jobGraph,
        haServices,
        slotPoolFactory,
        jobManagerSharedServices,
        heartbeatServices,
        jobManagerJobMetricGroupFactory,
        jobCompletionActions,
        fatalErrorHandler,
        userCodeClassloader,
        schedulerNGFactory,
        shuffleMaster,
        lookup -> new JobMasterPartitionTrackerImpl(
          jobGraph.getJobID(),
          shuffleMaster,
          lookup
        ),
        new DefaultExecutionDeploymentTracker(),
        DefaultExecutionDeploymentReconciler::new,
        initializationTimestamp);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    在JobManager中,最终创建JobMaster来执行任务

    JobMaster 的构造器

    public JobMaster(
    ... ...) throws Exception {
    
      super(rpcService, AkkaRpcServiceUtils.createRandomName(JOB_MANAGER_NAME), null);
    ... ...
    
      this.executionDeploymentTracker = executionDeploymentTracker;
      this.executionDeploymentReconciler = executionDeploymentReconcilerFactory.create(executionStateReconciliationHandler);
    
      this.jobMasterConfiguration = checkNotNull(jobMasterConfiguration);
      this.resourceId = checkNotNull(resourceId);
      this.jobGraph = checkNotNull(jobGraph);
      this.rpcTimeout = jobMasterConfiguration.getRpcTimeout();
      this.highAvailabilityServices = checkNotNull(highAvailabilityService);
      this.blobWriter = jobManagerSharedServices.getBlobWriter();
      this.scheduledExecutorService = jobManagerSharedServices.getScheduledExecutorService();
      this.jobCompletionActions = checkNotNull(jobCompletionActions);
      this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
      this.userCodeLoader = checkNotNull(userCodeLoader);
      this.schedulerNGFactory = checkNotNull(schedulerNGFactory);
      this.heartbeatServices = checkNotNull(heartbeatServices);
      this.jobMetricGroupFactory = checkNotNull(jobMetricGroupFactory);
      this.initializationTimestamp = initializationTimestamp;
      this.retrieveTaskManagerHostName = jobMasterConfiguration.getConfiguration()
          .getBoolean(JobManagerOptions.RETRIEVE_TASK_MANAGER_HOSTNAME);
    
      final String jobName = jobGraph.getName();
      final JobID jid = jobGraph.getJobID();
    
      log.info("Initializing job {} ({}).", jobName, jid);
    
      resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever();
    
      this.slotPool = checkNotNull(slotPoolFactory).createSlotPool(jid);
    
      this.registeredTaskManagers = new HashMap<>(4);
      this.partitionTracker = checkNotNull(partitionTrackerFactory)
        .create(resourceID -> {
          Tuple2 taskManagerInfo = registeredTaskManagers.get(resourceID);
          if (taskManagerInfo == null) {
            return Optional.empty();
          }
    
          return Optional.of(taskManagerInfo.f1);
        });
    
      this.backPressureStatsTracker = checkNotNull(jobManagerSharedServices.getBackPressureStatsTracker());
    
      this.shuffleMaster = checkNotNull(shuffleMaster);
    
      this.jobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph);
      // 创建 调度器,创建的时候把 JobGraph转换成 ExecutionGraph
      this.schedulerNG = createScheduler(executionDeploymentTracker, jobManagerJobMetricGroup);
      this.jobStatusListener = null;
    
      this.resourceManagerConnection = null;
      this.establishedResourceManagerConnection = null;
    
      this.accumulators = new HashMap<>();
      this.taskManagerHeartbeatManager = NoOpHeartbeatManager.getInstance();
      this.resourceManagerHeartbeatManager = NoOpHeartbeatManager.getInstance();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62

    创建 调度器,创建的时候把 JobGraph转换成 ExecutionGraph

    SchedulerBase

    public SchedulerBase(
    ... ...) throws Exception {
    
      this.log = checkNotNull(log);
      this.jobGraph = checkNotNull(jobGraph);
    ... ... 
    // 下追
      this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup, checkNotNull(shuffleMaster), checkNotNull(partitionTracker), checkNotNull(executionDeploymentTracker), initializationTimestamp);
    
      this.schedulingTopology = executionGraph.getSchedulingTopology();
    
      stateLocationRetriever =
        executionVertexId -> getExecutionVertex(executionVertexId).getPreferredLocationBasedOnState();
      inputsLocationsRetriever = new ExecutionGraphToInputsLocationsRetrieverAdapter(executionGraph);
    
      this.coordinatorMap = createCoordinatorMap();
    }
    
    private ExecutionGraph createAndRestoreExecutionGraph(
      JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
      ShuffleMaster shuffleMaster,
      JobMasterPartitionTracker partitionTracker,
      ExecutionDeploymentTracker executionDeploymentTracker,
      long initializationTimestamp) throws Exception {
    // 下追
      ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup, shuffleMaster, partitionTracker, executionDeploymentTracker, initializationTimestamp);
    ... ...
    
      return newExecutionGraph;
    }
    
    private ExecutionGraph createExecutionGraph(
    ... ...
      return ExecutionGraphBuilder.buildGraph(
        null,
        jobGraph,
    ... ...);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    ExecutionGraphBuilder

    public static ExecutionGraph buildGraph(
    ... ...
      checkNotNull(jobGraph, "job graph cannot be null");
    
      final String jobName = jobGraph.getName();
      final JobID jobId = jobGraph.getJobID();
    ... ... 
      // create a new execution graph, if none exists so far
      final ExecutionGraph executionGraph;
      try {
        // 如果不存在执⾏图,就创建⼀个新的执⾏图
        executionGraph = (prior != null) ? prior :
          new ExecutionGraph(
            jobInformation,
            futureExecutor,
            ioExecutor,
            rpcTimeout,
            restartStrategy,
            maxPriorAttemptsHistoryLength,
            failoverStrategyFactory,
            slotProvider,
            classLoader,
            blobWriter,
            allocationTimeout,
            partitionReleaseStrategyFactory,
            shuffleMaster,
            partitionTracker,
            jobGraph.getScheduleMode(),
            executionDeploymentListener,
            executionStateUpdateListener,
            initializationTimestamp);
      } catch (IOException e) {
        throw new JobException("Could not create the ExecutionGraph.", e);
      }
    
      // set the basic properties
      try {
        executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
      }
      catch (Throwable t) {
        log.warn("Cannot create JSON plan for job", t);
        // give the graph an empty plan
        executionGraph.setJsonPlan("{}");
      }
    
      // initialize the vertices that have a master initialization hook
      // file output formats create directories here, input formats create splits
      final long initMasterStart = System.nanoTime();
      log.info("Running initialization on master for job {} ({}).", jobName, jobId);
    
      for (JobVertex vertex : jobGraph.getVertices()) {
        String executableClass = vertex.getInvokableClassName();
        if (executableClass == null || executableClass.isEmpty()) {
          throw new JobSubmissionException(jobId,
              "The vertex " + vertex.getID() + " (" + vertex.getName() + ") has no invokable class.");
        }
    
        try {
          vertex.initializeOnMaster(classLoader);
        }
        catch (Throwable t) {
            throw new JobExecutionException(jobId,
                "Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(), t);
        }
      }
    
      log.info("Successfully ran initialization on master in {} ms.",
          (System.nanoTime() - initMasterStart) / 1_000_000);
    
      // topologically sort the job vertices and attach the graph to the existing one
    //  对JobGraph进⾏拓扑排序,获取所有的JobVertex列表
      List sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
      if (log.isDebugEnabled()) {
        log.debug("Adding {} vertices from job graph {} ({}).", sortedTopology.size(), jobName, jobId);
      }
    // 将拓扑排序过的JobGraph添加到 executionGraph数据结构中。
      executionGraph.attachJobGraph(sortedTopology);
    
      if (log.isDebugEnabled()) {
        log.debug("Successfully created execution graph from job graph {} ({}).", jobName, jobId);
      }
    
     ... ...
        executionGraph.enableCheckpointing(
          chkConfig,
          triggerVertices,
          ackVertices,
          confirmVertices,
          hooks,
          checkpointIdCounter,
          completedCheckpoints,
          rootBackend,
          checkpointStatsTracker);
      }
    
      // create all the metrics for the Execution Graph
    
      metrics.gauge(RestartTimeGauge.METRIC_NAME, new RestartTimeGauge(executionGraph));
      metrics.gauge(DownTimeGauge.METRIC_NAME, new DownTimeGauge(executionGraph));
      metrics.gauge(UpTimeGauge.METRIC_NAME, new UpTimeGauge(executionGraph));
    
      executionGraph.getFailoverStrategy().registerMetrics(metrics);
    
      return executionGraph;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105

    如果不存在执⾏图,就创建⼀个新的执⾏图

    对JobGraph进⾏拓扑排序,获取所有的JobVertex列表

    将拓扑排序过的JobGraph添加到 executionGraph数据结构中。

    JobManagerRunnerImpl

      @Override
      public void start() throws Exception {
          leaderElectionService.start(this);
    
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    StandaloneLeaderElectionService

    public void start(LeaderContender newContender) throws Exception {
      if (contender != null) {
        // Service was already started
        throw new IllegalArgumentException("Leader election service cannot be started multiple times.");
      }
    
      contender = Preconditions.checkNotNull(newContender);
    
      // directly grant leadership to the given contender
      contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    @Override
    public void grantLeadership(final UUID leaderSessionID) {
    ... ...
     return verifyJobSchedulingStatusAndStartJobManager(leaderSessionID);
    ... ...
    }
    
    private CompletableFuture verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) {
      final CompletableFuture jobSchedulingStatusFuture = getJobSchedulingStatus();
    
      return jobSchedulingStatusFuture.thenCompose(
        jobSchedulingStatus -> {
          if (jobSchedulingStatus == JobSchedulingStatus.DONE) {
            return jobAlreadyDone();
          } else {
          // 启动 JobMaster
            return startJobMaster(leaderSessionId);
          }
        });
    }
    
    
    private CompletionStage startJobMaster(UUID leaderSessionId) {
      log.info("JobManager runner for job {} ({}) was granted leadership with session id {} at {}.",
        jobGraph.getName(), jobGraph.getJobID(), leaderSessionId, jobMasterService.getAddress());
    
    
    ... ...
      final CompletableFuture startFuture;
      try {
      // 启动JobMaster服务
        startFuture = jobMasterService.start(new JobMasterId(leaderSessionId));
      } catch (Exception e) {
        return FutureUtils.completedExceptionally(new FlinkException("Failed to start the JobMaster.", e));
      }
    
      final CompletableFuture currentLeaderGatewayFuture = leaderGatewayFuture;
      return startFuture.thenAcceptAsync(
        (Acknowledge ack) -> confirmLeaderSessionIdIfStillLeader(
          leaderSessionId,
          jobMasterService.getAddress(),
          currentLeaderGatewayFuture),
        executor);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    启动JobMaster服务

    JobMaster

    public CompletableFuture start(final JobMasterId newJobMasterId) throws Exception {
      // make sure we receive RPC and async calls
      start();
      //  异步不阻塞 调用
      return callAsyncWithoutFencing(() -> startJobExecution(newJobMasterId), RpcUtils.INF_TIMEOUT);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    图片

  • 相关阅读:
    web上构建3d效果 基于three.js的实例
    视频结构化 AI 推理流程
    谁说爬虫只能Python?看我用C#快速简单实现爬虫开发和演示!
    分享一些我技术成长的感悟
    第20章 原子操作实验(iTOP-RK3568开发板驱动开发指南 )
    调度器/调度程序
    Golang协程的概念、用法、场景及案例
    初识 My Batis一 什么是My Batis,JDBC缺点,My Batis简化,Mapper 代理开发,My Batis 核心配置文件
    数仓建模—OneID
    startsWith()方法的使用
  • 原文地址:https://blog.csdn.net/hyunbar/article/details/126129241