• Dolphinscheduler3.0源码分析之XxlJob优化之路


    前言

    研究Dolphinscheduler也是机缘巧合,平时负责基于xxl-job二次开发出来的调度平台,因为遇到了并发性能瓶颈,到了不得不优化重构的地步,所以搜索市面上应用较广的调度平台以借鉴优化思路。在阅读完DolphinScheduler代码之后,便生出了将其设计与思考记录下来的念头,这边是此篇文章的来源。因为没有正式生产使用,业务理解不一定透彻,理解可能有偏差,欢迎大家交流讨论。

    1 DolphinScheduler的设计与策略

    大家能关注DolphinScheduler那么一定对调度系统有了一定的了解,对于调度所涉及的到一些专有名词在这里就不做过多的介绍,重点介绍一下流程定义,流程实例,任务定义,任务实例。(没有作业这个概念确实也很新奇,可能是不想和Quartz的JobDetail重叠)
    任务定义:各种类型的任务,是流程定义的关键组成,如sql,shell,spark,mr,python等
    任务实例:任务的实例化,标识着具体的任务执行状态
    流程定义:一组任务节点通过依赖关系建立的起来的有向无环图(DAG)
    流程实例:通过手动或者定时调度生成的流程实例
    定时调度:系统采用Quartz 分布式调度器,并同时支持cron表达式可视化的生成

    1.1 分布式设计

    分布式系统的架构设计基本分为中心化和去中心化两种,各有优劣,凭借各自的业务选择。

    1.1.1 中心化

    中心化设计比较简单,集群中的节点安装角色可以分为Master和slave两种,如下图:
    中心化设计架构
    Master: Master的角色主要负责任务分发并监督Slave的健康状态,可以动态的将任务均衡到Slave上,以致Slave节点不至于“忙死”或”闲死”的状态。
    Worker: Worker的角色主要负责任务的执行工作并维护和Master的心跳,以便Master可以分配任务给Slave。
    中心化设计存在一些问题。
    第一点,一旦Master出现了问题,则群龙无首,整个集群就会崩溃。为了解决这个问题,大多数Master/Slave架构模式都采用了主备Master的设计方案,可以是热备或者冷备,也可以是自动切换或手动切换,而且越来越多的新系统都开始具备自动选举切换Master的能力,以提升系统的可用性。
    第二点,如果Scheduler在Master上,虽然可以支持一个DAG中不同的任务运行在不同的机器上,但是会产生Master的过负载。如果Scheduler在Slave上,则一个DAG中所有的任务都只能在某一台机器上进行作业提交,则并行任务比较多的时候,Slave的压力可能会比较大。
    xxl-job就是采用这种设计方式,但是存在相应的问题。管理器(admin)宕机集群会崩溃,scheduler在管理器上,管理器负责所有任务的校验和分发,管理器存在过载的风险。需要开发者执行想方案解决。

    1.1.2 去中心化

    去中心化设计
    在去中心化设计里,通常没有Master/Slave的概念,所有的角色都是一样的,地位是平等的,去中心化设计的核心设计在于整个分布式系统中不存在一个区别于其他节点的”管理者”,因此不存在单点故障问题。但由于不存在” 管理者”节点所以每个节点都需要跟其他节点通信才得到必须要的机器信息,而分布式系统通信的不可靠性,则大大增加了上述功能的实现难度。实际上,真正去中心化的分布式系统并不多见。反而动态中心化分布式系统正在不断涌出。在这种架构下,集群中的管理者是被动态选择出来的,而不是预置的,并且集群在发生故障的时候,集群的节点会自发的举行"会议"来选举新的"管理者"去主持工作。一般都是基于Raft算法实现的选举策略。动态展示见链接:http://thesecretlivesofdata.com/
    Raft选举算法集群一般有三种角色:
    Leader: 即主节点,同一时刻只有一个leader,负责协调和管理其他节点
    Candidate:候选者,每个节点都可以成为Candidate,节点在该角色下才可以被选为新的Leader
    Follower: 追随者,不可以发起选举。
    选举流程可以分为一下几步:

    1. 初始化时,所有的节点都是Follower状态。
    2. 开始选主时,所有的节点状态都由Follower转换为Candidate,并向其他节点发送选举请求。
    3. 其他节点根据接收到的选举请求的先后顺序,回复是否同意成为主节点,这里需要注意的是,在每一轮选举中,一个节点只能投出一张票。
    4. 若发起选举请求的节点活的超过一半的投票,则成为主节点,他的状态转化为leader,其他的节点的状态则由Candidate降为Follower。Leader节点与Follower节点之间会定期发送心跳包,以检查主节点是否存活。
    5. 当leader节点的任期到了,或者有新节点加入集群,或者主节点挂了,会发起下一轮选举过程。
      基于Raft算法的去中心化选举架构图
      最典型的案例就是ZooKeeper及Go语言实现的Etcd。
      DolphinScheduler的去中心化是Master/Worker注册到Zookeeper中,实现Master集群和Worker集群无中心,并使用Zookeeper分布式锁来选举其中的一台Master或Worker为“管理者”来执行任务。

    1.2 DophinScheduler架构设计

    随手盗用一张官网的系统架构图,可以看到调度系统采用去中心化设计,由UI,API,MasterServer,Zookeeper,WorkServer,Alert,LoggerSever等几部分组成。
    系统架构图
    Api: API接口层,主要负责处理前端UI层的请求。该服务统一提供RESTful api向外部提供请求服务。 接口包括工作流的创建、定义、查询、修改、发布、下线、手工启动、停止、暂停、恢复、从该节点开始执行等等。
    MasterServer: MasterServer采用分布式无中心设计理念,MasterServer集成了Quartz,主要负责 DAG 任务切分、任务提交监控,并同时监听其它MasterServer和WorkerServer的健康状态。 MasterServer服务启动时向Zookeeper注册临时节点,通过监听Zookeeper临时节点变化来进行容错处理。
    WorkServer:WorkerServer也采用分布式无中心设计理念,WorkerServer主要负责任务的执行和提供日志服务。WorkerServer服务启动时向Zookeeper注册临时节点,并维持心跳。
    ZooKeeper: ZooKeeper服务,系统中的MasterServer和WorkerServer节点都通过ZooKeeper来进行集群管理和容错。另外系统还基于ZooKeeper进行事件监听和分布式锁
    Alert:提供告警相关接口,接口主要包括两种类型的告警数据的存储、查询和通知功能。其中通知功能又有邮件通知和SNMP(暂未实现)两种。

    1.3 DophinScheduler分布式锁

    DolphinScheduler使用ZooKeeper分布式锁来实现同一时刻只有一台Master执行Scheduler,或者只有一台Worker执行任务的提交。DolphinScheduler的zookeeper分布式锁是依托于Curator框架利用zk顺序节点和监听器实现的,具体算法如下:
    第一步:客户端A发送加锁请求,会在指定路径/dolphinscheduler/lock下创建临时顺序节点,节点名称是curator框架生成的一长串字符,以10位数值结尾,比如0000000001;
    第二步:查询锁路径下的所有节点,并安卓顺序排序,判断当前节点是不是排在第一位,如果是就可以加锁了。
    第三步:客户端A加锁成功,客户端B也要加锁,那么也会在锁路径下创建一个临时顺序节点,然后获取锁路径下面的所有节点并排序,然后判断自己是不是排在第一个,此时不是第一个,加锁失败。
    第四步:客户端B监听上一个顺序节点的删除操作(客户端A释放锁),知道客户端A获取的锁使用完毕,并释放锁(也就是删除客户端A创建的临时顺序节点),如果客户端A释放锁,客户端B就回到第二步。
    注:客户端宕机的话,zk也会侦测到节点死亡,客户端获取到的锁会超时删除,不会阻塞后续节点争抢锁。
    1. 获取分布式锁的核心流程算法如下
    zk分布式锁实现

    2.DolphinScheduler中Scheduler线程分布式锁使用流程图
    分布式锁实现流程

    1.4 线程不足等待问题

    第一点,如果一个DAG中没有子流程,则如果Command中的数据条数大于线程池设置的阈值,则直接流程等待或失败。
    第二点, 如果一个大的DAG中嵌套了很多子流程,如下图则会产生“死等”状态:
    线程等待策略
    上图中MainFlowThread等待SubFlowThread1结束,SubFlowThread1等待SubFlowThread2结束, SubFlowThread2等待SubFlowThread3结束,而SubFlowThread3等待线程池有新线程,则整个DAG流程不能结束,从而其中的线程也不能释放。这样就形成的子父流程循环等待的状态。此时除非启动新的Master来增加线程来打破这样的”僵局”,否则调度集群将不能再使用。
    对于启动新Master来打破僵局,似乎有点差强人意,于是我们提出了以下三种方案来降低这种风险:

    1. 计算所有Master的线程总和,然后对每一个DAG需要计算其需要的线程数,也就是在DAG流程执行之前做预计算。因为是多Master线程池,所以总线程数不太可能实时获取。
    2. 对单Master线程池进行判断,如果线程池已经满了,则让线程直接失败。
    3. 增加一种资源不足的Command类型,如果线程池不足,则将主流程挂起。这样线程池就有了新的线程,可以让资源不足挂起的流程重新唤醒执行。
      **注意:**Master Scheduler线程在获取Command的时候是FIFO的方式执行的。
      DolphinScheduler采用了第三种方式来解决线程不足的问题。(代码未找到)

    1.5 容错问题

    容错分为服务宕机容错和任务重试,服务宕机容错又分为Master容错和Worker容错两种情况:

    1.5.1 宕机容错

    服务容错设计依赖于ZooKeeper的Watcher机制,实现原理如图:
    宕机容错
    其中Master监控其他Master和Worker的目录,如果监听到remove事件,则会根据具体的业务逻辑进行流程实例容错或者任务实例容错,具体如下所示。
    Master容错流程图
    Master容错流程
    ZooKeeper Master容错完成之后则重新由DolphinScheduler中Scheduler线程调度,遍历 DAG 找到”正在运行”和“提交成功”的任务,对”正在运行”的任务监控其任务实例的状态,对”提交成功”的任务需要判断Task Queue中是否已经存在,如果存在则同样监控任务实例的状态,如果不存在则重新提交任务实例。
    Worker容错流程图
    worker容错流程
    Master Scheduler线程一旦发现任务实例为” 需要容错”状态,则接管任务并进行重新提交。
    **注意:**由于” 网络抖动”可能会使得节点短时间内失去和ZooKeeper的心跳,从而发生节点的remove事件。对于这种情况,我们使用最简单的方式,那就是节点一旦和ZooKeeper发生超时连接,则直接将Master或Worker服务停掉。

    1.5.2 失败重试

    这里首先要区分任务失败重试、流程失败恢复、流程失败重跑的概念:

    1. 任务失败重试是任务级别的,是调度系统自动进行的,比如一个Shell任务设置重试次数为3次,那么在Shell任务运行失败后会自己再最多尝试运行3次。
    2. 流程失败恢复是流程级别的,是手动进行的,恢复是从只能从失败的节点开始执行或从当前节点开始执行。
    3. 流程失败重跑也是流程级别的,是手动进行的,重跑是从开始节点进行。

    接下来说正题,我们将工作流中的任务节点分了两种类型。

    1. 一种是业务节点,这种节点都对应一个实际的脚本或者处理语句,比如Shell节点,MR节点、Spark节点、依赖节点等。
    2. 还有一种是逻辑节点,这种节点不做实际的脚本或语句处理,只是整个流程流转的逻辑处理,比如子流程节等。

    每一个业务节点都可以配置失败重试的次数,当该任务节点失败,会自动重试,直到成功或者超过配置的重试次数。逻辑节点不支持失败重试。但是逻辑节点里的任务支持重试。
    如果工作流中有任务失败达到最大重试次数,工作流就会失败停止,失败的工作流可以手动进行重跑操作或者流程恢复操作。

    1.6 远程日志访问

    由于Web(UI)和Worker不一定在同一台机器上,所以查看日志不能像查询本地文件那样。有两种方案:

    1. 将日志放到ES搜索引擎上
    2. 通过netty通信获取远程日志信息

    介于考虑到尽可能的DolphinScheduler的轻量级性,所以选择了gRPC实现远程访问日志信息。具体代码时间见2.8;
    远程日志访问设计

    2 DolphinScheduler源码分析

    上一章的讲解可能初步看起来还不是很清晰,本章的主要目的是从代码层面一一介绍第一张讲解的功能。关于系统的安装在这里并不会设计,安装运行请大家自行探索。

    2.1 工程模块介绍与配置文件

    2.1.1 工程模块介绍

    dolphinscheduler-alert 告警模块,提供告警服务。
    dolphinscheduler-api web应用模块,提供 Rest Api 服务,供 UI 进行调用。
    dolphinscheduler-common 通用的常量枚举、工具类、数据结构或者基类
    dolphinscheduler-dao 提供数据库访问等操作。
    dolphinscheduler-remote 基于netty的客户端、服务端
    dolphinscheduler-server 日志与心跳服务
    dolphinscheduler-log-server LoggerServer 用于Rest Api通过RPC查看日志
    dolphinscheduler-master MasterServer服务,主要负责 DAG 的切分和任务状态的监控
    dolphinscheduler-worker WorkerServer服务,主要负责任务的提交、执行和任务状态的更新。
    dolphinscheduler-service service模块,包含Quartz、Zookeeper、日志客户端访问服务,便于server模块和api模块调用
    dolphinscheduler-ui 前端模块

    2.1.1 配置文件

    dolphinscheduler-common common.properties

    #本地工作目录,用于存放临时文件
    data.basedir.path=/tmp/dolphinscheduler
    #资源文件存储类型: HDFS,S3,NONE
    resource.storage.type=NONE
    #资源文件存储路径
    resource.upload.path=/dolphinscheduler
    #hadoop是否开启kerberos权限
    hadoop.security.authentication.startup.state=false
    #kerberos配置目录
    java.security.krb5.conf.path=/opt/krb5.conf
    #kerberos登录用户
    login.user.keytab.username=hdfs-mycluster@ESZ.COM
    
    #kerberos登录用户keytab
    login.user.keytab.path=/opt/hdfs.headless.keytab
    
    #kerberos过期时间,整数,单位为小时
    kerberos.expire.time=2
    #	如果存储类型为HDFS,需要配置拥有对应操作权限的用户
    hdfs.root.user=hdfs
    #请求地址如果resource.storage.type=S3,该值类似为: s3a://dolphinscheduler. 如果resource.storage.type=HDFS, 如果 hadoop 配置了 HA,需要复制core-site.xml 和 hdfs-site.xml 文件到conf目录
    fs.defaultFS=hdfs://mycluster:8020
    aws.access.key.id=minioadmin
    aws.secret.access.key=minioadmin
    aws.region=us-east-1
    aws.endpoint=http://localhost:9000
    # resourcemanager port, the default value is 8088 if not specified
    resource.manager.httpaddress.port=8088
    #yarn resourcemanager 地址, 如果resourcemanager开启了HA, 输入HA的IP地址(以逗号分隔),如果resourcemanager为单节点, 该值为空即可
    yarn.resourcemanager.ha.rm.ids=192.168.xx.xx,192.168.xx.xx
    #如果resourcemanager开启了HA或者没有使用resourcemanager,保持默认值即可. 如果resourcemanager为单节点,你需要将ds1 配置为resourcemanager对应的hostname
    yarn.application.status.address=http://ds1:%s/ws/v1/cluster/apps/%s
    # job history status url when application number threshold is reached(default 10000, maybe it was set to 1000)
    yarn.job.history.status.address=http://ds1:19888/ws/v1/history/mapreduce/jobs/%s
    
    # datasource encryption enable
    datasource.encryption.enable=false
    
    # datasource encryption salt
    datasource.encryption.salt=!@#$%^&*
    
    # data quality option
    data-quality.jar.name=dolphinscheduler-data-quality-dev-SNAPSHOT.jar
    
    #data-quality.error.output.path=/tmp/data-quality-error-data
    
    # Network IP gets priority, default inner outer
    
    # Whether hive SQL is executed in the same session
    support.hive.oneSession=false
    
    # use sudo or not, if set true, executing user is tenant user and deploy user needs sudo permissions; if set false, executing user is the deploy user and doesn't need sudo permissions
    sudo.enable=true
    
    # network interface preferred like eth0, default: empty
    #dolphin.scheduler.network.interface.preferred=
    
    # network IP gets priority, default: inner outer
    #dolphin.scheduler.network.priority.strategy=default
    
    # system env path
    #dolphinscheduler.env.path=dolphinscheduler_env.sh
    
    #是否处于开发模式
    development.state=false
    
    # rpc port
    alert.rpc.port=50052
    
    # Url endpoint for zeppelin RESTful API
    zeppelin.rest.url=http://localhost:8080
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72

    dolphinscheduler-api application.yaml

    server:
      port: 12345
      servlet:
        session:
          timeout: 120m
        context-path: /dolphinscheduler/
      compression:
        enabled: true
        mime-types: text/html,text/xml,text/plain,text/css,text/javascript,application/javascript,application/json,application/xml
      jetty:
        max-http-form-post-size: 5000000
    
    spring:
      application:
        name: api-server
      banner:
        charset: UTF-8
      jackson:
        time-zone: UTC
        date-format: "yyyy-MM-dd HH:mm:ss"
      servlet:
        multipart:
          max-file-size: 1024MB
          max-request-size: 1024MB
      messages:
        basename: i18n/messages
      datasource:
    #    driver-class-name: org.postgresql.Driver
    #    url: jdbc:postgresql://127.0.0.1:5432/dolphinscheduler
        driver-class-name: com.mysql.jdbc.Driver
        url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull
        username: root
        password: root
        hikari:
          connection-test-query: select 1
          minimum-idle: 5
          auto-commit: true
          validation-timeout: 3000
          pool-name: DolphinScheduler
          maximum-pool-size: 50
          connection-timeout: 30000
          idle-timeout: 600000
          leak-detection-threshold: 0
          initialization-fail-timeout: 1
      quartz:
        auto-startup: false
        job-store-type: jdbc
        jdbc:
          initialize-schema: never
        properties:
          org.quartz.threadPool:threadPriority: 5
          org.quartz.jobStore.isClustered: true
          org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX
          org.quartz.scheduler.instanceId: AUTO
          org.quartz.jobStore.tablePrefix: QRTZ_
          org.quartz.jobStore.acquireTriggersWithinLock: true
          org.quartz.scheduler.instanceName: DolphinScheduler
          org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
          org.quartz.jobStore.useProperties: false
          org.quartz.threadPool.makeThreadsDaemons: true
          org.quartz.threadPool.threadCount: 25
          org.quartz.jobStore.misfireThreshold: 60000
          org.quartz.scheduler.makeSchedulerThreadDaemon: true
    #      org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
          org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
          org.quartz.jobStore.clusterCheckinInterval: 5000
    
    management:
      endpoints:
        web:
          exposure:
            include: '*'
      metrics:
        tags:
          application: ${spring.application.name}
    
    registry:
      type: zookeeper
      zookeeper:
        namespace: dolphinscheduler
    #    connect-string: localhost:2181
        connect-string: 10.255.158.70:2181
        retry-policy:
          base-sleep-time: 60ms
          max-sleep: 300ms
          max-retries: 5
        session-timeout: 30s
        connection-timeout: 9s
        block-until-connected: 600ms
        digest: ~
    
    audit:
      enabled: false
    
    metrics:
      enabled: true
    
    python-gateway:
      # Weather enable python gateway server or not. The default value is true.
      enabled: true
      # The address of Python gateway server start. Set its value to `0.0.0.0` if your Python API run in different
      # between Python gateway server. It could be be specific to other address like `127.0.0.1` or `localhost`
      gateway-server-address: 0.0.0.0
      # The port of Python gateway server start. Define which port you could connect to Python gateway server from
      # Python API side.
      gateway-server-port: 25333
      # The address of Python callback client.
      python-address: 127.0.0.1
      # The port of Python callback client.
      python-port: 25334
      # Close connection of socket server if no other request accept after x milliseconds. Define value is (0 = infinite),
      # and socket server would never close even though no requests accept
      connect-timeout: 0
      # Close each active connection of socket server if python program not active after x milliseconds. Define value is
      # (0 = infinite), and socket server would never close even though no requests accept
      read-timeout: 0
    
    # Override by profile
    
    ---
    spring:
      config:
        activate:
          on-profile: mysql
      datasource:
        driver-class-name: com.mysql.jdbc.Driver
        url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8
      quartz:
        properties:
          org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131

    dolphinscheduler-master application.yaml

    spring:
      banner:
        charset: UTF-8
      application:
        name: master-server
      jackson:
        time-zone: UTC
        date-format: "yyyy-MM-dd HH:mm:ss"
      cache:
        # default enable cache, you can disable by `type: none`
        type: none
        cache-names:
          - tenant
          - user
          - processDefinition
          - processTaskRelation
          - taskDefinition
        caffeine:
          spec: maximumSize=100,expireAfterWrite=300s,recordStats
      datasource:
        #driver-class-name: org.postgresql.Driver
        #url: jdbc:postgresql://127.0.0.1:5432/dolphinscheduler
        driver-class-name: com.mysql.jdbc.Driver
        url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull
        username: root
        password:
        hikari:
          connection-test-query: select 1
          minimum-idle: 5
          auto-commit: true
          validation-timeout: 3000
          pool-name: DolphinScheduler
          maximum-pool-size: 50
          connection-timeout: 30000
          idle-timeout: 600000
          leak-detection-threshold: 0
          initialization-fail-timeout: 1
      quartz:
        job-store-type: jdbc
        jdbc:
          initialize-schema: never
        properties:
          org.quartz.threadPool:threadPriority: 5
          org.quartz.jobStore.isClustered: true
          org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX
          org.quartz.scheduler.instanceId: AUTO
          org.quartz.jobStore.tablePrefix: QRTZ_
          org.quartz.jobStore.acquireTriggersWithinLock: true
          org.quartz.scheduler.instanceName: DolphinScheduler
          org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
          org.quartz.jobStore.useProperties: false
          org.quartz.threadPool.makeThreadsDaemons: true
          org.quartz.threadPool.threadCount: 25
          org.quartz.jobStore.misfireThreshold: 60000
          org.quartz.scheduler.makeSchedulerThreadDaemon: true
    #      org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
          org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
          org.quartz.jobStore.clusterCheckinInterval: 5000
    
    registry:
      type: zookeeper
      zookeeper:
        namespace: dolphinscheduler
    #    connect-string: localhost:2181
        connect-string: 10.255.158.70:2181
        retry-policy:
          base-sleep-time: 60ms
          max-sleep: 300ms
          max-retries: 5
        session-timeout: 30s
        connection-timeout: 9s
        block-until-connected: 600ms
        digest: ~
    
    master:
      listen-port: 5678
      # master fetch command num
      fetch-command-num: 10
      # master prepare execute thread number to limit handle commands in parallel
      pre-exec-threads: 10
      # master execute thread number to limit process instances in parallel
      exec-threads: 100
      # master dispatch task number per batch
      dispatch-task-number: 3
      # master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight
      host-selector: lower_weight
      # master heartbeat interval, the unit is second
      heartbeat-interval: 10
      # master commit task retry times
      task-commit-retry-times: 5
      # master commit task interval, the unit is millisecond
      task-commit-interval: 1000
      state-wheel-interval: 5
      # master max cpuload avg, only higher than the system cpu load average, master server can schedule. default value -1: the number of cpu cores * 2
      max-cpu-load-avg: -1
      # master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G
      reserved-memory: 0.3
      # failover interval, the unit is minute
      failover-interval: 10
      # kill yarn jon when failover taskInstance, default true
      kill-yarn-job-when-task-failover: true
    
    server:
      port: 5679
    
    management:
      endpoints:
        web:
          exposure:
            include: '*'
      metrics:
        tags:
          application: ${spring.application.name}
    
    metrics:
      enabled: true
    
    # Override by profile
    
    ---
    spring:
      config:
        activate:
          on-profile: mysql
      datasource:
        driver-class-name: com.mysql.jdbc.Driver
        url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull
      quartz:
        properties:
          org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130

    dolphinscheduler-worker application.yaml

    spring:
      banner:
        charset: UTF-8
      application:
        name: worker-server
      jackson:
        time-zone: UTC
        date-format: "yyyy-MM-dd HH:mm:ss"
      datasource:
        #driver-class-name: org.postgresql.Driver
        #url: jdbc:postgresql://127.0.0.1:5432/dolphinscheduler
        driver-class-name: com.mysql.jdbc.Driver
        url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull
        username: root
        #password: root
        password:
        hikari:
          connection-test-query: select 1
          minimum-idle: 5
          auto-commit: true
          validation-timeout: 3000
          pool-name: DolphinScheduler
          maximum-pool-size: 50
          connection-timeout: 30000
          idle-timeout: 600000
          leak-detection-threshold: 0
          initialization-fail-timeout: 1
    
    registry:
      type: zookeeper
      zookeeper:
        namespace: dolphinscheduler
    #    connect-string: localhost:2181
        connect-string: 10.255.158.70:2181
        retry-policy:
          base-sleep-time: 60ms
          max-sleep: 300ms
          max-retries: 5
        session-timeout: 30s
        connection-timeout: 9s
        block-until-connected: 600ms
        digest: ~
    
    worker:
      # worker listener port
      listen-port: 1234
      # worker execute thread number to limit task instances in parallel
      exec-threads: 100
      # worker heartbeat interval, the unit is second
      heartbeat-interval: 10
      # worker host weight to dispatch tasks, default value 100
      host-weight: 100
      # worker tenant auto create
      tenant-auto-create: true
      # worker max cpuload avg, only higher than the system cpu load average, worker server can be dispatched tasks. default value -1: the number of cpu cores * 2
      max-cpu-load-avg: -1
      # worker reserved memory, only lower than system available memory, worker server can be dispatched tasks. default value 0.3, the unit is G
      reserved-memory: 0.3
      # default worker groups separated by comma, like 'worker.groups=default,test'
      groups:
        - default
      # alert server listen host
      alert-listen-host: localhost
      alert-listen-port: 50052
    
    server:
      port: 1235
    
    management:
      endpoints:
        web:
          exposure:
            include: '*'
      metrics:
        tags:
          application: ${spring.application.name}
    
    metrics:
      enabled: true
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79

    主要关注数据库,quartz, zookeeper, masker, worker配置

    2.2 Api主要任务操作接口

    其他业务接口可以不用关注,只需要关注最最主要的流程上线功能接口,此接口可以发散出所有的任务调度相关的代码,接口如下:/dolphinscheduler/projects/{projectCode}/schedules/{id}/online;此接口会将定义的流程提交到Quartz调度框架;代码如下:

    public Map<String, Object> setScheduleState(User loginUser,
                                                    long projectCode,
                                                    Integer id,
                                                    ReleaseState scheduleStatus) {
       
            Map<String, Object> result = new HashMap<>();
    
            Project project = projectMapper.queryByCode(projectCode);
            // check project auth
            boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result);
            if (!hasProjectAndPerm) {
       
                return result;
            }
    
            // check schedule exists
            Schedule scheduleObj = scheduleMapper.selectById(id);
    
            if (scheduleObj == null) {
       
                putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, id);
                return result;
            }
            // check schedule release state
            if (scheduleObj.getReleaseState() == scheduleStatus) {
       
                logger.info("schedule release is already {},needn't to change schedule id: {} from {} to {}",
                        scheduleObj.getReleaseState(), scheduleObj.getId(), scheduleObj.getReleaseState(), scheduleStatus);
                putMsg(result, Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, scheduleStatus);
                return result;
            }
            ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(scheduleObj.getProcessDefinitionCode());
            if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
       
                putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(scheduleObj.getProcessDefinitionCode()));
                return result;
            }
            List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, scheduleObj.getProcessDefinitionCode());
            if (processTaskRelations.isEmpty()) {
       
                putMsg(result, Status.PROCESS_DAG_IS_EMPTY);
                return result;
            }
            if (scheduleStatus == ReleaseState.ONLINE) {
       
                // check process definition release state
                if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
       
                    logger.info("not release process definition id: {} , name : {}",
                            processDefinition.getId(), processDefinition.getName());
                    putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName());
                    return result;
                }
                // check sub process definition release state
                List<Long> subProcessDefineCodes = new ArrayList<>();
                processService.recurseFindSubProcess(processDefinition.getCode(), subProcessDefineCodes);
                if (!subProcessDefineCodes.isEmpty()) {
       
                    List<ProcessDefinition> subProcessDefinitionList =
                            processDefinitionMapper.queryByCodes(subProcessDefineCodes);
                    if (subProcessDefinitionList != null && !subProcessDefinitionList.isEmpty()) {
       
                        for (ProcessDefinition subProcessDefinition : subProcessDefinitionList) {
       
                            /**
                             * if there is no online process, exit directly
                             */
                            if (subProcessDefinition.getReleaseState() != ReleaseState.ONLINE) {
       
                                logger.info("not release process definition id: {} , name : {}",
                                        subProcessDefinition.getId(), subProcessDefinition.getName());
                                putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, String.valueOf(subProcessDefinition.getId()));
                                return result;
                            }
                        }
                    }
                }
            }
    
            // check master server exists
            List<Server> masterServers = monitorService.getServerListFromRegistry(true);
    
            if (masterServers.isEmpty()) {
       
                putMsg(result, Status.MASTER_NOT_EXISTS);
                return result;
            }
    
            // set status
            scheduleObj.setReleaseState(scheduleStatus);
    
            scheduleMapper.updateById(scheduleObj);
    
            try {
       
                switch (scheduleStatus) {
       
                    case ONLINE:
                        logger.info("Call master client set schedule online, project id: {}, flow id: {},host: {}", project.getId(), processDefinition.getId(), masterServers);
                        setSchedule(project.getId(), scheduleObj);
                        break;
                    case OFFLINE:
                        logger.info("Call master client set schedule offline, project id: {}, flow id: {},host: {}", project.getId(), processDefinition.getId(), masterServers);
                        deleteSchedule(project.getId(), id);
                        break;
                    default:
                        putMsg(result, Status.SCHEDULE_STATUS_UNKNOWN, scheduleStatus.toString());
                        return result;
                }
            } catch (Exception e) {
       
                result.put(Constants.MSG, scheduleStatus == ReleaseState.ONLINE ? "set online failure" : "set offline failure");
                throw new ServiceException(result.get(Constants.MSG).toString(), e);
            }
    
            putMsg(result, Status.SUCCESS);
            return result;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    public void setSchedule(int projectId, Schedule schedule) {
       
            logger.info("set schedule, project id: {}, scheduleId: {}", projectId, schedule.getId());
    
            quartzExecutor.addJob(ProcessScheduleJob.class, projectId, schedule);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    public void addJob(Class<? extends Job> clazz, int projectId, final Schedule schedule) {
       
            String jobName = this.buildJobName(schedule.getId());
            String jobGroupName = this.buildJobGroupName(projectId);
    
            Map<String, Object> jobDataMap = this.buildDataMap(projectId, schedule);
            String cronExpression = schedule.getCrontab();
            String timezoneId = schedule.getTimezoneId();
    
            /**
             * transform from server default timezone to schedule timezone
             * e.g. server default timezone is `UTC`
             * user set a schedule with startTime `2022-04-28 10:00:00`, timezone is `Asia/Shanghai`,
             * api skip to transform it and save into databases directly, startTime `2022-04-28 10:00:00`, timezone is `UTC`, which actually added 8 hours,
             * so when add job to quartz, it should recover by transform timezone
             */
            Date startDate = DateUtils.transformTimezoneDate(schedule.getStartTime(), timezoneId);
            Date endDate = DateUtils.transformTimezoneDate(schedule.getEndTime(), timezoneId);
    
            lock.writeLock().lock();
            try {
       
    
                JobKey jobKey = new JobKey(jobName, jobGroupName);
                JobDetail jobDetail;
                //add a task (if this task already exists, return this task directly)
                if (scheduler.checkExists(jobKey)) {
       
    
                    jobDetail = scheduler.getJobDetail(jobKey);
                    jobDetail.getJobDataMap().putAll(jobDataMap);
                } else {
       
                    jobDetail = newJob(clazz).withIdentity(jobKey).build();
    
                    jobDetail.getJobDataMap().putAll(jobDataMap);
    
                    scheduler.addJob(jobDetail, false, true);
    
                    logger.info("Add job, job name: {}, group name: {}",
                            jobName, jobGroupName);
                }
    
                TriggerKey triggerKey = new TriggerKey(jobName, jobGroupName);
                /*
                 * Instruct
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
  • 相关阅读:
    AI绘图软件Stable Diffusion 安装和使用之二controlnet插件
    EN 14339地下消防栓—CE认证
    计算机毕业设计Java扶贫产品销售(源码+系统+mysql数据库+lw文档)
    JS ||(或运算)详解
    我已经搞了三年算法面试培训了
    FastDFS安装(含nginx)
    存内计算与扩散模型:下一代视觉AIGC能力提升的关键
    4.9 GHz异帧隔离间距研究
    TestNG与ExtentReport单元测试导出报告文档
    Flutter系列文章-Flutter UI进阶
  • 原文地址:https://blog.csdn.net/qq_26777585/article/details/126488465