• Java定时任务及常见框架


    Java定时任务在实际开发中还是用到很多的,像刷新大屏可视化数据、电商下单付款计时、发送邮件等。

    实现方法大致可以分为两大类吧,

    • 使用JDK自带方法或一些类来实现
    • 使用一些比较成熟的任务框架来实现

    1. 单机定时任务技术选型

    1.1 Timer

    java.util.Timer是 JDK 1.3 开始就已经支持的一种定时任务的实现方式。

    Timer 内部使用一个叫做 TaskQueue 的类存放定时任务,它是一个基于最小堆实现的优先级队列。TaskQueue 会按照任务距离下一次执行时间的大小将任务排序,保证在堆顶的任务最先执行。这样在需要执行任务时,每次只需要取出堆顶的任务运行即可!

    TimerTimerTask用于在后台线程中调度任务的java.util类。TimerTask负责任务的执行,Timer负责任务的调度。

    Timer提供了三种定时模式:

    • 一次性任务
    • 按照固定的延迟执行(fixed delay)
    • 按照固定的周期执行(fixed rate

    1.1.1 执行一次

    //在当前时间往后delay个毫秒开始执行
    public void schedule(TimerTask task, long delay) {...}
    //在指定的time时间点执行
    public void schedule(TimerTask task, Date time) {...}

    1. public static void main(String[] args) {
    2. //定义一个Timer
    3. Timer timer = new Timer("test-timer");
    4. //定义一个TimerTask
    5. TimerTask task = new TimerTask() {
    6. @Override
    7. public void run() {
    8. System.out.println("任务执行时间:" + new Date() + "------------"
    9. + "线程:" + Thread.currentThread().getName());
    10. }
    11. };
    12. long delay = 3000L;
    13. timer.schedule(task, delay);
    14. System.out.println("任务添加时间:" + new Date() + "------------"
    15. + "线程:" + Thread.currentThread().getName());
    16. }

    工作方式:当达到我们指定的时间,执行一次结束

    任务虽然运行结束,但进程没有被销毁。并且执行任务的线程名为我们定义的Timer的名称。我们看一下源码:

    1. public class Timer {
    2. //小顶堆,用来存放timeTask
    3. private final TaskQueue queue = new TaskQueue();
    4. private final TimerThread thread = new TimerThread(queue);
    5. public Timer(String name) {
    6. thread.setName(name);
    7. thread.start();
    8. }
    9. }
    10. public abstract class TimerTask implements Runnable {
    11. long nextExecutionTime;
    12. long period = 0;
    13. public abstract void run();
    14. }
    • TaskQueue:基于小顶堆实现,用来存放timerTask
    • TimerThread:任务执行线程,继承Thread
    • nextExecutionTime:假如任务需要多次执行表示下一次执行时间
    • period:每次任务执行间隔时间
    • run():我们执行任务的内容

    创建一个 Timer 对象就是新启动了一个线程,但是这个新启动的线程,并不是守护线程,它一直在后台运行,通过如下 可以将新启动的 Timer 线程设置为守护线程。我们可以使用以下构造方法(public Timer(boolean isDaemon)public Timer(String name, boolean isDaemon))来设置。

    1.1.2 Fixed Delay模式(固定间隔)

    //从当前时间开始delay个毫秒数开始定期执行,周期是period个毫秒数
    public void schedule(TimerTask task, long delay, long period) {...}
    //从指定的firstTime开始定期执行,往后每次执行的周期是period个毫秒数
    public void schedule(TimerTask task, Date firstTime, long period){...}

    1. public static void main(String[] args) {
    2. Timer timer = new Timer("test-timer");
    3. MyTimerTask task1 = new MyTimerTask("任务1");
    4. MyTimerTask task2 = new MyTimerTask("任务2");
    5. long delay = 1000L;
    6. long period = 2000L;
    7. timer.schedule(task1, delay, period);
    8. timer.schedule(task2, new Date(), period);
    9. }
    10. static class MyTimerTask extends TimerTask {
    11. private String taskName;
    12. public MyTimerTask(String taskName) {
    13. this.taskName = taskName;
    14. }
    15. @Override
    16. public void run() {
    17. try {
    18. Thread.sleep(3000L);
    19. } catch (InterruptedException e) {
    20. throw new RuntimeException(e);
    21. }
    22. System.out.println(taskName + "执行时间:" + new Date() + "------------"
    23. + "线程:" + Thread.currentThread().getName());
    24. }
    25. }

    工作方式:

    • 第一次执行,按照指定时间开始(如果此时TimerThread没有执行其他任务),如有其他任务在执行,那就需要等到其他任务执行完成才能执行
    • 第二次执行,每次任务是上一次任务开始执行时间加上执行的period时间。

    根据任务运行结果来看,任务1和任务2并没有按照我们所预期的间隔2秒来执行,基本上间隔都是在6秒。而且我们注册在同一Timer的任务,都是使用同一个在同一个线程上执行。TimerTask 是以队列的方式一个一个被顺序运行的,所以执行的时间和预期的时间可能不一致,因为前面的任务可能消耗的时间较长,则后面的任务运行的时间会被延迟。延迟的任务具体开始的时间,就是依据前面任务的"结束时间" 。

    1.1.3 Fixed Rate模式(固定速率)

    //从当前时间开始delay个毫秒数开始定期执行,周期是period个毫秒数
    public void scheduleAtFixedRate(TimerTask task, long delay, long period) {...}
    //从指定的firstTime开始定期执行,往后每次执行的周期是period个毫秒数
    public void scheduleAtFixedRate(TimerTask task, Date firstTime, long period){...}

    1. public static void main(String[] args) {
    2. Timer timer = new Timer("test-timer");
    3. MyTimerTask task1 = new MyTimerTask("任务1");
    4. MyTimerTask task2 = new MyTimerTask("任务2");
    5. long delay = 1000L;
    6. long period = 5000L;
    7. timer.scheduleAtFixedRate(task1, delay, period);
    8. timer.scheduleAtFixedRate(task2, new Date(System.currentTimeMillis() - 1000L), period);
    9. }
    10. static class MyTimerTask extends TimerTask {
    11. private String taskName;
    12. public MyTimerTask(String taskName) {
    13. this.taskName = taskName;
    14. }
    15. @Override
    16. public void run() {
    17. try {
    18. Thread.sleep(2000L);
    19. } catch (InterruptedException e) {
    20. throw new RuntimeException(e);
    21. }
    22. System.out.println(taskName + "执行时间:" + new Date() + "------------"
    23. + "线程:" + Thread.currentThread().getName());
    24. }
    25. }

    工作方式:

    一般情况下和schedule()方法没有什么区别,我们可以观察结果发现任务2第一次和第二次执行相差4秒,我们设置开始时间为当前时间前1秒,scheduleAtFixedRate()当计划时间早于当前时间,则任务立即被运行。

    • FixedRate:不管前一个任务执行是否执行完毕,总是匀速执行新任务。
    • FixedDelay:当前一个任务执行完毕后,等待固定的时间间隔,再执行下一次任务。

    1.2 ScheduledExecutorService

    通过上面分析,Java的定时调度可以通过Timer&TimerTask来实现。由于其实现的方式为单线程,因此从JDK1.3发布之后就一直存在一些问题,大致如下:

    1. 多个任务之间会相互影响
    2. 多个任务的执行是串行的,性能较低

    ScheduledExecutorService在设计之初就是为了解决Timer&TimerTask的这些问题。因为天生就是基于多线程机制,所以任务之间不会相互影响(只要线程数足够。当线程数不足时,有些任务会复用同一个线程)。

    除此之外,因为其内部使用的延迟队列,本身就是基于等待/唤醒机制实现的,所以CPU并不会一直繁忙。同时,多线程带来的CPU资源复用也能极大地提升性能。

    因为ScheduledExecutorService继承于ExecutorService,所以本身支持线程池的所有功能。额外还提供了4种方法,我们来看看其作用。

    1. /**
    2. * 带延迟时间的调度,只执行一次
    3. * 调度之后可通过Future.get()阻塞直至任务执行完毕
    4. */
    5. 1. public ScheduledFuture schedule(Runnable command,
    6. long delay, TimeUnit unit);
    7. /**
    8. * 带延迟时间的调度,只执行一次
    9. * 调度之后可通过Future.get()阻塞直至任务执行完毕,并且可以获取执行结果
    10. */
    11. 2. public ScheduledFuture schedule(Callable callable,
    12. long delay, TimeUnit unit);
    13. /**
    14. * 带延迟时间的调度,循环执行,固定频率
    15. */
    16. 3. public ScheduledFuture scheduleAtFixedRate(Runnable command,
    17. long initialDelay,
    18. long period,
    19. TimeUnit unit);
    20. /**
    21. * 带延迟时间的调度,循环执行,固定延迟
    22. */
    23. 4. public ScheduledFuture scheduleWithFixedDelay(Runnable command,
    24. long initialDelay,
    25. long delay,
    26. TimeUnit unit);

    具体不多做分析了,可以理解为Time的多线程版。

    1.3 Spring Task

    Spring Task、是Spring3.0内置的定时任务框架,支持Cron表达式来指定定时任务执行时间。

    下面介绍在SpringBoot中使用Spring Task。

    使用SpringBoot创建定时任务非常简单,目前主要有以下三种创建方式:

    1. 基于注解(@Scheduled)
    2. 基于接口(SchedulingConfigurer) 前者相信大家都很熟悉,但是实际使用中我们往往想从数据库中读取指定时间来动态执行定时任务,这时候基于接口的定时任务就派上用场了。
    3. 基于注解设定多线程定时任务

    1.3.1 基于注解

    @Scheduled注解和@EnableScheduling注解的使用

    基于注解@Scheduled默认为单线程,开启多个任务时,任务的执行时机会受上一个任务执行时间的影响。

    • @EnableScheduling注解: 在配置类上使用,开启计划任务的支持(类上)。
    • @Scheduled注解: 来声明这是一个任务,包括 cron,fixDelay,fixRate 等类型(方法上,需先开启计划任务的支持)
    1. @SpringBootApplication
    2. @EnableScheduling //开启定时任务
    3. public class ScheduledDemoApplication
    4. {
    5. public static void main(String[] args)
    6. {
    7. SpringApplication.run(ScheduledDemoApplication.class, args);
    8. }
    9. }
    10. /**
    11. * 创建定时任务,并使用 @Scheduled 注解。
    12. * @author pan_junbiao
    13. **/
    14. @Component
    15. public class Task
    16. {
    17. @Scheduled(cron="0/5 * * * * ? ") //每5秒执行一次
    18. public void execute(){
    19. SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); //设置日期格式
    20. System.out.println("欢迎访问 pan_junbiao的博客 " + df.format(new Date()));
    21. }
    22. }

    @Scheduled注解各参数讲解

    源码如下:

    1. package org.springframework.scheduling.annotation;
    2. import java.lang.annotation.Documented;
    3. import java.lang.annotation.ElementType;
    4. import java.lang.annotation.Repeatable;
    5. import java.lang.annotation.Retention;
    6. import java.lang.annotation.RetentionPolicy;
    7. import java.lang.annotation.Target;
    8. @Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
    9. @Retention(RetentionPolicy.RUNTIME)
    10. @Documented
    11. @Repeatable(Schedules.class)
    12. public @interface Scheduled {
    13. String CRON_DISABLED = "-";
    14. String cron() default "";
    15. String zone() default "";
    16. long fixedDelay() default -1L;
    17. String fixedDelayString() default "";
    18. long fixedRate() default -1L;
    19. String fixedRateString() default "";
    20. long initialDelay() default -1L;
    21. String initialDelayString() default "";
    22. }

     (1)cron

    该参数接收一个cron表达式,cron表达式是一个字符串,字符串以5或6个空格隔开,分开共6或7个域,每一个域代表一个含义。

    cron 表达式语法:

    格式:[秒] [分] [小时] [日] [月] [周] [年]

    * 表示所有值. 例如:在分的字段上设置 "*",表示每一分钟都会触发。

    ? 表示不指定值。使用的场景为不需要关心当前设置这个字段的值。例如:要在每月的10号触发一个操作,但不关心是周几,所以需要周位置的那个字段设置为"?" 具体设置为 0 0 0 10 * ?

    - 表示区间。例如 在小时上设置 "10-12",表示 10,11,12点都会触发。

    , 表示指定多个值,例如在周字段上设置 "MON,WED,FRI" 表示周一,周三和周五触发

    / 用于递增触发。如在秒上面设置"5/15" 表示从5秒开始,每增15秒触发(5,20,35,50)。在月字段上设置'1/3'所示每月1号开始,每隔三天触发一次。

    L 表示最后的意思。在日字段设置上,表示当月的最后一天(依据当前月份,如果是二月还会依据是否是润年[leap]), 在周字段上表示星期六,相当于"7"或"SAT"。如果在"L"前加上数字,则表示该数据的最后一个。例如在周字段上设置"6L"这样的格式,则表示“本月最后一个星期五"

    W 表示离指定日期的最近那个工作日(周一至周五). 例如在日字段上设置"15W",表示离每月15号最近的那个工作日触发。如果15号正好是周六,则找最近的周五(14号)触发, 如果15号是周未,则找最近的下周一(16号)触发.如果15号正好在工作日(周一至周五),则就在该天触发。如果指定格式为 "1W",它则表示每月1号往后最近的工作日触发。如果1号正是周六,则将在3号下周一触发。(注,"W"前只能设置具体的数字,不允许区间"-").

    # 序号(表示每月的第几个周几),例如在周字段上设置"6#3"表示在每月的第三个周六.注意如果指定"#5",正好第五周没有周六,则不会触发该配置(用在母亲节和父亲节再合适不过了)

    可通过在线生成Cron表达式的工具:在线Cron表达式生成器 来生成自己想要的表达式。

    cron表达式使用占位符

    另外,cron属性接收的cron表达式支持占位符。eg:

    配置文件:

    time:
      cron: */5 * * * * *
      interval: 5

    每5秒执行一次:

    1. @Scheduled(cron="${time.cron}")
    2. void testPlaceholder1() {
    3. System.out.println("Execute at " + System.currentTimeMillis());
    4. }
    5. @Scheduled(cron="*/${time.interval} * * * * *")
    6. void testPlaceholder2() {
    7. System.out.println("Execute at " + System.currentTimeMillis());
    8. }

     (2)zone

     时区,接收一个 java.util.TimeZone#ID。cron表达式会基于该时区解析。默认是一个空字符串,即取服务器所在地的时区。比如我们一般使用的时区Asia/Shanghai。该字段我们一般留空。

    (3)fixedDelay

    上一次执行完毕时间点之后多长时间再执行。如:

    @Scheduled(fixedDelay = 5000) //上一次执行完毕时间点之后5秒再执行

     (4)fixedDelayString

    与 fixedDelay 意思相同,只是使用字符串的形式。唯一不同的是支持占位符。如:

    @Scheduled(fixedDelayString = "5000") //上一次执行完毕时间点之后5秒再执行

    占位符的使用:

    在 application.yml 配置文件中添加如下配置:

    time:
      fixedDelay: 5000

    1. /**
    2. * 定时任务的使用
    3. * @author pan_junbiao
    4. **/
    5. @Component
    6. public class Task
    7. {
    8. @Scheduled(fixedDelayString = "${time.fixedDelay}")
    9. void testFixedDelayString()
    10. {
    11. System.out.println("欢迎访问 pan_junbiao的博客 " + System.currentTimeMillis());
    12. }
    13. }

     (5)fixedRate

    上一次开始执行时间点之后多长时间再执行。如:

    @Scheduled(fixedRate = 5000) //上一次开始执行时间点之后5秒再执行

    (6)fixedRateString 

    与 fixedRate 意思相同,只是使用字符串的形式。唯一不同的是支持占位符。

    (7) initialDelay

    第一次延迟多长时间后再执行。如:

    @Scheduled(initialDelay=1000, fixedRate=5000) //第一次延迟1秒后执行,之后按fixedRate的规则每5秒执行一次

    (8) initialDelayString 

    与 initialDelay 意思相同,只是使用字符串的形式。唯一不同的是支持占位符。

    1.3.2 动态:基于接口(SchedulingConfigurer)

    (1)在MySQL数据库中创建 cron 表,并添加数据。

    DROP TABLE IF EXISTS cron;
    CREATE TABLE cron  (
      cron_id VARCHAR(30) NOT NULL PRIMARY KEY,
      cron VARCHAR(30) NOT NULL  
    );
     
    INSERT INTO cron VALUES ('1', '0/5 * * * * ?');

    (2)添加pom.xml配置信息

    在pom.xml配置文件中添加MyBatis、 MySQL的JDBC数据库驱动依赖。

    1. <dependency>
    2. <groupId>org.mybatis.spring.bootgroupId>
    3. <artifactId>mybatis-spring-boot-starterartifactId>
    4. <version>2.1.3version>
    5. dependency>
    6. <dependency>
    7. <groupId>mysqlgroupId>
    8. <artifactId>mysql-connector-javaartifactId>
    9. <version>8.0.20version>
    10. dependency>

    (3)配置相关信息

    将项目默认的application.properties文件的后缀修改为“.yml”,即配置文件名称为:application.yml,并配置以下信息:

    1. spring:
    2. #DataSource数据源
    3. datasource:
    4. url: jdbc:mysql://localhost:3306/db_admin?useSSL=false&
    5. username: root
    6. password: 123456
    7. driver-class-name: com.mysql.cj.jdbc.Driver
    8. #MyBatis配置
    9. mybatis:
    10. type-aliases-package: com.pjb.entity #别名定义
    11. configuration:
    12. log-impl: org.apache.ibatis.logging.stdout.StdOutImpl #指定 MyBatis 所用日志的具体实现,未指定时将自动查找
    13. map-underscore-to-camel-case: true #开启自动驼峰命名规则(camel case)映射
    14. lazy-loading-enabled: true #开启延时加载开关
    15. aggressive-lazy-loading: false #将积极加载改为消极加载(即按需加载),默认值就是false
    16. lazy-load-trigger-methods: "" #阻挡不相干的操作触发,实现懒加载
    17. cache-enabled: true #打开全局缓存开关(二级环境),默认值就是true

    4)创建定时器

    数据库准备好数据之后,我们编写定时任务,注意这里添加的是TriggerTask,目的是循环读取我们在数据库设置好的执行周期,以及执行相关定时任务的内容。具体代码如下:

    1. /**
    2. * 动态定时任务配置类
    3. * @author pan_junbiao
    4. **/
    5. @Configuration //1.主要用于标记配置类,兼备Component的效果
    6. @EnableScheduling //2.开启定时任务
    7. public class DynamicScheduleConfigurer implements SchedulingConfigurer
    8. {
    9. @Mapper
    10. public interface CronMapper {
    11. @Select("select cron from cron limit 1")
    12. public String getCron();
    13. }
    14. //注入mapper
    15. @Autowired
    16. @SuppressWarnings("all")
    17. CronMapper cronMapper;
    18. /**
    19. * 执行定时任务.
    20. */
    21. @Override
    22. public void configureTasks(ScheduledTaskRegistrar taskRegistrar)
    23. {
    24. taskRegistrar.addTriggerTask(
    25. //1.添加任务内容(Runnable)
    26. () -> System.out.println("欢迎访问 pan_junbiao的博客: " + LocalDateTime.now().toLocalTime()),
    27. //2.设置执行周期(Trigger)
    28. triggerContext -> {
    29. //2.1 从数据库获取执行周期
    30. String cron = cronMapper.getCron();
    31. //2.2 合法性校验.
    32. if (StringUtils.isEmpty(cron)) {
    33. // Omitted Code ..
    34. }
    35. //2.3 返回执行周期(Date)
    36. return new CronTrigger(cron).nextExecutionTime(triggerContext);
    37. }
    38. );
    39. }
    40. }

     1.3.3 基于注解设定多线程定时任务

    1. /**
    2. * 基于注解设定多线程定时任务
    3. * @author pan_junbiao
    4. */
    5. @Component
    6. @EnableScheduling // 1.开启定时任务
    7. @EnableAsync // 2.开启多线程
    8. public class MultithreadScheduleTask
    9. {
    10. @Async
    11. @Scheduled(fixedDelay = 1000) //间隔1秒
    12. public void first() throws InterruptedException {
    13. System.out.println("第一个定时任务开始 : " + LocalDateTime.now().toLocalTime() + "\r\n线程 : " + Thread.currentThread().getName());
    14. System.out.println();
    15. Thread.sleep(1000 * 10);
    16. }
    17. @Async
    18. @Scheduled(fixedDelay = 2000)
    19. public void second() {
    20. System.out.println("第二个定时任务开始 : " + LocalDateTime.now().toLocalTime() + "\r\n线程 : " + Thread.currentThread().getName());
    21. System.out.println();
    22. }
    23. }

    注意:由于基于注解@Scheduled默认为单线程,开启多个任务时,任务的执行时机会受上一个任务执行时间的影响。所以这里使用 @Async 注解很关键。

    从控制台可以看出,第一个定时任务和第二个定时任务互不影响;

    并且,由于开启了多线程,第一个任务的执行时间也不受其本身执行时间的限制,所以需要注意可能会出现重复操作导致数据异常。

    2. 分布式定时任务技术选型

    上面提到了一些定时任务的解决方案都是在单机下执行的,当遇到一些复杂场景例如分布式下的分片和高可用的话,就需要用到分布式定时框架。

    通常情况下,一个定时任务要涉及到以下三个角色

    • 任务:首先肯定是要执行的任务,这个任务就是具体的业务逻辑比如定时发送文章。
    • 调度器:其次是调度中心,调度中心主要负责任务管理,会分配任务给执行器。
    • 执行器:最后就是执行器,执行器接收调度器分派的任务并执行。

    介绍一下常见的一些分布式定时任务框架

    QuartZxxl-jobSchedulerX 2.0PowerJob
    推荐度1423
    是否有前端页面NYYY
    定时类型CRONCRONCRON、固定频率、固定延迟、OpenAPICRON、固定频率、固定延迟、OpenAPI
    支持数据库

    关系型数据库

    (MySQL、Oracle...)

    MySQL人民币(不开源)任意 Spring Data Jpa支持的关系型数据库(MySQL、Oracle...)
    报警监控邮件短信邮件,提供接口允许开发者扩展
    指定调度类型不确定支持不确定不支持
    开发方式Bean里的方法上加注解略复杂:单个Bean实现PowerJob的指定接口。
    任务类型内置Java内置Java、GLUE Java、Shell、Python等脚本内置Java、外置Java(FatJar)、Shell、Python等脚本内置Java、外置Java(容器)、Shell、Python等脚本
    分布式任务静态分片MapReduce 动态分片MapReduce 动态分片
    在线任务治理不支持支持支持支持
    日志白屏化不支持支持不支持支持
    调度方式和性能

    基于数据库锁

    有性能瓶颈

    基于数据库锁

    有性能瓶颈

    不详无锁化设计,性能强劲无上限
    DGA工作流不支持不支持不支持不支持

    2.1 QuartZ

    2.1.1 核心元素

    Job:一个函数式接口,其中的execute方法就是我们需要具体实现的业务任务逻辑。

    JobDetail:用于绑定Job,并对Job进行描述,其中提供了很多描述性属性如:name 任务名称、group 任务组、description 任务描述、jobClass 任务类、jobDataMap 任务自定义参数等。

    Tigger:触发器,用于定义Job的执行时间、执行间隔、执行频率等。在Quartz中主要有四种类型的Trigger:SimpleTrigger、CronTrigger、DataIntervalTrigger和NthIncludedTrigger。

    Scheduler:调度器,用于实际协调和组织JobDetail与Trigger。Quartz提供了DirectSchedulerFactory和StdSchedulerFactory等工厂类,用于支持Scheduler相关对象的产生。

    2.1.2 核心元素关系

    Scheduler可看成是一个定时任务调度容器,里面可注入多组任务(JobDetail与Trigger),而每个JobDetail又绑定了一个Job实例。一个JobDetail可对应多个Trigger,一个Trigger只能对应一个JobDetail。

    2.1.3 Quartz线程模型

    Quartz中主要存在两类线程:即执行线程和调度线程。

    执行线程通常由一个线程池维护,主要作用是执行Trigger中即将开始的任务。

    调度线程又分为Regular Scheduler Thread(执行常规调度)和Misfire Scheduler Thread(执行错失的任务)。

    其中Regular Thread 轮询Trigger,如果有将要触发的Trigger,则从执行任务线程池中获取一个空闲线程,然后执行与该Trigger关联的job;

    Misfire Thraed则是扫描所有的trigger,查看是否有错失的,如果有的话,根据一定的策略进行处理。

    ClusterManager线程:Quartz集群部署时,则还存在集群线程(ClusterManager线程),主要作用是定时检测集群中各节点健康状态。若发现宕机节点,则将其任务交由其他健康节点继续执行。

    2.1.4 Quartz核心配置文件

    Quartz默认加载工程目录下的quartz.properties,如果工程目录下没有,就会去加载quartz.jar包下面的quartz.properties文件,也可自定义配置位置。

    配置属性大体可分为:

    • 调度器属性
    • 线程池属性
    • 作业存储设置
    • 插件配置

    以整合springboot为例,贴出核心配置:

    1. # 定时任务的表前缀
    2. org.quartz.jobStore.tablePrefix=qrtz_
    3. # 是否是集群的任务
    4. org.quartz.jobStore.isClustered=true
    5. # 检查集群的状态间隔
    6. org.quartz.jobStore.clusterCheckinInterval=5000
    7. # 如果当前的执行周期被错过 任务持有的时长超过此时长则认为任务过期,单位ms
    8. org.quartz.jobStore.misfireThreshold=6000
    9. # 事务的隔离级别 推荐使用默认级别 设置为true容易造成死锁和不可重复读的一些事务问题
    10. org.quartz.jobStore.txIsolationLevelSerializable=false
    11. # 任务存储方式它应当是org.quartz.spi.JobStore的子类
    12. org.quartz.jobStore.class=org.springframework.scheduling.quartz.LocalDataSourceJobStore
    13. # 保证待执行的任务是锁定的 避免集群任务被其他现场抢断
    14. org.quartz.jobStore.acquireTriggersWithinLock=true
    15. # 数据库系统的方言StdJDBCDelegate标准JDBC方言
    16. org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
    17. # 定时任务实例的id 默认自动
    18. org.quartz.scheduler.instanceId=AUTO
    19. # 定时任务的线程名,相同集群实例名称必须相同
    20. org.quartz.scheduler.instanceName=ClusterJMVCScheduler
    21. # 定时任务线程池
    22. org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
    23. # 线程池的线程总数 默认10
    24. org.quartz.threadPool.threadCount=10
    25. # 线程池的优先级
    26. org.quartz.threadPool.threadPriority=5
    27. # 自创建父线程
    28. org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread=true

    2.1.5 Misfire过失策略

    针对CronTrigger和SimpleTrigger过失策略分别如下:

    (1)CronTrigger

    • withMisfireHandlingInstructionDoNothing:不触发立即执行,等待下次Cron触发频率到达时刻开始按照Cron频率依次执行
    • withMisfireHandlingInstructionIgnoreMisfires:以错过的第一个频率时间立刻开始执行,重做错过的所有频率周期后,当下一次触发频率发生时间大于当前时间后,再按照正常的Cron频率依次执行
    • withMisfireHandlingInstructionFireAndProceed:以当前时间为触发频率立刻触发一次执行,然后按照Cron频率依次执行

    (2)SimpleTrigger

    • withMisfireHandlingInstructionFireNow:以当前时间为触发频率立即触发执行,执行至FinalTIme的剩余周期次数。以调度或恢复调度的时刻为基准的周期频率,FinalTime根据剩余次数和当前时间计算得到调整后的FinalTime会略大于根据starttime计算的到的FinalTime值。
    • withMisfireHandlingInstructionIgnoreMisfires:以错过的第一个频率时间立刻开始执行,重做错过的所有频率周期,当下一次触发频率发生时间大于当前时间以后,按照Interval的依次执行剩下的频率,共执行RepeatCount+1次
    • withMisfireHandlingInstructionNextWithExistingCount:不触发立即执行,等待下次触发频率周期时刻,执行至FinalTime的剩余周期次数。以startTime为基准计算周期频率,并得到FinalTime,即使中间出现pause,resume以后保持FinalTime时间不变
    • withMisfireHandlingInstructionNowWithExistingCount:以当前时间为触发频率立即触发执行,执行至FinalTIme的剩余周期次数。以调度或恢复调度的时刻为基准的周期频率,FinalTime根据剩余次数和当前时间计算得到调整后的FinalTime会略大于根据starttime计算的到的FinalTime值
    • withMisfireHandlingInstructionNextWithRemainingCount:不触发立即执行,等待下次触发频率周期时刻,执行至FinalTime的剩余周期次数。以startTime为基准计算周期频率,并得到FinalTime
    • 即使中间出现pause,resume以后保持FinalTime时间不变
    • withMisfireHandlingInstructionNowWithRemainingCount:以当前时间为触发频率立即触发执行,执行至FinalTIme的剩余周期次数。以调度或恢复调度的时刻为基准的周期频率,FinalTime根据剩余次数和当前时间计算得到调整后的FinalTime会略大于根据starttime计算的到的FinalTime值

    (3)核心策略枚举说明

    • MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY:这个不是忽略已经错失的触发的意思,而是说忽略MisFire策略。它会在资源合适的时候,重新触发所有的MisFire任务,并且不会影响现有的调度时间。比如,SimpleTrigger每15秒执行一次,而中间有5分钟时间它都MisFire了,一共错失了20个,5分钟后,假设资源充足了,并且任务允许并发,它会被一次性触发。这个属性是所有Trigger都适用。
    • MISFIRE_INSTRUCTION_FIRE_NOW:忽略已经MisFire的任务,并且立即执行调度。这通常只适用于只执行一次的任务。
    • MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_EXISTING_REPEAT_COUNT:将startTime设置当前时间,立即重新调度任务,包括MisFire的。
    • MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_REMAINING_REPEAT_COUNT:类似MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_EXISTING_REPEAT_COUNT,区别在于会忽略已经MisFire的任务。
    • MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_EXISTING_COUNT:在下一次调度时间点,重新开始调度任务,包括MisFire的。
    • MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_REMAINING_COUNT
    • 类似于MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_EXISTING_COUNT,区别在于会忽略已经MisFire的任务。

    (4)默认策略

    CronTrigger和SimpleTrigger默认采用MISFIRE_INSTRUCTION_SMART_POLICY大致意思是“把处理逻辑交给聪明的Quartz去决定”。基本策略是

    • 如果是只执行一次的调度,使用MISFIRE_INSTRUCTION_FIRE_NOW
    • 如果是无限次的调度(repeatCount是无限的),使用MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_REMAINING_COUNT
    • 否则,使用MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_EXISTING_REPEAT_COUNT

    2.1.6 Quartz启动流程

    以Quartz和spring整合为例,当spring容器启动时,就会装载相关的bean。SchedulerFactoryBean实现了InitializingBean接口,因此在初始化bean的时候,会执行afterPropertiesSet方法,该方法将会调用SchedulerFactory(DirectSchedulerFactory 或者 StdSchedulerFactory,通常用StdSchedulerFactory)创建Scheduler。

    SchedulerFactory在创建quartzScheduler的过程中,将会读取配置参数,初始化各个组件,关键组件如下:

    • ThreadPool:一般是使用SimpleThreadPool,SimpleThreadPool创建了一定数量的WorkerThread实例来使得Job能够在线程中进行处理。WorkerThread是定义在SimpleThreadPool类中的内部类,它实质上就是一个线程。在SimpleThreadPool中有三个list:workers-存放池中所有的线程引用,availWorkers-存放所有空闲的线程,busyWorkers-存放所有工作中的线程;线程池的配置参数如下所示:
    • org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
    • org.quartz.threadPool.threadCount=3
    • org.quartz.threadPool.threadPriority=5
    • JobStore:分为存储在内存的RAMJobStore和存储在数据库的JobStoreSupport(包括JobStoreTX和JobStoreCMT两种实现,JobStoreCMT是依赖于容器来进行事务的管理,而JobStoreTX是自己管理事务),若要使用集群要使用JobStoreSupport的方式;
    • QuartzSchedulerThread:用来进行任务调度的线程,在初始化的时候paused=true,halted=false,虽然线程开始运行了,但是paused=true,线程会一直等待,直到start方法将paused置为false;

    另外,SchedulerFactoryBean还实现了SmartLifeCycle接口,因此初始化完成后,会执行start()方法,该方法将主要会执行以下的几个动作:

    • 创建ClusterManager线程并启动线程:该线程用来进行集群故障检测和处理,将在下文详细讨论;
    • 创建MisfireHandler线程并启动线程:该线程用来进行misfire任务的处理,只有当QuartzSchedulerThread的paused=false,调度线程才真正开始调度

    2.1.7 Quartz持久化

    Quartz持久化即将trigger和job基于jdbc存入数据库。Quartz中有两种存储方式:RAMJobStore,JobStoreSupport,其中RAMJobStore是将trigger和job存储在内存中,而JobStoreSupport是基于jdbc将trigger和job存储到数据库中。RAMJobStore的存取速度非常快,但是由于其在系统被停止后所有的数据都会丢失,所以在集群应用中,必须使用JobStoreSupport。
    集成时,执行去Quartz官网下载对应数据库sql文件导入并开启Quartz持久化配置即可:

    2.1.8 Quartz集群

    Quartz集群是基于数据库实现,主要利用了数据库的悲观锁机制。一个Quartz集群中的每个节点是一个独立的Quartz应用,它又管理着其他的节点。这就意味着你必须对每个节点分别启动或停止。Quartz集群中,独立的Quartz节点并不与另一其的节点或是管理节点通信,而是通过相同的数据库表来感知到另一Quartz应用的。

    在大型分布式系统中,为了避免Quartz集群表和业务表之间互相影响,导致数据库性能和Quartz集群、业务系统稳定性,建议是Quartz独立出数据库或独立出定时任务系统。

    2.1.9 Quartz坑集盘点

    (1)Job无法注入spring容器其他bean

    Quartz中每次执行任务时,会由JobFactory重新创建一个新Job实例,此实例默认采用反射newInstance创建且并未交给spring管理,所以在实例化时也无法注入其他spring bean。

    可通过自定JobFactory方式解决,当然在与springboot整合时,QuartzAutoConfiguration自动配置类已经帮我们处理了。

    (2)集群环境下时间同步问题

    Quartz实际并不关心你是在相同还是不同的机器上运行节点。当集群放置在不同的机器上时,称之为水平集群。节点跑在同一台机器上时,称之为垂直集群。对于垂直集群,存在着单点故障的问题。这对高可用性的应用来说是无法接受的,因为一旦机器崩溃了,所有的节点也就被终止了。对于水平集群,存在着时间同步问题。

    节点用时间戳来通知其他实例它自己的最后检入时间。假如节点的时钟被设置为将来的时间,那么运行中的Scheduler将再也意识不到那个结点已经宕掉了。另一方面,如果某个节点的时钟被设置为过去的时间,也许另一节点就会认定那个节点已宕掉并试图接过它的Job重运行。最简单的同步计算机时钟的方式是使用某一个Internet时间服务器(Internet Time Server ITS)。

    (3)节点争抢Job问题

    因为Quartz使用了一个随机的负载均衡算法,Job以随机的方式由不同的实例执行。Quartz官网上提到当前,还不存在一个方法来指派(钉住) 一个 Job 到集群中特定的节点。

    (4)从集群获取Job列表问题

    当前,如果不直接进到数据库查询的话,还没有一个简单的方式来得到集群中所有正在执行的Job列表。请求一个Scheduler实例,将只能得到在那个实例上正运行Job的列表。Quartz官网建议可以通过写一些访问数据库JDBC代码来从相应的表中获取全部的Job信息。

     2.1.10 Quartz实战

    基于mysql数据库搭建Quartz集群,并整合springboot、mybatisplus实现一个轻量企业级定时任务框架。

    1. 要求项目启动,自动扫描加载定时job。
    2. 对job支持动态的增删改查功能。
    3. 对job的执行耗时和日志做记录。

    (1)自定义JOB注解,并设置启动装载所有job

    1. package com.zkc.quartzdemo.annotation;
    2. import java.lang.annotation.ElementType;
    3. import java.lang.annotation.Retention;
    4. import java.lang.annotation.RetentionPolicy;
    5. import java.lang.annotation.Target;
    6. /**
    7. * @author kczhang@wisedu.com
    8. * @version 1.0.0
    9. * @since 2021-04-15
    10. */
    11. @Target({ElementType.TYPE})
    12. @Retention(RetentionPolicy.RUNTIME)
    13. public @interface ScheduleAnn {
    14. /**
    15. * 定时任务的名称
    16. */
    17. String name() default "";
    18. /**
    19. * 定时任务的定时表达式
    20. */
    21. String cronExpression() default "";
    22. /**
    23. * 定时任务所属群组
    24. */
    25. String group() default "";
    26. /**
    27. * 当前定时任务的描述
    28. */
    29. String description() default "";
    30. }
    1. package com.zkc.quartzdemo.config;
    2. import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
    3. import org.springframework.context.annotation.Bean;
    4. import org.springframework.context.annotation.Configuration;
    5. /**
    6. * @author kczhang@wisedu.com
    7. * @version 1.0.0
    8. * @since 2021-04-15
    9. */
    10. @Configuration
    11. public class QuartzConfig {
    12. @Bean(initMethod = "init")
    13. @ConditionalOnMissingBean
    14. public QuartzInit bootStarter() {
    15. return new QuartzInit();
    16. }
    17. }
    1. package com.zkc.quartzdemo.config;
    2. import com.zkc.quartzdemo.annotation.ScheduleAnn;
    3. import com.zkc.quartzdemo.dto.ScheduleJobVO;
    4. import com.zkc.quartzdemo.service.QuartzService;
    5. import lombok.RequiredArgsConstructor;
    6. import lombok.extern.slf4j.Slf4j;
    7. import org.quartz.Job;
    8. import org.reflections.Reflections;
    9. import org.springframework.beans.factory.annotation.Autowired;
    10. import org.springframework.beans.factory.annotation.Value;
    11. import java.util.List;
    12. import java.util.regex.Pattern;
    13. /**
    14. * @author kczhang@wisedu.com
    15. * @version 1.0.0
    16. * @since 2021-04-15
    17. */
    18. @Slf4j
    19. public class QuartzInit {
    20. @Value("${spring.quartz.package-scan:com.zkc}")
    21. private String packageScan;
    22. private final static Pattern expressionPattern = Pattern.compile("\\$\\{(.*)}");
    23. @Autowired
    24. private QuartzService quartzService;
    25. public void init() {
    26. List allJobClassNames = quartzService.getAllScheduleJobClassNames();
    27. new Reflections(packageScan).getTypesAnnotatedWith(ScheduleAnn.class).forEach(jobClass -> {
    28. String jobClassName = jobClass.getName();
    29. if (allJobClassNames.contains(jobClassName)) return;
    30. // 如果当前jobClass不是org.quartz.Job的子类则不做插入任务操作
    31. if (!Job.class.isAssignableFrom(jobClass)) {
    32. log.error("类[{}]未继承[org.quartz.Job]无法初始化为定时任务。", jobClassName);
    33. return;
    34. }
    35. ScheduleAnn annotate = jobClass.getAnnotation(ScheduleAnn.class);
    36. String name = annotate.name();
    37. String cronExpression = annotate.cronExpression();
    38. String group = annotate.group();
    39. String description = annotate.description();
    40. ScheduleJobVO scheduleJobVO = new ScheduleJobVO()
    41. .setClassName(jobClassName)
    42. .setName(name)
    43. .setCron(cronExpression)
    44. .setGroup(group)
    45. .setDescription(description);
    46. // 只创建数据库中不存在的定时任务 存在的则不做创建更新
    47. boolean existsFlag = quartzService.scheduleExists(name, group);
    48. if (!existsFlag) {
    49. quartzService.addJob(scheduleJobVO);
    50. quartzService.addInitJob(jobClassName);
    51. }
    52. });
    53. }
    54. }

     (2)定义调度servcie 与 controller

    1. package com.zkc.quartzdemo.service;
    2. import com.zkc.quartzdemo.dto.*;
    3. import java.util.List;
    4. /**
    5. * @author kczhang@wisedu.com
    6. * @version 1.0.0
    7. * @since 2021-04-15
    8. */
    9. public interface QuartzService {
    10. List getAllScheduleJobClassNames();
    11. boolean scheduleExists(String name, String group);
    12. void addInitJob(String jobClassName);
    13. boolean addJob(ScheduleJobVO scheduleJobVO);
    14. boolean addDynamicScheduleJob(ScheduleAddCO scheduleAddCO);
    15. List getAllJobGroups();
    16. List getJobDetails();
    17. boolean updateScheduleJob(ScheduleUpdateCO scheduleUpdateCO);
    18. boolean deleteDynamicScheduleById(String id);
    19. List getScheduleJobs(ScheduleQryCO scheduleQryCo);
    20. boolean changeScheduleJobStatus(ScheduleStatusCO statusCO);
    21. List getSystemAllJobs();
    22. }
    1. package com.zkc.quartzdemo.controller;
    2. import com.zkc.quartzdemo.common.MultiResponse;
    3. import com.zkc.quartzdemo.common.SingleResponse;
    4. import com.zkc.quartzdemo.dto.*;
    5. import com.zkc.quartzdemo.service.QuartzService;
    6. import lombok.RequiredArgsConstructor;
    7. import org.springframework.validation.annotation.Validated;
    8. import org.springframework.web.bind.annotation.*;
    9. import javax.validation.Valid;
    10. import javax.validation.constraints.NotEmpty;
    11. /**
    12. * @author kczhang@wisedu.com
    13. * @version 1.0.0
    14. * @since 2021-04-15
    15. */
    16. @RestController
    17. @RequiredArgsConstructor
    18. @RequestMapping("/admin/api/schedule_job")
    19. @Validated
    20. public class ScheduleController {
    21. private final QuartzService quartzService;
    22. /**
    23. * 添加一个新的定时任务
    24. *
    25. * @param scheduleAddCO 任务详情
    26. * @return 任务添加成功与否
    27. */
    28. @PostMapping(value = "/create")
    29. public SingleResponse addNewSchedule(@Valid @RequestBody ScheduleAddCO scheduleAddCO) {
    30. return SingleResponse.of(quartzService.addDynamicScheduleJob(scheduleAddCO));
    31. }
    32. /**
    33. * 获取任务的组集合
    34. *
    35. * @return 组s
    36. */
    37. @GetMapping(value = "/groups")
    38. public MultiResponse findAllScheduleJobGroups() {
    39. return MultiResponse.ofWithoutTotal(quartzService.getAllJobGroups());
    40. }
    41. /**
    42. * 更新定时任务
    43. *
    44. * @param scheduleUpdateC0 任务详情
    45. * @return 任务添加成功与否
    46. */
    47. @PostMapping(value = "/update")
    48. public SingleResponse updateScheduleJob(@Valid @RequestBody ScheduleUpdateCO scheduleUpdateC0) {
    49. return SingleResponse.of(quartzService.updateScheduleJob(scheduleUpdateC0));
    50. }
    51. /**
    52. * 删除定时任务
    53. *
    54. * @param id 任务的id
    55. */
    56. @GetMapping(value = "/remove")
    57. public SingleResponse deleteScheduleJob(@NotEmpty String id) {
    58. return SingleResponse.of(quartzService.deleteDynamicScheduleById(id));
    59. }
    60. /**
    61. * 获取定时任务列表
    62. *
    63. * @param scheduleQryCO 搜索条件
    64. * @return 定时任务列表
    65. */
    66. @GetMapping(value = "/list")
    67. public MultiResponse getScheduleJobs(ScheduleQryCO scheduleQryCO) {
    68. return MultiResponse.ofWithoutTotal(quartzService.getScheduleJobs(scheduleQryCO));
    69. }
    70. /**
    71. * 更新定时任务状态
    72. *
    73. * @param statusCO 任务状态
    74. * @return 定时任务状态更新成功与否
    75. */
    76. @PostMapping(value = "/updateScheduleJobStatus")
    77. public SingleResponse changeScheduleJobStatus(@Valid @RequestBody ScheduleStatusCO statusCO) {
    78. return SingleResponse.of(quartzService.changeScheduleJobStatus(statusCO));
    79. }
    80. /**
    81. * 任务集合
    82. *
    83. * @return 继承Job的SpringBean集合信息
    84. */
    85. @GetMapping(value = "/jobs")
    86. public MultiResponse getSystemAllJobs() {
    87. return MultiResponse.ofWithoutTotal(quartzService.getSystemAllJobs());
    88. }
    89. }

     (3)除了quartz官方持久化表,新增一个job初始化表

    DROP TABLE IF EXISTS `qrtz_auto_initialized`;
    CREATE TABLE `qrtz_auto_initialized` (
      `id` bigint(20) NOT NULL AUTO_INCREMENT,
      `job_class_name` varchar(500) DEFAULT NULL,
      `init_time` datetime DEFAULT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4;

    完整代码已分享到码云

    https://gitee.com/zhang_kaicheng/quartz-demo

    2.1.11 参考文章

    Quartz从入门到精通(最详细基础-进阶-实战)_quartz 学习-CSDN博客

     2.2 XXL-JOB

  • 相关阅读:
    如何使用插件扩展Vue的功能
    Jmeter接口测试
    服装行业如何做数字化转型?
    IB选课指南及热门专业选课建议
    LVS+Keepalived群集
    React源码分析3-render阶段(穿插scheduler和reconciler)
    Python 多进程间访问效率低,如何解决?
    基于Java的高校宿舍管理系统设计与实现(源码+lw+部署文档+讲解等)
    算法训练营day35, 二叉搜索树的范围和
    Node学习笔记之使用Express框架开发接口
  • 原文地址:https://blog.csdn.net/weixin_45734473/article/details/133647534