springboot + Quartz 实现定时任务管理

springboot + Quartz 实现定时任务管理

以下是结合策略模式和Quartz实现灵活定时任务的完整示例,包含核心代码和执行流程:

一、场景说明

假设有两类定时任务需求:

定时同步用户数据(每天凌晨 1 点执行)。定时清理日志文件(每周日凌晨 2 点执行)。

使用策略模式将任务逻辑与调度逻辑解耦,新增任务时只需实现策略接口,无需修改调度核心代码。

二、核心组件设计

策略接口:定义任务执行标准。策略实现类:不同业务任务的具体逻辑。任务实体类:存储任务配置(需序列化,用于持久化)。Quartz Job 实现类:调度入口,动态调用策略实现类。任务管理器:负责创建 / 删除任务,将任务信息存入 Quartz 数据库。

三、代码实现

引入相关包

        <!-- 定时任务 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-quartz</artifactId>
        </dependency>
若需持久化增加配置 -以mysql数据库为例

若不持久化,则任务存在内存中,重启系统后,则任务就丢失了,需要重新创建相应任务


# 文件上传配置
spring:
  # 1. 数据源配置(Quartz 需连接数据库存储任务)
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/quartz_db?useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true&characterEncoding=utf8
    username: root  # 替换为你的数据库用户名
    password: 123456  # 替换为你的数据库密码
    hikari:
      maximum-pool-size: 10  # 连接池大小,建议与 Quartz 线程池匹配

  # 2. Quartz 核心配置(持久化关键)
  quartz:
    job-store-type: JDBC  # 启用数据库存储(必填,默认是MEMORY)
    jdbc:
      initialize-schema: NEVER  # 首次启动后改为NEVER,避免重复建表数据丢失
      schema: classpath:org/quartz/impl/jdbcjobstore/tables_mysql_innodb.sql  # 可选:指定表结构脚本(首次启动用)
    properties:
      org:
        quartz:
          # 调度器配置
          scheduler:
            instanceName: MyScheduler  # 调度器实例名(自定义)
            instanceId: AUTO  # 自动生成实例ID(集群环境确保唯一)
            makeSchedulerThreadDaemon: true  # 调度器线程设为守护线程

          # 线程池配置
          threadPool:
            class: org.quartz.simpl.SimpleThreadPool
            threadCount: 10  # 线程池大小(根据任务数量调整)
            threadPriority: 5  # 线程优先级(1-10,默认5)
            threadsInheritContextClassLoaderOfInitializingThread: true  # 线程继承上下文类加载器

          # 数据库存储配置(JDBCJobStore)
          jobStore:
            class: org.quartz.impl.jdbcjobstore.JobStoreTX  # 事务型存储(支持事务)
            driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate  # MySQL 适配(其他数据库需换对应Delegate)
            tablePrefix: QRTZ_  # 表前缀(默认即可,与初始化脚本一致)
            isClustered: false  # 非集群模式(集群部署设为true)
            clusterCheckinInterval: 15000  # 集群节点心跳间隔(毫秒,集群模式需配置)
            useProperties: false  # 是否将JobDataMap存储为字符串(false则存储为序列化对象)
            misfireThreshold: 60000  # 触发失火阈值(毫秒,超过此时间视为失火)

          # 触发器配置(可选,全局默认策略)
          triggers:
            cron:
              misfireInstruction: MISFIRE_INSTRUCTION_DO_NOTHING  # Cron触发器默认失火策略:错过不执行
            simple:
              misfireInstruction: MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_EXISTING_REPEAT_COUNT  # Simple触发器默认策略
1. 策略接口(TaskHandler.java)

/**
 * 任务执行策略接口
 */
public interface TaskHandler {
    // 执行任务的核心方法
    String execute(ScheduledTask task);
}
2. 策略实现类(具体业务逻辑)

/**
 * 策略1:同步用户数据
 */
@Component("userSyncHandler") // Bean名称用于动态调用
public class UserSyncHandler implements TaskHandler {
    @Override
    public String execute(ScheduledTask task) {
        // 实际业务逻辑:如从数据库同步用户到缓存
        String result = "用户数据同步完成:" + LocalDateTime.now();
        System.out.println(result);
        return result;
    }
}

/**
 * 策略2:清理日志文件
 */
@Component("logCleanHandler") // Bean名称用于动态调用
public class LogCleanHandler implements TaskHandler {
    @Override
    public String execute(ScheduledTask task) {
        // 实际业务逻辑:如删除30天前的日志
        String result = "日志清理完成:" + LocalDateTime.now();
        System.out.println(result);
        return result;
    }
}
3. 任务实体类(需序列化,ScheduledTask.java)

import lombok.Data;
import java.io.Serializable;

/**
 * 任务实体(持久化到Quartz数据库)
 */
@Data
public class ScheduledTask implements Serializable { // 必须序列化
    private Long id;
    private String taskName; // 任务名称
    private String cronExpression; // 定时表达式(如0 0 1 * * ?)
    private String handlerBeanName; // 策略实现类的Bean名称(如userSyncHandler)
    // 其他参数:如任务状态、描述等
}
4. Quartz Job 实现类(调度入口,DynamicJob.java)

import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.scheduling.quartz.QuartzJobBean;
import org.springframework.stereotype.Component;

import java.io.Serializable;

// 任务执行器:DynamicJob.java
// Job实现类(必须序列化,因为会被Quartz持久化)
// 作用于该类的所有实例
@DisallowConcurrentExecution  // 避免同一任务并行执行问题
@Component
public class DynamicJob extends QuartzJobBean implements ApplicationContextAware, Serializable {
    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        DynamicJob.applicationContext = applicationContext;
    }

    @Override
    protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
        ScheduledTask task = (ScheduledTask) context.getMergedJobDataMap().get("task");
        if (task == null) {
            throw new JobExecutionException("任务配置为空");
        }

        String handlerBeanName = task.getInvokeMethod();
        if (handlerBeanName == null) {
            throw new JobExecutionException("未配置策略Bean");
        }

        try {
            TaskHandler handler = applicationContext.getBean(handlerBeanName, TaskHandler.class);
            String result = handler.execute(task);
            System.out.println("任务[" + task.getId() + "]执行成功:" + result);
        } catch (Exception e) {
            System.err.println("任务[" + task.getId() + "]执行失败:" + e.getMessage());
            throw new JobExecutionException("任务执行异常", e);
        }
    }
}

5. 任务管理器(创建 / 删除任务,TaskManager.java)



import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.quartz.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.time.LocalDateTime;
import java.util.List;

/**
 * @ClassName QuartzSchedulerService
 * @Description 定时任务服务实现类
 **/
@Slf4j
@Transactional
@Service
public class QuartzSchedulerServiceImpl implements IQuartzSchedulerService {

    @Autowired
    private Scheduler scheduler;

    @Autowired
    private QrtzTriggersMapper qrtzTriggersMapper;

    @Override
    public void createOrUpdateTask(VerificationScheduledTaskInfo task) throws Exception {
        JobKey jobKey = getJobKey(task);
        TriggerKey triggerKey = getTriggerKey(task);
        JobDataMap jobDataMap = new JobDataMap();
        jobDataMap.put("task", task);
        // 构建JobDetail
        JobDetail jobDetail = JobBuilder.newJob(DynamicJob.class)
                .withIdentity(jobKey)
                .usingJobData(jobDataMap)
                .storeDurably()
                .build();

        // 构建Trigger(根据执行策略)
        Trigger trigger;
        if (task.getExecuteStrategy() == 2) { // 周期执行
            String cron = buildCron(task);
            trigger = TriggerBuilder.newTrigger()
                    .withIdentity(triggerKey)
                    // 设置执行cron并配置错过后不立即执行,等下次执行时期
                    .withSchedule(CronScheduleBuilder.cronSchedule(cron))
                    .startAt(DateTimeUtils.toDate(task.getStartTime()))
                    .build();
        } else if (task.getExecuteStrategy() == 1) { // 执行一次
            trigger = TriggerBuilder.newTrigger()
                    .withIdentity(triggerKey)
                    .startAt(DateTimeUtils.toDate(task.getStartTime()))
                    .build();
        } else { // 立即执行
            trigger = TriggerBuilder.newTrigger()
                    .withIdentity(triggerKey)
                    .startNow()
                    .build();
        }

        // 调度任务
        if (scheduler.checkExists(jobKey)) {
            if (isComplete(jobKey.getName())) {
                // rescheduleJob 只更新触发器,并不更新job的内容,故需要先删除再重建
//                scheduler.rescheduleJob(triggerKey, trigger);
                deleteTask(task);
            } else {
                throw new BusinessException("任务已被其他线程占用,请稍后重试");
            }
        }
        scheduler.scheduleJob(jobDetail, trigger);
    }

    // 构建Cron表达式(支持小时、天、月)
    private String buildCron(VerificationScheduledTaskInfo task) {
        LocalDateTime startTime = task.getStartTime();
        int hour = startTime.getHour();
        int minute = startTime.getMinute();
        int second = startTime.getSecond();
        int dayOfMonth = startTime.getDayOfMonth();

        switch (task.getPeriodUnit().toLowerCase()) {
            case "minute":
                // 分钟级:从 startTime 的 "时:分:秒" 开始,每 N 分钟执行一次
                // 例:startTime为 10:05:30,periodValue=5 → 30 05/5 10 * * ? → 每天10点05分30秒开始,每5分钟执行一次
                return String.format("%d %d/%d %d * * ?",
                        second, minute, task.getPeriodValue(), hour);
            case "hour":
                return String.format("%d %d %d/%d * * ?", second, minute, hour, task.getPeriodValue());
            case "day":
                return String.format("%d %d %d 1/%d * ?", second, minute, hour, task.getPeriodValue());
            case "month":
                return String.format("%d %d %d %d 1/%d ?", second, minute, hour, dayOfMonth, task.getPeriodValue());
            default:
                throw new IllegalArgumentException("不支持的周期单位:" + task.getPeriodUnit());
        }
    }


    /**
     * 暂停任务
     */
    @Override
    public void pauseTask(VerificationScheduledTaskInfo task) throws Exception {
        JobKey jobKey = getJobKey(task);
        if (isComplete(jobKey.getName())) {
            scheduler.pauseJob(jobKey);
            log.info("暂停任务:{}", jobKey.getName());
        } else {
            throw new BusinessException("任务已被其他线程占用,请稍后重试");
        }
    }

    /**
     * 恢复任务
     */
    @Override
    public void resumeTask(VerificationScheduledTaskInfo task) throws Exception {
        JobKey jobKey = getJobKey(task);
        if (isComplete(jobKey.getName())) {
            scheduler.resumeJob(jobKey);
            log.info("恢复任务:{}", jobKey.getName());
        } else {
            throw new BusinessException("任务已被其他线程占用,请稍后重试");
        }

    }

    /**
     * 删除任务
     */
    @Override
    public void deleteTask(VerificationScheduledTaskInfo task) throws Exception {
        JobKey jobKey = getJobKey(task);
        TriggerKey triggerKey = getTriggerKey(task);
        if (isComplete(jobKey.getName())) {
            scheduler.pauseTrigger(triggerKey);
            scheduler.unscheduleJob(triggerKey);
            scheduler.deleteJob(jobKey);
            log.info("删除任务:{}", jobKey.getName());
        } else {
            throw new BusinessException("任务已被其他线程占用,请稍后重试");
        }

    }

    /**
     * @MethodName isComplete
     * @Description //判断是否执行结束,而非正在进行中
     * @Author xgw
     * @Date 2025/11/5 16:09
     * @Param
     * @Return
     **/
    private Boolean isComplete(String jobName) {
        try {
            LambdaQueryWrapper<QrtzTriggers> queryWrapper = Wrappers.lambdaQuery();
            queryWrapper.eq(QrtzTriggers::getJobName, jobName);
            QrtzTriggers qrtzTrigger = qrtzTriggersMapper.selectOne(queryWrapper);
            return !"ACQUIRED".equals(qrtzTrigger.getTriggerState());
        } catch (Exception e) {
            log.error(e.getMessage(), e);
            return false;
        }
    }


    @NotNull
    private static TriggerKey getTriggerKey(VerificationScheduledTaskInfo task) {
        TriggerKey triggerKey = new TriggerKey("trigger_" + task.getId(), "dynamic_trigger_group");
        return triggerKey;
    }

    @NotNull
    private static JobKey getJobKey(VerificationScheduledTaskInfo task) {
        JobKey jobKey = new JobKey("task_" + task.getId(), "dynamic_task_group");
        return jobKey;
    }

}


6. 测试代码(创建任务)

import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

/**
 * 项目启动时创建测试任务
 */
@Component
public class TaskInitializer implements CommandLineRunner {
    @Autowired
    private TaskManager taskManager;

    @Override
    public void run(String... args) throws Exception {
        // 创建用户同步任务(每天凌晨1点执行)
        ScheduledTask userTask = new ScheduledTask();
        userTask.setId(1L);
        userTask.setTaskName("用户同步任务");
        userTask.setCronExpression("0 0 1 * * ?");
        userTask.setHandlerBeanName("userSyncHandler"); // 对应UserSyncHandler
        taskManager.createTask(userTask);

        // 创建日志清理任务(每周日凌晨2点执行)
        ScheduledTask logTask = new ScheduledTask();
        logTask.setId(2L);
        logTask.setTaskName("日志清理任务");
        logTask.setCronExpression("0 0 2 ? * SUN");
        logTask.setHandlerBeanName("logCleanHandler"); // 对应LogCleanHandler
        taskManager.createTask(logTask);
    }
}
三、持久化验证

1、数据库表生成
启动项目后,Quartz 会自动创建 QRTZ_JOB_DETAILS、QRTZ_TRIGGERS 等表。
2、任务信息存储
调用 createTask 后,任务信息会被序列化并存入以下表:

QRTZ_JOB_DETAILS:存储 JobDetail 信息(含 ScheduledTask 实体的序列化数据)。QRTZ_CRON_TRIGGERS:存储 Cron 表达式和触发器配置。

3、重启恢复验证

重启项目后,Quartz 会从数据库读取任务信息,自动恢复调度。观察日志,任务会按 cronExpression 定时执行,无需重新创建。

四、关键说明

1、序列化要求

DynamicJob(Job 实现类)和 ScheduledTask(任务实体)必须实现 Serializable,否则无法持久化到数据库。策略实现类(如 UserSyncHandler)无需序列化,因为它们是 Spring 管理的 Bean,通过 BeanName 动态获取。

2、扩展性:新增任务时,只需:

实现 TaskHandler 接口(如 OrderSyncHandler)。创建 ScheduledTask 实体,指定 handlerBeanName 为新实现类的 Bean 名称。调用 taskManager.createTask() 即可,无需修改调度核心代码。

3、高可用:配合数据库主从备份,可确保任务信息不丢失,满足生产环境需求。
通过以上实现,策略模式的灵活性与 Quartz 的持久化能力相结合,既支持动态扩展任务类型,又能保证任务在重启后正常执行。

© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
子羽長安的头像 - 鹿快
评论 抢沙发

请登录后发表评论

    暂无评论内容