第二十一篇:定时任务调度:Cron表达式与时区处理

内容分享1小时前发布
0 0 0

📋 本文概览

学习目标:

深入理解Cron表达式的语法和高级用法掌握APScheduler的核心概念和最佳实践学会处理复杂的时区转换问题实现完整的调度历史记录系统掌握错过执行的补偿机制构建高可用的分布式调度系统学习调度性能优化技巧

技术栈:

APScheduler(任务调度框架)Croniter(Cron表达式解析)pytz(时区处理)PostgreSQL(调度记录存储)Redis(分布式锁)Celery(任务执行)FastAPI(调度管理API)

预计阅读时间: 90分钟

前置知识要求:

熟悉Python异步编程了解工作流执行引擎(参考第4-5篇)理解Celery任务队列(参考第5篇)掌握时区和时间处理概念了解分布式系统基础


🎯 业务场景

为什么需要定时任务调度?

在企业级工作流自动化平台中,定时任务调度是核心功能之一。它解决了以下关键问题:

场景1:周期性业务流程



需求:
- 每天凌晨2点生成销售报表
- 每周一早上9点发送周报邮件
- 每月1号进行数据归档
- 每季度末进行财务结算
- 每年12月31日进行年度总结
 
挑战:
- 如何精确控制执行时间?
- 如何处理不同时区的用户?
- 如何避免重复执行?
- 如何处理执行失败?
- 如何补偿错过的执行?
 
解决方案:
- Cron表达式精确定义时间
- 时区感知的调度系统
- 分布式锁防重复
- 失败重试机制
- 智能补偿算法

场景2:复杂时间规则



需求:
- 工作日每2小时执行一次
- 每月最后一天执行
- 每季度第一个周一执行
- 节假日跳过执行
- 夏令时自动调整
 
挑战:
- Cron表达式无法表达所有规则
- 节假日判断复杂
- 夏令时切换问题
- 闰年处理
- 月末日期不固定
 
解决方案:
- 扩展Cron表达式语法
- 集成节假日日历
- 时区库自动处理DST
- 智能日期计算
- 自定义调度规则

场景3:全球化部署



需求:
- 美国用户:每天早上9点(EST)
- 中国用户:每天早上9点(CST)
- 欧洲用户:每天早上9点(CET)
- 同一工作流,不同时区执行
 
挑战:
- 如何存储时区信息?
- 如何展示本地时间?
- 如何处理时区转换?
- 如何处理夏令时?
- 如何避免时区混淆?
 
解决方案:
- 数据库存储UTC时间
- 用户时区配置
- 自动时区转换
- pytz处理DST
- 明确的时区标识

场景4:高可用调度



需求:
- 多实例部署
- 避免重复执行
- 故障自动切换
- 执行状态追踪
- 性能监控
 
挑战:
- 如何防止多实例重复调度?
- 如何实现故障转移?
- 如何追踪执行历史?
- 如何监控调度性能?
- 如何处理长时间运行的任务?
 
解决方案:
- Redis分布式锁
- 心跳检测
- 数据库记录历史
- Prometheus监控
- 任务超时控制

业界解决方案对比

方案 优势 劣势 适用场景
Linux Cron 简单、稳定、资源占用少 功能有限、无GUI、难以管理 单机简单任务
APScheduler 功能丰富、易集成、支持多种触发器 单机为主、需自行实现分布式 Python应用内调度
Celery Beat 与Celery深度集成、分布式友好 配置复杂、依赖多 Celery生态
Airflow 强大的DAG、丰富的UI、企业级 重量级、学习曲线陡 复杂数据流水线
Quartz 成熟稳定、企业级、Java生态 Java专属、配置繁琐 Java企业应用
云服务 免运维、高可用、弹性扩展 成本高、厂商锁定 云原生应用

QuantumFlow采用APScheduler + Redis的混合方案:

核心调度:APScheduler(灵活、易用)分布式锁:Redis(防重复)任务执行:Celery(异步、分布式)历史记录:PostgreSQL(持久化)监控告警:Prometheus + Grafana


🏗️ 架构设计

整体架构图



graph TB
    subgraph "调度层"
        SCHEDULER[APScheduler调度器]
        CRON_PARSER[Cron表达式解析器]
        TZ_CONVERTER[时区转换器]
        TRIGGER_MGR[触发器管理器]
    end
    
    subgraph "执行层"
        CELERY[Celery任务队列]
        WORKER1[Worker 1]
        WORKER2[Worker 2]
        WORKER3[Worker 3]
    end
    
    subgraph "存储层"
        subgraph "PostgreSQL"
            SCHEDULE_TABLE[(调度配置表)]
            HISTORY_TABLE[(执行历史表)]
            MISSED_TABLE[(错过执行表)]
        end
        
        REDIS[(Redis<br/>分布式锁)]
    end
    
    subgraph "监控层"
        PROMETHEUS[Prometheus]
        GRAFANA[Grafana]
        ALERTMANAGER[AlertManager]
    end
    
    subgraph "API层"
        SCHEDULE_API[调度管理API]
        HISTORY_API[历史查询API]
        MONITOR_API[监控API]
    end
    
    SCHEDULER --> CRON_PARSER
    SCHEDULER --> TZ_CONVERTER
    SCHEDULER --> TRIGGER_MGR
    
    TRIGGER_MGR --> REDIS
    TRIGGER_MGR --> CELERY
    
    CELERY --> WORKER1
    CELERY --> WORKER2
    CELERY --> WORKER3
    
    SCHEDULER --> SCHEDULE_TABLE
    WORKER1 --> HISTORY_TABLE
    WORKER2 --> HISTORY_TABLE
    WORKER3 --> HISTORY_TABLE
    
    SCHEDULER --> MISSED_TABLE
    
    SCHEDULER --> PROMETHEUS
    PROMETHEUS --> GRAFANA
    PROMETHEUS --> ALERTMANAGER
    
    SCHEDULE_API --> SCHEDULER
    HISTORY_API --> HISTORY_TABLE
    MONITOR_API --> PROMETHEUS
    
    style SCHEDULER fill:#3B82F6
    style CRON_PARSER fill:#10B981
    style TZ_CONVERTER fill:#F59E0B
    style REDIS fill:#EF4444

核心模块说明

1. APScheduler调度器

管理所有调度任务触发器执行任务生命周期管理故障恢复

2. Cron表达式解析器

标准Cron语法支持扩展语法(L/W/#)表达式验证下次执行时间计算

3. 时区转换器

UTC与本地时区转换夏令时自动处理时区数据库更新时区验证

4. 触发器管理器

Cron触发器Interval触发器Date触发器自定义触发器

5. 分布式锁

Redis实现防止重复执行自动续期死锁检测

数据流图



sequenceDiagram
    participant User as 用户
    participant API as 调度API
    participant Scheduler as APScheduler
    participant Parser as Cron解析器
    participant TZ as 时区转换器
    participant Redis as Redis锁
    participant Celery as Celery
    participant DB as 数据库
    
    User->>API: 创建定时任务
    API->>Parser: 解析Cron表达式
    Parser->>Parser: 验证语法
    Parser-->>API: 返回解析结果
    
    API->>TZ: 转换用户时区到UTC
    TZ-->>API: 返回UTC时间
    
    API->>DB: 保存调度配置
    API->>Scheduler: 添加调度任务
    Scheduler->>Scheduler: 注册触发器
    Scheduler-->>API: 返回成功
    API-->>User: 创建成功
    
    Note over Scheduler: 等待触发时间
    
    Scheduler->>Scheduler: 触发时间到达
    Scheduler->>Redis: 尝试获取锁
    
    alt 获取锁成功
        Redis-->>Scheduler: 锁获取成功
        Scheduler->>DB: 记录执行历史(PENDING)
        Scheduler->>Celery: 提交任务到队列
        Celery->>Celery: 执行工作流
        
        alt 执行成功
            Celery->>DB: 更新历史(SUCCESS)
            Celery->>Redis: 释放锁
        else 执行失败
            Celery->>DB: 更新历史(FAILED)
            Celery->>Scheduler: 触发重试(如果配置)
            Celery->>Redis: 释放锁
        end
    else 获取锁失败
        Redis-->>Scheduler: 锁已被占用
        Scheduler->>DB: 记录跳过执行
    end

💻 代码实现

1. 调度配置数据模型



# models/schedule.py
from sqlalchemy import Column, String, Integer, DateTime, Boolean, JSON, ForeignKey, Enum as SQLEnum, Text
from sqlalchemy.orm import relationship
from datetime import datetime
from enum import Enum
from database import Base
import uuid
 
class ScheduleStatus(str, Enum):
    """调度状态枚举"""
    ACTIVE = "active"  # 激活
    PAUSED = "paused"  # 暂停
    DISABLED = "disabled"  # 禁用
    ERROR = "error"  # 错误
 
class TriggerType(str, Enum):
    """触发器类型枚举"""
    CRON = "cron"  # Cron表达式
    INTERVAL = "interval"  # 间隔时间
    DATE = "date"  # 指定日期
 
class MisfirePolicy(str, Enum):
    """错过执行策略枚举"""
    FIRE_ONCE = "fire_once"  # 立即执行一次
    FIRE_ALL = "fire_all"  # 执行所有错过的
    SKIP = "skip"  # 跳过
    FIRE_LATEST = "fire_latest"  # 执行最新的
 
class WorkflowSchedule(Base):
    """
    工作流调度配置模型
    
    存储工作流的定时调度配置,支持Cron、Interval、Date三种触发器
    """
    __tablename__ = "workflow_schedules"
    
    # 基本信息
    id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
    workflow_id = Column(String(36), ForeignKey("workflows.id"), nullable=False, index=True)
    tenant_id = Column(String(36), ForeignKey("tenants.id"), nullable=False, index=True)
    
    # 调度配置
    name = Column(String(200), nullable=False)  # 调度名称
    description = Column(Text, nullable=True)  # 描述
    status = Column(SQLEnum(ScheduleStatus), default=ScheduleStatus.ACTIVE, nullable=False, index=True)
    
    # 触发器配置
    trigger_type = Column(SQLEnum(TriggerType), nullable=False)
    
    # Cron触发器配置
    cron_expression = Column(String(100), nullable=True)  # Cron表达式
    """
    Cron表达式格式:
    秒 分 时 日 月 周 年(可选)
    
    示例:
    - "0 0 2 * * ?" - 每天凌晨2点
    - "0 0/30 * * * ?" - 每30分钟
    - "0 0 9 ? * MON-FRI" - 工作日早上9点
    - "0 0 0 L * ?" - 每月最后一天
    """
    
    # Interval触发器配置
    interval_value = Column(Integer, nullable=True)  # 间隔值
    interval_unit = Column(String(20), nullable=True)  # 单位:seconds/minutes/hours/days/weeks
    
    # Date触发器配置
    run_date = Column(DateTime, nullable=True)  # 指定执行日期
    
    # 时区配置
    timezone = Column(String(50), default="UTC", nullable=False)
    """
    时区标识符(IANA时区数据库)
    
    示例:
    - "UTC" - 协调世界时
    - "America/New_York" - 美国东部时间
    - "Asia/Shanghai" - 中国标准时间
    - "Europe/London" - 英国时间
    """
    
    # 执行配置
    max_instances = Column(Integer, default=1)  # 最大并发实例数
    misfire_grace_time = Column(Integer, default=60)  # 错过执行宽限时间(秒)
    misfire_policy = Column(SQLEnum(MisfirePolicy), default=MisfirePolicy.FIRE_ONCE)
    
    # 重试配置
    retry_enabled = Column(Boolean, default=False)
    max_retries = Column(Integer, default=3)
    retry_delay = Column(Integer, default=60)  # 重试延迟(秒)
    
    # 执行窗口
    start_date = Column(DateTime, nullable=True)  # 开始日期
    end_date = Column(DateTime, nullable=True)  # 结束日期
    
    # 元数据
    created_by = Column(String(36), ForeignKey("users.id"), nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    
    # 统计信息
    total_runs = Column(Integer, default=0)  # 总执行次数
    successful_runs = Column(Integer, default=0)  # 成功次数
    failed_runs = Column(Integer, default=0)  # 失败次数
    last_run_at = Column(DateTime, nullable=True)  # 最后执行时间
    next_run_at = Column(DateTime, nullable=True, index=True)  # 下次执行时间
    
    # APScheduler内部ID
    apscheduler_job_id = Column(String(100), nullable=True, unique=True)
    
    # 额外配置(JSON)
    extra_config = Column(JSON, nullable=True)
    """
    额外配置示例:
    {
        "holidays_skip": true,  # 跳过节假日
        "business_days_only": true,  # 仅工作日
        "notification_enabled": true,  # 启用通知
        "notification_emails": ["admin@example.com"],
        "custom_params": {...}  # 自定义参数
    }
    """
    
    # 关系
    workflow = relationship("Workflow", back_populates="schedules")
    created_by_user = relationship("User")
    execution_history = relationship("ScheduleExecution", back_populates="schedule")
    
    def __repr__(self):
        return f"<WorkflowSchedule {self.name} ({self.trigger_type})>"
    
    def to_dict(self):
        """转换为字典"""
        return {
            "id": self.id,
            "workflow_id": self.workflow_id,
            "name": self.name,
            "description": self.description,
            "status": self.status.value,
            "trigger_type": self.trigger_type.value,
            "cron_expression": self.cron_expression,
            "interval_value": self.interval_value,
            "interval_unit": self.interval_unit,
            "run_date": self.run_date.isoformat() if self.run_date else None,
            "timezone": self.timezone,
            "misfire_policy": self.misfire_policy.value,
            "retry_enabled": self.retry_enabled,
            "max_retries": self.max_retries,
            "total_runs": self.total_runs,
            "successful_runs": self.successful_runs,
            "failed_runs": self.failed_runs,
            "last_run_at": self.last_run_at.isoformat() if self.last_run_at else None,
            "next_run_at": self.next_run_at.isoformat() if self.next_run_at else None,
            "created_at": self.created_at.isoformat()
        }
 
 
class ExecutionStatus(str, Enum):
    """执行状态枚举"""
    PENDING = "pending"  # 等待执行
    RUNNING = "running"  # 执行中
    SUCCESS = "success"  # 成功
    FAILED = "failed"  # 失败
    SKIPPED = "skipped"  # 跳过
    TIMEOUT = "timeout"  # 超时
    CANCELLED = "cancelled"  # 取消
 
class ScheduleExecution(Base):
    """
    调度执行历史模型
    
    记录每次调度任务的执行历史,用于追踪、审计和分析
    """
    __tablename__ = "schedule_executions"
    
    id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
    schedule_id = Column(String(36), ForeignKey("workflow_schedules.id"), nullable=False, index=True)
    workflow_id = Column(String(36), ForeignKey("workflows.id"), nullable=False, index=True)
    tenant_id = Column(String(36), ForeignKey("tenants.id"), nullable=False, index=True)
    
    # 执行信息
    status = Column(SQLEnum(ExecutionStatus), default=ExecutionStatus.PENDING, nullable=False, index=True)
    scheduled_time = Column(DateTime, nullable=False, index=True)  # 计划执行时间
    actual_start_time = Column(DateTime, nullable=True)  # 实际开始时间
    actual_end_time = Column(DateTime, nullable=True)  # 实际结束时间
    duration = Column(Integer, nullable=True)  # 执行时长(秒)
    
    # 执行结果
    workflow_execution_id = Column(String(36), nullable=True)  # 工作流执行ID
    result = Column(JSON, nullable=True)  # 执行结果
    error_message = Column(Text, nullable=True)  # 错误信息
    error_traceback = Column(Text, nullable=True)  # 错误堆栈
    
    # 重试信息
    retry_count = Column(Integer, default=0)  # 重试次数
    is_retry = Column(Boolean, default=False)  # 是否是重试
    parent_execution_id = Column(String(36), ForeignKey("schedule_executions.id"), nullable=True)
    
    # 错过执行信息
    is_misfire = Column(Boolean, default=False)  # 是否是错过执行
    misfire_delay = Column(Integer, nullable=True)  # 错过时长(秒)
    
    # 执行环境
    hostname = Column(String(100), nullable=True)  # 执行主机
    worker_id = Column(String(100), nullable=True)  # Worker ID
    
    # 元数据
    created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
    
    # 关系
    schedule = relationship("WorkflowSchedule", back_populates="execution_history")
    retry_executions = relationship("ScheduleExecution", backref="parent_execution", remote_side=[id])
    
    def __repr__(self):
        return f"<ScheduleExecution {self.id} ({self.status})>"
    
    def to_dict(self):
        """转换为字典"""
        return {
            "id": self.id,
            "schedule_id": self.schedule_id,
            "workflow_id": self.workflow_id,
            "status": self.status.value,
            "scheduled_time": self.scheduled_time.isoformat(),
            "actual_start_time": self.actual_start_time.isoformat() if self.actual_start_time else None,
            "actual_end_time": self.actual_end_time.isoformat() if self.actual_end_time else None,
            "duration": self.duration,
            "retry_count": self.retry_count,
            "is_misfire": self.is_misfire,
            "misfire_delay": self.misfire_delay,
            "error_message": self.error_message
        }
 
 
# 更新Workflow模型
class Workflow(Base):
    """工作流模型(添加调度支持)"""
    __tablename__ = "workflows"
    
    # ... 原有字段
    
    # 调度相关字段
    is_scheduled = Column(Boolean, default=False)  # 是否启用调度
    active_schedule_count = Column(Integer, default=0)  # 激活的调度数量
    
    # 关系
    schedules = relationship("WorkflowSchedule", back_populates="workflow")

2. Cron表达式解析器



# services/cron_parser.py
from typing import Optional, List, Dict, Tuple
from datetime import datetime, timedelta
from croniter import croniter
import pytz
import re
 
class CronParser:
    """
    Cron表达式解析器
    
    支持标准Cron语法和扩展语法
    """
    
    # 标准Cron字段
    FIELDS = ["second", "minute", "hour", "day", "month", "day_of_week", "year"]
    
    # 字段范围
    RANGES = {
        "second": (0, 59),
        "minute": (0, 59),
        "hour": (0, 23),
        "day": (1, 31),
        "month": (1, 12),
        "day_of_week": (0, 6),  # 0=Sunday
        "year": (1970, 2099)
    }
    
    # 月份名称映射
    MONTH_NAMES = {
        "JAN": 1, "FEB": 2, "MAR": 3, "APR": 4,
        "MAY": 5, "JUN": 6, "JUL": 7, "AUG": 8,
        "SEP": 9, "OCT": 10, "NOV": 11, "DEC": 12
    }
    
    # 星期名称映射
    DAY_NAMES = {
        "SUN": 0, "MON": 1, "TUE": 2, "WED": 3,
        "THU": 4, "FRI": 5, "SAT": 6
    }
    
    @staticmethod
    def validate(expression: str) -> Tuple[bool, Optional[str]]:
        """
        验证Cron表达式
        
        Args:
            expression: Cron表达式
            
        Returns:
            Tuple[bool, Optional[str]]: (是否有效, 错误信息)
        """
        try:
            # 去除首尾空格
            expression = expression.strip()
            
            # 分割字段
            fields = expression.split()
            
            # 检查字段数量(支持5-7个字段)
            if len(fields) < 5 or len(fields) > 7:
                return False, f"Invalid number of fields: {len(fields)} (expected 5-7)"
            
            # 补充缺失的字段
            if len(fields) == 5:
                # 标准5字段格式:分 时 日 月 周
                fields = ["0"] + fields  # 添加秒字段
            
            if len(fields) == 6:
                # 添加年字段
                fields.append("*")
            
            # 验证每个字段
            for i, (field_name, field_value) in enumerate(zip(CronParser.FIELDS, fields)):
                valid, error = CronParser._validate_field(field_name, field_value)
                if not valid:
                    return False, f"Field '{field_name}': {error}"
            
            # 使用croniter验证
            try:
                croniter(expression)
            except Exception as e:
                return False, f"Croniter validation failed: {str(e)}"
            
            return True, None
        
        except Exception as e:
            return False, f"Validation error: {str(e)}"
    
    @staticmethod
    def _validate_field(field_name: str, field_value: str) -> Tuple[bool, Optional[str]]:
        """验证单个字段"""
        # 通配符
        if field_value == "*":
            return True, None
        
        # 问号(仅用于day和day_of_week)
        if field_value == "?":
            if field_name in ["day", "day_of_week"]:
                return True, None
            else:
                return False, "? is only allowed in day and day_of_week fields"
        
        # 范围
        min_val, max_val = CronParser.RANGES[field_name]
        
        # 列表值(逗号分隔)
        if "," in field_value:
            values = field_value.split(",")
            for val in values:
                valid, error = CronParser._validate_single_value(field_name, val, min_val, max_val)
                if not valid:
                    return False, error
            return True, None
        
        # 范围值(连字符)
        if "-" in field_value and "/" not in field_value:
            parts = field_value.split("-")
            if len(parts) != 2:
                return False, f"Invalid range: {field_value}"
            
            start, end = parts
            valid_start, error = CronParser._validate_single_value(field_name, start, min_val, max_val)
            if not valid_start:
                return False, error
            
            valid_end, error = CronParser._validate_single_value(field_name, end, min_val, max_val)
            if not valid_end:
                return False, error
            
            return True, None
        
        # 步长值(斜杠)
        if "/" in field_value:
            parts = field_value.split("/")
            if len(parts) != 2:
                return False, f"Invalid step: {field_value}"
            
            base, step = parts
            
            # 验证基础值
            if base != "*":
                valid, error = CronParser._validate_single_value(field_name, base, min_val, max_val)
                if not valid:
                    return False, error
            
            # 验证步长
            try:
                step_int = int(step)
                if step_int <= 0:
                    return False, f"Step must be positive: {step}"
            except ValueError:
                return False, f"Invalid step value: {step}"
            
            return True, None
        
        # 单个值
        return CronParser._validate_single_value(field_name, field_value, min_val, max_val)
    
    @staticmethod
    def _validate_single_value(
        field_name: str,
        value: str,
        min_val: int,
        max_val: int
    ) -> Tuple[bool, Optional[str]]:
        """验证单个值"""
        # 特殊字符
        if value in ["L", "W", "LW"]:
            if field_name == "day":
                return True, None
            else:
                return False, f"{value} is only allowed in day field"
        
        # #字符(第N个星期X)
        if "#" in value:
            if field_name == "day_of_week":
                parts = value.split("#")
                if len(parts) == 2:
                    try:
                        day = int(parts[0])
                        nth = int(parts[1])
                        if 0 <= day <= 6 and 1 <= nth <= 5:
                            return True, None
                    except ValueError:
                        pass
            return False, f"Invalid # syntax: {value}"
        
        # 月份名称
        if field_name == "month" and value.upper() in CronParser.MONTH_NAMES:
            return True, None
        
        # 星期名称
        if field_name == "day_of_week" and value.upper() in CronParser.DAY_NAMES:
            return True, None
        
        # 数字值
        try:
            int_value = int(value)
            if min_val <= int_value <= max_val:
                return True, None
            else:
                return False, f"Value {int_value} out of range [{min_val}, {max_val}]"
        except ValueError:
            return False, f"Invalid value: {value}"
    
    @staticmethod
    def get_next_run_time(
        expression: str,
        base_time: Optional[datetime] = None,
        timezone: str = "UTC"
    ) -> datetime:
        """
        获取下次执行时间
        
        Args:
            expression: Cron表达式
            base_time: 基准时间(默认为当前时间)
            timezone: 时区
            
        Returns:
            datetime: 下次执行时间
        """
        if base_time is None:
            base_time = datetime.now(pytz.timezone(timezone))
        
        # 确保base_time有时区信息
        if base_time.tzinfo is None:
            tz = pytz.timezone(timezone)
            base_time = tz.localize(base_time)
        
        # 使用croniter计算
        cron = croniter(expression, base_time)
        next_time = cron.get_next(datetime)
        
        return next_time
    
    @staticmethod
    def get_next_n_run_times(
        expression: str,
        n: int,
        base_time: Optional[datetime] = None,
        timezone: str = "UTC"
    ) -> List[datetime]:
        """
        获取接下来N次执行时间
        
        Args:
            expression: Cron表达式
            n: 次数
            base_time: 基准时间
            timezone: 时区
            
        Returns:
            List[datetime]: 执行时间列表
        """
        if base_time is None:
            base_time = datetime.now(pytz.timezone(timezone))
        
        if base_time.tzinfo is None:
            tz = pytz.timezone(timezone)
            base_time = tz.localize(base_time)
        
        cron = croniter(expression, base_time)
        times = []
        
        for _ in range(n):
            next_time = cron.get_next(datetime)
            times.append(next_time)
        
        return times
    
    @staticmethod
    def describe(expression: str) -> str:
        """
        生成Cron表达式的人类可读描述
        
        Args:
            expression: Cron表达式
            
        Returns:
            str: 描述文本
        """
        fields = expression.split()
        
        # 补充字段
        if len(fields) == 5:
            fields = ["0"] + fields
        if len(fields) == 6:
            fields.append("*")
        
        second, minute, hour, day, month, day_of_week, year = fields
        
        parts = []
        
        # 秒
        if second != "0" and second != "*":
            parts.append(f"在第{second}秒")
        
        # 分钟
        if minute == "*":
            parts.append("每分钟")
        elif "/" in minute:
            _, step = minute.split("/")
            parts.append(f"每{step}分钟")
        else:
            parts.append(f"在第{minute}分钟")
        
        # 小时
        if hour == "*":
            parts.append("每小时")
        elif "/" in hour:
            _, step = hour.split("/")
            parts.append(f"每{step}小时")
        else:
            parts.append(f"{hour}点")
        
        # 日期
        if day == "*":
            pass
        elif day == "L":
            parts.append("每月最后一天")
        else:
            parts.append(f"每月{day}日")
        
        # 月份
        if month != "*":
            parts.append(f"{month}月")
        
        # 星期
        if day_of_week != "*" and day_of_week != "?":
            day_names_cn = {
                "0": "周日", "1": "周一", "2": "周二", "3": "周三",
                "4": "周四", "5": "周五", "6": "周六",
                "SUN": "周日", "MON": "周一", "TUE": "周二", "WED": "周三",
                "THU": "周四", "FRI": "周五", "SAT": "周六"
            }
            
            if "-" in day_of_week:
                start, end = day_of_week.split("-")
                parts.append(f"{day_names_cn.get(start, start)}到{day_names_cn.get(end, end)}")
            else:
                parts.append(day_names_cn.get(day_of_week, day_of_week))
        
        return " ".join(parts)
    
    @staticmethod
    def parse_to_dict(expression: str) -> Dict[str, str]:
        """
        解析Cron表达式为字典
        
        Args:
            expression: Cron表达式
            
        Returns:
            Dict[str, str]: 字段字典
        """
        fields = expression.split()
        
        if len(fields) == 5:
            fields = ["0"] + fields
        if len(fields) == 6:
            fields.append("*")
        
        return {
            "second": fields[0],
            "minute": fields[1],
            "hour": fields[2],
            "day": fields[3],
            "month": fields[4],
            "day_of_week": fields[5],
            "year": fields[6]
        }
 
 
# 使用示例
if __name__ == "__main__":
    parser = CronParser()
    
    # 验证表达式
    expressions = [
        "0 0 2 * * ?",  # 每天凌晨2点
        "0 0/30 * * * ?",  # 每30分钟
        "0 0 9 ? * MON-FRI",  # 工作日早上9点
        "0 0 0 L * ?",  # 每月最后一天
        "0 0 12 1 */3 ?",  # 每季度第一天中午12点
    ]
    
    for expr in expressions:
        valid, error = parser.validate(expr)
        if valid:
            print(f"✓ {expr}")
            print(f"  描述: {parser.describe(expr)}")
            next_times = parser.get_next_n_run_times(expr, 3)
            print(f"  接下来3次执行:")
            for t in next_times:
                print(f"    - {t}")
        else:
            print(f"✗ {expr}")
            print(f"  错误: {error}")
        print()

3. 时区转换服务



# services/timezone_service.py
from typing import Optional, List
from datetime import datetime, timedelta
import pytz
from pytz import timezone as pytz_timezone
 
class TimezoneService:
    """
    时区转换服务
    
    处理时区转换、夏令时、时区验证等
    """
    
    @staticmethod
    def get_all_timezones() -> List[str]:
        """获取所有可用时区"""
        return pytz.all_timezones
    
    @staticmethod
    def get_common_timezones() -> List[str]:
        """获取常用时区"""
        return pytz.common_timezones
    
    @staticmethod
    def validate_timezone(tz_name: str) -> bool:
        """
        验证时区名称
        
        Args:
            tz_name: 时区名称
            
        Returns:
            bool: 是否有效
        """
        try:
            pytz.timezone(tz_name)
            return True
        except pytz.UnknownTimeZoneError:
            return False
    
    @staticmethod
    def convert_to_utc(
        dt: datetime,
        from_timezone: str
    ) -> datetime:
        """
        转换本地时间到UTC
        
        Args:
            dt: 本地时间
            from_timezone: 源时区
            
        Returns:
            datetime: UTC时间
        """
        # 如果已经有时区信息,先移除
        if dt.tzinfo is not None:
            dt = dt.replace(tzinfo=None)
        
        # 本地化到源时区
        tz = pytz_timezone(from_timezone)
        localized_dt = tz.localize(dt)
        
        # 转换到UTC
        utc_dt = localized_dt.astimezone(pytz.UTC)
        
        return utc_dt
    
    @staticmethod
    def convert_from_utc(
        utc_dt: datetime,
        to_timezone: str
    ) -> datetime:
        """
        转换UTC时间到本地时间
        
        Args:
            utc_dt: UTC时间
            to_timezone: 目标时区
            
        Returns:
            datetime: 本地时间
        """
        # 确保是UTC时间
        if utc_dt.tzinfo is None:
            utc_dt = pytz.UTC.localize(utc_dt)
        
        # 转换到目标时区
        tz = pytz_timezone(to_timezone)
        local_dt = utc_dt.astimezone(tz)
        
        return local_dt
    
    @staticmethod
    def convert_timezone(
        dt: datetime,
        from_timezone: str,
        to_timezone: str
    ) -> datetime:
        """
        时区间转换
        
        Args:
            dt: 源时间
            from_timezone: 源时区
            to_timezone: 目标时区
            
        Returns:
            datetime: 目标时区时间
        """
        # 先转到UTC
        utc_dt = TimezoneService.convert_to_utc(dt, from_timezone)
        
        # 再转到目标时区
        target_dt = TimezoneService.convert_from_utc(utc_dt, to_timezone)
        
        return target_dt
    
    @staticmethod
    def get_timezone_offset(
        timezone_name: str,
        dt: Optional[datetime] = None
    ) -> timedelta:
        """
        获取时区偏移量
        
        Args:
            timezone_name: 时区名称
            dt: 参考时间(用于处理夏令时)
            
        Returns:
            timedelta: 偏移量
        """
        if dt is None:
            dt = datetime.now()
        
        tz = pytz_timezone(timezone_name)
        
        # 本地化时间
        if dt.tzinfo is None:
            localized_dt = tz.localize(dt)
        else:
            localized_dt = dt.astimezone(tz)
        
        # 获取偏移量
        offset = localized_dt.utcoffset()
        
        return offset
    
    @staticmethod
    def is_dst(
        timezone_name: str,
        dt: Optional[datetime] = None
    ) -> bool:
        """
        判断是否处于夏令时
        
        Args:
            timezone_name: 时区名称
            dt: 参考时间
            
        Returns:
            bool: 是否夏令时
        """
        if dt is None:
            dt = datetime.now()
        
        tz = pytz_timezone(timezone_name)
        
        if dt.tzinfo is None:
            localized_dt = tz.localize(dt)
        else:
            localized_dt = dt.astimezone(tz)
        
        # 检查DST偏移
        dst_offset = localized_dt.dst()
        
        return dst_offset is not None and dst_offset != timedelta(0)
    
    @staticmethod
    def get_dst_transitions(
        timezone_name: str,
        year: int
    ) -> List[datetime]:
        """
        获取指定年份的夏令时转换时间
        
        Args:
            timezone_name: 时区名称
            year: 年份
            
        Returns:
            List[datetime]: 转换时间列表
        """
        tz = pytz_timezone(timezone_name)
        transitions = []
        
        # 遍历一年中的每一天
        start_date = datetime(year, 1, 1)
        end_date = datetime(year, 12, 31)
        
        current_date = start_date
        prev_is_dst = None
        
        while current_date <= end_date:
            localized = tz.localize(current_date)
            is_dst = localized.dst() != timedelta(0)
            
            if prev_is_dst is not None and is_dst != prev_is_dst:
                transitions.append(current_date)
            
            prev_is_dst = is_dst
            current_date += timedelta(days=1)
        
        return transitions
    
    @staticmethod
    def format_timezone_display(
        timezone_name: str,
        dt: Optional[datetime] = None
    ) -> str:
        """
        格式化时区显示
        
        Args:
            timezone_name: 时区名称
            dt: 参考时间
            
        Returns:
            str: 格式化字符串,例如 "UTC+08:00 (CST)"
        """
        if dt is None:
            dt = datetime.now()
        
        tz = pytz_timezone(timezone_name)
        
        if dt.tzinfo is None:
            localized_dt = tz.localize(dt)
        else:
            localized_dt = dt.astimezone(tz)
        
        # 获取偏移量
        offset = localized_dt.utcoffset()
        hours, remainder = divmod(int(offset.total_seconds()), 3600)
        minutes = remainder // 60
        
        # 格式化偏移量
        offset_str = f"UTC{hours:+03d}:{minutes:02d}"
        
        # 获取时区缩写
        tz_abbr = localized_dt.strftime("%Z")
        
        return f"{offset_str} ({tz_abbr})"
 
 
# 使用示例
if __name__ == "__main__":
    tz_service = TimezoneService()
    
    # 测试时区转换
    print("=== 时区转换测试 ===")
    
    # 北京时间 2024-01-01 09:00:00
    beijing_time = datetime(2024, 1, 1, 9, 0, 0)
    print(f"北京时间: {beijing_time}")
    
    # 转换到UTC
    utc_time = tz_service.convert_to_utc(beijing_time, "Asia/Shanghai")
    print(f"UTC时间: {utc_time}")
    
    # 转换到纽约时间
    ny_time = tz_service.convert_from_utc(utc_time, "America/New_York")
    print(f"纽约时间: {ny_time}")
    
    # 直接转换
    direct_ny_time = tz_service.convert_timezone(
        beijing_time,
        "Asia/Shanghai",
        "America/New_York"
    )
    print(f"直接转换: {direct_ny_time}")
    
    print("
=== 时区信息 ===")
    
    timezones = ["Asia/Shanghai", "America/New_York", "Europe/London"]
    
    for tz_name in timezones:
        offset = tz_service.get_timezone_offset(tz_name)
        is_dst = tz_service.is_dst(tz_name)
        display = tz_service.format_timezone_display(tz_name)
        
        print(f"{tz_name}:")
        print(f"  偏移量: {offset}")
        print(f"  夏令时: {is_dst}")
        print(f"  显示: {display}")

4. APScheduler集成服务



# services/scheduler_service.py
from typing import Optional, Dict, Any, Callable
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.date import DateTrigger
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR, EVENT_JOB_MISSED
from sqlalchemy.orm import Session
from models.schedule import WorkflowSchedule, ScheduleExecution, ExecutionStatus, ScheduleStatus
from services.cron_parser import CronParser
from services.timezone_service import TimezoneService
from core.redis_client import RedisClient
from datetime import datetime, timedelta
import pytz
import logging
import socket
 
logger = logging.getLogger(__name__)
 
class SchedulerService:
    """
    APScheduler调度服务
    
    管理所有定时任务的调度
    """
    
    def __init__(self, db: Session, redis_client: RedisClient):
        self.db = db
        self.redis = redis_client
        self.scheduler: Optional[BackgroundScheduler] = None
        self.hostname = socket.gethostname()
        
        # 配置
        self.config = {
            "jobstores": {
                "default": SQLAlchemyJobStore(
                    url=self._get_db_url(),
                    tablename="apscheduler_jobs"
                )
            },
            "executors": {
                "default": ThreadPoolExecutor(20),
                "processpool": ProcessPoolExecutor(5)
            },
            "job_defaults": {
                "coalesce": True,  # 合并错过的执行
                "max_instances": 3,  # 最大并发实例
                "misfire_grace_time": 60  # 错过执行宽限时间
            },
            "timezone": pytz.UTC  # 使用UTC时区
        }
    
    def _get_db_url(self) -> str:
        """获取数据库URL"""
        # 从配置读取
        from core.config import settings
        return settings.DATABASE_URL
    
    def start(self):
        """启动调度器"""
        if self.scheduler is None:
            self.scheduler = BackgroundScheduler(**self.config)
            
            # 添加事件监听器
            self.scheduler.add_listener(
                self._job_executed_listener,
                EVENT_JOB_EXECUTED
            )
            self.scheduler.add_listener(
                self._job_error_listener,
                EVENT_JOB_ERROR
            )
            self.scheduler.add_listener(
                self._job_missed_listener,
                EVENT_JOB_MISSED
            )
            
            self.scheduler.start()
            logger.info("APScheduler started")
            
            # 加载所有激活的调度
            self._load_schedules()
    
    def stop(self):
        """停止调度器"""
        if self.scheduler is not None:
            self.scheduler.shutdown(wait=True)
            logger.info("APScheduler stopped")
    
    def _load_schedules(self):
        """加载所有激活的调度"""
        schedules = self.db.query(WorkflowSchedule).filter(
            WorkflowSchedule.status == ScheduleStatus.ACTIVE
        ).all()
        
        for schedule in schedules:
            try:
                self.add_schedule(schedule)
                logger.info(f"Loaded schedule: {schedule.name}")
            except Exception as e:
                logger.error(f"Failed to load schedule {schedule.name}: {e}")
    
    def add_schedule(self, schedule: WorkflowSchedule) -> str:
        """
        添加调度任务
        
        Args:
            schedule: 调度配置
            
        Returns:
            str: APScheduler job ID
        """
        # 创建触发器
        trigger = self._create_trigger(schedule)
        
        # 生成job ID
        job_id = f"workflow_{schedule.workflow_id}_{schedule.id}"
        
        # 添加任务
        self.scheduler.add_job(
            func=self._execute_workflow,
            trigger=trigger,
            args=[schedule.id],
            id=job_id,
            name=schedule.name,
            replace_existing=True,
            max_instances=schedule.max_instances,
            misfire_grace_time=schedule.misfire_grace_time
        )
        
        # 更新调度配置
        schedule.apscheduler_job_id = job_id
        
        # 计算下次执行时间
        next_run = self.scheduler.get_job(job_id).next_run_time
        if next_run:
            schedule.next_run_at = next_run.astimezone(pytz.UTC).replace(tzinfo=None)
        
        self.db.commit()
        
        logger.info(f"Added schedule: {schedule.name} (job_id: {job_id})")
        
        return job_id
    
    def _create_trigger(self, schedule: WorkflowSchedule):
        """创建触发器"""
        tz = pytz.timezone(schedule.timezone)
        
        if schedule.trigger_type == "cron":
            # Cron触发器
            return CronTrigger.from_crontab(
                schedule.cron_expression,
                timezone=tz
            )
        
        elif schedule.trigger_type == "interval":
            # Interval触发器
            kwargs = {schedule.interval_unit: schedule.interval_value}
            return IntervalTrigger(
                timezone=tz,
                start_date=schedule.start_date,
                end_date=schedule.end_date,
                **kwargs
            )
        
        elif schedule.trigger_type == "date":
            # Date触发器
            return DateTrigger(
                run_date=schedule.run_date,
                timezone=tz
            )
        
        else:
            raise ValueError(f"Unknown trigger type: {schedule.trigger_type}")
    
    def _execute_workflow(self, schedule_id: str):
        """
        执行工作流
        
        Args:
            schedule_id: 调度ID
        """
        # 获取调度配置
        schedule = self.db.query(WorkflowSchedule).filter(
            WorkflowSchedule.id == schedule_id
        ).first()
        
        if not schedule:
            logger.error(f"Schedule {schedule_id} not found")
            return
        
        # 获取分布式锁(防止多实例重复执行)
        lock_key = f"schedule_lock:{schedule_id}"
        lock_timeout = 300  # 5分钟
        
        lock_acquired = self.redis.acquire_lock(
            lock_key,
            timeout=lock_timeout,
            blocking=False
        )
        
        if not lock_acquired:
            logger.warning(f"Failed to acquire lock for schedule {schedule.name}")
            
            # 记录跳过执行
            execution = ScheduleExecution(
                schedule_id=schedule.id,
                workflow_id=schedule.workflow_id,
                tenant_id=schedule.tenant_id,
                status=ExecutionStatus.SKIPPED,
                scheduled_time=datetime.utcnow(),
                error_message="Failed to acquire lock (another instance is running)"
            )
            self.db.add(execution)
            self.db.commit()
            
            return
        
        try:
            # 创建执行记录
            execution = ScheduleExecution(
                schedule_id=schedule.id,
                workflow_id=schedule.workflow_id,
                tenant_id=schedule.tenant_id,
                status=ExecutionStatus.PENDING,
                scheduled_time=datetime.utcnow(),
                hostname=self.hostname
            )
            self.db.add(execution)
            self.db.commit()
            
            # 更新执行状态
            execution.status = ExecutionStatus.RUNNING
            execution.actual_start_time = datetime.utcnow()
            self.db.commit()
            
            # 执行工作流(通过Celery异步执行)
            from tasks.workflow_tasks import execute_workflow_task
            
            result = execute_workflow_task.delay(
                workflow_id=schedule.workflow_id,
                tenant_id=schedule.tenant_id,
                triggered_by="schedule",
                schedule_execution_id=execution.id
            )
            
            # 等待执行完成(或超时)
            timeout = 3600  # 1小时超时
            try:
                workflow_result = result.get(timeout=timeout)
                
                # 更新执行记录
                execution.status = ExecutionStatus.SUCCESS
                execution.actual_end_time = datetime.utcnow()
                execution.duration = int(
                    (execution.actual_end_time - execution.actual_start_time).total_seconds()
                )
                execution.result = workflow_result
                execution.workflow_execution_id = workflow_result.get("execution_id")
                
                # 更新调度统计
                schedule.total_runs += 1
                schedule.successful_runs += 1
                schedule.last_run_at = datetime.utcnow()
                
            except Exception as e:
                # 执行失败
                execution.status = ExecutionStatus.FAILED
                execution.actual_end_time = datetime.utcnow()
                execution.duration = int(
                    (execution.actual_end_time - execution.actual_start_time).total_seconds()
                )
                execution.error_message = str(e)
                
                # 更新调度统计
                schedule.total_runs += 1
                schedule.failed_runs += 1
                schedule.last_run_at = datetime.utcnow()
                
                # 重试逻辑
                if schedule.retry_enabled and execution.retry_count < schedule.max_retries:
                    self._schedule_retry(schedule, execution)
            
            self.db.commit()
        
        except Exception as e:
            logger.error(f"Error executing schedule {schedule.name}: {e}")
            
            if execution:
                execution.status = ExecutionStatus.FAILED
                execution.error_message = str(e)
                self.db.commit()
        
        finally:
            # 释放锁
            self.redis.release_lock(lock_key)
    
    def _schedule_retry(self, schedule: WorkflowSchedule, failed_execution: ScheduleExecution):
        """安排重试"""
        retry_delay = schedule.retry_delay
        
        # 创建重试执行记录
        retry_execution = ScheduleExecution(
            schedule_id=schedule.id,
            workflow_id=schedule.workflow_id,
            tenant_id=schedule.tenant_id,
            status=ExecutionStatus.PENDING,
            scheduled_time=datetime.utcnow() + timedelta(seconds=retry_delay),
            is_retry=True,
            retry_count=failed_execution.retry_count + 1,
            parent_execution_id=failed_execution.id
        )
        self.db.add(retry_execution)
        self.db.commit()
        
        # 使用APScheduler安排重试
        self.scheduler.add_job(
            func=self._execute_workflow,
            trigger=DateTrigger(
                run_date=retry_execution.scheduled_time,
                timezone=pytz.UTC
            ),
            args=[schedule.id],
            id=f"retry_{retry_execution.id}",
            replace_existing=True
        )
        
        logger.info(
            f"Scheduled retry for {schedule.name} "
            f"(attempt {retry_execution.retry_count}/{schedule.max_retries})"
        )
    
    def remove_schedule(self, schedule_id: str):
        """移除调度任务"""
        schedule = self.db.query(WorkflowSchedule).filter(
            WorkflowSchedule.id == schedule_id
        ).first()
        
        if schedule and schedule.apscheduler_job_id:
            try:
                self.scheduler.remove_job(schedule.apscheduler_job_id)
                logger.info(f"Removed schedule: {schedule.name}")
            except Exception as e:
                logger.error(f"Failed to remove schedule {schedule.name}: {e}")
    
    def pause_schedule(self, schedule_id: str):
        """暂停调度任务"""
        schedule = self.db.query(WorkflowSchedule).filter(
            WorkflowSchedule.id == schedule_id
        ).first()
        
        if schedule and schedule.apscheduler_job_id:
            try:
                self.scheduler.pause_job(schedule.apscheduler_job_id)
                schedule.status = ScheduleStatus.PAUSED
                self.db.commit()
                logger.info(f"Paused schedule: {schedule.name}")
            except Exception as e:
                logger.error(f"Failed to pause schedule {schedule.name}: {e}")
    
    def resume_schedule(self, schedule_id: str):
        """恢复调度任务"""
        schedule = self.db.query(WorkflowSchedule).filter(
            WorkflowSchedule.id == schedule_id
        ).first()
        
        if schedule and schedule.apscheduler_job_id:
            try:
                self.scheduler.resume_job(schedule.apscheduler_job_id)
                schedule.status = ScheduleStatus.ACTIVE
                self.db.commit()
                logger.info(f"Resumed schedule: {schedule.name}")
            except Exception as e:
                logger.error(f"Failed to resume schedule {schedule.name}: {e}")
    
    def _job_executed_listener(self, event):
        """任务执行成功监听器"""
        logger.info(f"Job {event.job_id} executed successfully")
    
    def _job_error_listener(self, event):
        """任务执行错误监听器"""
        logger.error(f"Job {event.job_id} failed: {event.exception}")
    
    def _job_missed_listener(self, event):
        """任务错过执行监听器"""
        logger.warning(f"Job {event.job_id} missed execution")
        
        # 记录错过执行
        job_id = event.job_id
        
        # 从job_id提取schedule_id
        if job_id.startswith("workflow_"):
            parts = job_id.split("_")
            if len(parts) >= 3:
                schedule_id = parts[2]
                
                schedule = self.db.query(WorkflowSchedule).filter(
                    WorkflowSchedule.id == schedule_id
                ).first()
                
                if schedule:
                    # 创建错过执行记录
                    execution = ScheduleExecution(
                        schedule_id=schedule.id,
                        workflow_id=schedule.workflow_id,
                        tenant_id=schedule.tenant_id,
                        status=ExecutionStatus.SKIPPED,
                        scheduled_time=datetime.utcnow(),
                        is_misfire=True,
                        error_message="Missed execution"
                    )
                    self.db.add(execution)
                    self.db.commit()
                    
                    # 根据misfire策略处理
                    self._handle_misfire(schedule, execution)
    
    def _handle_misfire(self, schedule: WorkflowSchedule, missed_execution: ScheduleExecution):
        """处理错过执行"""
        if schedule.misfire_policy == "fire_once":
            # 立即执行一次
            self._execute_workflow(schedule.id)
        
        elif schedule.misfire_policy == "fire_all":
            # 执行所有错过的(需要计算错过的次数)
            # 这里简化为执行一次
            self._execute_workflow(schedule.id)
        
        elif schedule.misfire_policy == "fire_latest":
            # 执行最新的
            self._execute_workflow(schedule.id)
        
        elif schedule.misfire_policy == "skip":
            # 跳过
            pass

继续完成剩余代码…

5. 调度管理API



# api/schedules.py
from fastapi import APIRouter, Depends, HTTPException, status, Request, Query
from sqlalchemy.orm import Session
from database import get_db
from services.scheduler_service import SchedulerService
from services.cron_parser import CronParser
from services.timezone_service import TimezoneService
from core.tenant_context import TenantContext
from core.redis_client import get_redis_client
from models.schedule import WorkflowSchedule, ScheduleExecution, ScheduleStatus, TriggerType, MisfirePolicy
from pydantic import BaseModel, validator
from typing import Optional, List
from datetime import datetime
 
router = APIRouter(prefix="/api/schedules", tags=["schedules"])
 
class CreateScheduleRequest(BaseModel):
    """创建调度请求"""
    workflow_id: str
    name: str
    description: Optional[str] = None
    trigger_type: str  # cron/interval/date
    
    # Cron配置
    cron_expression: Optional[str] = None
    
    # Interval配置
    interval_value: Optional[int] = None
    interval_unit: Optional[str] = None  # seconds/minutes/hours/days/weeks
    
    # Date配置
    run_date: Optional[datetime] = None
    
    # 时区
    timezone: str = "UTC"
    
    # 执行配置
    max_instances: int = 1
    misfire_grace_time: int = 60
    misfire_policy: str = "fire_once"
    
    # 重试配置
    retry_enabled: bool = False
    max_retries: int = 3
    retry_delay: int = 60
    
    # 执行窗口
    start_date: Optional[datetime] = None
    end_date: Optional[datetime] = None
    
    # 额外配置
    extra_config: Optional[dict] = None
    
    @validator('trigger_type')
    def validate_trigger_type(cls, v):
        if v not in ['cron', 'interval', 'date']:
            raise ValueError('Invalid trigger_type')
        return v
    
    @validator('cron_expression')
    def validate_cron_expression(cls, v, values):
        if values.get('trigger_type') == 'cron':
            if not v:
                raise ValueError('cron_expression is required for cron trigger')
            
            # 验证Cron表达式
            valid, error = CronParser.validate(v)
            if not valid:
                raise ValueError(f'Invalid cron expression: {error}')
        
        return v
    
    @validator('interval_value')
    def validate_interval_value(cls, v, values):
        if values.get('trigger_type') == 'interval':
            if not v or v <= 0:
                raise ValueError('interval_value must be positive')
        return v
    
    @validator('timezone')
    def validate_timezone(cls, v):
        if not TimezoneService.validate_timezone(v):
            raise ValueError(f'Invalid timezone: {v}')
        return v
 
 
class UpdateScheduleRequest(BaseModel):
    """更新调度请求"""
    name: Optional[str] = None
    description: Optional[str] = None
    status: Optional[str] = None
    cron_expression: Optional[str] = None
    interval_value: Optional[int] = None
    interval_unit: Optional[str] = None
    timezone: Optional[str] = None
    max_instances: Optional[int] = None
    misfire_grace_time: Optional[int] = None
    misfire_policy: Optional[str] = None
    retry_enabled: Optional[bool] = None
    max_retries: Optional[int] = None
    retry_delay: Optional[int] = None
 
 
@router.post("")
async def create_schedule(
    request: CreateScheduleRequest,
    req: Request,
    db: Session = Depends(get_db),
    redis_client = Depends(get_redis_client)
):
    """创建调度"""
    tenant = TenantContext.require()
    user_id = req.state.user.id
    
    # 创建调度配置
    schedule = WorkflowSchedule(
        workflow_id=request.workflow_id,
        tenant_id=tenant.id,
        name=request.name,
        description=request.description,
        status=ScheduleStatus.ACTIVE,
        trigger_type=TriggerType(request.trigger_type),
        cron_expression=request.cron_expression,
        interval_value=request.interval_value,
        interval_unit=request.interval_unit,
        run_date=request.run_date,
        timezone=request.timezone,
        max_instances=request.max_instances,
        misfire_grace_time=request.misfire_grace_time,
        misfire_policy=MisfirePolicy(request.misfire_policy),
        retry_enabled=request.retry_enabled,
        max_retries=request.max_retries,
        retry_delay=request.retry_delay,
        start_date=request.start_date,
        end_date=request.end_date,
        extra_config=request.extra_config,
        created_by=user_id
    )
    
    db.add(schedule)
    db.commit()
    db.refresh(schedule)
    
    # 添加到调度器
    scheduler_service = SchedulerService(db, redis_client)
    try:
        job_id = scheduler_service.add_schedule(schedule)
        
        return {
            "success": True,
            "schedule": schedule.to_dict(),
            "job_id": job_id,
            "next_run_times": CronParser.get_next_n_run_times(
                request.cron_expression,
                5,
                timezone=request.timezone
            ) if request.trigger_type == "cron" else []
        }
    
    except Exception as e:
        db.delete(schedule)
        db.commit()
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Failed to create schedule: {str(e)}"
        )
 
 
@router.get("")
async def list_schedules(
    workflow_id: Optional[str] = None,
    status_filter: Optional[str] = Query(None, alias="status"),
    limit: int = 50,
    offset: int = 0,
    db: Session = Depends(get_db)
):
    """列出调度"""
    tenant = TenantContext.require()
    
    query = db.query(WorkflowSchedule).filter(
        WorkflowSchedule.tenant_id == tenant.id
    )
    
    if workflow_id:
        query = query.filter(WorkflowSchedule.workflow_id == workflow_id)
    
    if status_filter:
        query = query.filter(WorkflowSchedule.status == status_filter)
    
    total = query.count()
    schedules = query.limit(limit).offset(offset).all()
    
    return {
        "schedules": [s.to_dict() for s in schedules],
        "total": total,
        "limit": limit,
        "offset": offset
    }
 
 
@router.get("/{schedule_id}")
async def get_schedule(
    schedule_id: str,
    db: Session = Depends(get_db)
):
    """获取调度详情"""
    tenant = TenantContext.require()
    
    schedule = db.query(WorkflowSchedule).filter(
        WorkflowSchedule.id == schedule_id,
        WorkflowSchedule.tenant_id == tenant.id
    ).first()
    
    if not schedule:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="Schedule not found"
        )
    
    # 获取接下来的执行时间
    next_run_times = []
    if schedule.trigger_type == TriggerType.CRON:
        next_run_times = CronParser.get_next_n_run_times(
            schedule.cron_expression,
            10,
            timezone=schedule.timezone
        )
    
    return {
        "schedule": schedule.to_dict(),
        "next_run_times": [t.isoformat() for t in next_run_times]
    }
 
 
@router.put("/{schedule_id}")
async def update_schedule(
    schedule_id: str,
    request: UpdateScheduleRequest,
    db: Session = Depends(get_db),
    redis_client = Depends(get_redis_client)
):
    """更新调度"""
    tenant = TenantContext.require()
    
    schedule = db.query(WorkflowSchedule).filter(
        WorkflowSchedule.id == schedule_id,
        WorkflowSchedule.tenant_id == tenant.id
    ).first()
    
    if not schedule:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="Schedule not found"
        )
    
    # 更新字段
    update_data = request.dict(exclude_unset=True)
    
    for field, value in update_data.items():
        setattr(schedule, field, value)
    
    db.commit()
    db.refresh(schedule)
    
    # 重新添加到调度器
    scheduler_service = SchedulerService(db, redis_client)
    scheduler_service.remove_schedule(schedule_id)
    
    if schedule.status == ScheduleStatus.ACTIVE:
        scheduler_service.add_schedule(schedule)
    
    return {
        "success": True,
        "schedule": schedule.to_dict()
    }
 
 
@router.delete("/{schedule_id}")
async def delete_schedule(
    schedule_id: str,
    db: Session = Depends(get_db),
    redis_client = Depends(get_redis_client)
):
    """删除调度"""
    tenant = TenantContext.require()
    
    schedule = db.query(WorkflowSchedule).filter(
        WorkflowSchedule.id == schedule_id,
        WorkflowSchedule.tenant_id == tenant.id
    ).first()
    
    if not schedule:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="Schedule not found"
        )
    
    # 从调度器移除
    scheduler_service = SchedulerService(db, redis_client)
    scheduler_service.remove_schedule(schedule_id)
    
    # 删除记录
    db.delete(schedule)
    db.commit()
    
    return {"success": True, "message": "Schedule deleted"}
 
 
@router.post("/{schedule_id}/pause")
async def pause_schedule(
    schedule_id: str,
    db: Session = Depends(get_db),
    redis_client = Depends(get_redis_client)
):
    """暂停调度"""
    scheduler_service = SchedulerService(db, redis_client)
    scheduler_service.pause_schedule(schedule_id)
    
    return {"success": True, "message": "Schedule paused"}
 
 
@router.post("/{schedule_id}/resume")
async def resume_schedule(
    schedule_id: str,
    db: Session = Depends(get_db),
    redis_client = Depends(get_redis_client)
):
    """恢复调度"""
    scheduler_service = SchedulerService(db, redis_client)
    scheduler_service.resume_schedule(schedule_id)
    
    return {"success": True, "message": "Schedule resumed"}
 
 
@router.post("/{schedule_id}/trigger")
async def trigger_schedule_now(
    schedule_id: str,
    db: Session = Depends(get_db),
    redis_client = Depends(get_redis_client)
):
    """立即触发调度"""
    tenant = TenantContext.require()
    
    schedule = db.query(WorkflowSchedule).filter(
        WorkflowSchedule.id == schedule_id,
        WorkflowSchedule.tenant_id == tenant.id
    ).first()
    
    if not schedule:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="Schedule not found"
        )
    
    # 立即执行
    scheduler_service = SchedulerService(db, redis_client)
    scheduler_service._execute_workflow(schedule_id)
    
    return {"success": True, "message": "Schedule triggered"}
 
 
@router.get("/{schedule_id}/executions")
async def get_schedule_executions(
    schedule_id: str,
    status_filter: Optional[str] = Query(None, alias="status"),
    limit: int = 50,
    offset: int = 0,
    db: Session = Depends(get_db)
):
    """获取调度执行历史"""
    tenant = TenantContext.require()
    
    # 验证调度存在
    schedule = db.query(WorkflowSchedule).filter(
        WorkflowSchedule.id == schedule_id,
        WorkflowSchedule.tenant_id == tenant.id
    ).first()
    
    if not schedule:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="Schedule not found"
        )
    
    # 查询执行历史
    query = db.query(ScheduleExecution).filter(
        ScheduleExecution.schedule_id == schedule_id
    )
    
    if status_filter:
        query = query.filter(ScheduleExecution.status == status_filter)
    
    total = query.count()
    executions = query.order_by(
        ScheduleExecution.scheduled_time.desc()
    ).limit(limit).offset(offset).all()
    
    return {
        "executions": [e.to_dict() for e in executions],
        "total": total,
        "limit": limit,
        "offset": offset
    }
 
 
@router.get("/validate/cron")
async def validate_cron_expression(
    expression: str = Query(..., description="Cron表达式")
):
    """验证Cron表达式"""
    valid, error = CronParser.validate(expression)
    
    if valid:
        # 生成描述
        description = CronParser.describe(expression)
        
        # 计算接下来的执行时间
        next_times = CronParser.get_next_n_run_times(expression, 5)
        
        return {
            "valid": True,
            "description": description,
            "next_run_times": [t.isoformat() for t in next_times]
        }
    else:
        return {
            "valid": False,
            "error": error
        }
 
 
@router.get("/timezones")
async def list_timezones():
    """列出所有时区"""
    common_timezones = TimezoneService.get_common_timezones()
    
    # 格式化时区信息
    timezone_info = []
    for tz_name in common_timezones:
        display = TimezoneService.format_timezone_display(tz_name)
        timezone_info.append({
            "name": tz_name,
            "display": display
        })
    
    return {"timezones": timezone_info}

6. 错过执行补偿机制



# services/misfire_handler.py
from typing import List, Optional, Dict, Any
from datetime import datetime, timedelta
from sqlalchemy.orm import Session
from models.schedule import WorkflowSchedule, ScheduleExecution, ExecutionStatus, MisfirePolicy
from services.cron_parser import CronParser
import pytz
import logging
 
logger = logging.getLogger(__name__)
 
class MisfireHandler:
    """
    错过执行补偿处理器
    
    负责检测和处理错过的调度执行
    """
    
    def __init__(self, db: Session):
        self.db = db
    
    def detect_misfires(self, schedule: WorkflowSchedule) -> List[datetime]:
        """
        检测错过的执行时间
        
        Args:
            schedule: 调度配置
            
        Returns:
            List[datetime]: 错过的执行时间列表
        """
        if not schedule.last_run_at:
            # 首次运行,没有错过执行
            return []
        
        # 获取上次执行时间到现在之间应该执行的所有时间点
        last_run = schedule.last_run_at
        now = datetime.utcnow()
        
        # 转换到调度时区
        tz = pytz.timezone(schedule.timezone)
        last_run_tz = pytz.UTC.localize(last_run).astimezone(tz)
        now_tz = pytz.UTC.localize(now).astimezone(tz)
        
        missed_times = []
        
        if schedule.trigger_type.value == "cron":
            # 使用Cron表达式计算
            current_time = last_run_tz
            
            while current_time < now_tz:
                next_time = CronParser.get_next_run_time(
                    schedule.cron_expression,
                    current_time,
                    schedule.timezone
                )
                
                if next_time < now_tz:
                    # 检查是否在宽限时间内
                    grace_time = timedelta(seconds=schedule.misfire_grace_time)
                    if now_tz - next_time > grace_time:
                        missed_times.append(next_time)
                
                current_time = next_time
        
        elif schedule.trigger_type.value == "interval":
            # 间隔触发器
            interval = timedelta(**{
                schedule.interval_unit: schedule.interval_value
            })
            
            current_time = last_run_tz
            
            while current_time < now_tz:
                current_time += interval
                
                if current_time < now_tz:
                    grace_time = timedelta(seconds=schedule.misfire_grace_time)
                    if now_tz - current_time > grace_time:
                        missed_times.append(current_time)
        
        return missed_times
    
    def handle_misfire(
        self,
        schedule: WorkflowSchedule,
        missed_times: List[datetime]
    ) -> List[ScheduleExecution]:
        """
        处理错过执行
        
        Args:
            schedule: 调度配置
            missed_times: 错过的时间列表
            
        Returns:
            List[ScheduleExecution]: 创建的补偿执行记录
        """
        if not missed_times:
            return []
        
        compensations = []
        
        if schedule.misfire_policy == MisfirePolicy.FIRE_ONCE:
            # 只执行一次(最新的)
            latest_time = max(missed_times)
            execution = self._create_compensation_execution(
                schedule,
                latest_time,
                len(missed_times)
            )
            compensations.append(execution)
        
        elif schedule.misfire_policy == MisfirePolicy.FIRE_ALL:
            # 执行所有错过的
            for missed_time in missed_times:
                execution = self._create_compensation_execution(
                    schedule,
                    missed_time,
                    1
                )
                compensations.append(execution)
        
        elif schedule.misfire_policy == MisfirePolicy.FIRE_LATEST:
            # 执行最新的N次(默认3次)
            latest_n = 3
            latest_times = sorted(missed_times, reverse=True)[:latest_n]
            
            for missed_time in latest_times:
                execution = self._create_compensation_execution(
                    schedule,
                    missed_time,
                    1
                )
                compensations.append(execution)
        
        elif schedule.misfire_policy == MisfirePolicy.SKIP:
            # 跳过,只记录
            for missed_time in missed_times:
                execution = ScheduleExecution(
                    schedule_id=schedule.id,
                    workflow_id=schedule.workflow_id,
                    tenant_id=schedule.tenant_id,
                    status=ExecutionStatus.SKIPPED,
                    scheduled_time=missed_time,
                    is_misfire=True,
                    misfire_delay=int(
                        (datetime.utcnow() - missed_time).total_seconds()
                    ),
                    error_message=f"Skipped due to misfire policy: {schedule.misfire_policy.value}"
                )
                self.db.add(execution)
        
        self.db.commit()
        
        logger.info(
            f"Handled {len(missed_times)} misfires for schedule {schedule.name} "
            f"with policy {schedule.misfire_policy.value}, "
            f"created {len(compensations)} compensation executions"
        )
        
        return compensations
    
    def _create_compensation_execution(
        self,
        schedule: WorkflowSchedule,
        missed_time: datetime,
        batch_size: int
    ) -> ScheduleExecution:
        """创建补偿执行记录"""
        now = datetime.utcnow()
        
        execution = ScheduleExecution(
            schedule_id=schedule.id,
            workflow_id=schedule.workflow_id,
            tenant_id=schedule.tenant_id,
            status=ExecutionStatus.PENDING,
            scheduled_time=missed_time,
            is_misfire=True,
            misfire_delay=int((now - missed_time).total_seconds()),
            error_message=f"Compensation execution for {batch_size} missed run(s)"
        )
        
        self.db.add(execution)
        
        return execution
    
    def scan_all_schedules(self) -> Dict[str, Any]:
        """
        扫描所有调度,检测错过执行
        
        Returns:
            Dict[str, Any]: 扫描结果统计
        """
        # 获取所有激活的调度
        schedules = self.db.query(WorkflowSchedule).filter(
            WorkflowSchedule.status == "active"
        ).all()
        
        total_scanned = 0
        total_misfires = 0
        total_compensations = 0
        
        for schedule in schedules:
            try:
                # 检测错过执行
                missed_times = self.detect_misfires(schedule)
                
                if missed_times:
                    # 处理错过执行
                    compensations = self.handle_misfire(schedule, missed_times)
                    
                    total_misfires += len(missed_times)
                    total_compensations += len(compensations)
                
                total_scanned += 1
            
            except Exception as e:
                logger.error(f"Error scanning schedule {schedule.name}: {e}")
        
        result = {
            "total_scanned": total_scanned,
            "total_misfires": total_misfires,
            "total_compensations": total_compensations,
            "scanned_at": datetime.utcnow().isoformat()
        }
        
        logger.info(f"Misfire scan completed: {result}")
        
        return result
    
    def get_misfire_statistics(
        self,
        schedule_id: Optional[str] = None,
        days: int = 7
    ) -> Dict[str, Any]:
        """
        获取错过执行统计
        
        Args:
            schedule_id: 调度ID(可选)
            days: 统计天数
            
        Returns:
            Dict[str, Any]: 统计数据
        """
        # 查询时间范围
        end_time = datetime.utcnow()
        start_time = end_time - timedelta(days=days)
        
        # 构建查询
        query = self.db.query(ScheduleExecution).filter(
            ScheduleExecution.is_misfire == True,
            ScheduleExecution.scheduled_time >= start_time,
            ScheduleExecution.scheduled_time <= end_time
        )
        
        if schedule_id:
            query = query.filter(ScheduleExecution.schedule_id == schedule_id)
        
        # 执行查询
        executions = query.all()
        
        # 统计
        total_misfires = len(executions)
        
        # 按状态分组
        status_counts = {}
        for execution in executions:
            status = execution.status.value
            status_counts[status] = status_counts.get(status, 0) + 1
        
        # 平均延迟
        delays = [e.misfire_delay for e in executions if e.misfire_delay]
        avg_delay = sum(delays) / len(delays) if delays else 0
        max_delay = max(delays) if delays else 0
        
        # 按调度分组
        schedule_counts = {}
        for execution in executions:
            schedule_id = execution.schedule_id
            schedule_counts[schedule_id] = schedule_counts.get(schedule_id, 0) + 1
        
        # 排序找出问题最多的调度
        top_schedules = sorted(
            schedule_counts.items(),
            key=lambda x: x[1],
            reverse=True
        )[:5]
        
        return {
            "period": {
                "start": start_time.isoformat(),
                "end": end_time.isoformat(),
                "days": days
            },
            "total_misfires": total_misfires,
            "status_distribution": status_counts,
            "delay_statistics": {
                "average_seconds": int(avg_delay),
                "max_seconds": int(max_delay)
            },
            "top_problematic_schedules": [
                {
                    "schedule_id": schedule_id,
                    "misfire_count": count
                }
                for schedule_id, count in top_schedules
            ]
        }
 
 
# 定时扫描任务(Celery)
from celery import shared_task
 
@shared_task
def scan_misfires_task():
    """
    定时扫描错过执行的任务
    
    建议每5分钟执行一次
    """
    from database import SessionLocal
    
    db = SessionLocal()
    try:
        handler = MisfireHandler(db)
        result = handler.scan_all_schedules()
        
        return result
    finally:
        db.close()
 
 
# 使用示例
if __name__ == "__main__":
    from database import SessionLocal
    
    db = SessionLocal()
    
    # 创建处理器
    handler = MisfireHandler(db)
    
    # 扫描所有调度
    result = handler.scan_all_schedules()
    print(f"扫描结果: {result}")
    
    # 获取统计
    stats = handler.get_misfire_statistics(days=7)
    print(f"错过执行统计: {stats}")
    
    db.close()

7. 调度历史记录分析



# services/schedule_analytics.py
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
from sqlalchemy.orm import Session
from sqlalchemy import func, and_, or_
from models.schedule import WorkflowSchedule, ScheduleExecution, ExecutionStatus
import pandas as pd
import logging
 
logger = logging.getLogger(__name__)
 
class ScheduleAnalytics:
    """
    调度分析服务
    
    提供调度执行的各种统计和分析
    """
    
    def __init__(self, db: Session):
        self.db = db
    
    def get_execution_summary(
        self,
        schedule_id: Optional[str] = None,
        days: int = 30
    ) -> Dict[str, Any]:
        """
        获取执行摘要
        
        Args:
            schedule_id: 调度ID(可选)
            days: 统计天数
            
        Returns:
            Dict[str, Any]: 执行摘要
        """
        end_time = datetime.utcnow()
        start_time = end_time - timedelta(days=days)
        
        # 构建查询
        query = self.db.query(ScheduleExecution).filter(
            ScheduleExecution.scheduled_time >= start_time,
            ScheduleExecution.scheduled_time <= end_time
        )
        
        if schedule_id:
            query = query.filter(ScheduleExecution.schedule_id == schedule_id)
        
        executions = query.all()
        
        # 统计
        total_executions = len(executions)
        
        status_counts = {
            "success": 0,
            "failed": 0,
            "pending": 0,
            "running": 0,
            "skipped": 0,
            "timeout": 0,
            "cancelled": 0
        }
        
        total_duration = 0
        duration_count = 0
        
        for execution in executions:
            status_counts[execution.status.value] = status_counts.get(
                execution.status.value, 0
            ) + 1
            
            if execution.duration:
                total_duration += execution.duration
                duration_count += 1
        
        # 成功率
        success_rate = (
            status_counts["success"] / total_executions * 100
            if total_executions > 0 else 0
        )
        
        # 平均执行时长
        avg_duration = (
            total_duration / duration_count
            if duration_count > 0 else 0
        )
        
        return {
            "period": {
                "start": start_time.isoformat(),
                "end": end_time.isoformat(),
                "days": days
            },
            "total_executions": total_executions,
            "status_distribution": status_counts,
            "success_rate": round(success_rate, 2),
            "average_duration_seconds": int(avg_duration)
        }
    
    def get_execution_trend(
        self,
        schedule_id: Optional[str] = None,
        days: int = 30,
        granularity: str = "day"  # hour/day/week
    ) -> Dict[str, Any]:
        """
        获取执行趋势
        
        Args:
            schedule_id: 调度ID
            days: 统计天数
            granularity: 粒度(hour/day/week)
            
        Returns:
            Dict[str, Any]: 趋势数据
        """
        end_time = datetime.utcnow()
        start_time = end_time - timedelta(days=days)
        
        # 构建查询
        query = self.db.query(ScheduleExecution).filter(
            ScheduleExecution.scheduled_time >= start_time,
            ScheduleExecution.scheduled_time <= end_time
        )
        
        if schedule_id:
            query = query.filter(ScheduleExecution.schedule_id == schedule_id)
        
        executions = query.all()
        
        # 转换为DataFrame
        data = []
        for execution in executions:
            data.append({
                "scheduled_time": execution.scheduled_time,
                "status": execution.status.value,
                "duration": execution.duration or 0
            })
        
        if not data:
            return {
                "period": {
                    "start": start_time.isoformat(),
                    "end": end_time.isoformat()
                },
                "granularity": granularity,
                "data_points": []
            }
        
        df = pd.DataFrame(data)
        df['scheduled_time'] = pd.to_datetime(df['scheduled_time'])
        
        # 设置时间索引
        df.set_index('scheduled_time', inplace=True)
        
        # 按粒度重采样
        if granularity == "hour":
            resampled = df.resample('H')
        elif granularity == "day":
            resampled = df.resample('D')
        elif granularity == "week":
            resampled = df.resample('W')
        else:
            raise ValueError(f"Invalid granularity: {granularity}")
        
        # 聚合数据
        trend_data = []
        
        for timestamp, group in resampled:
            if len(group) == 0:
                continue
            
            total = len(group)
            success = len(group[group['status'] == 'success'])
            failed = len(group[group['status'] == 'failed'])
            avg_duration = group['duration'].mean()
            
            trend_data.append({
                "timestamp": timestamp.isoformat(),
                "total_executions": total,
                "successful": success,
                "failed": failed,
                "success_rate": round(success / total * 100, 2) if total > 0 else 0,
                "average_duration": int(avg_duration)
            })
        
        return {
            "period": {
                "start": start_time.isoformat(),
                "end": end_time.isoformat()
            },
            "granularity": granularity,
            "data_points": trend_data
        }
    
    def get_schedule_health_score(
        self,
        schedule_id: str,
        days: int = 7
    ) -> Dict[str, Any]:
        """
        计算调度健康分数
        
        Args:
            schedule_id: 调度ID
            days: 统计天数
            
        Returns:
            Dict[str, Any]: 健康分数和详情
        """
        end_time = datetime.utcnow()
        start_time = end_time - timedelta(days=days)
        
        # 获取执行记录
        executions = self.db.query(ScheduleExecution).filter(
            ScheduleExecution.schedule_id == schedule_id,
            ScheduleExecution.scheduled_time >= start_time,
            ScheduleExecution.scheduled_time <= end_time
        ).all()
        
        if not executions:
            return {
                "health_score": 0,
                "status": "unknown",
                "message": "No execution data available"
            }
        
        # 计算各项指标
        total = len(executions)
        success = len([e for e in executions if e.status == ExecutionStatus.SUCCESS])
        failed = len([e for e in executions if e.status == ExecutionStatus.FAILED])
        timeout = len([e for e in executions if e.status == ExecutionStatus.TIMEOUT])
        misfire = len([e for e in executions if e.is_misfire])
        
        # 成功率(40%权重)
        success_rate = success / total if total > 0 else 0
        success_score = success_rate * 40
        
        # 失败率(30%权重,反向)
        failure_rate = failed / total if total > 0 else 0
        failure_score = (1 - failure_rate) * 30
        
        # 超时率(15%权重,反向)
        timeout_rate = timeout / total if total > 0 else 0
        timeout_score = (1 - timeout_rate) * 15
        
        # 错过执行率(15%权重,反向)
        misfire_rate = misfire / total if total > 0 else 0
        misfire_score = (1 - misfire_rate) * 15
        
        # 总分
        health_score = success_score + failure_score + timeout_score + misfire_score
        
        # 健康状态
        if health_score >= 90:
            status = "excellent"
            message = "Schedule is performing excellently"
        elif health_score >= 75:
            status = "good"
            message = "Schedule is performing well"
        elif health_score >= 60:
            status = "fair"
            message = "Schedule has some issues"
        elif health_score >= 40:
            status = "poor"
            message = "Schedule needs attention"
        else:
            status = "critical"
            message = "Schedule is in critical condition"
        
        return {
            "health_score": round(health_score, 2),
            "status": status,
            "message": message,
            "metrics": {
                "success_rate": round(success_rate * 100, 2),
                "failure_rate": round(failure_rate * 100, 2),
                "timeout_rate": round(timeout_rate * 100, 2),
                "misfire_rate": round(misfire_rate * 100, 2)
            },
            "period": {
                "start": start_time.isoformat(),
                "end": end_time.isoformat(),
                "days": days
            }
        }
    
    def get_top_schedules(
        self,
        metric: str = "executions",  # executions/success_rate/duration
        limit: int = 10,
        days: int = 30
    ) -> List[Dict[str, Any]]:
        """
        获取排名靠前的调度
        
        Args:
            metric: 排序指标
            limit: 返回数量
            days: 统计天数
            
        Returns:
            List[Dict[str, Any]]: 调度列表
        """
        end_time = datetime.utcnow()
        start_time = end_time - timedelta(days=days)
        
        # 获取所有调度的执行记录
        executions = self.db.query(ScheduleExecution).filter(
            ScheduleExecution.scheduled_time >= start_time,
            ScheduleExecution.scheduled_time <= end_time
        ).all()
        
        # 按调度分组统计
        schedule_stats = {}
        
        for execution in executions:
            schedule_id = execution.schedule_id
            
            if schedule_id not in schedule_stats:
                schedule_stats[schedule_id] = {
                    "schedule_id": schedule_id,
                    "total_executions": 0,
                    "successful": 0,
                    "failed": 0,
                    "total_duration": 0,
                    "duration_count": 0
                }
            
            stats = schedule_stats[schedule_id]
            stats["total_executions"] += 1
            
            if execution.status == ExecutionStatus.SUCCESS:
                stats["successful"] += 1
            elif execution.status == ExecutionStatus.FAILED:
                stats["failed"] += 1
            
            if execution.duration:
                stats["total_duration"] += execution.duration
                stats["duration_count"] += 1
        
        # 计算派生指标
        for schedule_id, stats in schedule_stats.items():
            total = stats["total_executions"]
            stats["success_rate"] = (
                stats["successful"] / total * 100 if total > 0 else 0
            )
            stats["average_duration"] = (
                stats["total_duration"] / stats["duration_count"]
                if stats["duration_count"] > 0 else 0
            )
        
        # 排序
        if metric == "executions":
            sorted_schedules = sorted(
                schedule_stats.values(),
                key=lambda x: x["total_executions"],
                reverse=True
            )
        elif metric == "success_rate":
            sorted_schedules = sorted(
                schedule_stats.values(),
                key=lambda x: x["success_rate"],
                reverse=True
            )
        elif metric == "duration":
            sorted_schedules = sorted(
                schedule_stats.values(),
                key=lambda x: x["average_duration"],
                reverse=True
            )
        else:
            raise ValueError(f"Invalid metric: {metric}")
        
        # 获取调度详情
        top_schedules = []
        for stats in sorted_schedules[:limit]:
            schedule = self.db.query(WorkflowSchedule).filter(
                WorkflowSchedule.id == stats["schedule_id"]
            ).first()
            
            if schedule:
                top_schedules.append({
                    "schedule": schedule.to_dict(),
                    "statistics": {
                        "total_executions": stats["total_executions"],
                        "success_rate": round(stats["success_rate"], 2),
                        "average_duration": int(stats["average_duration"])
                    }
                })
        
        return top_schedules
    
    def get_execution_duration_distribution(
        self,
        schedule_id: Optional[str] = None,
        days: int = 30
    ) -> Dict[str, Any]:
        """
        获取执行时长分布
        
        Args:
            schedule_id: 调度ID
            days: 统计天数
            
        Returns:
            Dict[str, Any]: 时长分布数据
        """
        end_time = datetime.utcnow()
        start_time = end_time - timedelta(days=days)
        
        # 构建查询
        query = self.db.query(ScheduleExecution).filter(
            ScheduleExecution.scheduled_time >= start_time,
            ScheduleExecution.scheduled_time <= end_time,
            ScheduleExecution.duration.isnot(None)
        )
        
        if schedule_id:
            query = query.filter(ScheduleExecution.schedule_id == schedule_id)
        
        executions = query.all()
        
        if not executions:
            return {
                "period": {
                    "start": start_time.isoformat(),
                    "end": end_time.isoformat()
                },
                "distribution": []
            }
        
        # 提取时长数据
        durations = [e.duration for e in executions]
        
        # 计算分位数
        df = pd.DataFrame({"duration": durations})
        
        percentiles = [10, 25, 50, 75, 90, 95, 99]
        distribution = []
        
        for p in percentiles:
            value = df["duration"].quantile(p / 100)
            distribution.append({
                "percentile": p,
                "duration_seconds": int(value)
            })
        
        # 统计信息
        stats = {
            "min": int(df["duration"].min()),
            "max": int(df["duration"].max()),
            "mean": int(df["duration"].mean()),
            "median": int(df["duration"].median()),
            "std": int(df["duration"].std())
        }
        
        return {
            "period": {
                "start": start_time.isoformat(),
                "end": end_time.isoformat()
            },
            "statistics": stats,
            "distribution": distribution
        }
 
 
# API端点
from fastapi import APIRouter, Depends
from database import get_db
 
router = APIRouter(prefix="/api/schedules/analytics", tags=["schedule-analytics"])
 
@router.get("/summary")
async def get_execution_summary(
    schedule_id: Optional[str] = None,
    days: int = 30,
    db: Session = Depends(get_db)
):
    """获取执行摘要"""
    analytics = ScheduleAnalytics(db)
    return analytics.get_execution_summary(schedule_id, days)
 
 
@router.get("/trend")
async def get_execution_trend(
    schedule_id: Optional[str] = None,
    days: int = 30,
    granularity: str = "day",
    db: Session = Depends(get_db)
):
    """获取执行趋势"""
    analytics = ScheduleAnalytics(db)
    return analytics.get_execution_trend(schedule_id, days, granularity)
 
 
@router.get("/{schedule_id}/health")
async def get_schedule_health(
    schedule_id: str,
    days: int = 7,
    db: Session = Depends(get_db)
):
    """获取调度健康分数"""
    analytics = ScheduleAnalytics(db)
    return analytics.get_schedule_health_score(schedule_id, days)
 
 
@router.get("/top")
async def get_top_schedules(
    metric: str = "executions",
    limit: int = 10,
    days: int = 30,
    db: Session = Depends(get_db)
):
    """获取排名靠前的调度"""
    analytics = ScheduleAnalytics(db)
    return analytics.get_top_schedules(metric, limit, days)
 
 
@router.get("/duration-distribution")
async def get_duration_distribution(
    schedule_id: Optional[str] = None,
    days: int = 30,
    db: Session = Depends(get_db)
):
    """获取执行时长分布"""
    analytics = ScheduleAnalytics(db)
    return analytics.get_execution_duration_distribution(schedule_id, days)

8. 前端调度管理UI



// frontend/src/components/ScheduleManager.tsx
import React, { useState, useEffect } from 'react';
import { Calendar, Clock, Play, Pause, Trash2, Plus, TrendingUp } from 'lucide-react';
 
interface Schedule {
  id: string;
  name: string;
  workflow_id: string;
  status: string;
  trigger_type: string;
  cron_expression?: string;
  timezone: string;
  next_run_at?: string;
  total_runs: number;
  successful_runs: number;
  failed_runs: number;
}
 
const ScheduleManager: React.FC = () => {
  const [schedules, setSchedules] = useState<Schedule[]>([]);
  const [loading, setLoading] = useState(true);
  const [showCreateModal, setShowCreateModal] = useState(false);
 
  useEffect(() => {
    loadSchedules();
  }, []);
 
  const loadSchedules = async () => {
    try {
      const response = await fetch('/api/schedules');
      const data = await response.json();
      setSchedules(data.schedules);
    } catch (error) {
      console.error('Failed to load schedules:', error);
    } finally {
      setLoading(false);
    }
  };
 
  const pauseSchedule = async (scheduleId: string) => {
    try {
      await fetch(`/api/schedules/${scheduleId}/pause`, { method: 'POST' });
      loadSchedules();
    } catch (error) {
      console.error('Failed to pause schedule:', error);
    }
  };
 
  const resumeSchedule = async (scheduleId: string) => {
    try {
      await fetch(`/api/schedules/${scheduleId}/resume`, { method: 'POST' });
      loadSchedules();
    } catch (error) {
      console.error('Failed to resume schedule:', error);
    }
  };
 
  const deleteSchedule = async (scheduleId: string) => {
    if (!confirm('确定要删除此调度吗?')) return;
    
    try {
      await fetch(`/api/schedules/${scheduleId}`, { method: 'DELETE' });
      loadSchedules();
    } catch (error) {
      console.error('Failed to delete schedule:', error);
    }
  };
 
  const triggerNow = async (scheduleId: string) => {
    try {
      await fetch(`/api/schedules/${scheduleId}/trigger`, { method: 'POST' });
      alert('调度已触发');
    } catch (error) {
      console.error('Failed to trigger schedule:', error);
    }
  };
 
  const getStatusColor = (status: string) => {
    switch (status) {
      case 'active': return 'bg-green-100 text-green-800';
      case 'paused': return 'bg-yellow-100 text-yellow-800';
      case 'disabled': return 'bg-gray-100 text-gray-800';
      case 'error': return 'bg-red-100 text-red-800';
      default: return 'bg-gray-100 text-gray-800';
    }
  };
 
  const formatNextRun = (nextRun?: string) => {
    if (!nextRun) return '未计划';
    
    const date = new Date(nextRun);
    const now = new Date();
    const diff = date.getTime() - now.getTime();
    
    if (diff < 0) return '已过期';
    
    const minutes = Math.floor(diff / 60000);
    const hours = Math.floor(minutes / 60);
    const days = Math.floor(hours / 24);
    
    if (days > 0) return `${days}天后`;
    if (hours > 0) return `${hours}小时后`;
    if (minutes > 0) return `${minutes}分钟后`;
    return '即将执行';
  };
 
  if (loading) {
    return (
      <div className="flex items-center justify-center h-64">
        <div className="animate-spin rounded-full h-12 w-12 border-b-2 border-blue-600"></div>
      </div>
    );
  }
 
  return (
    <div className="p-6">
      <div className="flex justify-between items-center mb-6">
        <h1 className="text-2xl font-bold text-gray-900">调度管理</h1>
        <button
          onClick={() => setShowCreateModal(true)}
          className="flex items-center gap-2 px-4 py-2 bg-blue-600 text-white rounded-lg hover:bg-blue-700"
        >
          <Plus className="w-5 h-5" />
          创建调度
        </button>
      </div>
 
      <div className="grid gap-4">
        {schedules.map(schedule => (
          <div key={schedule.id} className="bg-white rounded-lg shadow p-6">
            <div className="flex items-start justify-between">
              <div className="flex-1">
                <div className="flex items-center gap-3 mb-2">
                  <h3 className="text-lg font-semibold text-gray-900">
                    {schedule.name}
                  </h3>
                  <span className={`px-2 py-1 rounded-full text-xs font-medium ${getStatusColor(schedule.status)}`}>
                    {schedule.status}
                  </span>
                </div>
 
                <div className="grid grid-cols-2 gap-4 mt-4">
                  <div className="flex items-center gap-2 text-sm text-gray-600">
                    <Calendar className="w-4 h-4" />
                    <span>
                      {schedule.trigger_type === 'cron' && schedule.cron_expression}
                      {schedule.trigger_type === 'interval' && '间隔触发'}
                      {schedule.trigger_type === 'date' && '指定日期'}
                    </span>
                  </div>
 
                  <div className="flex items-center gap-2 text-sm text-gray-600">
                    <Clock className="w-4 h-4" />
                    <span>下次执行: {formatNextRun(schedule.next_run_at)}</span>
                  </div>
 
                  <div className="flex items-center gap-2 text-sm text-gray-600">
                    <TrendingUp className="w-4 h-4" />
                    <span>
                      成功率: {schedule.total_runs > 0 
                        ? Math.round(schedule.successful_runs / schedule.total_runs * 100)
                        : 0}%
                    </span>
                  </div>
 
                  <div className="text-sm text-gray-600">
                    总执行: {schedule.total_runs} | 
                    成功: {schedule.successful_runs} | 
                    失败: {schedule.failed_runs}
                  </div>
                </div>
              </div>
 
              <div className="flex gap-2">
                {schedule.status === 'active' ? (
                  <button
                    onClick={() => pauseSchedule(schedule.id)}
                    className="p-2 text-yellow-600 hover:bg-yellow-50 rounded"
                    title="暂停"
                  >
                    <Pause className="w-5 h-5" />
                  </button>
                ) : (
                  <button
                    onClick={() => resumeSchedule(schedule.id)}
                    className="p-2 text-green-600 hover:bg-green-50 rounded"
                    title="恢复"
                  >
                    <Play className="w-5 h-5" />
                  </button>
                )}
 
                <button
                  onClick={() => triggerNow(schedule.id)}
                  className="p-2 text-blue-600 hover:bg-blue-50 rounded"
                  title="立即触发"
                >
                  <Play className="w-5 h-5" />
                </button>
 
                <button
                  onClick={() => deleteSchedule(schedule.id)}
                  className="p-2 text-red-600 hover:bg-red-50 rounded"
                  title="删除"
                >
                  <Trash2 className="w-5 h-5" />
                </button>
              </div>
            </div>
          </div>
        ))}
 
        {schedules.length === 0 && (
          <div className="text-center py-12 text-gray-500">
            暂无调度任务,点击"创建调度"开始
          </div>
        )}
      </div>
 
      {showCreateModal && (
        <CreateScheduleModal
          onClose={() => setShowCreateModal(false)}
          onSuccess={() => {
            setShowCreateModal(false);
            loadSchedules();
          }}
        />
      )}
    </div>
  );
};
 
export default ScheduleManager;


// frontend/src/components/CreateScheduleModal.tsx
import React, { useState } from 'react';
import { X } from 'lucide-react';
 
interface CreateScheduleModalProps {
  onClose: () => void;
  onSuccess: () => void;
}
 
const CreateScheduleModal: React.FC<CreateScheduleModalProps> = ({ onClose, onSuccess }) => {
  const [formData, setFormData] = useState({
    workflow_id: '',
    name: '',
    description: '',
    trigger_type: 'cron',
    cron_expression: '0 0 * * *',
    timezone: 'Asia/Shanghai',
    misfire_policy: 'fire_once',
    retry_enabled: false,
    max_retries: 3
  });
 
  const [cronValidation, setCronValidation] = useState<{
    valid: boolean;
    description?: string;
    error?: string;
  } | null>(null);
 
  const validateCron = async (expression: string) => {
    try {
      const response = await fetch(`/api/schedules/validate/cron?expression=${encodeURIComponent(expression)}`);
      const data = await response.json();
      setCronValidation(data);
    } catch (error) {
      console.error('Failed to validate cron:', error);
    }
  };
 
  const handleCronChange = (value: string) => {
    setFormData({ ...formData, cron_expression: value });
    if (value) {
      validateCron(value);
    }
  };
 
  const handleSubmit = async (e: React.FormEvent) => {
    e.preventDefault();
    
    try {
      const response = await fetch('/api/schedules', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify(formData)
      });
 
      if (response.ok) {
        onSuccess();
      } else {
        const error = await response.json();
        alert(`创建失败: ${error.detail}`);
      }
    } catch (error) {
      console.error('Failed to create schedule:', error);
      alert('创建失败');
    }
  };
 
  return (
    <div className="fixed inset-0 bg-black bg-opacity-50 flex items-center justify-center z-50">
      <div className="bg-white rounded-lg shadow-xl max-w-2xl w-full max-h-screen overflow-y-auto">
        <div className="flex items-center justify-between p-6 border-b">
          <h2 className="text-xl font-bold text-gray-900">创建调度</h2>
          <button onClick={onClose} className="text-gray-400 hover:text-gray-600">
            <X className="w-6 h-6" />
          </button>
        </div>
 
        <form onSubmit={handleSubmit} className="p-6 space-y-6">
          <div>
            <label className="block text-sm font-medium text-gray-700 mb-2">
              调度名称 *
            </label>
            <input
              type="text"
              required
              value={formData.name}
              onChange={(e) => setFormData({ ...formData, name: e.target.value })}
              className="w-full px-3 py-2 border border-gray-300 rounded-lg focus:ring-2 focus:ring-blue-500"
              placeholder="例如:每日数据同步"
            />
          </div>
 
          <div>
            <label className="block text-sm font-medium text-gray-700 mb-2">
              描述
            </label>
            <textarea
              value={formData.description}
              onChange={(e) => setFormData({ ...formData, description: e.target.value })}
              className="w-full px-3 py-2 border border-gray-300 rounded-lg focus:ring-2 focus:ring-blue-500"
              rows={3}
              placeholder="调度任务的详细说明"
            />
          </div>
 
          <div>
            <label className="block text-sm font-medium text-gray-700 mb-2">
              触发类型 *
            </label>
            <select
              value={formData.trigger_type}
              onChange={(e) => setFormData({ ...formData, trigger_type: e.target.value })}
              className="w-full px-3 py-2 border border-gray-300 rounded-lg focus:ring-2 focus:ring-blue-500"
            >
              <option value="cron">Cron表达式</option>
              <option value="interval">间隔时间</option>
              <option value="date">指定日期</option>
            </select>
          </div>
 
          {formData.trigger_type === 'cron' && (
            <div>
              <label className="block text-sm font-medium text-gray-700 mb-2">
                Cron表达式 *
              </label>
              <input
                type="text"
                required
                value={formData.cron_expression}
                onChange={(e) => handleCronChange(e.target.value)}
                className="w-full px-3 py-2 border border-gray-300 rounded-lg focus:ring-2 focus:ring-blue-500 font-mono"
                placeholder="0 0 * * *"
              />
              
              {cronValidation && (
                <div className={`mt-2 p-3 rounded ${cronValidation.valid ? 'bg-green-50 text-green-800' : 'bg-red-50 text-red-800'}`}>
                  {cronValidation.valid ? (
                    <>
                      <p className="font-medium">✓ 表达式有效</p>
                      <p className="text-sm mt-1">{cronValidation.description}</p>
                      {cronValidation.next_run_times && (
                        <div className="mt-2 text-sm">
                          <p className="font-medium">接下来5次执行:</p>
                          <ul className="list-disc list-inside mt-1">
                            {cronValidation.next_run_times.slice(0, 5).map((time: string, i: number) => (
                              <li key={i}>{new Date(time).toLocaleString('zh-CN')}</li>
                            ))}
                          </ul>
                        </div>
                      )}
                    </>
                  ) : (
                    <p>✗ {cronValidation.error}</p>
                  )}
                </div>
              )}
 
              <div className="mt-2 text-xs text-gray-500">
                <p>常用示例:</p>
                <ul className="list-disc list-inside mt-1 space-y-1">
                  <li>0 0 * * * - 每天凌晨</li>
                  <li>0 */6 * * * - 每6小时</li>
                  <li>0 0 * * 1 - 每周一凌晨</li>
                  <li>0 0 1 * * - 每月1号凌晨</li>
                </ul>
              </div>
            </div>
          )}
 
          <div>
            <label className="block text-sm font-medium text-gray-700 mb-2">
              时区 *
            </label>
            <select
              value={formData.timezone}
              onChange={(e) => setFormData({ ...formData, timezone: e.target.value })}
              className="w-full px-3 py-2 border border-gray-300 rounded-lg focus:ring-2 focus:ring-blue-500"
            >
              <option value="UTC">UTC</option>
              <option value="Asia/Shanghai">Asia/Shanghai (中国标准时间)</option>
              <option value="America/New_York">America/New_York (美国东部时间)</option>
              <option value="Europe/London">Europe/London (英国时间)</option>
            </select>
          </div>
 
          <div>
            <label className="block text-sm font-medium text-gray-700 mb-2">
              错过执行策略
            </label>
            <select
              value={formData.misfire_policy}
              onChange={(e) => setFormData({ ...formData, misfire_policy: e.target.value })}
              className="w-full px-3 py-2 border border-gray-300 rounded-lg focus:ring-2 focus:ring-blue-500"
            >
              <option value="fire_once">立即执行一次</option>
              <option value="fire_all">执行所有错过的</option>
              <option value="fire_latest">执行最新的</option>
              <option value="skip">跳过</option>
            </select>
          </div>
 
          <div className="flex items-center gap-2">
            <input
              type="checkbox"
              id="retry_enabled"
              checked={formData.retry_enabled}
              onChange={(e) => setFormData({ ...formData, retry_enabled: e.target.checked })}
              className="w-4 h-4 text-blue-600 border-gray-300 rounded focus:ring-blue-500"
            />
            <label htmlFor="retry_enabled" className="text-sm font-medium text-gray-700">
              启用失败重试
            </label>
          </div>
 
          {formData.retry_enabled && (
            <div>
              <label className="block text-sm font-medium text-gray-700 mb-2">
                最大重试次数
              </label>
              <input
                type="number"
                min="1"
                max="10"
                value={formData.max_retries}
                onChange={(e) => setFormData({ ...formData, max_retries: parseInt(e.target.value) })}
                className="w-full px-3 py-2 border border-gray-300 rounded-lg focus:ring-2 focus:ring-blue-500"
              />
            </div>
          )}
 
          <div className="flex gap-3 pt-4 border-t">
            <button
              type="button"
              onClick={onClose}
              className="flex-1 px-4 py-2 border border-gray-300 text-gray-700 rounded-lg hover:bg-gray-50"
            >
              取消
            </button>
            <button
              type="submit"
              className="flex-1 px-4 py-2 bg-blue-600 text-white rounded-lg hover:bg-blue-700"
            >
              创建
            </button>
          </div>
        </form>
      </div>
    </div>
  );
};
 
export default CreateScheduleModal;

9. 调度执行历史展示



// frontend/src/components/ScheduleExecutionHistory.tsx
import React, { useState, useEffect } from 'react';
import { CheckCircle, XCircle, Clock, AlertCircle, BarChart3 } from 'lucide-react';
 
interface Execution {
  id: string;
  scheduled_time: string;
  actual_start_time?: string;
  actual_end_time?: string;
  status: string;
  duration?: number;
  error_message?: string;
  is_misfire: boolean;
  retry_count: number;
}
 
interface ScheduleExecutionHistoryProps {
  scheduleId: string;
}
 
const ScheduleExecutionHistory: React.FC<ScheduleExecutionHistoryProps> = ({ scheduleId }) => {
  const [executions, setExecutions] = useState<Execution[]>([]);
  const [loading, setLoading] = useState(true);
  const [filter, setFilter] = useState<string>('all');
  const [analytics, setAnalytics] = useState<any>(null);
 
  useEffect(() => {
    loadExecutions();
    loadAnalytics();
  }, [scheduleId, filter]);
 
  const loadExecutions = async () => {
    try {
      const params = new URLSearchParams();
      if (filter !== 'all') params.append('status', filter);
      
      const response = await fetch(`/api/schedules/${scheduleId}/executions?${params}`);
      const data = await response.json();
      setExecutions(data.executions);
    } catch (error) {
      console.error('Failed to load executions:', error);
    } finally {
      setLoading(false);
    }
  };
 
  const loadAnalytics = async () => {
    try {
      const response = await fetch(`/api/schedules/analytics/${scheduleId}/health?days=7`);
      const data = await response.json();
      setAnalytics(data);
    } catch (error) {
      console.error('Failed to load analytics:', error);
    }
  };
 
  const getStatusIcon = (status: string) => {
    switch (status) {
      case 'success':
        return <CheckCircle className="w-5 h-5 text-green-600" />;
      case 'failed':
        return <XCircle className="w-5 h-5 text-red-600" />;
      case 'running':
        return <Clock className="w-5 h-5 text-blue-600 animate-spin" />;
      case 'pending':
        return <Clock className="w-5 h-5 text-gray-600" />;
      case 'skipped':
        return <AlertCircle className="w-5 h-5 text-yellow-600" />;
      default:
        return <AlertCircle className="w-5 h-5 text-gray-600" />;
    }
  };
 
  const getStatusColor = (status: string) => {
    switch (status) {
      case 'success': return 'bg-green-100 text-green-800';
      case 'failed': return 'bg-red-100 text-red-800';
      case 'running': return 'bg-blue-100 text-blue-800';
      case 'pending': return 'bg-gray-100 text-gray-800';
      case 'skipped': return 'bg-yellow-100 text-yellow-800';
      default: return 'bg-gray-100 text-gray-800';
    }
  };
 
  const formatDuration = (seconds?: number) => {
    if (!seconds) return '-';
    
    const hours = Math.floor(seconds / 3600);
    const minutes = Math.floor((seconds % 3600) / 60);
    const secs = seconds % 60;
    
    if (hours > 0) return `${hours}h ${minutes}m ${secs}s`;
    if (minutes > 0) return `${minutes}m ${secs}s`;
    return `${secs}s`;
  };
 
  const getHealthColor = (score: number) => {
    if (score >= 90) return 'text-green-600';
    if (score >= 75) return 'text-blue-600';
    if (score >= 60) return 'text-yellow-600';
    if (score >= 40) return 'text-orange-600';
    return 'text-red-600';
  };
 
  if (loading) {
    return (
      <div className="flex items-center justify-center h-64">
        <div className="animate-spin rounded-full h-12 w-12 border-b-2 border-blue-600"></div>
      </div>
    );
  }
 
  return (
    <div className="space-y-6">
      {/* 健康分数卡片 */}
      {analytics && (
        <div className="bg-white rounded-lg shadow p-6">
          <div className="flex items-center justify-between mb-4">
            <h3 className="text-lg font-semibold text-gray-900">调度健康度</h3>
            <BarChart3 className="w-5 h-5 text-gray-400" />
          </div>
 
          <div className="grid grid-cols-2 md:grid-cols-4 gap-4">
            <div className="text-center">
              <div className={`text-3xl font-bold ${getHealthColor(analytics.health_score)}`}>
                {analytics.health_score}
              </div>
              <div className="text-sm text-gray-600 mt-1">健康分数</div>
              <div className={`text-xs mt-1 px-2 py-1 rounded-full inline-block ${
                analytics.status === 'excellent' ? 'bg-green-100 text-green-800' :
                analytics.status === 'good' ? 'bg-blue-100 text-blue-800' :
                analytics.status === 'fair' ? 'bg-yellow-100 text-yellow-800' :
                'bg-red-100 text-red-800'
              }`}>
                {analytics.status}
              </div>
            </div>
 
            <div className="text-center">
              <div className="text-3xl font-bold text-green-600">
                {analytics.metrics.success_rate}%
              </div>
              <div className="text-sm text-gray-600 mt-1">成功率</div>
            </div>
 
            <div className="text-center">
              <div className="text-3xl font-bold text-red-600">
                {analytics.metrics.failure_rate}%
              </div>
              <div className="text-sm text-gray-600 mt-1">失败率</div>
            </div>
 
            <div className="text-center">
              <div className="text-3xl font-bold text-yellow-600">
                {analytics.metrics.misfire_rate}%
              </div>
              <div className="text-sm text-gray-600 mt-1">错过率</div>
            </div>
          </div>
        </div>
      )}
 
      {/* 过滤器 */}
      <div className="flex gap-2">
        {['all', 'success', 'failed', 'running', 'skipped'].map(status => (
          <button
            key={status}
            onClick={() => setFilter(status)}
            className={`px-4 py-2 rounded-lg text-sm font-medium transition-colors ${
              filter === status
                ? 'bg-blue-600 text-white'
                : 'bg-white text-gray-700 hover:bg-gray-50'
            }`}
          >
            {status === 'all' ? '全部' : status}
          </button>
        ))}
      </div>
 
      {/* 执行历史列表 */}
      <div className="bg-white rounded-lg shadow overflow-hidden">
        <div className="overflow-x-auto">
          <table className="w-full">
            <thead className="bg-gray-50">
              <tr>
                <th className="px-6 py-3 text-left text-xs font-medium text-gray-500 uppercase tracking-wider">
                  状态
                </th>
                <th className="px-6 py-3 text-left text-xs font-medium text-gray-500 uppercase tracking-wider">
                  计划时间
                </th>
                <th className="px-6 py-3 text-left text-xs font-medium text-gray-500 uppercase tracking-wider">
                  实际时间
                </th>
                <th className="px-6 py-3 text-left text-xs font-medium text-gray-500 uppercase tracking-wider">
                  执行时长
                </th>
                <th className="px-6 py-3 text-left text-xs font-medium text-gray-500 uppercase tracking-wider">
                  重试次数
                </th>
                <th className="px-6 py-3 text-left text-xs font-medium text-gray-500 uppercase tracking-wider">
                  详情
                </th>
              </tr>
            </thead>
            <tbody className="bg-white divide-y divide-gray-200">
              {executions.map(execution => (
                <tr key={execution.id} className="hover:bg-gray-50">
                  <td className="px-6 py-4 whitespace-nowrap">
                    <div className="flex items-center gap-2">
                      {getStatusIcon(execution.status)}
                      <span className={`px-2 py-1 rounded-full text-xs font-medium ${getStatusColor(execution.status)}`}>
                        {execution.status}
                      </span>
                      {execution.is_misfire && (
                        <span className="px-2 py-1 bg-orange-100 text-orange-800 rounded-full text-xs font-medium">
                          错过
                        </span>
                      )}
                    </div>
                  </td>
                  <td className="px-6 py-4 whitespace-nowrap text-sm text-gray-900">
                    {new Date(execution.scheduled_time).toLocaleString('zh-CN')}
                  </td>
                  <td className="px-6 py-4 whitespace-nowrap text-sm text-gray-900">
                    {execution.actual_start_time 
                      ? new Date(execution.actual_start_time).toLocaleString('zh-CN')
                      : '-'
                    }
                  </td>
                  <td className="px-6 py-4 whitespace-nowrap text-sm text-gray-900">
                    {formatDuration(execution.duration)}
                  </td>
                  <td className="px-6 py-4 whitespace-nowrap text-sm text-gray-900">
                    {execution.retry_count > 0 ? (
                      <span className="text-yellow-600">{execution.retry_count}</span>
                    ) : (
                      '-'
                    )}
                  </td>
                  <td className="px-6 py-4 whitespace-nowrap text-sm">
                    {execution.error_message ? (
                      <button
                        onClick={() => alert(execution.error_message)}
                        className="text-red-600 hover:text-red-800"
                      >
                        查看错误
                      </button>
                    ) : (
                      '-'
                    )}
                  </td>
                </tr>
              ))}
            </tbody>
          </table>
 
          {executions.length === 0 && (
            <div className="text-center py-12 text-gray-500">
              暂无执行记录
            </div>
          )}
        </div>
      </div>
    </div>
  );
};
 
export default ScheduleExecutionHistory;

10. 测试验证



# tests/test_scheduler.py
import pytest
from datetime import datetime, timedelta
import pytz
from services.cron_parser import CronParser
from services.timezone_service import TimezoneService
from services.scheduler_service import SchedulerService
from services.misfire_handler import MisfireHandler
from models.schedule import WorkflowSchedule, ScheduleExecution, ScheduleStatus, TriggerType
 
class TestCronParser:
    """Cron表达式解析器测试"""
    
    def test_validate_valid_expressions(self):
        """测试有效的Cron表达式"""
        valid_expressions = [
            "0 0 * * *",  # 每天凌晨
            "0 */2 * * *",  # 每2小时
            "0 0 * * 1-5",  # 工作日
            "0 0 1 * *",  # 每月1号
            "0 0 1 1 *",  # 每年1月1号
        ]
        
        for expr in valid_expressions:
            valid, error = CronParser.validate(expr)
            assert valid, f"Expression '{expr}' should be valid, got error: {error}"
    
    def test_validate_invalid_expressions(self):
        """测试无效的Cron表达式"""
        invalid_expressions = [
            "invalid",
            "60 0 * * *",  # 分钟超出范围
            "0 25 * * *",  # 小时超出范围
            "0 0 32 * *",  # 日期超出范围
            "0 0 * 13 *",  # 月份超出范围
        ]
        
        for expr in invalid_expressions:
            valid, error = CronParser.validate(expr)
            assert not valid, f"Expression '{expr}' should be invalid"
    
    def test_get_next_run_time(self):
        """测试计算下次执行时间"""
        # 每天凌晨2点
        expr = "0 0 2 * * ?"
        base_time = datetime(2024, 1, 1, 0, 0, 0)
        
        next_time = CronParser.get_next_run_time(expr, base_time, "UTC")
        
        assert next_time.hour == 2
        assert next_time.minute == 0
        assert next_time.second == 0
    
    def test_get_next_n_run_times(self):
        """测试计算接下来N次执行时间"""
        # 每小时
        expr = "0 0 * * * ?"
        base_time = datetime(2024, 1, 1, 0, 0, 0)
        
        next_times = CronParser.get_next_n_run_times(expr, 5, base_time, "UTC")
        
        assert len(next_times) == 5
        
        # 验证间隔为1小时
        for i in range(1, 5):
            diff = next_times[i] - next_times[i-1]
            assert diff.total_seconds() == 3600
    
    def test_describe(self):
        """测试生成描述"""
        expr = "0 0 9 * * MON-FRI"
        description = CronParser.describe(expr)
        
        assert "9点" in description or "9" in description
 
 
class TestTimezoneService:
    """时区服务测试"""
    
    def test_validate_timezone(self):
        """测试时区验证"""
        assert TimezoneService.validate_timezone("UTC")
        assert TimezoneService.validate_timezone("Asia/Shanghai")
        assert TimezoneService.validate_timezone("America/New_York")
        assert not TimezoneService.validate_timezone("Invalid/Timezone")
    
    def test_convert_to_utc(self):
        """测试转换到UTC"""
        # 北京时间 2024-01-01 09:00:00
        beijing_time = datetime(2024, 1, 1, 9, 0, 0)
        
        utc_time = TimezoneService.convert_to_utc(beijing_time, "Asia/Shanghai")
        
        # 北京时间比UTC早8小时
        assert utc_time.hour == 1
        assert utc_time.day == 1
    
    def test_convert_from_utc(self):
        """测试从UTC转换"""
        # UTC 2024-01-01 01:00:00
        utc_time = datetime(2024, 1, 1, 1, 0, 0)
        utc_time = pytz.UTC.localize(utc_time)
        
        beijing_time = TimezoneService.convert_from_utc(utc_time, "Asia/Shanghai")
        
        # 北京时间比UTC早8小时
        assert beijing_time.hour == 9
    
    def test_timezone_conversion(self):
        """测试时区间转换"""
        # 北京时间 2024-01-01 09:00:00
        beijing_time = datetime(2024, 1, 1, 9, 0, 0)
        
        # 转换到纽约时间
        ny_time = TimezoneService.convert_timezone(
            beijing_time,
            "Asia/Shanghai",
            "America/New_York"
        )
        
        # 北京比纽约早13小时(标准时间)
        # 所以北京9点 = 纽约前一天20点
        assert ny_time.day == 31
        assert ny_time.month == 12
        assert ny_time.year == 2023
    
    def test_is_dst(self):
        """测试夏令时判断"""
        # 美国夏令时期间(6月)
        summer_time = datetime(2024, 6, 1, 12, 0, 0)
        assert TimezoneService.is_dst("America/New_York", summer_time)
        
        # 美国标准时间期间(1月)
        winter_time = datetime(2024, 1, 1, 12, 0, 0)
        assert not TimezoneService.is_dst("America/New_York", winter_time)
 
 
class TestSchedulerService:
    """调度服务测试"""
    
    @pytest.fixture
    def db_session(self):
        """数据库会话fixture"""
        from database import SessionLocal
        session = SessionLocal()
        yield session
        session.close()
    
    @pytest.fixture
    def redis_client(self):
        """Redis客户端fixture"""
        from core.redis_client import RedisClient
        return RedisClient()
    
    @pytest.fixture
    def sample_schedule(self, db_session):
        """示例调度fixture"""
        schedule = WorkflowSchedule(
            workflow_id="test-workflow-1",
            tenant_id="test-tenant-1",
            name="测试调度",
            status=ScheduleStatus.ACTIVE,
            trigger_type=TriggerType.CRON,
            cron_expression="0 0 * * *",
            timezone="UTC",
            created_by="test-user-1"
        )
        db_session.add(schedule)
        db_session.commit()
        db_session.refresh(schedule)
        
        yield schedule
        
        db_session.delete(schedule)
        db_session.commit()
    
    def test_add_schedule(self, db_session, redis_client, sample_schedule):
        """测试添加调度"""
        scheduler = SchedulerService(db_session, redis_client)
        scheduler.start()
        
        try:
            job_id = scheduler.add_schedule(sample_schedule)
            
            assert job_id is not None
            assert sample_schedule.apscheduler_job_id == job_id
            assert sample_schedule.next_run_at is not None
            
            # 验证任务已添加到调度器
            job = scheduler.scheduler.get_job(job_id)
            assert job is not None
        
        finally:
            scheduler.stop()
    
    def test_pause_resume_schedule(self, db_session, redis_client, sample_schedule):
        """测试暂停和恢复调度"""
        scheduler = SchedulerService(db_session, redis_client)
        scheduler.start()
        
        try:
            # 添加调度
            scheduler.add_schedule(sample_schedule)
            
            # 暂停
            scheduler.pause_schedule(sample_schedule.id)
            db_session.refresh(sample_schedule)
            assert sample_schedule.status == ScheduleStatus.PAUSED
            
            # 恢复
            scheduler.resume_schedule(sample_schedule.id)
            db_session.refresh(sample_schedule)
            assert sample_schedule.status == ScheduleStatus.ACTIVE
        
        finally:
            scheduler.stop()
 
 
class TestMisfireHandler:
    """错过执行处理器测试"""
    
    @pytest.fixture
    def db_session(self):
        """数据库会话fixture"""
        from database import SessionLocal
        session = SessionLocal()
        yield session
        session.close()
    
    @pytest.fixture
    def sample_schedule(self, db_session):
        """示例调度fixture"""
        schedule = WorkflowSchedule(
            workflow_id="test-workflow-1",
            tenant_id="test-tenant-1",
            name="测试调度",
            status=ScheduleStatus.ACTIVE,
            trigger_type=TriggerType.CRON,
            cron_expression="0 0 * * *",  # 每天凌晨
            timezone="UTC",
            last_run_at=datetime.utcnow() - timedelta(days=3),  # 3天前
            created_by="test-user-1"
        )
        db_session.add(schedule)
        db_session.commit()
        db_session.refresh(schedule)
        
        yield schedule
        
        db_session.delete(schedule)
        db_session.commit()
    
    def test_detect_misfires(self, db_session, sample_schedule):
        """测试检测错过执行"""
        handler = MisfireHandler(db_session)
        
        missed_times = handler.detect_misfires(sample_schedule)
        
        # 应该检测到3次错过执行(每天一次)
        assert len(missed_times) >= 2
    
    def test_handle_misfire_fire_once(self, db_session, sample_schedule):
        """测试错过执行处理 - 执行一次"""
        from models.schedule import MisfirePolicy
        
        sample_schedule.misfire_policy = MisfirePolicy.FIRE_ONCE
        db_session.commit()
        
        handler = MisfireHandler(db_session)
        missed_times = handler.detect_misfires(sample_schedule)
        
        if missed_times:
            compensations = handler.handle_misfire(sample_schedule, missed_times)
            
            # 应该只创建1个补偿执行
            assert len(compensations) == 1
            assert compensations[0].is_misfire == True
    
    def test_handle_misfire_skip(self, db_session, sample_schedule):
        """测试错过执行处理 - 跳过"""
        from models.schedule import MisfirePolicy
        
        sample_schedule.misfire_policy = MisfirePolicy.SKIP
        db_session.commit()
        
        handler = MisfireHandler(db_session)
        missed_times = handler.detect_misfires(sample_schedule)
        
        if missed_times:
            compensations = handler.handle_misfire(sample_schedule, missed_times)
            
            # 跳过策略不创建补偿执行
            assert len(compensations) == 0
            
            # 但应该创建跳过记录
            skipped = db_session.query(ScheduleExecution).filter(
                ScheduleExecution.schedule_id == sample_schedule.id,
                ScheduleExecution.status == "skipped"
            ).all()
            
            assert len(skipped) == len(missed_times)
 
 
# 集成测试
class TestSchedulerIntegration:
    """调度器集成测试"""
    
    @pytest.fixture
    def setup_environment(self):
        """设置测试环境"""
        from database import SessionLocal, engine, Base
        
        # 创建测试表
        Base.metadata.create_all(bind=engine)
        
        db = SessionLocal()
        
        yield db
        
        # 清理
        db.close()
        Base.metadata.drop_all(bind=engine)
    
    def test_end_to_end_schedule_execution(self, setup_environment):
        """端到端调度执行测试"""
        db = setup_environment
        
        # 创建调度
        schedule = WorkflowSchedule(
            workflow_id="test-workflow-1",
            tenant_id="test-tenant-1",
            name="集成测试调度",
            status=ScheduleStatus.ACTIVE,
            trigger_type=TriggerType.DATE,
            run_date=datetime.utcnow() + timedelta(seconds=5),  # 5秒后执行
            timezone="UTC",
            created_by="test-user-1"
        )
        db.add(schedule)
        db.commit()
        db.refresh(schedule)
        
        # 启动调度器
        from core.redis_client import RedisClient
        redis_client = RedisClient()
        scheduler = SchedulerService(db, redis_client)
        scheduler.start()
        
        try:
            # 添加调度
            scheduler.add_schedule(schedule)
            
            # 等待执行
            import time
            time.sleep(10)
            
            # 检查执行记录
            executions = db.query(ScheduleExecution).filter(
                ScheduleExecution.schedule_id == schedule.id
            ).all()
            
            assert len(executions) > 0
            
            # 验证执行状态
            latest_execution = executions[-1]
            assert latest_execution.status in ["success", "failed", "running"]
        
        finally:
            scheduler.stop()
            db.delete(schedule)
            db.commit()
 
 
# 性能测试
class TestSchedulerPerformance:
    """调度器性能测试"""
    
    def test_large_number_of_schedules(self):
        """测试大量调度"""
        from database import SessionLocal
        from core.redis_client import RedisClient
        
        db = SessionLocal()
        redis_client = RedisClient()
        scheduler = SchedulerService(db, redis_client)
        scheduler.start()
        
        try:
            # 创建1000个调度
            schedules = []
            for i in range(1000):
                schedule = WorkflowSchedule(
                    workflow_id=f"test-workflow-{i}",
                    tenant_id="test-tenant-1",
                    name=f"性能测试调度-{i}",
                    status=ScheduleStatus.ACTIVE,
                    trigger_type=TriggerType.CRON,
                    cron_expression="0 0 * * *",
                    timezone="UTC",
                    created_by="test-user-1"
                )
                db.add(schedule)
                schedules.append(schedule)
            
            db.commit()
            
            # 测试添加性能
            import time
            start_time = time.time()
            
            for schedule in schedules:
                scheduler.add_schedule(schedule)
            
            end_time = time.time()
            duration = end_time - start_time
            
            print(f"添加1000个调度耗时: {duration:.2f}秒")
            assert duration < 30  # 应该在30秒内完成
            
            # 清理
            for schedule in schedules:
                db.delete(schedule)
            db.commit()
        
        finally:
            scheduler.stop()
            db.close()
 
 
# 运行测试
if __name__ == "__main__":
    pytest.main([__file__, "-v"])

11. 性能优化



# services/scheduler_optimizer.py
from typing import List, Dict, Any
from datetime import datetime, timedelta
from sqlalchemy.orm import Session
from sqlalchemy import func
from models.schedule import WorkflowSchedule, ScheduleExecution
import logging
 
logger = logging.getLogger(__name__)
 
class SchedulerOptimizer:
    """
    调度器性能优化器
    
    提供各种性能优化建议和自动优化功能
    """
    
    def __init__(self, db: Session):
        self.db = db
    
    def analyze_schedule_distribution(self) -> Dict[str, Any]:
        """
        分析调度分布
        
        检测是否存在调度集中在某些时间点的问题
        """
        # 获取所有激活的Cron调度
        schedules = self.db.query(WorkflowSchedule).filter(
            WorkflowSchedule.status == "active",
            WorkflowSchedule.trigger_type == "cron"
        ).all()
        
        # 按小时统计
        hour_distribution = {}
        
        for schedule in schedules:
            if schedule.next_run_at:
                hour = schedule.next_run_at.hour
                hour_distribution[hour] = hour_distribution.get(hour, 0) + 1
        
        # 找出高峰时段
        max_count = max(hour_distribution.values()) if hour_distribution else 0
        peak_hours = [
            hour for hour, count in hour_distribution.items()
            if count >= max_count * 0.8  # 80%以上的最大值
        ]
        
        # 计算标准差
        import statistics
        if len(hour_distribution) > 1:
            std_dev = statistics.stdev(hour_distribution.values())
        else:
            std_dev = 0
        
        return {
            "total_schedules": len(schedules),
            "hour_distribution": hour_distribution,
            "peak_hours": peak_hours,
            "max_concurrent": max_count,
            "std_deviation": std_dev,
            "is_balanced": std_dev < max_count * 0.3,  # 标准差小于30%认为是平衡的
            "recommendations": self._generate_distribution_recommendations(
                hour_distribution,
                peak_hours,
                max_count
            )
        }
    
    def _generate_distribution_recommendations(
        self,
        distribution: Dict[int, int],
        peak_hours: List[int],
        max_count: int
    ) -> List[str]:
        """生成分布优化建议"""
        recommendations = []
        
        if len(peak_hours) > 0 and max_count > 10:
            recommendations.append(
                f"检测到高峰时段({', '.join(map(str, peak_hours))}点),"
                f"建议将部分调度分散到其他时段"
            )
        
        # 找出空闲时段
        all_hours = set(range(24))
        used_hours = set(distribution.keys())
        idle_hours = all_hours - used_hours
        
        if len(idle_hours) > 0:
            recommendations.append(
                f"以下时段未被使用,可以考虑将调度分散到这些时段:{sorted(idle_hours)}"
            )
        
        return recommendations
    
    def optimize_execution_time(
        self,
        schedule_id: str,
        target_duration: int
    ) -> Dict[str, Any]:
        """
        优化执行时间
        
        分析历史执行数据,提供优化建议
        """
        # 获取最近30天的执行记录
        end_time = datetime.utcnow()
        start_time = end_time - timedelta(days=30)
        
        executions = self.db.query(ScheduleExecution).filter(
            ScheduleExecution.schedule_id == schedule_id,
            ScheduleExecution.scheduled_time >= start_time,
            ScheduleExecution.duration.isnot(None)
        ).all()
        
        if not executions:
            return {
                "status": "insufficient_data",
                "message": "执行记录不足,无法提供优化建议"
            }
        
        # 统计执行时长
        durations = [e.duration for e in executions]
        avg_duration = sum(durations) / len(durations)
        max_duration = max(durations)
        min_duration = min(durations)
        
        # 计算百分位数
        import statistics
        median_duration = statistics.median(durations)
        p95_duration = statistics.quantiles(durations, n=20)[18]  # 95th percentile
        
        recommendations = []
        
        # 如果平均时长超过目标
        if avg_duration > target_duration:
            recommendations.append({
                "type": "duration_optimization",
                "severity": "high",
                "message": f"平均执行时长({avg_duration:.0f}秒)超过目标({target_duration}秒)",
                "suggestions": [
                    "检查工作流是否有性能瓶颈",
                    "考虑拆分为多个较小的工作流",
                    "优化数据库查询",
                    "增加并行处理"
                ]
            })
        
        # 如果执行时长波动大
        if max_duration > median_duration * 2:
            recommendations.append({
                "type": "duration_variance",
                "severity": "medium",
                "message": f"执行时长波动较大(最大{max_duration:.0f}秒,中位数{median_duration:.0f}秒)",
                "suggestions": [
                    "检查是否有数据量波动",
                    "考虑添加超时控制",
                    "监控资源使用情况"
                ]
            })
        
        return {
            "status": "analyzed",
            "statistics": {
                "average_duration": avg_duration,
                "median_duration": median_duration,
                "min_duration": min_duration,
                "max_duration": max_duration,
                "p95_duration": p95_duration,
                "sample_size": len(executions)
            },
            "recommendations": recommendations
        }
    
    def detect_resource_conflicts(self) -> List[Dict[str, Any]]:
        """
        检测资源冲突
        
        识别可能同时运行并竞争资源的调度
        """
        # 获取所有激活的调度
        schedules = self.db.query(WorkflowSchedule).filter(
            WorkflowSchedule.status == "active"
        ).all()
        
        conflicts = []
        
        # 按下次执行时间分组
        time_groups = {}
        for schedule in schedules:
            if schedule.next_run_at:
                # 按5分钟窗口分组
                window = schedule.next_run_at.replace(
                    minute=(schedule.next_run_at.minute // 5) * 5,
                    second=0,
                    microsecond=0
                )
                
                if window not in time_groups:
                    time_groups[window] = []
                
                time_groups[window].append(schedule)
        
        # 检测冲突
        for window, group_schedules in time_groups.items():
            if len(group_schedules) > 5:  # 超过5个调度在同一窗口
                conflicts.append({
                    "time_window": window.isoformat(),
                    "concurrent_schedules": len(group_schedules),
                    "schedules": [
                        {
                            "id": s.id,
                            "name": s.name,
                            "workflow_id": s.workflow_id
                        }
                        for s in group_schedules
                    ],
                    "severity": "high" if len(group_schedules) > 10 else "medium",
                    "recommendation": "考虑调整部分调度的执行时间,避免资源竞争"
                })
        
        return conflicts
    
    def suggest_schedule_consolidation(self) -> List[Dict[str, Any]]:
        """
        建议调度合并
        
        识别可以合并的相似调度
        """
        # 获取所有调度
        schedules = self.db.query(WorkflowSchedule).all()
        
        # 按工作流和Cron表达式分组
        groups = {}
        
        for schedule in schedules:
            if schedule.trigger_type.value == "cron":
                key = (schedule.workflow_id, schedule.cron_expression)
                
                if key not in groups:
                    groups[key] = []
                
                groups[key].append(schedule)
        
        # 找出可以合并的组
        suggestions = []
        
        for (workflow_id, cron_expr), group_schedules in groups.items():
            if len(group_schedules) > 1:
                suggestions.append({
                    "workflow_id": workflow_id,
                    "cron_expression": cron_expr,
                    "duplicate_count": len(group_schedules),
                    "schedules": [
                        {
                            "id": s.id,
                            "name": s.name,
                            "status": s.status.value
                        }
                        for s in group_schedules
                    ],
                    "recommendation": "这些调度使用相同的工作流和时间表达式,考虑合并为一个调度"
                })
        
        return suggestions
    
    def generate_optimization_report(self) -> Dict[str, Any]:
        """
        生成完整的优化报告
        """
        return {
            "generated_at": datetime.utcnow().isoformat(),
            "distribution_analysis": self.analyze_schedule_distribution(),
            "resource_conflicts": self.detect_resource_conflicts(),
            "consolidation_suggestions": self.suggest_schedule_consolidation(),
            "summary": {
                "total_issues": 0,  # 计算总问题数
                "high_priority": 0,
                "medium_priority": 0,
                "low_priority": 0
            }
        }
 
 
# API端点
from fastapi import APIRouter, Depends
from database import get_db
 
router = APIRouter(prefix="/api/schedules/optimizer", tags=["schedule-optimizer"])
 
@router.get("/distribution")
async def analyze_distribution(db: Session = Depends(get_db)):
    """分析调度分布"""
    optimizer = SchedulerOptimizer(db)
    return optimizer.analyze_schedule_distribution()
 
 
@router.get("/{schedule_id}/execution-time")
async def optimize_execution_time(
    schedule_id: str,
    target_duration: int = 300,
    db: Session = Depends(get_db)
):
    """优化执行时间"""
    optimizer = SchedulerOptimizer(db)
    return optimizer.optimize_execution_time(schedule_id, target_duration)
 
 
@router.get("/conflicts")
async def detect_conflicts(db: Session = Depends(get_db)):
    """检测资源冲突"""
    optimizer = SchedulerOptimizer(db)
    return optimizer.detect_resource_conflicts()
 
 
@router.get("/consolidation")
async def suggest_consolidation(db: Session = Depends(get_db)):
    """建议调度合并"""
    optimizer = SchedulerOptimizer(db)
    return optimizer.suggest_schedule_consolidation()
 
 
@router.get("/report")
async def get_optimization_report(db: Session = Depends(get_db)):
    """获取优化报告"""
    optimizer = SchedulerOptimizer(db)
    return optimizer.generate_optimization_report()

12. 监控和告警



# services/schedule_monitor.py
from typing import Dict, Any, List
from datetime import datetime, timedelta
from sqlalchemy.orm import Session
from models.schedule import WorkflowSchedule, ScheduleExecution, ExecutionStatus
from prometheus_client import Counter, Histogram, Gauge
import logging
 
logger = logging.getLogger(__name__)
 
# Prometheus指标
schedule_executions_total = Counter(
    'schedule_executions_total',
    'Total number of schedule executions',
    ['schedule_id', 'status']
)
 
schedule_execution_duration = Histogram(
    'schedule_execution_duration_seconds',
    'Schedule execution duration in seconds',
    ['schedule_id']
)
 
schedule_misfires_total = Counter(
    'schedule_misfires_total',
    'Total number of schedule misfires',
    ['schedule_id']
)
 
active_schedules = Gauge(
    'active_schedules',
    'Number of active schedules'
)
 
class ScheduleMonitor:
    """
    调度监控服务
    
    监控调度执行状态,发送告警
    """
    
    def __init__(self, db: Session):
        self.db = db
    
    def record_execution(self, execution: ScheduleExecution):
        """记录执行指标"""
        # 更新Prometheus指标
        schedule_executions_total.labels(
            schedule_id=execution.schedule_id,
            status=execution.status.value
        ).inc()
        
        if execution.duration:
            schedule_execution_duration.labels(
                schedule_id=execution.schedule_id
            ).observe(execution.duration)
        
        if execution.is_misfire:
            schedule_misfires_total.labels(
                schedule_id=execution.schedule_id
            ).inc()
    
    def update_active_schedules_count(self):
        """更新激活调度数量"""
        count = self.db.query(WorkflowSchedule).filter(
            WorkflowSchedule.status == "active"
        ).count()
        
        active_schedules.set(count)
    
    def check_schedule_health(
        self,
        schedule_id: str,
        threshold: Dict[str, float]
    ) -> Dict[str, Any]:
        """
        检查调度健康状态
        
        Args:
            schedule_id: 调度ID
            threshold: 阈值配置
                - success_rate_min: 最小成功率(默认0.9)
                - max_duration: 最大执行时长(秒,默认3600)
                - max_misfire_rate: 最大错过率(默认0.1)
        """
        # 获取最近24小时的执行记录
        end_time = datetime.utcnow()
        start_time = end_time - timedelta(hours=24)
        
        executions = self.db.query(ScheduleExecution).filter(
            ScheduleExecution.schedule_id == schedule_id,
            ScheduleExecution.scheduled_time >= start_time
        ).all()
        
        if not executions:
            return {
                "healthy": True,
                "message": "No recent executions",
                "alerts": []
            }
        
        # 计算指标
        total = len(executions)
        success = len([e for e in executions if e.status == ExecutionStatus.SUCCESS])
        failed = len([e for e in executions if e.status == ExecutionStatus.FAILED])
        misfire = len([e for e in executions if e.is_misfire])
        
        success_rate = success / total if total > 0 else 0
        misfire_rate = misfire / total if total > 0 else 0
        
        durations = [e.duration for e in executions if e.duration]
        avg_duration = sum(durations) / len(durations) if durations else 0
        max_duration_actual = max(durations) if durations else 0
        
        # 检查阈值
        alerts = []
        
        # 成功率检查
        success_rate_min = threshold.get('success_rate_min', 0.9)
        if success_rate < success_rate_min:
            alerts.append({
                "type": "low_success_rate",
                "severity": "high",
                "message": f"成功率过低: {success_rate:.2%} < {success_rate_min:.2%}",
                "value": success_rate,
                "threshold": success_rate_min
            })
        
        # 执行时长检查
        max_duration_threshold = threshold.get('max_duration', 3600)
        if max_duration_actual > max_duration_threshold:
            alerts.append({
                "type": "long_execution",
                "severity": "medium",
                "message": f"执行时长过长: {max_duration_actual}秒 > {max_duration_threshold}秒",
                "value": max_duration_actual,
                "threshold": max_duration_threshold
            })
        
        # 错过执行率检查
        max_misfire_rate = threshold.get('max_misfire_rate', 0.1)
        if misfire_rate > max_misfire_rate:
            alerts.append({
                "type": "high_misfire_rate",
                "severity": "medium",
                "message": f"错过执行率过高: {misfire_rate:.2%} > {max_misfire_rate:.2%}",
                "value": misfire_rate,
                "threshold": max_misfire_rate
            })
        
        return {
            "healthy": len(alerts) == 0,
            "metrics": {
                "total_executions": total,
                "success_rate": success_rate,
                "misfire_rate": misfire_rate,
                "average_duration": avg_duration,
                "max_duration": max_duration_actual
            },
            "alerts": alerts
        }
    
    def send_alert(self, alert: Dict[str, Any]):
        """
        发送告警
        
        集成告警系统(邮件、Slack、钉钉等)
        """
        # 这里是告警发送的示例实现
        logger.warning(f"Schedule Alert: {alert}")
        
        # 实际实现中,可以集成:
        # - 邮件通知
        # - Slack/钉钉/企业微信
        # - PagerDuty
        # - 自定义Webhook
        
        # 示例:发送邮件
        # from services.notification_service import NotificationService
        # notification_service = NotificationService()
        # notification_service.send_email(
        #     to=["admin@example.com"],
        #     subject=f"Schedule Alert: {alert['type']}",
        #     body=alert['message']
        # )
    
    def monitor_all_schedules(self) -> Dict[str, Any]:
        """
        监控所有调度
        
        定期执行的监控任务
        """
        schedules = self.db.query(WorkflowSchedule).filter(
            WorkflowSchedule.status == "active"
        ).all()
        
        total_alerts = []
        unhealthy_count = 0
        
        for schedule in schedules:
            health = self.check_schedule_health(
                schedule.id,
                threshold={
                    'success_rate_min': 0.9,
                    'max_duration': 3600,
                    'max_misfire_rate': 0.1
                }
            )
            
            if not health['healthy']:
                unhealthy_count += 1
                
                for alert in health['alerts']:
                    # 添加调度信息
                    alert['schedule_id'] = schedule.id
                    alert['schedule_name'] = schedule.name
                    
                    total_alerts.append(alert)
                    
                    # 发送告警
                    self.send_alert(alert)
        
        # 更新指标
        self.update_active_schedules_count()
        
        return {
            "monitored_at": datetime.utcnow().isoformat(),
            "total_schedules": len(schedules),
            "unhealthy_schedules": unhealthy_count,
            "total_alerts": len(total_alerts),
            "alerts": total_alerts
        }
 
 
# Celery定时任务
from celery import shared_task
 
@shared_task
def monitor_schedules_task():
    """
    定时监控任务
    
    建议每5分钟执行一次
    """
    from database import SessionLocal
    
    db = SessionLocal()
    try:
        monitor = ScheduleMonitor(db)
        result = monitor.monitor_all_schedules()
        
        logger.info(f"Schedule monitoring completed: {result}")
        
        return result
    finally:
        db.close()

13.1 Cron表达式的高级用法

扩展语法详解


# services/advanced_cron.py
from typing import List, Optional
from datetime import datetime, timedelta
import calendar
 
class AdvancedCronParser:
    """
    高级Cron表达式解析器
    
    支持扩展语法和复杂场景
    """
    
    @staticmethod
    def parse_last_day_of_month(year: int, month: int) -> int:
        """
        解析月末日期(L字符)
        
        示例:
        - "0 0 L * *" - 每月最后一天
        - "0 0 L-3 * *" - 每月倒数第3天
        """
        last_day = calendar.monthrange(year, month)[1]
        return last_day
    
    @staticmethod
    def parse_weekday_of_month(year: int, month: int, weekday: int, nth: int) -> Optional[int]:
        """
        解析每月第N个星期X(#字符)
        
        示例:
        - "0 0 ? * 1#1" - 每月第一个周一
        - "0 0 ? * 5#3" - 每月第三个周五
        
        Args:
            year: 年份
            month: 月份
            weekday: 星期(0=周日,1=周一,...,6=周六)
            nth: 第几个(1-5)
            
        Returns:
            日期,如果不存在则返回None
        """
        # 获取月份第一天是星期几
        first_day = datetime(year, month, 1)
        first_weekday = first_day.weekday()
        
        # 调整weekday(Cron中0=周日,Python中0=周一)
        if weekday == 0:
            target_weekday = 6  # 周日
        else:
            target_weekday = weekday - 1
        
        # 计算第一个目标星期几的日期
        days_until_target = (target_weekday - first_weekday) % 7
        first_occurrence = 1 + days_until_target
        
        # 计算第N个目标星期几
        target_day = first_occurrence + (nth - 1) * 7
        
        # 检查是否在当月范围内
        last_day = calendar.monthrange(year, month)[1]
        if target_day <= last_day:
            return target_day
        else:
            return None
    
    @staticmethod
    def parse_nearest_weekday(year: int, month: int, day: int) -> int:
        """
        解析最近的工作日(W字符)
        
        示例:
        - "0 0 15W * *" - 每月15号最近的工作日
        
        如果15号是周六,则调整到14号(周五)
        如果15号是周日,则调整到16号(周一)
        """
        target_date = datetime(year, month, day)
        weekday = target_date.weekday()
        
        # 0-4是周一到周五(工作日)
        if weekday < 5:
            return day
        
        # 5是周六
        if weekday == 5:
            # 往前调整到周五
            adjusted_day = day - 1
            if adjusted_day < 1:
                # 如果是1号,则往后调整
                return day + 2
            return adjusted_day
        
        # 6是周日
        if weekday == 6:
            # 往后调整到周一
            last_day = calendar.monthrange(year, month)[1]
            adjusted_day = day + 1
            if adjusted_day > last_day:
                # 如果超出月末,则往前调整
                return day - 2
            return adjusted_day
    
    @staticmethod
    def parse_last_weekday_of_month(year: int, month: int, weekday: int) -> int:
        """
        解析每月最后一个星期X(L字符配合星期)
        
        示例:
        - "0 0 ? * 5L" - 每月最后一个周五
        """
        last_day = calendar.monthrange(year, month)[1]
        last_date = datetime(year, month, last_day)
        last_weekday = last_date.weekday()
        
        # 调整weekday
        if weekday == 0:
            target_weekday = 6
        else:
            target_weekday = weekday - 1
        
        # 计算最后一个目标星期几
        days_back = (last_weekday - target_weekday) % 7
        target_day = last_day - days_back
        
        return target_day
    
    @staticmethod
    def generate_business_days_cron(
        hour: int = 9,
        minute: int = 0,
        exclude_holidays: bool = True
    ) -> str:
        """
        生成工作日Cron表达式
        
        Args:
            hour: 小时
            minute: 分钟
            exclude_holidays: 是否排除节假日
            
        Returns:
            Cron表达式
        """
        # 基础工作日表达式
        cron = f"0 {minute} {hour} ? * MON-FRI"
        
        # 如果需要排除节假日,需要在执行时动态判断
        # 这里返回基础表达式,实际判断在执行器中处理
        return cron
    
    @staticmethod
    def generate_quarterly_cron(
        day: int = 1,
        hour: int = 0,
        minute: int = 0
    ) -> str:
        """
        生成季度Cron表达式
        
        Args:
            day: 每季度第几天
            hour: 小时
            minute: 分钟
            
        Returns:
            Cron表达式
        """
        # 每季度第一天:1月、4月、7月、10月的第1天
        return f"0 {minute} {hour} {day} 1,4,7,10 ?"
    
    @staticmethod
    def generate_yearly_cron(
        month: int = 1,
        day: int = 1,
        hour: int = 0,
        minute: int = 0
    ) -> str:
        """
        生成年度Cron表达式
        
        Args:
            month: 月份
            day: 日期
            hour: 小时
            minute: 分钟
            
        Returns:
            Cron表达式
        """
        return f"0 {minute} {hour} {day} {month} ?"
 
 
# 使用示例
if __name__ == "__main__":
    parser = AdvancedCronParser()
    
    print("=== 高级Cron表达式示例 ===
")
    
    # 月末
    print("1. 每月最后一天:")
    last_day = parser.parse_last_day_of_month(2024, 2)
    print(f"   2024年2月最后一天: {last_day}日")
    
    # 第N个星期X
    print("
2. 每月第三个周五:")
    third_friday = parser.parse_weekday_of_month(2024, 1, 5, 3)
    print(f"   2024年1月第三个周五: {third_friday}日")
    
    # 最近的工作日
    print("
3. 15号最近的工作日:")
    nearest = parser.parse_nearest_weekday(2024, 6, 15)
    print(f"   2024年6月15号最近的工作日: {nearest}日")
    
    # 最后一个星期X
    print("
4. 每月最后一个周五:")
    last_friday = parser.parse_last_weekday_of_month(2024, 1, 5)
    print(f"   2024年1月最后一个周五: {last_friday}日")
    
    # 预定义表达式
    print("
5. 预定义Cron表达式:")
    print(f"   工作日早上9点: {parser.generate_business_days_cron(9, 0)}")
    print(f"   每季度第一天: {parser.generate_quarterly_cron()}")
    print(f"   每年1月1日: {parser.generate_yearly_cron()}")

13.2 时区处理的深入探讨

夏令时自动处理


# services/dst_handler.py
from datetime import datetime, timedelta
import pytz
from typing import List, Tuple, Optional
 
class DSTHandler:
    """
    夏令时处理器
    
    处理夏令时切换带来的各种问题
    """
    
    @staticmethod
    def get_dst_transitions(timezone_name: str, year: int) -> List[Tuple[datetime, str]]:
        """
        获取指定年份的夏令时转换点
        
        Returns:
            List[Tuple[datetime, str]]: [(转换时间, 转换类型)]
            转换类型: "spring_forward" (春季向前) 或 "fall_back" (秋季向后)
        """
        tz = pytz.timezone(timezone_name)
        transitions = []
        
        # 遍历一年中的每一天
        start_date = datetime(year, 1, 1)
        end_date = datetime(year, 12, 31)
        
        current_date = start_date
        prev_offset = None
        
        while current_date <= end_date:
            localized = tz.localize(current_date)
            current_offset = localized.utcoffset()
            
            if prev_offset is not None and current_offset != prev_offset:
                # 检测到偏移变化
                if current_offset > prev_offset:
                    # 偏移增加 = 春季向前(跳过1小时)
                    transition_type = "spring_forward"
                else:
                    # 偏移减少 = 秋季向后(重复1小时)
                    transition_type = "fall_back"
                
                transitions.append((current_date, transition_type))
            
            prev_offset = current_offset
            current_date += timedelta(days=1)
        
        return transitions
    
    @staticmethod
    def handle_ambiguous_time(
        dt: datetime,
        timezone_name: str,
        is_dst: Optional[bool] = None
    ) -> datetime:
        """
        处理模糊时间(秋季向后时重复的1小时)
        
        Args:
            dt: 本地时间
            timezone_name: 时区名称
            is_dst: 是否是夏令时(None表示自动判断)
            
        Returns:
            本地化后的时间
        """
        tz = pytz.timezone(timezone_name)
        
        try:
            # 尝试本地化
            if is_dst is None:
                # 自动判断(默认使用非夏令时)
                localized = tz.localize(dt, is_dst=False)
            else:
                localized = tz.localize(dt, is_dst=is_dst)
            
            return localized
        
        except pytz.exceptions.AmbiguousTimeError:
            # 时间模糊,使用指定的is_dst
            if is_dst is None:
                is_dst = False  # 默认使用标准时间
            
            localized = tz.localize(dt, is_dst=is_dst)
            return localized
        
        except pytz.exceptions.NonExistentTimeError:
            # 时间不存在(春季向前跳过的1小时)
            # 向后调整1小时
            adjusted_dt = dt + timedelta(hours=1)
            localized = tz.localize(adjusted_dt)
            return localized
    
    @staticmethod
    def adjust_schedule_for_dst(
        schedule_time: datetime,
        timezone_name: str
    ) -> datetime:
        """
        为夏令时调整调度时间
        
        确保调度时间在DST转换时正确处理
        """
        tz = pytz.timezone(timezone_name)
        
        # 检查是否在DST转换日
        transitions = DSTHandler.get_dst_transitions(timezone_name, schedule_time.year)
        
        for transition_date, transition_type in transitions:
            # 检查调度时间是否在转换窗口内
            if transition_date.date() == schedule_time.date():
                if transition_type == "spring_forward":
                    # 春季向前:2:00-3:00不存在
                    # 如果调度在这个时间段,向后调整
                    if 2 <= schedule_time.hour < 3:
                        schedule_time = schedule_time.replace(hour=3)
                
                elif transition_type == "fall_back":
                    # 秋季向后:1:00-2:00重复
                    # 使用第一次出现(夏令时)
                    if 1 <= schedule_time.hour < 2:
                        schedule_time = DSTHandler.handle_ambiguous_time(
                            schedule_time,
                            timezone_name,
                            is_dst=True
                        )
        
        return schedule_time
    
    @staticmethod
    def get_safe_schedule_times(timezone_name: str) -> List[int]:
        """
        获取安全的调度时间(避开DST转换)
        
        Returns:
            安全的小时列表
        """
        # 大多数地区DST转换发生在凌晨2-3点
        # 避开这个时间段
        unsafe_hours = [2, 3]
        
        safe_hours = [h for h in range(24) if h not in unsafe_hours]
        
        return safe_hours
    
    @staticmethod
    def calculate_next_run_with_dst(
        cron_expression: str,
        base_time: datetime,
        timezone_name: str
    ) -> datetime:
        """
        计算考虑DST的下次执行时间
        """
        from services.cron_parser import CronParser
        
        # 计算基础的下次执行时间
        next_time = CronParser.get_next_run_time(
            cron_expression,
            base_time,
            timezone_name
        )
        
        # 调整DST
        adjusted_time = DSTHandler.adjust_schedule_for_dst(
            next_time,
            timezone_name
        )
        
        return adjusted_time
 
 
# 使用示例
if __name__ == "__main__":
    handler = DSTHandler()
    
    print("=== 夏令时处理示例 ===
")
    
    # 1. 获取DST转换点
    print("1. 2024年美国东部时区DST转换:")
    transitions = handler.get_dst_transitions("America/New_York", 2024)
    for transition_date, transition_type in transitions:
        print(f"   {transition_date.date()}: {transition_type}")
    
    # 2. 处理模糊时间
    print("
2. 处理秋季向后的模糊时间:")
    # 2024年11月3日凌晨1:30(重复的时间)
    ambiguous_time = datetime(2024, 11, 3, 1, 30, 0)
    
    # 使用夏令时
    dst_time = handler.handle_ambiguous_time(
        ambiguous_time,
        "America/New_York",
        is_dst=True
    )
    print(f"   夏令时: {dst_time}")
    
    # 使用标准时间
    std_time = handler.handle_ambiguous_time(
        ambiguous_time,
        "America/New_York",
        is_dst=False
    )
    print(f"   标准时间: {std_time}")
    
    # 3. 安全的调度时间
    print("
3. 安全的调度时间(避开DST转换):")
    safe_hours = handler.get_safe_schedule_times("America/New_York")
    print(f"   推荐时间: {safe_hours}")

13.3 分布式调度的一致性保证

分布式锁的高级实现


# services/distributed_lock.py
from typing import Optional, Callable
import redis
import time
import uuid
import logging
from contextlib import contextmanager
 
logger = logging.getLogger(__name__)
 
class DistributedLock:
    """
    分布式锁(基于Redis)
    
    特性:
    - 自动续期
    - 死锁检测
    - 可重入
    - 公平锁(可选)
    """
    
    def __init__(
        self,
        redis_client: redis.Redis,
        lock_name: str,
        timeout: int = 300,
        auto_renewal: bool = True,
        blocking: bool = True,
        blocking_timeout: Optional[int] = None
    ):
        """
        Args:
            redis_client: Redis客户端
            lock_name: 锁名称
            timeout: 锁超时时间(秒)
            auto_renewal: 是否自动续期
            blocking: 是否阻塞等待
            blocking_timeout: 阻塞超时时间(秒)
        """
        self.redis = redis_client
        self.lock_name = f"lock:{lock_name}"
        self.timeout = timeout
        self.auto_renewal = auto_renewal
        self.blocking = blocking
        self.blocking_timeout = blocking_timeout
        
        # 锁标识符(用于验证锁的所有权)
        self.identifier = str(uuid.uuid4())
        
        # 续期线程
        self._renewal_thread = None
        self._stop_renewal = False
    
    def acquire(self) -> bool:
        """
        获取锁
        
        Returns:
            是否成功获取
        """
        end_time = None
        if self.blocking and self.blocking_timeout:
            end_time = time.time() + self.blocking_timeout
        
        while True:
            # 尝试设置锁
            acquired = self.redis.set(
                self.lock_name,
                self.identifier,
                nx=True,  # 只在键不存在时设置
                ex=self.timeout  # 设置过期时间
            )
            
            if acquired:
                logger.debug(f"Lock acquired: {self.lock_name}")
                
                # 启动自动续期
                if self.auto_renewal:
                    self._start_auto_renewal()
                
                return True
            
            # 如果不阻塞,直接返回
            if not self.blocking:
                return False
            
            # 检查是否超时
            if end_time and time.time() > end_time:
                return False
            
            # 等待一小段时间后重试
            time.sleep(0.1)
    
    def release(self) -> bool:
        """
        释放锁
        
        Returns:
            是否成功释放
        """
        # 停止自动续期
        if self.auto_renewal:
            self._stop_auto_renewal()
        
        # 使用Lua脚本确保原子性
        # 只有锁的所有者才能释放
        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        
        released = self.redis.eval(
            lua_script,
            1,
            self.lock_name,
            self.identifier
        )
        
        if released:
            logger.debug(f"Lock released: {self.lock_name}")
            return True
        else:
            logger.warning(f"Failed to release lock (not owner): {self.lock_name}")
            return False
    
    def extend(self, additional_time: int) -> bool:
        """
        延长锁的过期时间
        
        Args:
            additional_time: 额外的时间(秒)
            
        Returns:
            是否成功延长
        """
        # 使用Lua脚本确保原子性
        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("expire", KEYS[1], ARGV[2])
        else
            return 0
        end
        """
        
        extended = self.redis.eval(
            lua_script,
            1,
            self.lock_name,
            self.identifier,
            additional_time
        )
        
        return bool(extended)
    
    def _start_auto_renewal(self):
        """启动自动续期线程"""
        import threading
        
        self._stop_renewal = False
        
        def renewal_worker():
            while not self._stop_renewal:
                # 每隔timeout的一半时间续期
                time.sleep(self.timeout / 2)
                
                if not self._stop_renewal:
                    # 续期到原始timeout
                    self.extend(self.timeout)
        
        self._renewal_thread = threading.Thread(
            target=renewal_worker,
            daemon=True
        )
        self._renewal_thread.start()
    
    def _stop_auto_renewal(self):
        """停止自动续期"""
        self._stop_renewal = True
        
        if self._renewal_thread:
            self._renewal_thread.join(timeout=1)
    
    @contextmanager
    def __call__(self):
        """上下文管理器"""
        acquired = self.acquire()
        
        if not acquired:
            raise RuntimeError(f"Failed to acquire lock: {self.lock_name}")
        
        try:
            yield self
        finally:
            self.release()
 
 
class FairLock(DistributedLock):
    """
    公平锁
    
    保证按照请求顺序获取锁
    """
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.queue_name = f"queue:{self.lock_name}"
    
    def acquire(self) -> bool:
        """获取公平锁"""
        # 加入等待队列
        queue_position = self.redis.rpush(self.queue_name, self.identifier)
        
        end_time = None
        if self.blocking and self.blocking_timeout:
            end_time = time.time() + self.blocking_timeout
        
        while True:
            # 检查是否轮到自己
            first_in_queue = self.redis.lindex(self.queue_name, 0)
            
            if first_in_queue == self.identifier.encode():
                # 轮到自己,尝试获取锁
                acquired = super().acquire()
                
                if acquired:
                    # 从队列中移除
                    self.redis.lpop(self.queue_name)
                    return True
            
            # 检查超时
            if end_time and time.time() > end_time:
                # 从队列中移除
                self.redis.lrem(self.queue_name, 1, self.identifier)
                return False
            
            time.sleep(0.1)
 
 
# 使用示例
if __name__ == "__main__":
    import redis
    
    # 连接Redis
    r = redis.Redis(host='localhost', port=6379, db=0)
    
    print("=== 分布式锁示例 ===
")
    
    # 1. 基本用法
    print("1. 基本用法:")
    lock = DistributedLock(r, "test_lock", timeout=10)
    
    if lock.acquire():
        print("   锁已获取")
        time.sleep(2)
        lock.release()
        print("   锁已释放")
    
    # 2. 上下文管理器
    print("
2. 上下文管理器:")
    with DistributedLock(r, "test_lock_2", timeout=10)():
        print("   在锁保护的代码块中")
        time.sleep(1)
    print("   锁自动释放")
    
    # 3. 自动续期
    print("
3. 自动续期:")
    lock = DistributedLock(
        r,
        "test_lock_3",
        timeout=5,
        auto_renewal=True
    )
    
    if lock.acquire():
        print("   锁已获取(5秒超时,自动续期)")
        time.sleep(10)  # 睡眠10秒,超过超时时间
        print("   10秒后仍持有锁(自动续期生效)")
        lock.release()
    
    # 4. 公平锁
    print("
4. 公平锁:")
    
    def worker(worker_id: int):
        fair_lock = FairLock(
            r,
            "fair_lock",
            timeout=5,
            blocking=True,
            blocking_timeout=30
        )
        
        print(f"   Worker {worker_id} 请求锁")
        
        if fair_lock.acquire():
            print(f"   Worker {worker_id} 获得锁")
            time.sleep(1)
            fair_lock.release()
            print(f"   Worker {worker_id} 释放锁")
    
    import threading
    
    threads = []
    for i in range(5):
        t = threading.Thread(target=worker, args=(i,))
        threads.append(t)
        t.start()
    
    for t in threads:
        t.join()

13.4 调度性能优化策略

批量调度优化


# services/batch_scheduler.py
from typing import List, Dict, Any
from datetime import datetime, timedelta
from sqlalchemy.orm import Session
from models.schedule import WorkflowSchedule
import logging
 
logger = logging.getLogger(__name__)
 
class BatchScheduler:
    """
    批量调度优化器
    
    将多个调度任务合并执行,减少系统开销
    """
    
    def __init__(self, db: Session):
        self.db = db
        self.batch_window = timedelta(minutes=5)  # 批处理窗口
    
    def group_schedules_by_time_window(
        self,
        schedules: List[WorkflowSchedule]
    ) -> Dict[datetime, List[WorkflowSchedule]]:
        """
        按时间窗口分组调度
        
        将接近的执行时间合并到同一批次
        """
        groups = {}
        
        for schedule in schedules:
            if not schedule.next_run_at:
                continue
            
            # 计算时间窗口
            window_start = schedule.next_run_at.replace(
                minute=(schedule.next_run_at.minute // 5) * 5,
                second=0,
                microsecond=0
            )
            
            if window_start not in groups:
                groups[window_start] = []
            
            groups[window_start].append(schedule)
        
        return groups
    
    def optimize_batch_execution(
        self,
        schedules: List[WorkflowSchedule]
    ) -> List[Dict[str, Any]]:
        """
        优化批量执行
        
        Returns:
            优化后的执行计划
        """
        # 按时间窗口分组
        groups = self.group_schedules_by_time_window(schedules)
        
        execution_plan = []
        
        for window_start, group_schedules in sorted(groups.items()):
            # 分析组内调度
            workflow_groups = {}
            
            for schedule in group_schedules:
                workflow_id = schedule.workflow_id
                
                if workflow_id not in workflow_groups:
                    workflow_groups[workflow_id] = []
                
                workflow_groups[workflow_id].append(schedule)
            
            # 为每个工作流创建批量执行计划
            for workflow_id, wf_schedules in workflow_groups.items():
                if len(wf_schedules) > 1:
                    # 多个调度使用同一工作流,可以合并
                    execution_plan.append({
                        "type": "batch",
                        "window_start": window_start,
                        "workflow_id": workflow_id,
                        "schedules": [s.id for s in wf_schedules],
                        "count": len(wf_schedules),
                        "optimization": "merged_execution"
                    })
                else:
                    # 单个调度,正常执行
                    execution_plan.append({
                        "type": "single",
                        "window_start": window_start,
                        "workflow_id": workflow_id,
                        "schedule_id": wf_schedules[0].id,
                        "optimization": "none"
                    })
        
        return execution_plan
    
    def calculate_resource_requirements(
        self,
        execution_plan: List[Dict[str, Any]]
    ) -> Dict[str, Any]:
        """
        计算资源需求
        
        估算批量执行所需的资源
        """
        total_tasks = 0
        peak_concurrent = 0
        
        # 按时间窗口统计
        window_tasks = {}
        
        for plan in execution_plan:
            window = plan["window_start"]
            
            if window not in window_tasks:
                window_tasks[window] = 0
            
            if plan["type"] == "batch":
                window_tasks[window] += plan["count"]
                total_tasks += plan["count"]
            else:
                window_tasks[window] += 1
                total_tasks += 1
        
        # 找出峰值并发
        if window_tasks:
            peak_concurrent = max(window_tasks.values())
        
        return {
            "total_tasks": total_tasks,
            "peak_concurrent": peak_concurrent,
            "time_windows": len(window_tasks),
            "recommended_workers": max(4, peak_concurrent // 2),
            "estimated_duration": len(window_tasks) * 5  # 每个窗口5分钟
        }
 
 
# 使用示例
if __name__ == "__main__":
    from database import SessionLocal
    
    db = SessionLocal()
    
    # 创建批量调度器
    batch_scheduler = BatchScheduler(db)
    
    # 获取所有激活的调度
    schedules = db.query(WorkflowSchedule).filter(
        WorkflowSchedule.status == "active"
    ).all()
    
    print(f"总调度数: {len(schedules)}")
    
    # 生成执行计划
    execution_plan = batch_scheduler.optimize_batch_execution(schedules)
    
    print(f"
执行计划:")
    for plan in execution_plan[:10]:  # 只显示前10个
        print(f"  {plan}")
    
    # 计算资源需求
    resources = batch_scheduler.calculate_resource_requirements(execution_plan)
    
    print(f"
资源需求:")
    print(f"  总任务数: {resources['total_tasks']}")
    print(f"  峰值并发: {resources['peak_concurrent']}")
    print(f"  推荐Worker数: {resources['recommended_workers']}")
    
    db.close()

📚 最佳实践

14.1 Cron表达式设计原则



# 最佳实践示例
 
class CronBestPractices:
    """Cron表达式最佳实践"""
    
    @staticmethod
    def get_recommendations() -> Dict[str, List[str]]:
        return {
            "可读性": [
                "使用有意义的注释说明Cron表达式的含义",
                "复杂表达式拆分为多个简单表达式",
                "使用命名常量代替魔法数字",
                "示例: DAILY_2AM = '0 0 2 * * ?' # 每天凌晨2点"
            ],
            
            "性能": [
                "避免在高峰时段(如整点)集中调度",
                "使用随机偏移分散负载: '0 5-10 2 * * ?' 而非 '0 0 2 * * ?'",
                "长时间运行的任务避免频繁调度",
                "考虑使用Interval触发器代替高频Cron"
            ],
            
            "可靠性": [
                "设置合理的misfire_grace_time",
                "启用重试机制处理临时失败",
                "避免使用过于复杂的表达式",
                "定期验证表达式的正确性"
            ],
            
            "时区": [
                "始终明确指定时区",
                "避免在DST转换时间(2-3点)调度",
                "跨时区系统使用UTC作为基准",
                "记录用户时区用于显示"
            ],
            
            "维护性": [
                "文档化调度的业务目的",
                "版本控制调度配置",
                "定期审查和清理无用调度",
                "监控调度执行情况"
            ]
        }
 
 
# 推荐的Cron表达式模板
CRON_TEMPLATES = {
    "每日任务": {
        "凌晨执行": "0 0 2 * * ?",  # 凌晨2点(避开高峰)
        "工作日执行": "0 0 9 ? * MON-FRI",  # 工作日早上9点
        "周末执行": "0 0 10 ? * SAT,SUN",  # 周末早上10点
    },
    
    "每周任务": {
        "周一执行": "0 0 8 ? * MON",  # 周一早上8点
        "周五执行": "0 0 17 ? * FRI",  # 周五下午5点
    },
    
    "每月任务": {
        "月初执行": "0 0 1 1 * ?",  # 每月1号凌晨1点
        "月末执行": "0 0 23 L * ?",  # 每月最后一天晚上11点
        "月中执行": "0 0 12 15 * ?",  # 每月15号中午12点
    },
    
    "高频任务": {
        "每5分钟": "0 */5 * * * ?",
        "每15分钟": "0 */15 * * * ?",
        "每小时": "0 0 * * * ?",
    },
    
    "特殊任务": {
        "季度首日": "0 0 0 1 1,4,7,10 ?",  # 每季度第一天
        "年度首日": "0 0 0 1 1 ?",  # 每年1月1日
    }
}

14.2 调度系统设计模式



# design_patterns/scheduler_patterns.py
 
class SchedulerDesignPatterns:
    """调度系统设计模式"""
    
    @staticmethod
    def chain_of_responsibility_pattern():
        """
        责任链模式
        
        用于调度前的多层验证和处理
        """
        return """
        class ScheduleHandler:
            def __init__(self):
                self.next_handler = None
            
            def set_next(self, handler):
                self.next_handler = handler
                return handler
            
            def handle(self, schedule):
                # 处理当前层
                if self.can_handle(schedule):
                    self.process(schedule)
                
                # 传递给下一层
                if self.next_handler:
                    self.next_handler.handle(schedule)
        
        class ValidationHandler(ScheduleHandler):
            def can_handle(self, schedule):
                return True
            
            def process(self, schedule):
                # 验证调度配置
                validate_cron_expression(schedule.cron_expression)
                validate_timezone(schedule.timezone)
        
        class AuthorizationHandler(ScheduleHandler):
            def can_handle(self, schedule):
                return True
            
            def process(self, schedule):
                # 检查权限
                check_user_permission(schedule.created_by, schedule.workflow_id)
        
        class ResourceCheckHandler(ScheduleHandler):
            def can_handle(self, schedule):
                return True
            
            def process(self, schedule):
                # 检查资源
                check_resource_availability()
        
        # 使用
        validation = ValidationHandler()
        authorization = AuthorizationHandler()
        resource_check = ResourceCheckHandler()
        
        validation.set_next(authorization).set_next(resource_check)
        
        validation.handle(schedule)
        """
    
    @staticmethod
    def strategy_pattern():
        """
        策略模式
        
        用于不同的misfire处理策略
        """
        return """
        class MisfireStrategy:
            def handle(self, schedule, missed_times):
                raise NotImplementedError
        
        class FireOnceStrategy(MisfireStrategy):
            def handle(self, schedule, missed_times):
                # 只执行一次(最新的)
                return [max(missed_times)]
        
        class FireAllStrategy(MisfireStrategy):
            def handle(self, schedule, missed_times):
                # 执行所有错过的
                return missed_times
        
        class SkipStrategy(MisfireStrategy):
            def handle(self, schedule, missed_times):
                # 跳过所有
                return []
        
        # 使用
        strategies = {
            'fire_once': FireOnceStrategy(),
            'fire_all': FireAllStrategy(),
            'skip': SkipStrategy()
        }
        
        strategy = strategies[schedule.misfire_policy]
        compensations = strategy.handle(schedule, missed_times)
        """
    
    @staticmethod
    def observer_pattern():
        """
        观察者模式
        
        用于调度事件通知
        """
        return """
        class ScheduleEventObserver:
            def update(self, event):
                raise NotImplementedError
        
        class LoggingObserver(ScheduleEventObserver):
            def update(self, event):
                logger.info(f"Schedule event: {event}")
        
        class MetricsObserver(ScheduleEventObserver):
            def update(self, event):
                # 更新Prometheus指标
                update_metrics(event)
        
        class NotificationObserver(ScheduleEventObserver):
            def update(self, event):
                if event.type == 'failure':
                    send_alert(event)
        
        class ScheduleSubject:
            def __init__(self):
                self.observers = []
            
            def attach(self, observer):
                self.observers.append(observer)
            
            def notify(self, event):
                for observer in self.observers:
                    observer.update(event)
        
        # 使用
        scheduler = ScheduleSubject()
        scheduler.attach(LoggingObserver())
        scheduler.attach(MetricsObserver())
        scheduler.attach(NotificationObserver())
        
        scheduler.notify(ScheduleEvent('execution_complete', schedule))
        """

14.3 常见问题和解决方案



# troubleshooting/common_issues.py
 
class SchedulerTroubleshooting:
    """调度器常见问题排查"""
    
    @staticmethod
    def get_common_issues():
        return {
            "问题1: 调度未按预期执行": {
                "症状": [
                    "调度状态为active但未执行",
                    "next_run_at时间已过但未触发"
                ],
                "可能原因": [
                    "调度器未启动",
                    "Cron表达式错误",
                    "时区配置错误",
                    "数据库连接问题",
                    "分布式锁被占用"
                ],
                "排查步骤": [
                    "1. 检查调度器状态: scheduler.running",
                    "2. 验证Cron表达式: CronParser.validate(expression)",
                    "3. 检查时区转换: 确认UTC和本地时间",
                    "4. 查看调度器日志",
                    "5. 检查Redis锁状态"
                ],
                "解决方案": [
                    "重启调度器",
                    "修正Cron表达式",
                    "更新时区配置",
                    "清理过期的分布式锁"
                ]
            },
            
            "问题2: 重复执行": {
                "症状": [
                    "同一调度被执行多次",
                    "执行历史中有重复记录"
                ],
                "可能原因": [
                    "多个调度器实例未使用分布式锁",
                    "分布式锁失效",
                    "调度配置重复",
                    "misfire处理策略错误"
                ],
                "排查步骤": [
                    "1. 检查运行的调度器实例数",
                    "2. 验证分布式锁配置",
                    "3. 查询重复的调度配置",
                    "4. 检查misfire_policy设置"
                ],
                "解决方案": [
                    "确保所有实例使用分布式锁",
                    "增加锁的超时时间",
                    "删除重复的调度配置",
                    "调整misfire策略为fire_once"
                ]
            },
            
            "问题3: 时区混淆": {
                "症状": [
                    "执行时间与预期不符",
                    "夏令时切换后时间错误"
                ],
                "可能原因": [
                    "未指定时区",
                    "时区配置错误",
                    "DST处理不当",
                    "数据库时区设置问题"
                ],
                "排查步骤": [
                    "1. 检查schedule.timezone配置",
                    "2. 验证pytz时区数据",
                    "3. 检查数据库时区设置",
                    "4. 查看DST转换日志"
                ],
                "解决方案": [
                    "明确设置时区",
                    "使用IANA时区标识符",
                    "避开DST转换时间",
                    "统一使用UTC存储"
                ]
            },
            
            "问题4: 性能问题": {
                "症状": [
                    "调度延迟",
                    "系统资源占用高",
                    "数据库查询慢"
                ],
                "可能原因": [
                    "调度过多",
                    "执行时间集中",
                    "数据库索引缺失",
                    "Worker数量不足"
                ],
                "排查步骤": [
                    "1. 统计active调度数量",
                    "2. 分析调度时间分布",
                    "3. 检查数据库慢查询",
                    "4. 监控Worker负载"
                ],
                "解决方案": [
                    "分散调度时间",
                    "增加数据库索引",
                    "扩展Worker数量",
                    "使用批量调度优化"
                ]
            },
            
            "问题5: 错过执行未补偿": {
                "症状": [
                    "系统重启后未执行错过的调度",
                    "misfire记录存在但未补偿"
                ],
                "可能原因": [
                    "misfire_grace_time过短",
                    "misfire_policy设置为skip",
                    "补偿任务未启动"
                ],
                "排查步骤": [
                    "1. 检查misfire_grace_time设置",
                    "2. 验证misfire_policy配置",
                    "3. 查看misfire扫描日志",
                    "4. 检查补偿任务执行记录"
                ],
                "解决方案": [
                    "增加misfire_grace_time",
                    "修改misfire_policy为fire_once",
                    "手动触发补偿扫描",
                    "启动定时补偿任务"
                ]
            }
        }
    
    @staticmethod
    def generate_health_check_script():
        """生成健康检查脚本"""
        return """
#!/usr/bin/env python3
# schedule_health_check.py

import sys
from database import SessionLocal
from models.schedule import WorkflowSchedule, ScheduleExecution
from datetime import datetime, timedelta

def check_scheduler_health():
    db = SessionLocal()
    
    issues = []
    
    # 1. 检查激活的调度
    active_count = db.query(WorkflowSchedule).filter(
        WorkflowSchedule.status == 'active'
    ).count()
    
    print(f"✓ Active schedules: {active_count}")
    
    # 2. 检查过期的next_run_at
    now = datetime.utcnow()
    overdue = db.query(WorkflowSchedule).filter(
        WorkflowSchedule.status == 'active',
        WorkflowSchedule.next_run_at < now - timedelta(hours=1)
    ).all()
    
    if overdue:
        issues.append(f"✗ {len(overdue)} schedules have overdue next_run_at")
        for s in overdue:
            print(f"  - {s.name}: {s.next_run_at}")
    else:
        print("✓ No overdue schedules")
    
    # 3. 检查最近的失败
    recent_failures = db.query(ScheduleExecution).filter(
        ScheduleExecution.status == 'failed',
        ScheduleExecution.scheduled_time > now - timedelta(hours=24)
    ).count()
    
    if recent_failures > 10:
        issues.append(f"✗ High failure rate: {recent_failures} failures in 24h")
    else:
        print(f"✓ Recent failures: {recent_failures}")
    
    # 4. 检查错过执行
    recent_misfires = db.query(ScheduleExecution).filter(
        ScheduleExecution.is_misfire == True,
        ScheduleExecution.scheduled_time > now - timedelta(hours=24)
    ).count()
    
    if recent_misfires > 5:
        issues.append(f"✗ High misfire rate: {recent_misfires} misfires in 24h")
    else:
        print(f"✓ Recent misfires: {recent_misfires}")
    
    db.close()
    
    # 总结
    if issues:
        print("
❌ Health check FAILED:")
        for issue in issues:
            print(f"  {issue}")
        sys.exit(1)
    else:
        print("
✅ Health check PASSED")
        sys.exit(0)

if __name__ == "__main__":
    check_scheduler_health()
        """

📖 参考资料和小结

15.1 参考资料



REFERENCES = {
    "官方文档": [
        {
            "name": "APScheduler Documentation",
            "url": "https://apscheduler.readthedocs.io/",
            "description": "APScheduler官方文档,包含完整的API参考和使用指南"
        },
        {
            "name": "Croniter Documentation",
            "url": "https://github.com/kiorky/croniter",
            "description": "Croniter库文档,用于Cron表达式解析"
        },
        {
            "name": "pytz Documentation",
            "url": "https://pythonhosted.org/pytz/",
            "description": "pytz时区库文档"
        },
        {
            "name": "Celery Documentation",
            "url": "https://docs.celeryproject.org/",
            "description": "Celery分布式任务队列文档"
        }
    ],
    
    "技术文章": [
        {
            "title": "Understanding Cron Expressions",
            "author": "Various",
            "url": "https://crontab.guru/",
            "description": "Cron表达式在线测试和学习工具"
        },
        {
            "title": "Distributed Locks with Redis",
            "author": "Redis Labs",
            "url": "https://redis.io/topics/distlock",
            "description": "Redis分布式锁的官方实现指南"
        },
        {
            "title": "Handling Daylight Saving Time",
            "author": "Python Software Foundation",
            "url": "https://docs.python.org/3/library/datetime.html",
            "description": "Python处理夏令时的官方指南"
        }
    ],
    
    "开源项目": [
        {
            "name": "Airflow",
            "url": "https://github.com/apache/airflow",
            "description": "Apache Airflow - 企业级工作流调度平台"
        },
        {
            "name": "Prefect",
            "url": "https://github.com/PrefectHQ/prefect",
            "description": "Prefect - 现代化的工作流编排工具"
        },
        {
            "name": "Temporal",
            "url": "https://github.com/temporalio/temporal",
            "description": "Temporal - 微服务编排平台"
        }
    ],
    
    "相关标准": [
        {
            "name": "IANA Time Zone Database",
            "url": "https://www.iana.org/time-zones",
            "description": "IANA时区数据库,时区标识符的权威来源"
        },
        {
            "name": "ISO 8601",
            "url": "https://www.iso.org/iso-8601-date-and-time-format.html",
            "description": "ISO 8601日期时间格式标准"
        }
    ]
}

15.2 知识总结



KNOWLEDGE_SUMMARY = {
    "核心概念": {
        "Cron表达式": {
            "定义": "用于定义周期性时间规则的字符串表达式",
            "格式": "秒 分 时 日 月 周 [年]",
            "特殊字符": {
                "*": "任意值",
                "?": "不指定(仅用于日和周)",
                "-": "范围",
                ",": "列表",
                "/": "步长",
                "L": "最后(月末/最后一个星期X)",
                "W": "最近的工作日",
                "#": "第N个星期X"
            },
            "关键点": [
                "支持5-7个字段",
                "日和周字段互斥(使用?)",
                "扩展语法提供更强大的功能"
            ]
        },
        
        "时区处理": {
            "核心原则": [
                "数据库存储UTC时间",
                "用户界面显示本地时间",
                "调度器使用用户时区计算触发时间"
            ],
            "夏令时": {
                "春季向前": "跳过1小时(2:00-3:00不存在)",
                "秋季向后": "重复1小时(1:00-2:00出现两次)",
                "处理策略": "使用pytz自动处理,避开转换时间"
            },
            "最佳实践": [
                "使用IANA时区标识符",
                "明确指定时区",
                "避开DST转换时间(2-3点)",
                "测试跨时区场景"
            ]
        },
        
        "APScheduler": {
            "触发器类型": {
                "CronTrigger": "基于Cron表达式",
                "IntervalTrigger": "固定间隔",
                "DateTrigger": "指定日期"
            },
            "作业存储": {
                "MemoryJobStore": "内存存储(不持久化)",
                "SQLAlchemyJobStore": "数据库存储(持久化)"
            },
            "执行器": {
                "ThreadPoolExecutor": "线程池",
                "ProcessPoolExecutor": "进程池"
            },
            "配置要点": [
                "coalesce: 合并错过的执行",
                "max_instances: 最大并发实例",
                "misfire_grace_time: 错过执行宽限时间"
            ]
        },
        
        "错过执行补偿": {
            "检测": "比较last_run_at和当前时间,计算应执行的时间点",
            "策略": {
                "fire_once": "执行一次(最新的)",
                "fire_all": "执行所有错过的",
                "fire_latest": "执行最新的N次",
                "skip": "跳过"
            },
            "实现": [
                "定期扫描调度",
                "计算错过的时间点",
                "根据策略创建补偿执行",
                "记录补偿历史"
            ]
        },
        
        "分布式调度": {
            "挑战": [
                "防止重复执行",
                "故障转移",
                "负载均衡",
                "一致性保证"
            ],
            "解决方案": {
                "分布式锁": "Redis实现,防重复",
                "心跳检测": "监控调度器健康",
                "任务队列": "Celery异步执行",
                "数据库": "持久化调度配置和历史"
            }
        }
    },
    
    "技术要点": {
        "性能优化": [
            "分散调度时间,避免集中",
            "使用批量调度合并执行",
            "添加数据库索引",
            "扩展Worker数量",
            "监控和告警"
        ],
        
        "可靠性保证": [
            "启用重试机制",
            "设置合理的超时",
            "实现故障转移",
            "记录详细日志",
            "定期健康检查"
        ],
        
        "安全性": [
            "权限验证",
            "输入验证",
            "SQL注入防护",
            "分布式锁防护",
            "审计日志"
        ]
    },
    
    "实战经验": {
        "避免的坑": [
            "不要在整点集中调度",
            "不要忽略时区配置",
            "不要在DST转换时间调度",
            "不要使用过于复杂的Cron表达式",
            "不要忘记设置misfire策略"
        ],
        
        "推荐做法": [
            "文档化每个调度的目的",
            "版本控制调度配置",
            "监控调度执行情况",
            "定期审查和优化",
            "测试边界情况"
        ]
    }
}

15.3 学习路径建议



LEARNING_PATH = {
    "初级": {
        "目标": "掌握基础的定时任务调度",
        "内容": [
            "理解Cron表达式基础语法",
            "学习APScheduler基本用法",
            "掌握时区转换基础",
            "实现简单的定时任务"
        ],
        "实践项目": [
            "每日数据备份任务",
            "定时发送报表邮件",
            "定期清理过期数据"
        ],
        "预计时间": "1-2周"
    },
    
    "中级": {
        "目标": "构建生产级调度系统",
        "内容": [
            "掌握Cron扩展语法",
            "理解夏令时处理",
            "实现错过执行补偿",
            "集成分布式锁",
            "添加监控告警"
        ],
        "实践项目": [
            "多时区调度系统",
            "带重试的工作流调度",
            "调度执行历史分析"
        ],
        "预计时间": "2-3周"
    },
    
    "高级": {
        "目标": "优化和扩展调度系统",
        "内容": [
            "性能优化策略",
            "批量调度优化",
            "自定义触发器",
            "高可用架构设计",
            "调度分析和预测"
        ],
        "实践项目": [
            "分布式调度集群",
            "智能调度优化器",
            "调度可视化平台"
        ],
        "预计时间": "3-4周"
    }
}

15.4 总结

本文深入探讨了定时任务调度系统的设计与实现,涵盖了从基础的Cron表达式到高级的分布式调度的各个方面。

核心收获:

Cron表达式:掌握了标准语法和扩展语法,能够表达复杂的时间规则APScheduler集成:学会了如何使用APScheduler构建生产级调度系统时区处理:理解了时区转换和夏令时处理的最佳实践错过执行补偿:实现了完整的misfire检测和补偿机制分布式调度:掌握了使用Redis分布式锁防止重复执行性能优化:学习了批量调度、资源优化等性能提升技巧监控告警:建立了完善的监控和健康检查机制

关键要点:

始终明确指定时区,使用UTC存储避开高峰时段和DST转换时间使用分布式锁保证一致性实现完善的错过执行补偿建立监控和告警机制文档化和版本控制调度配置

希望本文能帮助你构建一个稳定、高效、可靠的定时任务调度系统!


© 版权声明

相关文章

暂无评论

none
暂无评论...