在项目中,我们经常需要用到定时任务。
很多场景需要用到定时任务
这些项目无疑都有一个共同点就是需要用到定时任务。
前面我们已经直到了使用quartz的@Scheduled注解方式开发定时任务,这显然很不方便动态发布定时任务。
@Slf4j
@Component
public class MyJob {
@Scheduled(cron = "0,15,30,45 * * * * ?")
public void hello(){
log.info("say hello");
}
}
基于上面的定时任务需要,我们需要一个手动可以修改的定时发布任务。既可以完成定时发布手机这样的一次性任务,也可以完成周期性的任务。并且必须保证系统重启后依然可以继续执行原来的任务。
本章基于springboot 实现
需要mysql做任务的持久化,redis做多机高可用
本章代码已分享至:https://gitee.com/lengcz/springboot-quratz01
基于前面的一些困难,我们需要将定时任务持久化到数据库,在系统重启后,我们可以重新加载任务。
本建表语句基于mysql
CREATE TABLE `myquartz_task` (
`tid` int NOT NULL AUTO_INCREMENT,
`task_code` varchar(50) NOT NULL COMMENT '任务编码',
`task_name` varchar(100) NOT NULL COMMENT '任务名称',
`job_class` varchar(255) NOT NULL COMMENT '类全名',
`params` varchar(255) DEFAULT NULL COMMENT '参数',
`job_group` varchar(50) NOT NULL COMMENT '分组',
`cron` varchar(255) NOT NULL COMMENT 'cron时间',
`create_time` datetime NOT NULL COMMENT '创建时间',
`task_status` int NOT NULL COMMENT '任务状态0,无效,1生效',
PRIMARY KEY (`tid`),
UNIQUE KEY `uk_taskCode` (`task_code`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='定时任务计划表';
CREATE TABLE `myquartz_task_record` (
`rid` bigint NOT NULL AUTO_INCREMENT,
`task_code` varchar(50) DEFAULT NULL COMMENT '任务编码',
`job_class` varchar(255) DEFAULT NULL COMMENT '类全名',
`params` varchar(255) DEFAULT NULL COMMENT '参数',
`job_group` varchar(50) DEFAULT NULL COMMENT '分组',
`start_time` datetime DEFAULT NULL,
`end_time` datetime DEFAULT NULL COMMENT '创建时间',
`exec_status` int DEFAULT NULL COMMENT '任务状态1 OK,0 FAIL',
PRIMARY KEY (`rid`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='定时任务运行记录表'
定时任务的创建,保存,取消,删除和重新加载
package com.lcz.myquartz.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.lcz.myquartz.entity.EQTask;
import com.lcz.myquartz.entity.QTask;
import com.lcz.myquartz.mapper.QTaskMapper;
import com.lcz.myquartz.quartzjobbean.MyQuartzJobBean;
import com.lcz.myquartz.service.QTaskService;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@Slf4j
@Service
public class QTaskServiceImpl implements QTaskService {
@Autowired
private QTaskMapper qTaskMapper;
@Autowired
private Scheduler scheduler;
public boolean saveAndStartTask(QTask qTask) throws Exception {
try {
boolean flag = true;
String clazzName = qTask.getJobClass();
while (flag) {
Class clazz = Class.forName(clazzName).getSuperclass();
String className = clazz.getName();
if (className.equals("java.lang.Object")) {
throw new RuntimeException(qTask.getJobClass() + " not extends "+MyQuartzJobBean.class.getName());
} else if (className.equals(MyQuartzJobBean.class.getName())) {
//检查到继承了,可以跳出
flag = false;
}
clazzName = className;
}
int result = qTaskMapper.insert(qTask);
if (result > 0) {
startTask(qTask);
}
return true;
} catch (Exception e) {
log.error(e.getMessage());
e.printStackTrace();
}
return false;
}
/**
* 保存
*
* @param qTask
* @return
*/
private boolean startTask(QTask qTask) throws Exception {
boolean existTask = scheduler.checkExists(JobKey.jobKey(qTask.getTaskCode()));
if (existTask) {
boolean flag = scheduler.deleteJob(JobKey.jobKey(qTask.getTaskCode()));
if (!flag) {
log.error("cancel task error:" + JSONObject.toJSONString(qTask));
return false;
}
}
//传入的参数
String params = qTask.getParams();
JobDataMap jobDataMap = null;
if (StringUtils.isNotBlank(params)) {
jobDataMap = JSON.parseObject(params, JobDataMap.class);
} else {
jobDataMap = new JobDataMap();
}
String className = qTask.getJobClass();
Class clz = Class.forName(className);
// System.out.println(clz);
JobDetail jobDetail = JobBuilder.newJob(clz).usingJobData(jobDataMap).withIdentity(qTask.getTaskCode(), qTask.getJobGroup()).storeDurably().build();
String cron = qTask.getCron();
String[] crons = qTask.getCron().split(";");
Set<Trigger> triggers = new HashSet<>();
for (String cr : crons) {
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(cr);
Trigger trigger = TriggerBuilder.newTrigger().forJob(jobDetail).withSchedule(cronScheduleBuilder).build();
triggers.add(trigger);
}
scheduler.scheduleJob(jobDetail, triggers, true);
return true;
}
public boolean delTask(String taskCode) throws Exception {
LambdaUpdateWrapper<QTask> qw = new LambdaUpdateWrapper<>();
qw.eq(null != taskCode, QTask::getTaskCode, taskCode);
int result = qTaskMapper.delete(qw);
if (result > 0) {
scheduler.deleteJob(JobKey.jobKey(taskCode));
return true;
}
return false;
}
public boolean cancelTask(String taskCode) throws Exception {
LambdaUpdateWrapper<QTask> qw = new LambdaUpdateWrapper<>();
qw.eq(null != taskCode, QTask::getTaskCode, taskCode);
QTask qTask = qTaskMapper.selectOne(qw);
qTask.setTaskStatus(EQTask.CANCEL.getId());
int result = qTaskMapper.updateById(qTask);
if (result > 0) {
scheduler.deleteJob(JobKey.jobKey(taskCode));
return true;
}
return false;
}
@Override
public boolean resumeTask(String taskCode) throws Exception {
LambdaUpdateWrapper<QTask> qw = new LambdaUpdateWrapper<>();
qw.eq(null != taskCode, QTask::getTaskCode, taskCode);
QTask qTask = qTaskMapper.selectOne(qw);
qTask.setTaskStatus(EQTask.OK.getId());
int result = qTaskMapper.updateById(qTask);
if (result > 0) {
startTask(qTask);
return true;
}
return false;
}
public List<QTask> query(Wrapper<QTask> queryWrapper) {
return qTaskMapper.selectList(queryWrapper);
}
public void reloadAllTasks() throws Exception {
LambdaQueryWrapper<QTask> qw = new LambdaQueryWrapper<>();
qw.eq(QTask::getTaskStatus, EQTask.OK.getId());
List<QTask> tasks = qTaskMapper.selectList(qw);
for (QTask task : tasks) {
startTask(task);
}
log.info("load all task:" + tasks.size());
}
}
package com.lcz.myquartz.controller;
import com.alibaba.fastjson.JSONObject;
import com.lcz.myquartz.entity.QTask;
import com.lcz.myquartz.service.QTaskService;
import org.quartz.JobDataMap;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MyTaskDemoController {
@Autowired
private QTaskService qTaskService;
/**
* 创建任务
* @param tid
* @return
* @throws Exception
*/
@RequestMapping("/createTask/{tid}")
public String createTask(@PathVariable("tid") Integer tid) throws Exception {
System.out.println("==============");
QTask t = new QTask();
t.setJobClass("com.lcz.myquartz.task.MyQuartzDemo01");//任务的全类名
// t.setJobClass("com.lcz.myquartz.task.MyClusterQuartzDemo02");//任务的全类名
t.setTaskName("测试任务");
t.setCron("0,15,30,45 * * * * ?");//cron时间表达式
// t.setCron("0,15,30,45 * * * * ?;0,5,10 * * * * ?");//或者也可以设置多个cron表达式,以分号隔开
// t.setTaskStatus(EQTask.OK.getId());//(有默认可以不设置)状态,OK表示有效,CANCEL表示取消的任务
// t.setJobGroup("default");//(有默认可以不设置)job分组
t.setTaskCode("Demo" + tid);//taskCode
// t.setCreateTime(new Date());//(有默认可以不设置)创建时间
JobDataMap jobDataMap = new JobDataMap();//需要的一些参数
jobDataMap.put("id", System.currentTimeMillis());
jobDataMap.put("name", "xiaowang" + tid);
t.setParams(JSONObject.toJSONString(jobDataMap));//需要的一些参数
boolean flag = qTaskService.saveAndStartTask(t);//添加到数据库并开始任务
return "createTask,tid=" + tid + ",flag=" + flag;
}
/**
* 重新加载所有任务
* @return
* @throws Exception
*/
@RequestMapping("/reloadAllTask")
public String reloadAllTask() throws Exception {
qTaskService.reloadAllTasks();//从数据库重新加载所有任务
return "reloadAllTask";
}
/**
* 删除任务
* @return
* @throws Exception
*/
@RequestMapping("/delTask/{taskCode}")
public String delTask(@PathVariable("taskCode") String taskCode) throws Exception {
qTaskService.delTask(taskCode);//取消任务并冲数据库中删除
return "delTask," + taskCode;
}
/**
* 恢复任务(重新启动)
* @param taskCode
* @return
* @throws Exception
*/
@RequestMapping("/resumeTask/{taskCode}")
public String resumeTask(@PathVariable("taskCode") String taskCode) throws Exception {
qTaskService.resumeTask(taskCode);//恢复启动任务
return "delTask," + taskCode;
}
/**
* 取消任务
* @param taskCode
* @return
* @throws Exception
*/
@RequestMapping("/cancelTask/{taskCode}")
public String cancelTaskByTaskCode(@PathVariable("taskCode") String taskCode) throws Exception {
qTaskService.cancelTask(taskCode);//取消任务并修改其状态
return "cancelTask," + taskCode;
}
}
继承MyQuartzJobBean,实现自己的定时任务
package com.lcz.myquartz.task;
import com.lcz.myquartz.entity.MyQuartzState;
import com.lcz.myquartz.quartzjobbean.MyQuartzJobBean;
import lombok.extern.slf4j.Slf4j;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import java.util.concurrent.TimeUnit;
/**
* 必须继承 MyQuartzJobBean
*/
@Slf4j
public class MyQuartzDemo01 extends MyQuartzJobBean {
protected MyQuartzState executeJob(JobExecutionContext jobExecutionContext) throws JobExecutionException {
log.info("定时任务的内容... ");
//打印传入的定时任务参数
jobExecutionContext.getMergedJobDataMap().forEach((k,v)->{
System.out.println("key="+k+",value="+v);
});
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return MyQuartzState.OK;
}
}
添加并开始任务,注意taskCode需要保证唯一。
@Test
public void testAddAndStartTask() throws Exception {
QTask t=new QTask();
t.setJobClass("com.lcz.myquartz.task.MyQuartzDemo01");
t.setCron("0,15,30,45 * * * * ?");
t.setJobGroup("default");
t.setTaskName("testabc");
t.setTaskCode("taskCode-"+System.currentTimeMillis());
JobDataMap jobDataMap=new JobDataMap();
jobDataMap.put("id",123456);
jobDataMap.put("name","xiaowang");
t.setParams(JSONObject.toJSONString(jobDataMap));//需要发给定时任务的参数
qTaskService.saveAndStartTask(t);
TimeUnit.HOURS.sleep(1);
}
添加任务时设置参数
@Test
public void testAddAndStartTask() throws Exception {
QTask t=new QTask();
.....
JobDataMap jobDataMap=new JobDataMap();
jobDataMap.put("id",123456);
jobDataMap.put("name","xiaowang");
t.setParams(JSONObject.toJSONString(jobDataMap));//需要发给定时任务的参数
}
你的定时任务需要接收这些参数
@Slf4j
public class MyQuartzDemo01 extends MyQuartzJobBean {
protected MyQuartzState executeJob(JobExecutionContext jobExecutionContext) throws JobExecutionException {
//接收你传入的参数,按照响应的参数执行任务,比如游戏开服
jobExecutionContext.getMergedJobDataMap().forEach((k,v)->{
System.out.println("key="+k+",value="+v);
});
String name=jobExecutionContext.getMergedJobDataMap().getString("name");
System.out.println(name);
return MyQuartzState.OK;
}
}
/**
* 使用项目启动后,加载所有任务
*/
@Component
public class MyQuartzApplicationRunner implements ApplicationRunner {
@Autowired
private QTaskService qTaskService;
@Override
public void run(ApplicationArguments args) throws Exception {
qTaskService.reloadAllTasks();
}
}
基于redis实现的高可用。
任务只在一个节点执行,节点如果宕机或者不可用,则服务将无法使用。
利用多节点+redis的方式,当节点获取锁之后,即可执行任务。这样只要多节点之中只要有一个节点是活着的,则定时任务依然可用。

基于高可用模型的实现
package com.lcz.myquartz.quartzjobbean;
import com.lcz.myquartz.entity.MyQuartzState;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import java.time.Duration;
public abstract class MyClusterQuartzJobBean extends MyQuartzJobBean {
@Autowired
private RedisTemplate redisTemplate;
@Override
protected boolean before(JobExecutionContext jobExecutionContext) throws JobExecutionException {
String taskCode = jobExecutionContext.getJobDetail().getKey().getName();
String group = jobExecutionContext.getJobDetail().getKey().getGroup();
StringBuffer sb = new StringBuffer("MYQUARTZ-LOCK:");
sb.append(taskCode).append("-");
sb.append(group);
String value="LOCKED-"+jobExecutionContext.hashCode();
String key=sb.toString();
// System.out.println(value+"----before----");
Boolean lockFlag = redisTemplate.opsForValue().setIfAbsent(key, value, Duration.ofSeconds(60));//设置60秒
if (null != lockFlag && lockFlag.booleanValue()) {
// System.out.println("-----获取锁--------");
return true;
}
// System.out.println("-----未获取锁--------");
return false;
}
@Override
protected void complete(JobExecutionContext jobExecutionContext, MyQuartzState myQuartzState, Exception e) throws Exception {
String taskCode = jobExecutionContext.getJobDetail().getKey().getName();
String group = jobExecutionContext.getJobDetail().getKey().getGroup();
StringBuffer sb = new StringBuffer("MYQUARTZ-LOCK:");
sb.append(taskCode).append("-");
sb.append(group);
String value="LOCKED-"+jobExecutionContext.hashCode();
String key=sb.toString();
// System.out.println(value+"----complete----");
if(value.equals(redisTemplate.opsForValue().get(key))){//最大可能确保不删错key
//释放锁
redisTemplate.delete(key);
// System.out.println("-----释放锁--------");
}
}
}
自己定义的任务类需要继承MyClusterQuartzJobBean即可
package com.lcz.myquartz.task;
import com.lcz.myquartz.entity.MyQuartzState;
import com.lcz.myquartz.quartzjobbean.MyClusterQuartzJobBean;
import com.lcz.myquartz.quartzjobbean.MyQuartzJobBean;
import lombok.extern.slf4j.Slf4j;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import java.util.concurrent.TimeUnit;
/**
* 必须继承 MyClusterQuartzJobBean
*/
@Slf4j
public class MyClusterQuartzDemo02 extends MyClusterQuartzJobBean {
@Override
protected MyQuartzState executeJob(JobExecutionContext jobExecutionContext) throws JobExecutionException {
log.info("定时任务的内容... ");
jobExecutionContext.getMergedJobDataMap().forEach((k,v)->{
System.out.println("key="+k+",value="+v);
});
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return MyQuartzState.OK;
}
}
测试用例
@Test
public void testClusterAddAndStartTask() throws Exception {
QTask t=new QTask();
t.setJobClass("com.lcz.myquartz.task.MyClusterQuartzDemo02");
t.setCron("0,15,30,45 * * * * ?");
t.setJobGroup("default");
t.setTaskName("testabc");
t.setTaskCode("taskCode-"+System.currentTimeMillis());
JobDataMap jobDataMap=new JobDataMap();
jobDataMap.put("id",123456);
jobDataMap.put("name","xiaowang2222");
t.setParams(JSONObject.toJSONString(jobDataMap));
qTaskService.saveAndStartTask(t);
TimeUnit.HOURS.sleep(1);
如果单个cron不满足需求,通常可以传入多个cron时间,多个cron时间以分号隔开。
例如
@Test
public void testAddAndStartTask() throws Exception {
QTask t=new QTask();
t.setCron("0,15,30,45 * * * * ?;0,15,30,45 * * * * ?");
//......
}
通常我们需要使用到定时发布功能,定时上线一款商品,将Date转换Cron时间。
package com.lcz.myquartz.util;
import java.util.Calendar;
import java.util.Date;
/**
* 时间辅助类
*/
public class DateUtils {
/**
* 将日期转换为cron时间
*
* @param date
* @return
*/
public static String parseCron(Date date) {
Calendar calendar = Calendar.getInstance();
calendar.setTime(date);
StringBuffer sb = new StringBuffer();
sb.append(calendar.get(Calendar.SECOND)).append(" ");
sb.append(calendar.get(Calendar.MINUTE)).append(" ");
sb.append(calendar.get(Calendar.HOUR_OF_DAY)).append(" ");
sb.append(calendar.get(Calendar.DAY_OF_MONTH)).append(" ");
sb.append(calendar.get(Calendar.MONTH) + 1).append(" ");
sb.append("?").append(" ");
sb.append(calendar.get(Calendar.YEAR));
return sb.toString();
}
}
使用实例
Date date=new Date(System.currentTimeMillis()+5000);//需要发布的时间(示例5秒后运行)
QTask qTask=new QTask();
qTask.setCron(DateUtils.parseCron(date));
默认是OK(1),表示生效,CANCEL(0)表示任务无效
只有任务状态为OK时,系统才会加载任务。
注意:如果任务启动时,系统检查cron表达式不合理(无法解析或者时间已经过去),任务也不会被加载。
用户若是需要取消任务则可以设置状态为CANCEL
如果要恢复,则设置为OK




用户在填写cron表达式可能会出错,需要进行格式校验,也可能需要在填写cron表达式时之后,预显示任务的执行时间来检验是否符合需求。我们可以将cron表达式转换成时间。相关内容请见:
cron表达式转换成时间(Java)