springboot + Quartz 实现定时任务管理
以下是结合策略模式和Quartz实现灵活定时任务的完整示例,包含核心代码和执行流程:
一、场景说明
假设有两类定时任务需求:
定时同步用户数据(每天凌晨 1 点执行)。定时清理日志文件(每周日凌晨 2 点执行)。
使用策略模式将任务逻辑与调度逻辑解耦,新增任务时只需实现策略接口,无需修改调度核心代码。
二、核心组件设计
策略接口:定义任务执行标准。策略实现类:不同业务任务的具体逻辑。任务实体类:存储任务配置(需序列化,用于持久化)。Quartz Job 实现类:调度入口,动态调用策略实现类。任务管理器:负责创建 / 删除任务,将任务信息存入 Quartz 数据库。
三、代码实现
引入相关包
<!-- 定时任务 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
若需持久化增加配置 -以mysql数据库为例
若不持久化,则任务存在内存中,重启系统后,则任务就丢失了,需要重新创建相应任务
# 文件上传配置
spring:
# 1. 数据源配置(Quartz 需连接数据库存储任务)
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/quartz_db?useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true&characterEncoding=utf8
username: root # 替换为你的数据库用户名
password: 123456 # 替换为你的数据库密码
hikari:
maximum-pool-size: 10 # 连接池大小,建议与 Quartz 线程池匹配
# 2. Quartz 核心配置(持久化关键)
quartz:
job-store-type: JDBC # 启用数据库存储(必填,默认是MEMORY)
jdbc:
initialize-schema: NEVER # 首次启动后改为NEVER,避免重复建表数据丢失
schema: classpath:org/quartz/impl/jdbcjobstore/tables_mysql_innodb.sql # 可选:指定表结构脚本(首次启动用)
properties:
org:
quartz:
# 调度器配置
scheduler:
instanceName: MyScheduler # 调度器实例名(自定义)
instanceId: AUTO # 自动生成实例ID(集群环境确保唯一)
makeSchedulerThreadDaemon: true # 调度器线程设为守护线程
# 线程池配置
threadPool:
class: org.quartz.simpl.SimpleThreadPool
threadCount: 10 # 线程池大小(根据任务数量调整)
threadPriority: 5 # 线程优先级(1-10,默认5)
threadsInheritContextClassLoaderOfInitializingThread: true # 线程继承上下文类加载器
# 数据库存储配置(JDBCJobStore)
jobStore:
class: org.quartz.impl.jdbcjobstore.JobStoreTX # 事务型存储(支持事务)
driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate # MySQL 适配(其他数据库需换对应Delegate)
tablePrefix: QRTZ_ # 表前缀(默认即可,与初始化脚本一致)
isClustered: false # 非集群模式(集群部署设为true)
clusterCheckinInterval: 15000 # 集群节点心跳间隔(毫秒,集群模式需配置)
useProperties: false # 是否将JobDataMap存储为字符串(false则存储为序列化对象)
misfireThreshold: 60000 # 触发失火阈值(毫秒,超过此时间视为失火)
# 触发器配置(可选,全局默认策略)
triggers:
cron:
misfireInstruction: MISFIRE_INSTRUCTION_DO_NOTHING # Cron触发器默认失火策略:错过不执行
simple:
misfireInstruction: MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_EXISTING_REPEAT_COUNT # Simple触发器默认策略
1. 策略接口(TaskHandler.java)
/**
* 任务执行策略接口
*/
public interface TaskHandler {
// 执行任务的核心方法
String execute(ScheduledTask task);
}
2. 策略实现类(具体业务逻辑)
/**
* 策略1:同步用户数据
*/
@Component("userSyncHandler") // Bean名称用于动态调用
public class UserSyncHandler implements TaskHandler {
@Override
public String execute(ScheduledTask task) {
// 实际业务逻辑:如从数据库同步用户到缓存
String result = "用户数据同步完成:" + LocalDateTime.now();
System.out.println(result);
return result;
}
}
/**
* 策略2:清理日志文件
*/
@Component("logCleanHandler") // Bean名称用于动态调用
public class LogCleanHandler implements TaskHandler {
@Override
public String execute(ScheduledTask task) {
// 实际业务逻辑:如删除30天前的日志
String result = "日志清理完成:" + LocalDateTime.now();
System.out.println(result);
return result;
}
}
3. 任务实体类(需序列化,ScheduledTask.java)
import lombok.Data;
import java.io.Serializable;
/**
* 任务实体(持久化到Quartz数据库)
*/
@Data
public class ScheduledTask implements Serializable { // 必须序列化
private Long id;
private String taskName; // 任务名称
private String cronExpression; // 定时表达式(如0 0 1 * * ?)
private String handlerBeanName; // 策略实现类的Bean名称(如userSyncHandler)
// 其他参数:如任务状态、描述等
}
4. Quartz Job 实现类(调度入口,DynamicJob.java)
import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.scheduling.quartz.QuartzJobBean;
import org.springframework.stereotype.Component;
import java.io.Serializable;
// 任务执行器:DynamicJob.java
// Job实现类(必须序列化,因为会被Quartz持久化)
// 作用于该类的所有实例
@DisallowConcurrentExecution // 避免同一任务并行执行问题
@Component
public class DynamicJob extends QuartzJobBean implements ApplicationContextAware, Serializable {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
DynamicJob.applicationContext = applicationContext;
}
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
ScheduledTask task = (ScheduledTask) context.getMergedJobDataMap().get("task");
if (task == null) {
throw new JobExecutionException("任务配置为空");
}
String handlerBeanName = task.getInvokeMethod();
if (handlerBeanName == null) {
throw new JobExecutionException("未配置策略Bean");
}
try {
TaskHandler handler = applicationContext.getBean(handlerBeanName, TaskHandler.class);
String result = handler.execute(task);
System.out.println("任务[" + task.getId() + "]执行成功:" + result);
} catch (Exception e) {
System.err.println("任务[" + task.getId() + "]执行失败:" + e.getMessage());
throw new JobExecutionException("任务执行异常", e);
}
}
}
5. 任务管理器(创建 / 删除任务,TaskManager.java)
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.quartz.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.List;
/**
* @ClassName QuartzSchedulerService
* @Description 定时任务服务实现类
**/
@Slf4j
@Transactional
@Service
public class QuartzSchedulerServiceImpl implements IQuartzSchedulerService {
@Autowired
private Scheduler scheduler;
@Autowired
private QrtzTriggersMapper qrtzTriggersMapper;
@Override
public void createOrUpdateTask(VerificationScheduledTaskInfo task) throws Exception {
JobKey jobKey = getJobKey(task);
TriggerKey triggerKey = getTriggerKey(task);
JobDataMap jobDataMap = new JobDataMap();
jobDataMap.put("task", task);
// 构建JobDetail
JobDetail jobDetail = JobBuilder.newJob(DynamicJob.class)
.withIdentity(jobKey)
.usingJobData(jobDataMap)
.storeDurably()
.build();
// 构建Trigger(根据执行策略)
Trigger trigger;
if (task.getExecuteStrategy() == 2) { // 周期执行
String cron = buildCron(task);
trigger = TriggerBuilder.newTrigger()
.withIdentity(triggerKey)
// 设置执行cron并配置错过后不立即执行,等下次执行时期
.withSchedule(CronScheduleBuilder.cronSchedule(cron))
.startAt(DateTimeUtils.toDate(task.getStartTime()))
.build();
} else if (task.getExecuteStrategy() == 1) { // 执行一次
trigger = TriggerBuilder.newTrigger()
.withIdentity(triggerKey)
.startAt(DateTimeUtils.toDate(task.getStartTime()))
.build();
} else { // 立即执行
trigger = TriggerBuilder.newTrigger()
.withIdentity(triggerKey)
.startNow()
.build();
}
// 调度任务
if (scheduler.checkExists(jobKey)) {
if (isComplete(jobKey.getName())) {
// rescheduleJob 只更新触发器,并不更新job的内容,故需要先删除再重建
// scheduler.rescheduleJob(triggerKey, trigger);
deleteTask(task);
} else {
throw new BusinessException("任务已被其他线程占用,请稍后重试");
}
}
scheduler.scheduleJob(jobDetail, trigger);
}
// 构建Cron表达式(支持小时、天、月)
private String buildCron(VerificationScheduledTaskInfo task) {
LocalDateTime startTime = task.getStartTime();
int hour = startTime.getHour();
int minute = startTime.getMinute();
int second = startTime.getSecond();
int dayOfMonth = startTime.getDayOfMonth();
switch (task.getPeriodUnit().toLowerCase()) {
case "minute":
// 分钟级:从 startTime 的 "时:分:秒" 开始,每 N 分钟执行一次
// 例:startTime为 10:05:30,periodValue=5 → 30 05/5 10 * * ? → 每天10点05分30秒开始,每5分钟执行一次
return String.format("%d %d/%d %d * * ?",
second, minute, task.getPeriodValue(), hour);
case "hour":
return String.format("%d %d %d/%d * * ?", second, minute, hour, task.getPeriodValue());
case "day":
return String.format("%d %d %d 1/%d * ?", second, minute, hour, task.getPeriodValue());
case "month":
return String.format("%d %d %d %d 1/%d ?", second, minute, hour, dayOfMonth, task.getPeriodValue());
default:
throw new IllegalArgumentException("不支持的周期单位:" + task.getPeriodUnit());
}
}
/**
* 暂停任务
*/
@Override
public void pauseTask(VerificationScheduledTaskInfo task) throws Exception {
JobKey jobKey = getJobKey(task);
if (isComplete(jobKey.getName())) {
scheduler.pauseJob(jobKey);
log.info("暂停任务:{}", jobKey.getName());
} else {
throw new BusinessException("任务已被其他线程占用,请稍后重试");
}
}
/**
* 恢复任务
*/
@Override
public void resumeTask(VerificationScheduledTaskInfo task) throws Exception {
JobKey jobKey = getJobKey(task);
if (isComplete(jobKey.getName())) {
scheduler.resumeJob(jobKey);
log.info("恢复任务:{}", jobKey.getName());
} else {
throw new BusinessException("任务已被其他线程占用,请稍后重试");
}
}
/**
* 删除任务
*/
@Override
public void deleteTask(VerificationScheduledTaskInfo task) throws Exception {
JobKey jobKey = getJobKey(task);
TriggerKey triggerKey = getTriggerKey(task);
if (isComplete(jobKey.getName())) {
scheduler.pauseTrigger(triggerKey);
scheduler.unscheduleJob(triggerKey);
scheduler.deleteJob(jobKey);
log.info("删除任务:{}", jobKey.getName());
} else {
throw new BusinessException("任务已被其他线程占用,请稍后重试");
}
}
/**
* @MethodName isComplete
* @Description //判断是否执行结束,而非正在进行中
* @Author xgw
* @Date 2025/11/5 16:09
* @Param
* @Return
**/
private Boolean isComplete(String jobName) {
try {
LambdaQueryWrapper<QrtzTriggers> queryWrapper = Wrappers.lambdaQuery();
queryWrapper.eq(QrtzTriggers::getJobName, jobName);
QrtzTriggers qrtzTrigger = qrtzTriggersMapper.selectOne(queryWrapper);
return !"ACQUIRED".equals(qrtzTrigger.getTriggerState());
} catch (Exception e) {
log.error(e.getMessage(), e);
return false;
}
}
@NotNull
private static TriggerKey getTriggerKey(VerificationScheduledTaskInfo task) {
TriggerKey triggerKey = new TriggerKey("trigger_" + task.getId(), "dynamic_trigger_group");
return triggerKey;
}
@NotNull
private static JobKey getJobKey(VerificationScheduledTaskInfo task) {
JobKey jobKey = new JobKey("task_" + task.getId(), "dynamic_task_group");
return jobKey;
}
}
6. 测试代码(创建任务)
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
/**
* 项目启动时创建测试任务
*/
@Component
public class TaskInitializer implements CommandLineRunner {
@Autowired
private TaskManager taskManager;
@Override
public void run(String... args) throws Exception {
// 创建用户同步任务(每天凌晨1点执行)
ScheduledTask userTask = new ScheduledTask();
userTask.setId(1L);
userTask.setTaskName("用户同步任务");
userTask.setCronExpression("0 0 1 * * ?");
userTask.setHandlerBeanName("userSyncHandler"); // 对应UserSyncHandler
taskManager.createTask(userTask);
// 创建日志清理任务(每周日凌晨2点执行)
ScheduledTask logTask = new ScheduledTask();
logTask.setId(2L);
logTask.setTaskName("日志清理任务");
logTask.setCronExpression("0 0 2 ? * SUN");
logTask.setHandlerBeanName("logCleanHandler"); // 对应LogCleanHandler
taskManager.createTask(logTask);
}
}
三、持久化验证
1、数据库表生成:
启动项目后,Quartz 会自动创建 QRTZ_JOB_DETAILS、QRTZ_TRIGGERS 等表。
2、任务信息存储:
调用 createTask 后,任务信息会被序列化并存入以下表:
QRTZ_JOB_DETAILS:存储 JobDetail 信息(含 ScheduledTask 实体的序列化数据)。QRTZ_CRON_TRIGGERS:存储 Cron 表达式和触发器配置。
3、重启恢复验证:
重启项目后,Quartz 会从数据库读取任务信息,自动恢复调度。观察日志,任务会按 cronExpression 定时执行,无需重新创建。
四、关键说明
1、序列化要求:
DynamicJob(Job 实现类)和 ScheduledTask(任务实体)必须实现 Serializable,否则无法持久化到数据库。策略实现类(如 UserSyncHandler)无需序列化,因为它们是 Spring 管理的 Bean,通过 BeanName 动态获取。
2、扩展性:新增任务时,只需:
实现 TaskHandler 接口(如 OrderSyncHandler)。创建 ScheduledTask 实体,指定 handlerBeanName 为新实现类的 Bean 名称。调用 taskManager.createTask() 即可,无需修改调度核心代码。
3、高可用:配合数据库主从备份,可确保任务信息不丢失,满足生产环境需求。
通过以上实现,策略模式的灵活性与 Quartz 的持久化能力相结合,既支持动态扩展任务类型,又能保证任务在重启后正常执行。
















暂无评论内容