• TaskManager的启动流程


    TaskManager是运行时的工作节点,提供了Job运行时所需的Slot计算资源。JobManager提交的Task实例,都会运行在TaskManager上。

    1. TaskManagerRunner的启动流程

    执行bin/start-cluster.sh脚本启动集群时,会同步运行TaskManager的启动脚本,脚本会调用执行TaskManagerRunner。TaskManagerRunner专门用于创建TaskManager

    /**
     * 启动TaskManager,其实就是对TaskExecutor内部服务进行初始化。
     * 初始化完毕后,TaskExecutor作为TaskManager就算是启动完毕了
     */
    public static void runTaskManager(Configuration configuration, ResourceID resourceId) throws Exception {
        // 初始化TaskManagerRunner实例(内部构造方法会同步创建TaskExecutor)
        // 最后一个参数是:TaskExecutorToServiceAdapter(TaskExecutorService接口的实现子类)
        final TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, resourceId, TaskManagerRunner::createTaskExecutorService);
    
        // 这里调用的是TaskExecutorToServiceAdapter#start()方法
        // 最终调用的就是TaskExecutor#start()方法,由于TaskExecutor继承了RpcEndPoint,所以这会启动TaskExecutor对应的RpcServer。
        // 启动RpcServer会同步调用RpcEndPoint实现类的onStart方法,也就是TaskExecutor#onStart()方法
        taskManagerRunner.start();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    在new TaskManagerRunner的过程中,会创建TaskExecutor。TaskExecutor是TaskManager的底层实现类,因此,TaskManager的能力都是由TaskExecutor提供的。

    /**
     * new出来TaskExecutor,并将其交给TaskExecutorService的成员变量“合法持有”
     */
    public static TaskExecutorService createTaskExecutorService(
        Configuration configuration,
        ResourceID resourceID,
        RpcService rpcService,
        HighAvailabilityServices highAvailabilityServices,
        HeartbeatServices heartbeatServices,
        MetricRegistry metricRegistry,
        BlobCacheService blobCacheService,
        boolean localCommunicationOnly,
        FatalErrorHandler fatalErrorHandler) throws Exception {
    
        // 创建TaskExecutor实例(直接new TaskExecutor)
        final TaskExecutor taskExecutor = startTaskManager(
            configuration,
            resourceID,
            rpcService,
            highAvailabilityServices,
            heartbeatServices,
            metricRegistry,
            blobCacheService,
            localCommunicationOnly,
            fatalErrorHandler);
    
        // 将创建好的TaskExecutor实例,包装到TaskExecutorToServiceAdapter中,让它的成员变量持有
        // TaskExecutorToServiceAdapter是TaskExecutorService的实现子类,通过这种包装、持有,TaskManagerRunner才能顺利的启动TaskExecutor
        return TaskExecutorToServiceAdapter.createFor(taskExecutor);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    创建好的TaskExecutor实例,交给TaskExecutorToServiceAdapter的成员变量持有,而TaskExecutorToServiceAdapter又被包装到了TaskManagerRunner中。通过这种包装、持有操作,让TaskManagerRunner可以“合法持有”TaskExecutor实例。

    得到TaskManagerRunner后,就会调用它的start()方法。 通过上面分析过的“合法持有”关系,调用TaskManagerRunner#start()就是最终调用的TaskExecutor#start()方法。由于TaskExecutor继承了RpcEndPoint,所以这会启动TaskExecutor对应的RpcServer。启动RpcServer会同步调用RpcEndPoint实现类的onStart方法,也就是TaskExecutor#onStart()方法。

    /**
     * 启动TaskManager,实质上就是执行TaskExecutor#start()。
     * TaskExecutor继承了RpcEndpoint,所以启动TaskExecutor就是启动TaskExecutor对应的RpcServer服务。
     * 启动RpcServer服务会同步调用RpcEndpoint#onStart()方法,也就是调用TaskExecutor#onStart()
     */
    @Override
    public void onStart() throws Exception {
        try {
            // 启动TaskExecutor,本质上就是对TaskExecutor内部服务进行初始化
            startTaskExecutorServices();
        } catch (Exception e) {
            final TaskManagerException exception = new TaskManagerException(String.format("Could not start the TaskExecutor %s", getAddress()), e);
            onFatalError(exception);
            throw exception;
        }
    
        startRegistrationTimeout();
    }
    
    
    /**
     * TaskExecutor内部服务都初始化完毕后,就意味着TaskExecutor(作为TaskManager)启动完成了。
     */
    private void startTaskExecutorServices() throws Exception {
        try {
            // 通过LeaderRetrievalService服务,创建TaskExecutor与ResourceManager之间的RPC连接,此时会将TaskManager的资源信息汇报给ResourceManager
            // 这个监听器在TaskExecutor启动时会初始化调用一次,以后只有当ResourceManager的Leader地址发生变更时才会被调用
            resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
    
            // 启动TaskSlotTable服务,它是用来管理TaskManager内的Slot计算资源
            taskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());
    
            // 启动JobLeaderService服务,它是TaskExecutor用来和JobManager进行RPC通信时,获取JobManager的Leader节点的
            jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());
    
            // FileCache是用来存储Task执行过程中,从PermanentBlobService拉取来的文件,并将其放到/tmp_/路径下。
    		// 如果Task处于“非注册”状态的时间超过5s,就将这个临时文件clear掉
            fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(), blobCacheService.getPermanentBlobService());
        } catch (Exception e) {
            handleStartTaskExecutorServicesException(e);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    TaskExecutor内部服务都初始化完毕后,就意味着TaskExecutor(作为TaskManager)启动完成了。此时ResourceManager会接收到来自TaskManager的注册信息,并对TaskManager提供的资源统一管理。

    总结:

    • 创建TaskManagerRunner,就是创建TaskExecutor
    • 启动TaskManagerRunner,就是启动TaskExecutor
    • 启动TaskExecutor,就是将TaskExecutor内部用到的服务进行初始化(负责RPC连接的、管理Slot的、管理Task执行过程中的临时文件的相关服务)

    2. 将TaskManager注册给ResourceManager

    在启动TaskExecutor(初始化TaskExecutor的内部服务)时,first step就是建立TaskExecutor和ResourceManager之间的RPC连接

    // 创建TaskExecutor与ResourceManager之间的RPC连接,此时会将TaskManager的资源信息汇报给ResourceManager
    // 这个监听器在TaskExecutor启动时会初始化调用一次,以后只有当ResourceManager的Leader地址发生变更时才会被调用
    resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
    
    • 1
    • 2
    • 3

    LeaderRetrievalService服务通过ResourceManagerLeaderListener,可以获取ResourceManager的Leader地址。其中,该能力是由LeaderRetrievalListener接口提供的。这个监听器在TaskExecutor启动时会初始化调用一次,以后只有当ResourceManager的Leader地址发生变更时才会被调用

    /**
     * The listener for leader changes of the resource manager.
     * ResourceManager的Leader地址发生变更时,所用的监听器
     * LeaderRetrievalListener接口提供了监听方法
     */
    private final class ResourceManagerLeaderListener implements LeaderRetrievalListener {
    
       @Override
       public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
          // 当ResourceManager的Leader地址发生变更时,会异步执行下面的方法块
          runAsync(
             // 根据新的Leader地址,重新和ResourceManager建立RPC连接
             () -> notifyOfNewResourceManagerLeader(
                leaderAddress,
                ResourceManagerId.fromUuidOrNull(leaderSessionID)));
       }
    
       /**
        * 异常处理
        */
       @Override
       public void handleError(Exception exception) {
          onFatalError(exception);
       }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    有了ResourceManager变更后的Leader地址,就可以创建一个新的“可以连接到ResourceManager的地址”。有了“新ResourceManager的Leader地址”,就能重新和ResourceManager建立RPC连接了

    /**
     * 根据ResourceManager变更后的Leader地址,重新和ResourceManager建立RPC连接
     */
    private void notifyOfNewResourceManagerLeader(String newLeaderAddress, ResourceManagerId newResourceManagerId) {
       // 根据新的ResourceManager的Leader地址和id,创建新的ResourceManager的连接地址
       resourceManagerAddress = createResourceManagerAddress(newLeaderAddress, newResourceManagerId);
       // 重新建立和ResourceManager之间的RPC连接
       reconnectToResourceManager(new FlinkException(String.format("ResourceManager leader changed to new address %s", resourceManagerAddress)));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    和ResourceManager重新建立连接,分为3步:

    • 关闭“旧的”连接
    • 注册超时等待时间,防止TaskExecutor和ResourceManager之间的RPC连接超时
    • 尝试再次连接“新的”ResourceManager
    /**
     * 重新和ResourceManager的Leader建立RPC连接
     */
    private void reconnectToResourceManager(Exception cause) {
       // 关闭之前的ResourceManager的Leader的“旧的”RPC连接
       // 如果是首次启动TaskExecutor,不会执行“关闭”操作
       closeResourceManagerConnection(cause);
       // 注册超时等待时间,防止TaskExecutor和ResourceManager之间出现RPC连接超时
       startRegistrationTimeout();
       // 尝试再次连接ResourceManager
       tryConnectToResourceManager();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    最重要的就是TaskExecutor重新和ResourceManager建立连接,核心行为就是创建TaskExecutor和ResourceManager之间的物理连接,然后start。

    /**
     * TaskExecutor重新和ResourceManager建立RPC连接
     */
    private void tryConnectToResourceManager() {
       if (resourceManagerAddress != null) {
          // 提供的Leader地址不为null,就开始建立RPC连接
          connectToResourceManager();
       }
    }
    
    /**
     * TaskExecutor重新和ResourceManager建立RPC连接
     */
    private void connectToResourceManager() {
        assert(resourceManagerAddress != null);
        assert(establishedResourceManagerConnection == null);
        assert(resourceManagerConnection == null);
    
        log.info("Connecting to ResourceManager {}.", resourceManagerAddress);
    
        // 创建TaskExecutorRegistration,用来存放TaskExecutor的基本信息
        final TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(
            getAddress(),
            getResourceID(),
            taskManagerLocation.dataPort(),
            hardwareDescription,
            taskManagerConfiguration.getDefaultSlotResourceProfile(),
            taskManagerConfiguration.getTotalResourceProfile()
        );
    
        // TaskExecutorToResourceManagerConnection是TaskExecutor和ResourceManager之间的物理连接
        resourceManagerConnection =
            new TaskExecutorToResourceManagerConnection(
            log,
            getRpcService(),
            taskManagerConfiguration.getRetryingRegistrationConfiguration(),
            resourceManagerAddress.getAddress(),
            resourceManagerAddress.getResourceManagerId(),
            getMainThreadExecutor(),
            new ResourceManagerRegistrationListener(),
            taskExecutorRegistration);
        // 启动TaskExecutor和ResourceManager之间的物理连接
        resourceManagerConnection.start();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    至此,TaskExecutor和ResourceManager之间的RPC连接就算是创建完毕了。此时的ResourceManager就可以通过ResourceManagerGateway接收TaskExecutor的注册信息。一旦注册成功的话,TaskExecutor就会创建TaskManager和ResourceManager之间的心跳连接。TaskExecutor就会通过心跳连接,将SlotReport发送给ResourceManager。

    3. TaskManager向ResourceManager汇报Slot

    上面已经创建好了TaskExecutor和ResourceManager之间的物理连接,并调用TaskExecutorToResourceManagerConnection#start()方法(实际是父类RegisteredRpcConnection的start()方法)。

    既然TaskExecutor会通过TaskExecutorToResourceManagerConnection#start()方法向ResourceManager注册,那就得有专门的方法来接收ResourceManager的反馈信息。

    /**
     * TaskExecutor向ResourceManager注册后,会收到ResourceManager的反馈
     */
    @Override
    protected void onRegistrationSuccess(TaskExecutorRegistrationSuccess success) {
       log.info("Successful registration at resource manager {} under registration id {}.",
          getTargetAddress(), success.getRegistrationId());
    
       // 向TaskExecutor发送“TaskExecutor已经向ResourceManager注册成功”的消息
       registrationListener.onRegistrationSuccess(this, success);
    }
    
    @Override
    protected void onRegistrationFailure(Throwable failure) {
       log.info("Failed to register at resource manager {}.", getTargetAddress(), failure);
    
       registrationListener.onRegistrationFailure(failure);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    RegistrationConnectionListener接口专门用来接收“TaskExecutor向ResourceManager注册”后的反馈信息,唯一实现是TaskExecutor的内部类ResourceManagerRegistrationListener。

    /**
     * 当TaskExecutor向ResourceManager注册后,会收到来自ResourceManager的反馈信息。
     * 注册success or failure会通过这个监听器回调,通知给TaskExecutor
     */
    private final class ResourceManagerRegistrationListener implements RegistrationConnectionListener<TaskExecutorToResourceManagerConnection, TaskExecutorRegistrationSuccess> {
    
       @Override
       public void onRegistrationSuccess(TaskExecutorToResourceManagerConnection connection, TaskExecutorRegistrationSuccess success) {
          // 获取“注册”相关的配置信息
          final ResourceID resourceManagerId = success.getResourceManagerId();
          final InstanceID taskExecutorRegistrationId = success.getRegistrationId();
          final ClusterInformation clusterInformation = success.getClusterInformation();
          // 从物理连接中,取出ResourceManagerGateway
          final ResourceManagerGateway resourceManagerGateway = connection.getTargetGateway();
    
          // 异步执行方法块:构建TaskExecutor和ResourceManager之间的网络连接
          runAsync(
             () -> {
                // filter out outdated connections
                //noinspection ObjectEquality
                if (resourceManagerConnection == connection) {
                   try {
                      establishResourceManagerConnection(
                         resourceManagerGateway,
                         resourceManagerId,
                         taskExecutorRegistrationId,
                         clusterInformation);
                   } catch (Throwable t) {
                      log.error("Establishing Resource Manager connection in Task Executor failed", t);
                   }
                }
             });
       }
    
       @Override
       public void onRegistrationFailure(Throwable failure) {
          onFatalError(failure);
       }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    一旦TaskExecutor向ResourceManager注册成功后,“成功消息”会通过监听器通知TaskExecutor。TaskExecutor收到“success消息”后,会调用以下方法,向ResourceManager汇报SlotReport信息

    /**
     * 一旦TaskExecutor向ResourceManager注册成功后,“成功消息”会通过监听器通知TaskExecutor。
     * TaskExecutor收到后,会调用该方法,向ResourceManager汇报SlotReport信息
     */
    private void establishResourceManagerConnection(
          ResourceManagerGateway resourceManagerGateway,
          ResourceID resourceManagerResourceId,
          InstanceID taskExecutorRegistrationId,
          ClusterInformation clusterInformation) {
    
       // 向ResourceManager汇报SlotReport信息
       // SlotReport包含当前TaskExecutor具备的Slot资源报告
       final CompletableFuture<Acknowledge> slotReportResponseFuture = resourceManagerGateway.sendSlotReport(
          getResourceID(),
          taskExecutorRegistrationId,
          taskSlotTable.createSlotReport(getResourceID()),
          taskManagerConfiguration.getTimeout());
    
       // 执行SlotReport上报操作,如果出现异常,就重新和ResourceManager建立连接
       slotReportResponseFuture.whenCompleteAsync(
          (acknowledge, throwable) -> {
             if (throwable != null) {
                // 上报SlotReport出现异常,重新和ResourceManager建立连接
                reconnectToResourceManager(new TaskManagerException("Failed to send initial slot report to ResourceManager.", throwable));
             }
          }, getMainThreadExecutor());
    
       // monitor the resource manager as heartbeat target
       // 通过HeartbeatManager,建立TaskExecutor和ResourceManager之间的心跳连接
       // TaskExecutor会周期性的向ResourceManager进行心跳上报
       resourceManagerHeartbeatManager.monitorTarget(resourceManagerResourceId, new HeartbeatTarget<TaskExecutorHeartbeatPayload>() {
          @Override
          public void receiveHeartbeat(ResourceID resourceID, TaskExecutorHeartbeatPayload heartbeatPayload) {
             resourceManagerGateway.heartbeatFromTaskManager(resourceID, heartbeatPayload);
          }
    
          @Override
          public void requestHeartbeat(ResourceID resourceID, TaskExecutorHeartbeatPayload heartbeatPayload) {
             // the TaskManager won't send heartbeat requests to the ResourceManager
          }
       });
    
       // set the propagated blob server address
       final InetSocketAddress blobServerAddress = new InetSocketAddress(
          clusterInformation.getBlobServerHostname(),
          clusterInformation.getBlobServerPort());
    
       blobCacheService.setBlobServerAddress(blobServerAddress);
    
       // EstablishedResourceManagerConnection被用来保存TaskExecutor和ResourceManager之间的连接信息,
       // 这些会保存在TaskExecutor内
       establishedResourceManagerConnection = new EstablishedResourceManagerConnection(
          resourceManagerGateway,
          resourceManagerResourceId,
          taskExecutorRegistrationId);
    
       // 在TaskExecutor尝试和ResourceManager建立连接之前,曾经注册了一个超时等待时间(防止TaskExecutor和ResourceManager之间连接超时)
       // 此时停止注册超时监听,因为TaskExecutor和ResourceManager之间的连接已经建立完成,SlotReport也已经汇报给ResourceManager了
       stopRegistrationTimeout();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    TaskExecutor会通过ResourceManagerGateway,向ResourceManager汇报SlotReport信息。如果汇报过程出现异常,就重新建立和ResourceManager之间的网络连接。最后停止注册超时监听,在TaskExecutor尝试和ResourceManager建立连接之前,曾经注册了一个超时等待时间(防止TaskExecutor和ResourceManager之间连接超时)。此时停止注册超时监听,因为TaskExecutor和ResourceManager之间的连接已经建立完成,SlotReport也已经汇报给ResourceManager了。

    最终,TaskExecutor也启动了,和ResourceManager之间的连接也建立好了,TaskExecutor内的SlotReport也都汇报给ResourceManager了。这意味着,ResourceManager可以对TaskManager进行统一管理了,作业提交到集群后,会从已注册的TaskManager上分配Slot计算资源,并将Task运行在指定的TaskManager上。

  • 相关阅读:
    循环神经网络
    【小型物体测速仪】只有原理,无代码
    虚幻动画系统概述
    list转map
    R语言ggplot2可视化:使用ggpubr包的ggviolin函数可视化小提琴图
    Rowset 的元数据一直存储在内存中
    记一次 MySQL 主从同步异常的排查记录,百转千回
    ETLCloud制造业轻量级数据中台解决方案
    嵌入式分享合集107
    网络安全笔记-SSRF
  • 原文地址:https://blog.csdn.net/qq_36299025/article/details/127715766