• Apache DolphinScheduler源码分析(超详细)


    01 引言

    Apache DolphinScheduler官方文档地址:https://dolphinscheduler.apache.org/zh-cn/index.html

    Apache DolphinScheduler是一个分布式去中心化,易扩展的可视化DAG工作流任务调度平台。致力于解决数据处理流程中错综复杂的依赖关系,使调度系统在数据处理流程中开箱即用。

    其原理图如下:
    在这里插入图片描述

    接下来,本文一步一步详细地讲解其源码。

    02 DolphinScheduler 项目结构

    2.1 结构分析

    首先使用IDEAGithub clone项目到本地,项目地址https://github.com/apache/dolphinscheduler
    在这里插入图片描述


    导入项目后,可以看到其主要核心模块如下:

    模块描述
    dolphinscheduler-alert告警模块,提供 AlertServer 服务。
    dolphinscheduler-api web应用模块,提供 ApiServer 服务。
    dolphinscheduler-common通用的常量枚举、工具类、数据结构或者基类
    dolphinscheduler-dao提供数据库访问等操作。
    dolphinscheduler-remote基于 netty 的客户端、服务端
    dolphinscheduler-server MasterServer 和 WorkerServer 服务
    dolphinscheduler-serviceservice模块,包含Quartz、Zookeeper、日志客户端访问服务,便于server模块和api模块调用
    dolphinscheduler-ui前端模块

    2.2 表分析

    在项目/dolphinscheduler-dao/src/main/resources/sql/create/release-1.0.0_schema/mysql目录下,有数据库初始化脚本dolphinscheduler_ddl.sql及dolphinscheduler_dml.sql脚本以及一些升级脚本:

    在这里插入图片描述
    执行完后,可以在数据库里看到有如下表:

    表名表信息
    t_ds_access_token访问ds后端的token
    t_ds_alert 告警信息
    t_ds_alertgroup 告警组
    t_ds_command 执行命令
    t_ds_datasource 数据源
    t_ds_error_command(核心表) 错误命令
    t_ds_process_definition(核心表) 流程定义
    t_ds_process_instance(核心表) 流程实例
    t_ds_project 项目
    t_ds_queue 队列
    t_ds_relation_datasource_user 用户关联数据源
    t_ds_relation_process_instance 子流程
    t_ds_relation_project_user 用户关联项目
    t_ds_relation_resources_user 用户关联资源
    t_ds_relation_udfs_user 用户关联UDF函数
    t_ds_relation_user_alertgroup用户关联告警组
    t_ds_resources 资源文件
    t_ds_schedules(核心表) 流程定时调度
    t_ds_session 用户登录的session
    t_ds_task_instance(核心表) 任务实例
    t_ds_tenant 租户
    t_ds_udfs UDF资源
    t_ds_user 用户
    t_ds_version ds版本信息

    核心表可以直接看文末附录。

    2.2.1 类关系图 (用户/队列/数据源)

    在这里插入图片描述
    描述如下:

    • 一个租户下可以有多个用户;
    • t_ds_user中的queue字段存储的是队列表中的queue_name信息;
    • t_ds_tenant下存的是queue_id,在流程定义执行过程中,用户队列优先级最高,用户队列为空则采用租户队列;
    • t_ds_datasource表中的user_id字段表示创建该数据源的用户;
    • t_ds_relation_datasource_user中的user_id表示,对数据源有权限的用户

    2.2.2 类关系图 (项目/资源/告警)

    在这里插入图片描述
    描述如下:

    • 一个用户可以有多个项目,用户项目授权通过t_ds_relation_project_user表完成project_id和user_id的关系绑定
    • t_ds_projcet表中的user_id表示创建该项目的用户;
    • t_ds_relation_project_user表中的user_id表示对项目有权限的用户;
    • t_ds_resources表中的user_id表示创建该资源的用户;
    • t_ds_relation_resources_user中的user_id表示对资源有权限的用户;
    • t_ds_udfs表中的user_id表示创建该UDF的用户;
    • t_ds_relation_udfs_user表中的user_id表示对UDF有权限的用户。

    2.2.3 类关系图 ( 命令/流程/任务)

    在这里插入图片描述


    在这里插入图片描述
    描述如下:

    • 一个项目有多个流程定义,一个流程定义可以生成多个流程实例,一个流程实例可以生成多个任务实例
    • t_ds_schedulers表存放流程定义的定时调度信息;
    • t_ds_relation_process_instance表存放的数据用于处理流程定义中含有子流程的情况,parent_process_instance_id表示含有子流程的主流程实例id,process_instance_id表示子流程实例的id,parent_task_instance_id表示子流程节点的任务实例id,流程实例表和任务实例表分别对应t_ds_process_instance表和t_ds_task_instance表

    03 DolphinScheduler 源码分析

    讲解源码前,先贴一份官网的启动流程图:
    在这里插入图片描述

    3.1 ExecutorController

    根据上述的流程图,可以看到,我们在UI层点击“启动按钮”后,会访问dophinscheduler-api目录的Api Server服务,会进入到ExecutorController(完整类路径:org.apache.dolphinscheduler.api.controller.ExecutorController),下面看看ExecutorController提供了哪些接口方法?

    在这里插入图片描述

    以下是对各接口的描述:

    接口描述
    /start-process-instance执行流程实例
    /batch-start-process-instance批量执行流程实例
    /execute 操作流程实例,如:暂停, 停止, 重跑, 从暂停恢复,从停止恢复
    /batch-execute批量操作流程实例
    /start-check检查流程定义或检查所有的子流程定义是否在线

    接下我们看看最核心的execute方法:

     /**
         * do action to process instance: pause, stop, repeat, recover from pause, recover from stop
         *
         * @param loginUser login user
         * @param projectCode project code
         * @param processInstanceId process instance id
         * @param executeType execute type
         * @return execute result code
         */
        @ApiOperation(value = "execute", notes = "EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES")
        @ApiImplicitParams({
                @ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100"),
                @ApiImplicitParam(name = "executeType", value = "EXECUTE_TYPE", required = true, dataType = "ExecuteType")
        })
        @PostMapping(value = "/execute")
        @ResponseStatus(HttpStatus.OK)
        @ApiException(EXECUTE_PROCESS_INSTANCE_ERROR)
        @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
        public Result execute(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
                              @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
                              @RequestParam("processInstanceId") Integer processInstanceId,
                              @RequestParam("executeType") ExecuteType executeType
        ) {
            Map<String, Object> result = execService.execute(loginUser, projectCode, processInstanceId, executeType);
            return returnDataList(result);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    可以看到execute接口,是直接使用ExecService去执行了,下面分析下

    3.2 ExecService

    下面看看里面的execute方法,已经加好了注释:

    /**
     * 操作工作流实例
     *
     * @param loginUser         登录用户
     * @param projectCode       项目编码
     * @param processInstanceId 流程实例ID
     * @param executeType       执行类型(repeat running、resume pause、resume failure、stop、pause)
     * @return 执行结果
     */
    @Override
    public Map<String, Object> execute(User loginUser, long projectCode, Integer processInstanceId, ExecuteType executeType) {
    
        /*** 查询项目信息 **/
        Project project = projectMapper.queryByCode(projectCode);
        //check user access for project
    
        /*** 判断当前用户是否有操作权限 **/
        Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode, ApiFuncIdentificationConstant.map.get(executeType));
        if (result.get(Constants.STATUS) != Status.SUCCESS) {
            return result;
        }
    
        /*** 检查Master节点是否存在 **/
        if (!checkMasterExists(result)) {
            return result;
        }
    
        /*** 查询工作流实例详情 **/
        ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId);
        if (processInstance == null) {
            putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
            return result;
        }
    
        /*** 根据工作流实例绑定的流程定义ID查询流程定义 **/
        ProcessDefinition processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
                processInstance.getProcessDefinitionVersion());
        if (executeType != ExecuteType.STOP && executeType != ExecuteType.PAUSE) {
            /*** 校验工作流定义能否执行(工作流是否存在?是否上线状态?存在子工作流定义不是上线状态?) **/
            result = checkProcessDefinitionValid(projectCode, processDefinition, processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
            if (result.get(Constants.STATUS) != Status.SUCCESS) {
                return result;
            }
        }
    
        /*** 根据当前工作流实例的状态判断能否执行对应executeType类型的操作 **/
        result = checkExecuteType(processInstance, executeType);
        if (result.get(Constants.STATUS) != Status.SUCCESS) {
            return result;
        }
    
        /*** 判断是否已经选择了合适的租户 **/
        if (!checkTenantSuitable(processDefinition)) {
            logger.error("there is not any valid tenant for the process definition: id:{},name:{}, ",
                    processDefinition.getId(), processDefinition.getName());
            putMsg(result, Status.TENANT_NOT_SUITABLE);
        }
    
        /*** 在executeType为重跑的状态下,获取用户指定的启动参数 **/
        Map<String, Object> commandMap = JSONUtils.parseObject(processInstance.getCommandParam(), new TypeReference<Map<String, Object>>() {
        });
        String startParams = null;
        if (MapUtils.isNotEmpty(commandMap) && executeType == ExecuteType.REPEAT_RUNNING) {
            Object startParamsJson = commandMap.get(Constants.CMD_PARAM_START_PARAMS);
            if (startParamsJson != null) {
                startParams = startParamsJson.toString();
            }
        }
    
        /*** 根据不同的ExecuteType去执行相应的操作 **/
        switch (executeType) {
            case REPEAT_RUNNING: // 重跑
                result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.REPEAT_RUNNING, startParams);
                break;
            case RECOVER_SUSPENDED_PROCESS: // 恢复挂载的工作流
                result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams);
                break;
            case START_FAILURE_TASK_PROCESS: // 启动失败的工作流
                result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.START_FAILURE_TASK_PROCESS, startParams);
                break;
            case STOP: // 停止
                if (processInstance.getState() == ExecutionStatus.READY_STOP) {
                    putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState());
                } else {
                    result = updateProcessInstancePrepare(processInstance, CommandType.STOP, ExecutionStatus.READY_STOP);
                }
                break;
            case PAUSE: // 暂停
                if (processInstance.getState() == ExecutionStatus.READY_PAUSE) {
                    putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState());
                } else {
                    result = updateProcessInstancePrepare(processInstance, CommandType.PAUSE, ExecutionStatus.READY_PAUSE);
                }
                break;
            default:
                logger.error("unknown execute type : {}", executeType);
                putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "unknown execute type");
    
                break;
        }
        return result;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102

    可以看到,以上代码前半部分主要是做了校验的操作,后半部分是根据执行类型来做不同的操作,操作主要分为两部分:insertCommand以及updateProcessInstancePrepare

    3.2.1 insertCommand

    方法代码如下,其实主要就是把生成命令并插入t_ds_command(执行命令表),插入已经添加好注释:

    /**
     * 插入命令(re run, recovery (pause / failure) execution)
     *
     * @param loginUser             登录用户
     * @param instanceId            工作流实例id
     * @param processDefinitionCode 工作流定义id
     * @param processVersion        工作流版本
     * @param commandType           命令类型
     * @return 操作结果
     */
    private Map<String, Object> insertCommand(User loginUser, Integer instanceId, long processDefinitionCode, int processVersion, CommandType commandType, String startParams) {
        Map<String, Object> result = new HashMap<>();
    
        /*** 封装启动参数 **/
        Map<String, Object> cmdParam = new HashMap<>();
        cmdParam.put(CMD_PARAM_RECOVER_PROCESS_ID_STRING, instanceId);
        if (!StringUtils.isEmpty(startParams)) {
            cmdParam.put(CMD_PARAM_START_PARAMS, startParams);
        }
    
        Command command = new Command();
        command.setCommandType(commandType);
        command.setProcessDefinitionCode(processDefinitionCode);
        command.setCommandParam(JSONUtils.toJsonString(cmdParam));
        command.setExecutorId(loginUser.getId());
        command.setProcessDefinitionVersion(processVersion);
        command.setProcessInstanceId(instanceId);
    
        /*** 判断工作流实例是否正在执行 **/
        if (!processService.verifyIsNeedCreateCommand(command)) {
            putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, String.valueOf(processDefinitionCode));
            return result;
        }
    
        /*** 保存命令 **/
        int create = processService.createCommand(command);
    
        if (create > 0) {
            putMsg(result, Status.SUCCESS);
        } else {
            putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR);
        }
    
        return result;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    3.2.2 updateProcessInstancePrepare

    方法代码如下,已经添加注释

    /**
     * 准备更新工作流实例的命令类型和状态
     *
     * @param processInstance 工作流实例
     * @param commandType     命令类型
     * @param executionStatus 执行状态
     * @return 更新结果
     */
    private Map<String, Object> updateProcessInstancePrepare(ProcessInstance processInstance, CommandType commandType, ExecutionStatus executionStatus) {
        Map<String, Object> result = new HashMap<>();
    
        processInstance.setCommandType(commandType);
        processInstance.addHistoryCmd(commandType);
        processInstance.setState(executionStatus);
        int update = processService.updateProcessInstance(processInstance);
    
        // 判断流程是否正常
        if (update > 0) {
            StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(
                    processInstance.getId(), 0, processInstance.getState(), processInstance.getId(), 0
            );
            Host host = new Host(processInstance.getHost());
            stateEventCallbackService.sendResult(host, stateEventChangeCommand.convert2Command());
            putMsg(result, Status.SUCCESS);
        } else {
            putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR);
        }
        return result;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    根据流程图,我们可以看到了已经执行了如下红框的代码,也就是把我们的command已经缓存到了DB。接下来需要看看Master的代码
    在这里插入图片描述

    3.3 MasterServer

    MasterSerer在项目/dolphinscheduler-dev/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java目录。它对应的是架构图的这一块:
    在这里插入图片描述
    代码及注释如下:

    @SpringBootApplication
    @ComponentScan("org.apache.dolphinscheduler")
    @EnableTransactionManagement
    @EnableCaching
    public class MasterServer implements IStoppable {
        private static final Logger logger = LoggerFactory.getLogger(MasterServer.class);
    
        @Autowired
        private SpringApplicationContext springApplicationContext;
    
        @Autowired
        private MasterRegistryClient masterRegistryClient;
    
        @Autowired
        private TaskPluginManager taskPluginManager;
    
        @Autowired
        private MasterSchedulerService masterSchedulerService;
    
        @Autowired
        private SchedulerApi schedulerApi;
    
        @Autowired
        private EventExecuteService eventExecuteService;
    
        @Autowired
        private FailoverExecuteThread failoverExecuteThread;
    
        @Autowired
        private MasterRPCServer masterRPCServer;
    
        public static void main(String[] args) {
            Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER);
            SpringApplication.run(MasterServer.class);
        }
    
        /**
         * 启动 master server
         */
        @PostConstruct
        public void run() throws SchedulerException {
    
            // 初始化 RPC服务
            this.masterRPCServer.start();
    
            //安装任务插件
            this.taskPluginManager.installPlugin();
    
            /*** MasterServer 注册客户端,用于连接到注册表并传递注册表事件。
             * 当主节点启动时,它将在注册中心注册,并调度一个{@link HeartBeatTask}来更新注册表中的元数据**/
            this.masterRegistryClient.init();
            this.masterRegistryClient.start();
            this.masterRegistryClient.setRegistryStoppable(this);
    
            // 主调度程序线程,该线程将使用来自数据库的命令并触发执行的processInstance。
            this.masterSchedulerService.init();
            this.masterSchedulerService.start();
    
            this.eventExecuteService.start();
            this.failoverExecuteThread.start();
    
            //这是调度器的接口,包含操作调度任务的方法。
            this.schedulerApi.start();
    
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                if (Stopper.isRunning()) {
                    close("MasterServer shutdownHook");
                }
            }));
        }
    
        /**
         * 优雅的关闭方法
         *
         * @param cause 关闭的原因
         */
        public void close(String cause) {
    
            try {
                // set stop signal is true
                // execute only once
                if (!Stopper.stop()) {
                    logger.warn("MasterServer is already stopped, current cause: {}", cause);
                    return;
                }
    
                logger.info("Master server is stopping, current cause : {}", cause);
    
                // thread sleep 3 seconds for thread quietly stop
                ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
                // close
                this.schedulerApi.close();
                this.masterSchedulerService.close();
                this.masterRPCServer.close();
                this.masterRegistryClient.closeRegistry();
                // close spring Context and will invoke method with @PreDestroy annotation to destroy beans.
                // like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
                springApplicationContext.close();
    
                logger.info("MasterServer stopped, current cause: {}", cause);
            } catch (Exception e) {
                logger.error("MasterServer stop failed, current cause: {}", cause, e);
            }
        }
    
        @Override
        public void stop(String cause) {
            close(cause);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110

    在run方法里面,可以看到,主要依次执行了:

    • ① MasterRPCServer.start():启动master的rpc服务;
    • ② TaskPluginManager.installPlugin():安装任务插件;
    • ③ MasterRegistryClient.start():向Zookeeper注册MasterServer;
    • ④ MasterSchedulerService.start():主调度程序线程,该线程将使用来自数据库的命令并触发执行的processInstance。
    • ⑤ EventExecuteService.start():工作流实例执行情况
    • ⑥ FailoverExecuteThread():故障转移检测
    • ⑦ SchedulerApi.start():scheduler接口去操作任务实例

    3.3.1 MasterRPCServer

    Master RPC Server主要用来发送或接收请求给其它系统

    初始化方法如下:

    @PostConstruct
    private void init() {
        // 初始化远程服务
        NettyServerConfig serverConfig = new NettyServerConfig();
        serverConfig.setListenPort(masterConfig.getListenPort());
        this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_RECALL, taskRecallProcessor);
    
        // 日志服务
        this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
    
        this.nettyRemotingServer.start();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    3.3.2 TaskPluginManager

    插件安装管理器,通过SPI机制加载数据源插件的,其中插件在如下目录:
    在这里插入图片描述
    只要每个数据源插件实现指定的工厂即可,如Spark:
    在这里插入图片描述
    核心代码如下,并通过loadTaskChannel ()方法,把factory缓存进Map<String, TaskChannel>集合:
    在这里插入图片描述

    3.3.3 MasterRegistryClient

    我们知道DolphinScheduler使用的是去中心化思想所以MasterRegistryClient主要的作用是注册MasterServer客户端,用于连接到注册表并传递注册表事件。当Master节点启动时,它将在注册中心注册,并调度一个HeartBeatTask来更新注册表中的元数据
    在这里插入图片描述
    进入RegistryClient里面的subscribe方法,可以看到有两种注册中心,一种是MySQL的,另一种是Zookeeper的,这里应该默认使用了ZookeeperRegistry。
    在这里插入图片描述

    3.3.4 MasterSchedulerService

    其init和run方法如下,init主要就是初始化一个工作流实例的队列

    在这里插入图片描述

    看看里面实际运行的run方法,可以看到里面是一个死循环,不断地去执行scheduleWorkflow() 方法:
    在这里插入图片描述
    看看里面的scheduleWorkflow()方法,已写好注释:

    /**
     * 从数据库中按槽位查询命令,转换为工作流实例,然后提交给workflowExecuteThreadPool。
     */
    private void scheduleWorkflow() throws InterruptedException, MasterException {
        // 从数据库中按槽位查询命令
        List<Command> commands = findCommands();
        if (CollectionUtils.isEmpty(commands)) {
            // indicate that no command ,sleep for 1s
            Thread.sleep(Constants.SLEEP_TIME_MILLIS);
            return;
        }
    
        // 转换为工作流实例
        List<ProcessInstance> processInstances = command2ProcessInstance(commands);
        if (CollectionUtils.isEmpty(processInstances)) {
            // indicate that the command transform to processInstance error, sleep for 1s
            Thread.sleep(Constants.SLEEP_TIME_MILLIS);
            return;
        }
        MasterServerMetrics.incMasterConsumeCommand(commands.size());
    
        for (ProcessInstance processInstance : processInstances) {
            //提交给workflowExecuteThreadPool
            submitProcessInstance(processInstance);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    提交工作流实例方法如下,注意提交到了workflowExecuteThreadPool

    /**
     * 提交工作流实例给 workflowExecuteThreadPool
     *
     * @param processInstance 工作流实例
     */
    private void submitProcessInstance(@NonNull ProcessInstance processInstance) {
        try {
            LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
            logger.info("Master schedule service starting workflow instance");
    
            // 封装工作流实例Runnable
            final WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable(
                    processInstance
                    , processService
                    , nettyExecutorManager
                    , processAlertManager
                    , masterConfig
                    , stateWheelExecuteThread
                    , curingGlobalParamsService);
    
            this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable);
            if (processInstance.getTimeout() > 0) {
                stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
            }
            ProcessInstanceMetrics.incProcessInstanceSubmit();
    
            // 提交封装好的工作流实例Runnable给workflowExecuteThreadPool
            CompletableFuture<WorkflowSubmitStatue> workflowSubmitFuture = CompletableFuture.supplyAsync(
                    workflowExecuteRunnable::call, workflowExecuteThreadPool);
            workflowSubmitFuture.thenAccept(workflowSubmitStatue -> {
                if (WorkflowSubmitStatue.FAILED == workflowSubmitStatue) {
                    // submit failed
                    processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
                    stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
                    submitFailedProcessInstances.add(processInstance);
                }
            });
            logger.info("Master schedule service started workflow instance");
    
        } catch (Exception ex) {
            processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
            stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
            logger.info("Master submit workflow to thread pool failed, will remove workflow runnable from cache manager", ex);
        } finally {
            LoggerUtils.removeWorkflowInstanceIdMDC();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    3.3.5 EventExecuteService

    EventExecuteService没有类注释,猜测应该是用来传递每个工作流实例执行情况事件的。
    在这里插入图片描述

    3.3.6 FailoverExecuteThread

    FailoverExecuteThread为故障转移检测线程。每隔10秒检测一次,核心方法在FailoverService里的failoverMasterWithLock()failoverMaster()方法:

    在这里插入图片描述

    3.3.7 SchedulerApi

    SchedulerApi是操作调度任务实例的接口,其主要功能是启动调度程序、插入或更新调度任务、删除调度任务、关闭调度任务和释放资源

    3.3.8 TaskPriorityQueueConsumer

    根据流程图,我们可以知道TaskConsumer从队列里获取分割的任务,然后转发到Worker执行。
    在这里插入图片描述

    我们看看TaskPriorityQueueConsumer里的核心代码,可以看到里面的批量分发任务方法:
    在这里插入图片描述
    进入batchDispatch方法,可以看到从队列里获取任务分发任务:
    在这里插入图片描述
    最后分发任务给worker:
    在这里插入图片描述

    ok,到这里,我们可以看worker部分的代码了。

    3.4 WorkerServer

    MasterSerer在项目/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java目录。它的主程序代码与MasterServer基本一致,代码及注释如下:

    @PostConstruct
    public void run() {
    	// worker rpc服务
        this.workerRpcServer.start();
    
    	// 任务插件安装
        this.taskPluginManager.installPlugin();
    
        // 向Zookeeper注册客户端
        this.workerRegistryClient.registry();
        this.workerRegistryClient.setRegistryStoppable(this);
        Set<String> workerZkPaths = this.workerRegistryClient.getWorkerZkPaths();
        this.workerRegistryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP);
    
       // 管理Worker线程
        this.workerManagerThread.start();
        
        // 报告状态线程
        this.retryReportTaskStatusThread.start();
    
        /*
         * registry hooks, which are called before the process exits
         */
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            if (Stopper.isRunning()) {
                close("WorkerServer shutdown hook");
            }
        }));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    根据官网提供的流程图,我们需要重点留意的是TaskExecutePorcessor:
    在这里插入图片描述

    3.4.1 TaskExecutePorcessor

    TaskExecuteProcessor/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java目录,毋庸置疑,其核心方法为:

    @Counted(value = "ds.task.execution.count", description = "task execute total count")
    @Timed(value = "ds.task.execution.duration", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true)
    @Override
    public void process(Channel channel, Command command) {
     // code ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    其中里面有一段较为核心的代码,提交任务TaskExecuteThread给管理者:
    在这里插入图片描述
    接下来看看TaskExecuteThread的代码。

    3.4.2 TaskExecuteThread

    TaskExecuteThread就是最终执行任务的代码了,里面的run方法如下,已加好注释:

    
    @Override
    public void run() {
        // dry run 预演模式
        if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
            taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.SUCCESS);
            taskExecutionContext.setStartTime(new Date());
            taskExecutionContext.setEndTime(new Date());
            TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
            taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
            logger.info("[WorkflowInstance-{}][TaskInstance-{}] Task dry run success",
                taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
            return;
        }
        try {
            LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
            logger.info("script path : {}", taskExecutionContext.getExecutePath());
            if (taskExecutionContext.getStartTime() == null) {
                taskExecutionContext.setStartTime(new Date());
            }
            logger.info("the task begins to execute. task instance id: {}", taskExecutionContext.getTaskInstanceId());
    
            //回调任务执行状态
            taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
            taskCallbackService.sendTaskExecuteRunningCommand(taskExecutionContext);
    
            // 拷贝 hdfs/minio 文件到本地
            List<Pair<String, String>> fileDownloads = downloadCheck(taskExecutionContext.getExecutePath(), taskExecutionContext.getResources());
            if (!fileDownloads.isEmpty()) {
                downloadResource(taskExecutionContext.getExecutePath(), logger, fileDownloads);
            }
    
            taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());
    
            taskExecutionContext.setTaskAppId(String.format("%s_%s",
                    taskExecutionContext.getProcessInstanceId(),
                    taskExecutionContext.getTaskInstanceId()));
    
            TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType());
            if (null == taskChannel) {
                throw new ServiceException(String.format("%s Task Plugin Not Found,Please Check Config File.", taskExecutionContext.getTaskType()));
            }
            String taskLogName = LoggerUtils.buildTaskId(taskExecutionContext.getFirstSubmitTime(),
                    taskExecutionContext.getProcessDefineCode(),
                    taskExecutionContext.getProcessDefineVersion(),
                    taskExecutionContext.getProcessInstanceId(),
                    taskExecutionContext.getTaskInstanceId());
            taskExecutionContext.setTaskLogName(taskLogName);
    
            // 给当前线程设置名称
            Thread.currentThread().setName(taskLogName);
    
            task = taskChannel.createTask(taskExecutionContext);
    
            // 执行任务插件方法 - init
            this.task.init();
    
            //init varPool
            this.task.getParameters().setVarPool(taskExecutionContext.getVarPool());
    
            // 执行任务插件方法 -  handle
            this.task.handle();
    
            // 判断是否需要发送告警
            if (this.task.getNeedAlert()) {
                sendAlert(this.task.getTaskAlertInfo(), this.task.getExitStatus().getCode());
            }
    
            taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.of(this.task.getExitStatus().getCode()));
            taskExecutionContext.setEndTime(DateUtils.getCurrentDate());
            taskExecutionContext.setProcessId(this.task.getProcessId());
            taskExecutionContext.setAppIds(this.task.getAppIds());
            taskExecutionContext.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool()));
            logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), this.task.getExitStatus());
        } catch (Throwable e) {
            logger.error("task scheduler failure", e);
            kill();
            taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
            taskExecutionContext.setEndTime(DateUtils.getCurrentDate());
            taskExecutionContext.setProcessId(this.task.getProcessId());
            taskExecutionContext.setAppIds(this.task.getAppIds());
        } finally {
            TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
            taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
            clearTaskExecPath();
            LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88

    04 附录

    4.1 核心表

    ① t_ds_process_definition(流程定义表):

    字段类型注释
    idint主键
    namevarchar流程定义名称
    versionint流程定义版本
    release_statetinyint流程定义的发布状态:0 未上线 1已上线
    project_idint项目id
    user_idint流程定义所属用户id
    process_definition_jsonlongtext流程定义json串
    descriptiontext流程定义描述
    global_paramstext全局参数
    flagtinyint流程是否可用:0 不可用,1 可用
    locationstext节点坐标信息
    connectstext节点连线信息
    receiverstext收件人
    receivers_cctext抄送人
    create_timedatetime创建时间
    timeoutint超时时间
    tenant_idint租户id
    update_timedatetime更新时间
    modify_byvarchar修改用户
    resource_idsvarchar资源id集

    ② t_ds_process_instance(流程实例表):

    字段类型注释
    idint主键
    namevarchar流程实例名称
    process_definition_idint流程定义id
    statetinyint流程实例状态:0 提交成功,1 正在运行,2 准备暂停,3 暂停,4 准备停止,5 停止,6 失败,7 成功,8 需要容错,9 kill,10 等待线程,11 等待依赖完成
    recoverytinyint流程实例容错标识:0 正常,1 需要被容错重启
    start_timedatetime流程实例开始时间
    end_timedatetime流程实例结束时间
    run_timesint流程实例运行次数
    hostvarchar流程实例所在的机器
    command_typetinyint命令类型:0 启动工作流,1 从当前节点开始执行,2 恢复被容错的工作流,3 恢复暂停流程,4 从失败节点开始执行,5 补数,6 调度,7 重跑,8 暂停,9 停止,10 恢复等待线程
    command_paramtext命令的参数(json格式)
    task_depend_typetinyint节点依赖类型:0 当前节点,1 向前执行,2 向后执行
    max_try_timestinyint最大重试次数
    failure_strategytinyint失败策略 0 失败后结束,1 失败后继续
    warning_typetinyint告警类型:0 不发,1 流程成功发,2 流程失败发,3 成功失败都发
    warning_group_idint告警组id
    schedule_timedatetime预期运行时间
    command_start_timedatetime开始命令时间
    global_paramstext全局参数(固化流程定义的参数)
    process_instance_jsonlongtext流程实例json(copy的流程定义的json)
    flagtinyint是否可用,1 可用,0不可用
    update_timetimestamp更新时间
    is_sub_processint是否是子工作流 1 是,0 不是
    executor_idint命令执行用户
    locationstext节点坐标信息
    connectstext节点连线信息
    history_cmdtext历史命令,记录所有对流程实例的操作
    dependence_schedule_timestext依赖节点的预估时间
    process_instance_priorityint流程实例优先级:0 Highest,1 High,2 Medium,3 Low,4 Lowest
    worker_groupvarchar任务指定运行的worker分组
    timeoutint超时时间
    tenant_idint租户id

    ③ t_ds_task_instance(任务实例表):

    字段类型注释
    idint主键
    namevarchar任务名称
    task_typevarchar任务类型
    process_definition_idint流程定义id
    process_instance_idint流程实例id
    task_jsonlongtext任务节点json
    statetinyint任务实例状态:0 提交成功,1 正在运行,2 准备暂停,3 暂停,4 准备停止,5 停止,6 失败,7 成功,8 需要容错,9 kill,10 等待线程,11 等待依赖完成
    submit_timedatetime任务提交时间
    start_timedatetime任务开始时间
    end_timedatetime任务结束时间
    hostvarchar执行任务的机器
    execute_pathvarchar任务执行路径
    log_pathvarchar任务日志路径
    alert_flagtinyint是否告警
    retry_timesint重试次数
    pidint进程pid
    app_linkvarcharyarn app id
    flagtinyint是否可用:0 不可用,1 可用
    retry_intervalint重试间隔
    max_retry_timesint最大重试次数
    task_instance_priorityint任务实例优先级:0 Highest,1 High,2 Medium,3 Low,4 Lowest
    worker_groupvarchar任务指定运行的worker分组

    ④ t_ds_schedules(流程定时调度表):

    字段类型注释
    idint主键
    process_definition_idint流程定义id
    start_timedatetime调度开始时间
    end_timedatetime调度结束时间
    crontabvarcharcrontab 表达式
    failure_strategytinyint失败策略: 0 结束,1 继续
    user_idint用户id
    release_statetinyint状态:0 未上线,1 上线
    warning_typetinyint告警类型:0 不发,1 流程成功发,2 流程失败发,3 成功失败都发
    warning_group_idint告警组id
    process_instance_priorityint流程实例优先级:0 Highest,1 High,2 Medium,3 Low,4 Lowest
    worker_groupvarchar任务指定运行的worker分组
    create_timedatetime创建时间
    update_timedatetime更新时间

    ⑤ t_ds_command(执行命令表):

    字段类型注释
    idint主键
    command_typetinyint命令类型:0 启动工作流,1 从当前节点开始执行,2 恢复被容错的工作流,3 恢复暂停流程,4 从失败节点开始执行,5 补数,6 调度,7 重跑,8 暂停,9 停止,10 恢复等待线程
    process_definition_idint流程定义id
    command_paramtext命令的参数(json格式)
    task_depend_typetinyint节点依赖类型:0 当前节点,1 向前执行,2 向后执行
    failure_strategytinyint失败策略:0结束,1继续
    warning_typetinyint告警类型:0 不发,1 流程成功发,2 流程失败发,3 成功失败都发
    warning_group_idint告警组
    schedule_timedatetime预期运行时间
    start_timedatetime开始时间
    executor_idint执行用户id
    dependencevarchar依赖字段
    update_timedatetime更新时间
    process_instance_priorityint流程实例优先级:0 Highest,1 High,2 Medium,3 Low,4 Lowest
    worker_groupvarchar任务指定运行的worker分组

    05 文末

    本文是个人阅读DolphinScheduler一些见解,后续或许会再更新,DolphinScheduler给我的感觉就是联通性不强,需要结合流程图才能去理解。最后在补上它的流程图,以做回顾吧
    在这里插入图片描述
    接下来的文章讲讲它的配置。

  • 相关阅读:
    Linux: C实现动态线程池。
    error Missing “key“ prop for element in array react/jsx-key
    HTTP 参数污染 (HPP) 和 HTTP 参数碎片 (HPF)
    【go/方法记录】局部坐标与世界坐标间的相互转换(位置/方向)
    AQS之ReentrantReadWriteLock分析 (九)
    没有root权限如何通过apt安装deb软件
    【ESP32】22.智能家居-DHT11温湿度采集(WiFi)
    =(等号)在Excel中的3个实用技巧
    【NOWCODER】- Python:内置函数(三)
    【JAVA】03 对象
  • 原文地址:https://blog.csdn.net/qq_20042935/article/details/125615561