对比28种时间序列预测算法在某个数据集上的表现-全套源码

之前在本号列举了一系列时间序列算法和模型。光说不练假把式!本文测试了28个时间序列算法在某个数据集上的表现,本测试仅仅代表在特定数据集上的一次试验表现,并不能说明对应算法的优劣。

数据集

该数据集是浙江省2011年1月-2022年12月12年的月度结核病发病率数据集(/10万)。如有需要可以在评论区留言索取。

本文一共测试了如下28种算法:

1.KNN

2.随机森林

3.GBRT

4.SARIMA 

5.BSTS

6.SVR

7.模糊时间序列

8.TCN

9.CNN-LSTM

10.GRU

11.指数平滑

12.ETS

13.DLM

14.马尔可夫链

15.Transformer

16.ARMA

17.VARMA

18.AR

19.VAR

20.MA

21.VECM

22.ARCH

23.ARIMA

24.GARCH

25.Seq2Seq

26.GPR

27.LSTM

28.BiLSTM

本文使用的评价指标如下:

MAE (平均绝对误差): 预测值与真实值之间的平均绝对差异,越小越好

RMSE (均方根误差): 预测误差的平方根,对异常值敏感,越小越好

MAPE (平均绝对百分比误差): 预测误差的百分比表示,越小越好

R² (决定系数): 模型对数据变异性的解释程度,越接近1越好

可视化的预测结果如下:

后面的几个算法的基础表现就不再列举了。在不进行任何调参的情况下,算法表现不佳,不足以用来作为预测算法使用。后续将进一步优化参数。下面给出算法评价报告。

=====================

时间序列预测算法对比分析报告

=====================

生成时间: 2025-11-05 14:21:04

数据信息:

总数据量: 144 条

训练集: 115 条 (80%)

测试集: 29 条 (20%)

======================

各算法预测结果对比

======================

算法 MAE RMSE MAPE(%) R²

———————————————————————-

AR(5) 0.4935 0.6947 15.71 -0.5060

MA(2) 0.5360 0.7505 17.04 -0.7578

ARMA(2,2) 0.5765 0.6929 17.20 -0.4982

ARIMA(1, 1, 1) 1.0163 1.1635 30.26 -3.2248

SARIMA(1, 1, 1)x(1, 1, 1, 12) 0.3872 0.4515 10.32 0.3639

指数平滑 0.4515 0.5561 12.86 0.0351

ETS 0.4515 0.5561 12.86 0.0351

SVR 0.3527 0.4564 10.43 0.3500

GBRT 0.3084 0.4469 8.49 0.3768

随机森林 0.3039 0.4328 8.36 0.4154

KNN(n=5) 0.2957 0.4238 8.41 0.4396

GPR 3.2559 3.5257 86.82 -37.7907

LSTM 8.3894 18.7696 257.80 -1098.4088

GRU 0.4392 0.4989 12.23 0.2234

BiLSTM 31250151.7945 144472333.4477 1139939852.06 -65135425335662872.0000

TCN 0.3878 0.4768 10.22 0.2907

Transformer 0.5440 0.6626 14.80 -0.3700

Seq2Seq 1.0489 1.2053 31.21 -3.5332

CNN-LSTM 0.4016 0.4868 11.46 0.2605

VAR(5) 0.5309 0.7407 16.85 -0.7123

VARMA(2,2) 0.5765 0.6929 17.20 -0.4982

VECM(1) 1.0086 1.1568 30.05 -3.1760

ARCH(1) 1.0086 1.1568 30.05 -3.1760

GARCH(1,1) 1.0163 1.1635 30.26 -3.2248

DLM 0.4515 0.5561 12.86 0.0351

模糊时间序列 0.3803 0.4701 10.50 0.3103

马尔可夫链 0.4600 0.6317 14.47 -0.2453

BSTS 0.3872 0.4515 10.32 0.3639

=====================

算法性能排名

=====================

按RMSE排序 (越小越好):

1. KNN(n=5) RMSE: 0.4238

2. 随机森林 RMSE: 0.4328

3. GBRT RMSE: 0.4469

4. SARIMA(1, 1, 1)x(1, 1, 1, 12) RMSE: 0.4515

5. BSTS RMSE: 0.4515

6. SVR RMSE: 0.4564

7. 模糊时间序列 RMSE: 0.4701

8. TCN RMSE: 0.4768

9. CNN-LSTM RMSE: 0.4868

10. GRU RMSE: 0.4989

11. 指数平滑 RMSE: 0.5561

12. ETS RMSE: 0.5561

13. DLM RMSE: 0.5561

14. 马尔可夫链 RMSE: 0.6317

15. Transformer RMSE: 0.6626

16. ARMA(2,2) RMSE: 0.6929

17. VARMA(2,2) RMSE: 0.6929

18. AR(5) RMSE: 0.6947

19. VAR(5) RMSE: 0.7407

20. MA(2) RMSE: 0.7505

21. VECM(1) RMSE: 1.1568

22. ARCH(1) RMSE: 1.1568

23. ARIMA(1, 1, 1) RMSE: 1.1635

24. GARCH(1,1) RMSE: 1.1635

25. Seq2Seq RMSE: 1.2053

26. GPR RMSE: 3.5257

27. LSTM RMSE: 18.7696

28. BiLSTM RMSE: 144472333.4477

按MAPE排序 (越小越好):

1. 随机森林 MAPE: 8.36%

2. KNN(n=5) MAPE: 8.41%

3. GBRT MAPE: 8.49%

4. TCN MAPE: 10.22%

5. SARIMA(1, 1, 1)x(1, 1, 1, 12) MAPE: 10.32%

6. BSTS MAPE: 10.32%

7. SVR MAPE: 10.43%

8. 模糊时间序列 MAPE: 10.50%

9. CNN-LSTM MAPE: 11.46%

10. GRU MAPE: 12.23%

11. 指数平滑 MAPE: 12.86%

12. ETS MAPE: 12.86%

13. DLM MAPE: 12.86%

14. 马尔可夫链 MAPE: 14.47%

15. Transformer MAPE: 14.80%

16. AR(5) MAPE: 15.71%

17. VAR(5) MAPE: 16.85%

18. MA(2) MAPE: 17.04%

19. ARMA(2,2) MAPE: 17.20%

20. VARMA(2,2) MAPE: 17.20%

21. VECM(1) MAPE: 30.05%

22. ARCH(1) MAPE: 30.05%

23. ARIMA(1, 1, 1) MAPE: 30.26%

24. GARCH(1,1) MAPE: 30.26%

25. Seq2Seq MAPE: 31.21%

26. GPR MAPE: 86.82%

27. LSTM MAPE: 257.80%

28. BiLSTM MAPE: 1139939852.06%

按R²排序 (越大越好):

1. KNN(n=5) R²: 0.4396

2. 随机森林 R²: 0.4154

3. GBRT R²: 0.3768

4. SARIMA(1, 1, 1)x(1, 1, 1, 12) R²: 0.3639

5. BSTS R²: 0.3639

6. SVR R²: 0.3500

7. 模糊时间序列 R²: 0.3103

8. TCN R²: 0.2907

9. CNN-LSTM R²: 0.2605

10. GRU R²: 0.2234

11. 指数平滑 R²: 0.0351

12. ETS R²: 0.0351

13. DLM R²: 0.0351

14. 马尔可夫链 R²: -0.2453

15. Transformer R²: -0.3700

16. ARMA(2,2) R²: -0.4982

17. VARMA(2,2) R²: -0.4982

18. AR(5) R²: -0.5060

19. VAR(5) R²: -0.7123

20. MA(2) R²: -0.7578

21. VECM(1) R²: -3.1760

22. ARCH(1) R²: -3.1760

23. ARIMA(1, 1, 1) R²: -3.2248

24. GARCH(1,1) R²: -3.2248

25. Seq2Seq R²: -3.5332

26. GPR R²: -37.7907

27. LSTM R²: -1098.4088

28. BiLSTM R²: -65135425335662872.0000

附上源代码:



"""
时间序列预测算法对比分析程序
使用多种算法进行时间序列预测,并生成对比分析报告
"""
 
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import mean_absolute_error, mean_squared_error, mean_absolute_percentage_error
import warnings
import os
import sys
from datetime import datetime
 
# 统计模型
from statsmodels.tsa.ar_model import AutoReg
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.tsa.holtwinters import ExponentialSmoothing
 
# LSTM/深度学习相关
try:
    from tensorflow.keras.models import Sequential, Model
    from tensorflow.keras.layers import LSTM, Dense, GRU, Bidirectional, Conv1D, MaxPooling1D, Flatten, Dropout, Input, Attention, MultiHeadAttention, LayerNormalization
    from tensorflow.keras.callbacks import EarlyStopping
    TENSORFLOW_AVAILABLE = True
except ImportError:
    TENSORFLOW_AVAILABLE = False
    print("警告: TensorFlow未安装,深度学习模型将无法使用")
 
# Prophet
try:
    from prophet import Prophet
    PROPHET_AVAILABLE = True
except ImportError:
    PROPHET_AVAILABLE = False
 
# 机器学习模型
try:
    from sklearn.svm import SVR
    from sklearn.ensemble import GradientBoostingRegressor, RandomForestRegressor
    from sklearn.neighbors import KNeighborsRegressor
    from sklearn.gaussian_process import GaussianProcessRegressor
    from sklearn.gaussian_process.kernels import RBF, ConstantKernel as C
    SKLEARN_AVAILABLE = True
except ImportError:
    SKLEARN_AVAILABLE = False
 
# LightGBM
try:
    import lightgbm as lgb
    LIGHTGBM_AVAILABLE = True
except ImportError:
    LIGHTGBM_AVAILABLE = False
 
# 统计模型
try:
    from statsmodels.tsa.vector_ar.var_model import VAR
    from statsmodels.tsa.statespace.varmax import VARMAX
    from statsmodels.tsa.vector_ar.vecm import VECM
    from statsmodels.tsa.arch.arch_model import ARCHModel
    from statsmodels.tsa.statespace.exponential_smoothing import ExponentialSmoothing as ETS
    STATSMODELS_VAR_AVAILABLE = True
except ImportError:
    STATSMODELS_VAR_AVAILABLE = False
 
warnings.filterwarnings('ignore')
 
# 设置Windows控制台编码为UTF-8
if sys.platform == 'win32':
    try:
        sys.stdout.reconfigure(encoding='utf-8')
        sys.stderr.reconfigure(encoding='utf-8')
    except:
        pass
 
# 设置matplotlib支持中文
plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'Arial Unicode MS']
plt.rcParams['axes.unicode_minus'] = False
 
 
class TimeSeriesPredictor:
    """时间序列预测器基类"""
    
    def __init__(self, name):
        self.name = name
        self.model = None
        self.is_fitted = False
    
    def fit(self, data):
        """训练模型"""
        raise NotImplementedError
    
    def predict(self, steps):
        """预测"""
        raise NotImplementedError
    
    def evaluate(self, y_true, y_pred):
        """评估预测结果"""
        mae = mean_absolute_error(y_true, y_pred)
        rmse = np.sqrt(mean_squared_error(y_true, y_pred))
        mape = mean_absolute_percentage_error(y_true, y_pred) * 100
        
        # 计算R²
        ss_res = np.sum((y_true - y_pred) ** 2)
        ss_tot = np.sum((y_true - np.mean(y_true)) ** 2)
        r2 = 1 - (ss_res / ss_tot) if ss_tot != 0 else 0
        
        return {
            'MAE': mae,
            'RMSE': rmse,
            'MAPE': mape,
            'R²': r2
        }
 
 
class ARPredictor(TimeSeriesPredictor):
    """AR (自回归) 模型"""
    
    def __init__(self, lags=5):
        super().__init__(f"AR({lags})")
        self.lags = lags
    
    def fit(self, data):
        try:
            self.model = AutoReg(data, lags=self.lags).fit()
            self.is_fitted = True
        except Exception as e:
            print(f"AR模型训练失败: {e}")
            self.is_fitted = False
    
    def predict(self, steps):
        if not self.is_fitted:
            raise ValueError("模型尚未训练")
        try:
            forecast = self.model.forecast(steps=steps)
            return forecast.values if hasattr(forecast, 'values') else forecast
        except:
            # 如果forecast失败,使用predict
            n = len(self.model.model.endog)
            predictions = self.model.predict(start=n, end=n+steps-1)
            return predictions.values if hasattr(predictions, 'values') else predictions
 
 
class MAPredictor(TimeSeriesPredictor):
    """MA (移动平均) 模型 - 通过ARIMA(0,0,q)实现"""
    
    def __init__(self, q=2):
        super().__init__(f"MA({q})")
        self.q = q
    
    def fit(self, data):
        try:
            self.model = ARIMA(data, order=(0, 0, self.q)).fit()
            self.is_fitted = True
        except Exception as e:
            print(f"MA模型训练失败: {e}")
            self.is_fitted = False
    
    def predict(self, steps):
        if not self.is_fitted:
            raise ValueError("模型尚未训练")
        try:
            forecast = self.model.forecast(steps=steps)
            return forecast.values if hasattr(forecast, 'values') else forecast
        except:
            n = len(self.model.model.endog)
            predictions = self.model.predict(start=n, end=n+steps-1)
            return predictions.values if hasattr(predictions, 'values') else predictions
 
 
class ARMAPredictor(TimeSeriesPredictor):
    """ARMA 模型"""
    
    def __init__(self, p=2, q=2):
        super().__init__(f"ARMA({p},{q})")
        self.p = p
        self.q = q
    
    def fit(self, data):
        try:
            self.model = ARIMA(data, order=(self.p, 0, self.q)).fit()
            self.is_fitted = True
        except Exception as e:
            print(f"ARMA模型训练失败: {e}")
            self.is_fitted = False
    
    def predict(self, steps):
        if not self.is_fitted:
            raise ValueError("模型尚未训练")
        try:
            forecast = self.model.forecast(steps=steps)
            return forecast.values if hasattr(forecast, 'values') else forecast
        except:
            n = len(self.model.model.endog)
            predictions = self.model.predict(start=n, end=n+steps-1)
            return predictions.values if hasattr(predictions, 'values') else predictions
 
 
class ARIMAPredictor(TimeSeriesPredictor):
    """ARIMA 模型"""
    
    def __init__(self, order=(1, 1, 1)):
        super().__init__(f"ARIMA{order}")
        self.order = order
    
    def fit(self, data):
        try:
            self.model = ARIMA(data, order=self.order).fit()
            self.is_fitted = True
        except Exception as e:
            print(f"ARIMA模型训练失败: {e}")
            self.is_fitted = False
    
    def predict(self, steps):
        if not self.is_fitted:
            raise ValueError("模型尚未训练")
        try:
            forecast = self.model.forecast(steps=steps)
            return forecast.values if hasattr(forecast, 'values') else forecast
        except:
            n = len(self.model.model.endog)
            predictions = self.model.predict(start=n, end=n+steps-1)
            return predictions.values if hasattr(predictions, 'values') else predictions
 
 
class SARIMAPredictor(TimeSeriesPredictor):
    """SARIMA 模型"""
    
    def __init__(self, order=(1, 1, 1), seasonal_order=(1, 1, 1, 12)):
        super().__init__(f"SARIMA{order}x{seasonal_order}")
        self.order = order
        self.seasonal_order = seasonal_order
    
    def fit(self, data):
        try:
            self.model = SARIMAX(data, order=self.order, 
                                seasonal_order=self.seasonal_order,
                                enforce_stationarity=False,
                                enforce_invertibility=False).fit(disp=False)
            self.is_fitted = True
        except Exception as e:
            print(f"SARIMA模型训练失败: {e}")
            self.is_fitted = False
    
    def predict(self, steps):
        if not self.is_fitted:
            raise ValueError("模型尚未训练")
        try:
            forecast = self.model.forecast(steps=steps)
            return forecast.values if hasattr(forecast, 'values') else forecast
        except:
            n = len(self.model.model.endog)
            predictions = self.model.predict(start=n, end=n+steps-1)
            return predictions.values if hasattr(predictions, 'values') else predictions
 
 
class ExponentialSmoothingPredictor(TimeSeriesPredictor):
    """指数平滑模型"""
    
    def __init__(self, trend='add', seasonal='add', seasonal_periods=12):
        super().__init__("指数平滑")
        self.trend = trend
        self.seasonal = seasonal
        self.seasonal_periods = seasonal_periods
    
    def fit(self, data):
        try:
            self.model = ExponentialSmoothing(data, 
                                             trend=self.trend,
                                             seasonal=self.seasonal,
                                             seasonal_periods=self.seasonal_periods).fit()
            self.is_fitted = True
        except Exception as e:
            # 如果带季节性的模型失败,尝试不带季节性的
            try:
                self.model = ExponentialSmoothing(data, trend=self.trend).fit()
                self.is_fitted = True
            except:
                print(f"指数平滑模型训练失败: {e}")
                self.is_fitted = False
    
    def predict(self, steps):
        if not self.is_fitted:
            raise ValueError("模型尚未训练")
        forecast = self.model.forecast(steps=steps)
        return forecast.values if hasattr(forecast, 'values') else forecast
 
 
class LSTMPredictor(TimeSeriesPredictor):
    """LSTM 模型"""
    
    def __init__(self, lookback=12, units=50):
        super().__init__("LSTM")
        self.lookback = lookback
        self.units = units
        self.scaler_mean = None
        self.scaler_std = None
    
    def _normalize(self, data):
        """数据归一化"""
        self.scaler_mean = np.mean(data)
        self.scaler_std = np.std(data)
        return (data - self.scaler_mean) / self.scaler_std
    
    def _denormalize(self, data):
        """数据反归一化"""
        return data * self.scaler_std + self.scaler_mean
    
    def _create_sequences(self, data, lookback):
        """创建时间序列窗口"""
        X, y = [], []
        for i in range(lookback, len(data)):
            X.append(data[i-lookback:i])
            y.append(data[i])
        return np.array(X), np.array(y)
    
    def fit(self, data):
        if not TENSORFLOW_AVAILABLE:
            print("TensorFlow未安装,跳过LSTM模型")
            self.is_fitted = False
            return
        
        try:
            # 保存原始数据用于后续预测
            self.train_data = data.copy()
            
            # 归一化数据(保存归一化参数)
            data_norm = self._normalize(data)
            
            # 创建序列
            if len(data_norm) < self.lookback + 1:
                print(f"数据长度不足,需要至少{self.lookback + 1}个样本")
                self.is_fitted = False
                return
            
            X, y = self._create_sequences(data_norm, self.lookback)
            
            # 重塑数据为LSTM输入格式 [samples, time steps, features]
            X = X.reshape((X.shape[0], X.shape[1], 1))
            
            # 构建LSTM模型
            self.model = Sequential([
                LSTM(self.units, activation='relu', input_shape=(self.lookback, 1)),
                Dense(1)
            ])
            self.model.compile(optimizer='adam', loss='mse')
            
            # 训练模型
            early_stop = EarlyStopping(monitor='loss', patience=10, verbose=0)
            self.model.fit(X, y, epochs=100, batch_size=16, 
                          callbacks=[early_stop], verbose=0)
            
            self.is_fitted = True
        except Exception as e:
            print(f"LSTM模型训练失败: {e}")
            self.is_fitted = False
    
    def predict(self, steps):
        if not self.is_fitted:
            raise ValueError("模型尚未训练")
        
        try:
            # 获取最后lookback个数据点
            if hasattr(self, 'last_data'):
                last_data = self.last_data
            else:
                raise ValueError("需要提供历史数据")
            
            predictions = []
            current_data = last_data.copy()
            
            for _ in range(steps):
                # 准备输入
                X_input = current_data[-self.lookback:].reshape(1, self.lookback, 1)
                
                # 预测
                pred = self.model.predict(X_input, verbose=0)[0, 0]
                predictions.append(pred)
                
                # 更新数据
                current_data = np.append(current_data, pred)
            
            # 反归一化
            predictions = self._denormalize(np.array(predictions))
            return predictions
        except Exception as e:
            print(f"LSTM预测失败: {e}")
            return np.full(steps, np.nan)
    
    def set_last_data(self, data):
        """设置用于预测的历史数据"""
        if self.is_fitted:
            # 使用训练时的归一化参数
            if self.scaler_mean is not None and self.scaler_std is not None:
                self.last_data = (data - self.scaler_mean) / self.scaler_std
            else:
                self.last_data = self._normalize(data)
 
 
# ==================== 新增预测器类 ====================
 
class ProphetPredictor(TimeSeriesPredictor):
    """Prophet模型"""
    
    def __init__(self):
        super().__init__("Prophet")
    
    def fit(self, data):
        if not PROPHET_AVAILABLE:
            self.is_fitted = False
            return
        try:
            # Prophet需要DataFrame格式,包含ds和y列
            # 使用月度数据(每月最后一天)
            df = pd.DataFrame({
                'ds': pd.date_range(start='2011-01-31', periods=len(data), freq='M'),
                'y': data
            })
            self.model = Prophet(yearly_seasonality=True, weekly_seasonality=False, 
                               daily_seasonality=False)
            self.model.fit(df)
            self.is_fitted = True
        except Exception as e:
            print(f"Prophet模型训练失败: {e}")
            self.is_fitted = False
    
    def predict(self, steps):
        if not self.is_fitted:
            raise ValueError("模型尚未训练")
        try:
            future = self.model.make_future_dataframe(periods=steps, freq='M')
            forecast = self.model.predict(future)
            return forecast['yhat'].values[-steps:]
        except:
            return np.full(steps, np.nan)
 
 
class SVRPredictor(TimeSeriesPredictor):
    """支持向量回归(SVR)"""
    
    def __init__(self, lookback=12):
        super().__init__("SVR")
        self.lookback = lookback
    
    def _create_features(self, data):
        X, y = [], []
        for i in range(self.lookback, len(data)):
            X.append(data[i-self.lookback:i])
            y.append(data[i])
        return np.array(X), np.array(y)
    
    def fit(self, data):
        if not SKLEARN_AVAILABLE:
            self.is_fitted = False
            return
        try:
            X, y = self._create_features(data)
            self.model = SVR(kernel='rbf', C=100, gamma='scale', epsilon=0.1)
            self.model.fit(X, y)
            self.train_data = data
            self.is_fitted = True
        except Exception as e:
            print(f"SVR模型训练失败: {e}")
            self.is_fitted = False
    
    def predict(self, steps):
        if not self.is_fitted:
            raise ValueError("模型尚未训练")
        try:
            predictions = []
            current_data = self.train_data[-self.lookback:].copy()
            for _ in range(steps):
                X_input = current_data[-self.lookback:].reshape(1, -1)
                pred = self.model.predict(X_input)[0]
                predictions.append(pred)
                current_data = np.append(current_data, pred)
            return np.array(predictions)
        except:
            return np.full(steps, np.nan)
 
 
class GBRTPredictor(TimeSeriesPredictor):
    """梯度提升回归树(GBRT)"""
    
    def __init__(self, lookback=12):
        super().__init__("GBRT")
        self.lookback = lookback
    
    def _create_features(self, data):
        X, y = [], []
        for i in range(self.lookback, len(data)):
            X.append(data[i-self.lookback:i])
            y.append(data[i])
        return np.array(X), np.array(y)
    
    def fit(self, data):
        if not SKLEARN_AVAILABLE:
            self.is_fitted = False
            return
        try:
            X, y = self._create_features(data)
            self.model = GradientBoostingRegressor(n_estimators=100, max_depth=5, random_state=42)
            self.model.fit(X, y)
            self.train_data = data
            self.is_fitted = True
        except Exception as e:
            print(f"GBRT模型训练失败: {e}")
            self.is_fitted = False
    
    def predict(self, steps):
        if not self.is_fitted:
            raise ValueError("模型尚未训练")
        try:
            predictions = []
            current_data = self.train_data[-self.lookback:].copy()
            for _ in range(steps):
                X_input = current_data[-self.lookback:].reshape(1, -1)
                pred = self.model.predict(X_input)[0]
                predictions.append(pred)
                current_data = np.append(current_data, pred)
            return np.array(predictions)
        except:
            return np.full(steps, np.nan)
 
 
class GRUPredictor(TimeSeriesPredictor):
    """门控循环单元(GRU)"""
    
    def __init__(self, lookback=12, units=50):
        super().__init__("GRU")
        self.lookback = lookback
        self.units = units
        self.scaler_mean = None
        self.scaler_std = None
    
    def _normalize(self, data):
        self.scaler_mean = np.mean(data)
        self.scaler_std = np.std(data)
        return (data - self.scaler_mean) / self.scaler_std
    
    def _denormalize(self, data):
        return data * self.scaler_std + self.scaler_mean
    
    def _create_sequences(self, data, lookback):
        X, y = [], []
        for i in range(lookback, len(data)):
            X.append(data[i-lookback:i])
            y.append(data[i])
        return np.array(X), np.array(y)
    
    def fit(self, data):
        if not TENSORFLOW_AVAILABLE:
            self.is_fitted = False
            return
        try:
            self.train_data = data.copy()
            data_norm = self._normalize(data)
            if len(data_norm) < self.lookback + 1:
                self.is_fitted = False
                return
            X, y = self._create_sequences(data_norm, self.lookback)
            X = X.reshape((X.shape[0], X.shape[1], 1))
            self.model = Sequential([
                GRU(self.units, activation='relu', input_shape=(self.lookback, 1)),
                Dense(1)
            ])
            self.model.compile(optimizer='adam', loss='mse')
            early_stop = EarlyStopping(monitor='loss', patience=10, verbose=0)
            self.model.fit(X, y, epochs=100, batch_size=16, callbacks=[early_stop], verbose=0)
            self.is_fitted = True
        except Exception as e:
            print(f"GRU模型训练失败: {e}")
            self.is_fitted = False
    
    def predict(self, steps):
        if not self.is_fitted:
            raise ValueError("模型尚未训练")
        try:
            last_data = self._normalize(self.train_data)
            predictions = []
            current_data = last_data.copy()
            for _ in range(steps):
                X_input = current_data[-self.lookback:].reshape(1, self.lookback, 1)
                pred = self.model.predict(X_input, verbose=0)[0, 0]
                predictions.append(pred)
                current_data = np.append(current_data, pred)
            predictions = self._denormalize(np.array(predictions))
            return predictions
        except:
            return np.full(steps, np.nan)
 
 
class BiLSTMPredictor(TimeSeriesPredictor):
    """双向长短期记忆网络(BiLSTM)"""
    
    def __init__(self, lookback=12, units=50):
        super().__init__("BiLSTM")
        self.lookback = lookback
        self.units = units
        self.scaler_mean = None
        self.scaler_std = None
    
    def _normalize(self, data):
        self.scaler_mean = np.mean(data)
        self.scaler_std = np.std(data)
        return (data - self.scaler_mean) / self.scaler_std
    
    def _denormalize(self, data):
        return data * self.scaler_std + self.scaler_mean
    
    def _create_sequences(self, data, lookback):
        X, y = [], []
        for i in range(lookback, len(data)):
            X.append(data[i-lookback:i])
            y.append(data[i])
        return np.array(X), np.array(y)
    
    def fit(self, data):
        if not TENSORFLOW_AVAILABLE:
            self.is_fitted = False
            return
        try:
            self.train_data = data.copy()
            data_norm = self._normalize(data)
            if len(data_norm) < self.lookback + 1:
                self.is_fitted = False
                return
            X, y = self._create_sequences(data_norm, self.lookback)
            X = X.reshape((X.shape[0], X.shape[1], 1))
            self.model = Sequential([
                Bidirectional(LSTM(self.units, activation='relu'), input_shape=(self.lookback, 1)),
                Dense(1)
            ])
            self.model.compile(optimizer='adam', loss='mse')
            early_stop = EarlyStopping(monitor='loss', patience=10, verbose=0)
            self.model.fit(X, y, epochs=100, batch_size=16, callbacks=[early_stop], verbose=0)
            self.is_fitted = True
        except Exception as e:
            print(f"BiLSTM模型训练失败: {e}")
            self.is_fitted = False
    
    def predict(self, steps):
        if not self.is_fitted:
            raise ValueError("模型尚未训练")
        try:
            last_data = self._normalize(self.train_data)
            predictions = []
            current_data = last_data.copy()
            for _ in range(steps):
                X_input = current_data[-self.lookback:].reshape(1, self.lookback, 1)
                pred = self.model.predict(X_input, verbose=0)[0, 0]
                predictions.append(pred)
                current_data = np.append(current_data, pred)
            predictions = self._denormalize(np.array(predictions))
            return predictions
        except:
            return np.full(steps, np.nan)
 
 
class RandomForestPredictor(TimeSeriesPredictor):
    """随机森林回归"""
    
    def __init__(self, lookback=12):
        super().__init__("随机森林")
        self.lookback = lookback
    
    def _create_features(self, data):
        X, y = [], []
        for i in range(self.lookback, len(data)):
            X.append(data[i-self.lookback:i])
            y.append(data[i])
        return np.array(X), np.array(y)
    
    def fit(self, data):
        if not SKLEARN_AVAILABLE:
            self.is_fitted = False
            return
        try:
            X, y = self._create_features(data)
            self.model = RandomForestRegressor(n_estimators=100, max_depth=10, random_state=42)
            self.model.fit(X, y)
            self.train_data = data
            self.is_fitted = True
        except Exception as e:
            print(f"随机森林模型训练失败: {e}")
            self.is_fitted = False
    
    def predict(self, steps):
        if not self.is_fitted:
            raise ValueError("模型尚未训练")
        try:
            predictions = []
            current_data = self.train_data[-self.lookback:].copy()
            for _ in range(steps):
                X_input = current_data[-self.lookback:].reshape(1, -1)
                pred = self.model.predict(X_input)[0]
                predictions.append(pred)
                current_data = np.append(current_data, pred)
            return np.array(predictions)
        except:
            return np.full(steps, np.nan)
 
 
class LightGBMPredictor(TimeSeriesPredictor):
    """轻量级梯度提升机(LightGBM)"""
    
    def __init__(self, lookback=12):
        super().__init__("LightGBM")
        self.lookback = lookback
    
    def _create_features(self, data):
        X, y = [], []
        for i in range(self.lookback, len(data)):
            X.append(data[i-self.lookback:i])
            y.append(data[i])
        return np.array(X), np.array(y)
    
    def fit(self, data):
        if not LIGHTGBM_AVAILABLE:
            self.is_fitted = False
            return
        try:
            X, y = self._create_features(data)
            self.model = lgb.LGBMRegressor(n_estimators=100, max_depth=5, random_state=42, verbose=-1)
            self.model.fit(X, y)
            self.train_data = data
            self.is_fitted = True
        except Exception as e:
            print(f"LightGBM模型训练失败: {e}")
            self.is_fitted = False
    
    def predict(self, steps):
        if not self.is_fitted:
            raise ValueError("模型尚未训练")
        try:
            predictions = []
            current_data = self.train_data[-self.lookback:].copy()
            for _ in range(steps):
                X_input = current_data[-self.lookback:].reshape(1, -1)
                pred = self.model.predict(X_input)[0]
                predictions.append(pred)
                current_data = np.append(current_data, pred)
            return np.array(predictions)
        except:
            return np.full(steps, np.nan)
 
 
class KNNPredictor(TimeSeriesPredictor):
    """K近邻回归(KNN)"""
    
    def __init__(self, lookback=12, n_neighbors=5):
        super().__init__(f"KNN(n={n_neighbors})")
        self.lookback = lookback
        self.n_neighbors = n_neighbors
    
    def _create_features(self, data):
        X, y = [], []
        for i in range(self.lookback, len(data)):
            X.append(data[i-self.lookback:i])
            y.append(data[i])
        return np.array(X), np.array(y)
    
    def fit(self, data):
        if not SKLEARN_AVAILABLE:
            self.is_fitted = False
            return
        try:
            X, y = self._create_features(data)
            self.model = KNeighborsRegressor(n_neighbors=self.n_neighbors)
            self.model.fit(X, y)
            self.train_data = data
            self.is_fitted = True
        except Exception as e:
            print(f"KNN模型训练失败: {e}")
            self.is_fitted = False
    
    def predict(self, steps):
        if not self.is_fitted:
            raise ValueError("模型尚未训练")
        try:
            predictions = []
            current_data = self.train_data[-self.lookback:].copy()
            for _ in range(steps):
                X_input = current_data[-self.lookback:].reshape(1, -1)
                pred = self.model.predict(X_input)[0]
                predictions.append(pred)
                current_data = np.append(current_data, pred)
            return np.array(predictions)
        except:
            return np.full(steps, np.nan)
 
 
class GPRPredictor(TimeSeriesPredictor):
    """高斯过程回归(GPR)"""
    
    def __init__(self, lookback=12):
        super().__init__("GPR")
        self.lookback = lookback
    
    def _create_features(self, data):
        X, y = [], []
        for i in range(self.lookback, len(data)):
            X.append(data[i-self.lookback:i])
            y.append(data[i])
        return np.array(X), np.array(y)
    
    def fit(self, data):
        if not SKLEARN_AVAILABLE:
            self.is_fitted = False
            return
        try:
            X, y = self._create_features(data)
            # 限制数据量,因为GPR计算复杂度高
            if len(X) > 100:
                X = X[-100:]
                y = y[-100:]
            kernel = C(1.0, (1e-3, 1e3)) * RBF(1.0, (1e-2, 1e2))
            self.model = GaussianProcessRegressor(kernel=kernel, n_restarts_optimizer=10, random_state=42)
            self.model.fit(X, y)
            self.train_data = data
            self.is_fitted = True
        except Exception as e:
            print(f"GPR模型训练失败: {e}")
            self.is_fitted = False
    
    def predict(self, steps):
        if not self.is_fitted:
            raise ValueError("模型尚未训练")
        try:
            predictions = []
            current_data = self.train_data[-self.lookback:].copy()
            for _ in range(steps):
                X_input = current_data[-self.lookback:].reshape(1, -1)
                pred = self.model.predict(X_input)[0]
                predictions.append(pred)
                current_data = np.append(current_data, pred)
            return np.array(predictions)
        except:
            return np.full(steps, np.nan)
 
 
class ARCHPredictor(TimeSeriesPredictor):
    """自回归条件异方差模型(ARCH)"""
    
    def __init__(self, p=1):
        super().__init__(f"ARCH({p})")
        self.p = p
    
    def fit(self, data):
        try:
            # ARCH模型需要平稳数据,这里使用一阶差分
            diff_data = np.diff(data)
            self.original_data = data
            self.diff_data = diff_data
            # 简化实现:使用ARIMA作为近似
            from statsmodels.tsa.arima.model import ARIMA
            self.model = ARIMA(data, order=(self.p, 1, 0)).fit()
            self.is_fitted = True
        except Exception as e:
            print(f"ARCH模型训练失败: {e}")
            self.is_fitted = False
    
    def predict(self, steps):
        if not self.is_fitted:
            raise ValueError("模型尚未训练")
        try:
            forecast = self.model.forecast(steps=steps)
            return forecast.values if hasattr(forecast, 'values') else forecast
        except:
            return np.full(steps, np.nan)
 
 
class GARCHPredictor(TimeSeriesPredictor):
    """广义自回归条件异方差模型(GARCH)"""
    
    def __init__(self, p=1, q=1):
        super().__init__(f"GARCH({p},{q})")
        self.p = p
        self.q = q
    
    def fit(self, data):
        try:
            # GARCH模型简化实现:使用ARIMA作为近似
            from statsmodels.tsa.arima.model import ARIMA
            self.model = ARIMA(data, order=(max(self.p, self.q), 1, max(self.p, self.q))).fit()
            self.is_fitted = True
        except Exception as e:
            print(f"GARCH模型训练失败: {e}")
            self.is_fitted = False
    
    def predict(self, steps):
        if not self.is_fitted:
            raise ValueError("模型尚未训练")
        try:
            forecast = self.model.forecast(steps=steps)
            return forecast.values if hasattr(forecast, 'values') else forecast
        except:
            return np.full(steps, np.nan)
 
 
class ETSPredictor(TimeSeriesPredictor):
    """指数平滑状态空间模型(ETS)"""
    
    def __init__(self, trend='add', seasonal='add', seasonal_periods=12):
        super().__init__("ETS")
        self.trend = trend
        self.seasonal = seasonal
        self.seasonal_periods = seasonal_periods
    
    def fit(self, data):
        try:
            # 使用statsmodels的指数平滑
            self.model = ExponentialSmoothing(data, 
                                             trend=self.trend,
                                             seasonal=self.seasonal,
                                             seasonal_periods=self.seasonal_periods).fit()
            self.is_fitted = True
        except Exception as e:
            try:
                self.model = ExponentialSmoothing(data, trend=self.trend).fit()
                self.is_fitted = True
            except:
                print(f"ETS模型训练失败: {e}")
                self.is_fitted = False
    
    def predict(self, steps):
        if not self.is_fitted:
            raise ValueError("模型尚未训练")
        try:
            forecast = self.model.forecast(steps=steps)
            return forecast.values if hasattr(forecast, 'values') else forecast
        except:
            return np.full(steps, np.nan)
 
 
# 简化版本的预测器(由于实现复杂度)
class TCNPredictor(TimeSeriesPredictor):
    """时间卷积网络(TCN) - 简化版"""
    
    def __init__(self, lookback=12, filters=64):
        super().__init__("TCN")
        self.lookback = lookback
        self.filters = filters
        self.scaler_mean = None
        self.scaler_std = None
    
    def _normalize(self, data):
        self.scaler_mean = np.mean(data)
        self.scaler_std = np.std(data)
        return (data - self.scaler_mean) / self.scaler_std
    
    def _denormalize(self, data):
        return data * self.scaler_std + self.scaler_mean
    
    def _create_sequences(self, data, lookback):
        X, y = [], []
        for i in range(lookback, len(data)):
            X.append(data[i-lookback:i])
            y.append(data[i])
        return np.array(X), np.array(y)
    
    def fit(self, data):
        if not TENSORFLOW_AVAILABLE:
            self.is_fitted = False
            return
        try:
            self.train_data = data.copy()
            data_norm = self._normalize(data)
            if len(data_norm) < self.lookback + 1:
                self.is_fitted = False
                return
            X, y = self._create_sequences(data_norm, self.lookback)
            X = X.reshape((X.shape[0], X.shape[1], 1))
            # 简化的TCN:使用卷积层
            self.model = Sequential([
                Conv1D(self.filters, 3, activation='relu', padding='same', input_shape=(self.lookback, 1)),
                Conv1D(self.filters, 3, activation='relu', padding='same'),
                MaxPooling1D(2),
                Flatten(),
                Dense(50, activation='relu'),
                Dense(1)
            ])
            self.model.compile(optimizer='adam', loss='mse')
            early_stop = EarlyStopping(monitor='loss', patience=10, verbose=0)
            self.model.fit(X, y, epochs=100, batch_size=16, callbacks=[early_stop], verbose=0)
            self.is_fitted = True
        except Exception as e:
            print(f"TCN模型训练失败: {e}")
            self.is_fitted = False
    
    def predict(self, steps):
        if not self.is_fitted:
            raise ValueError("模型尚未训练")
        try:
            last_data = self._normalize(self.train_data)
            predictions = []
            current_data = last_data.copy()
            for _ in range(steps):
                X_input = current_data[-self.lookback:].reshape(1, self.lookback, 1)
                pred = self.model.predict(X_input, verbose=0)[0, 0]
                predictions.append(pred)
                current_data = np.append(current_data, pred)
            predictions = self._denormalize(np.array(predictions))
            return predictions
        except:
            return np.full(steps, np.nan)
 
 
class CNNLSTMPredictor(TimeSeriesPredictor):
    """CNN-LSTM混合模型"""
    
    def __init__(self, lookback=12, cnn_filters=64, lstm_units=50):
        super().__init__("CNN-LSTM")
        self.lookback = lookback
        self.cnn_filters = cnn_filters
        self.lstm_units = lstm_units
        self.scaler_mean = None
        self.scaler_std = None
    
    def _normalize(self, data):
        self.scaler_mean = np.mean(data)
        self.scaler_std = np.std(data)
        return (data - self.scaler_mean) / self.scaler_std
    
    def _denormalize(self, data):
        return data * self.scaler_std + self.scaler_mean
    
    def _create_sequences(self, data, lookback):
        X, y = [], []
        for i in range(lookback, len(data)):
            X.append(data[i-lookback:i])
            y.append(data[i])
        return np.array(X), np.array(y)
    
    def fit(self, data):
        if not TENSORFLOW_AVAILABLE:
            self.is_fitted = False
            return
        try:
            self.train_data = data.copy()
            data_norm = self._normalize(data)
            if len(data_norm) < self.lookback + 1:
                self.is_fitted = False
                return
            X, y = self._create_sequences(data_norm, self.lookback)
            X = X.reshape((X.shape[0], X.shape[1], 1))
            self.model = Sequential([
                Conv1D(self.cnn_filters, 3, activation='relu', input_shape=(self.lookback, 1)),
                MaxPooling1D(2),
                LSTM(self.lstm_units, activation='relu'),
                Dense(1)
            ])
            self.model.compile(optimizer='adam', loss='mse')
            early_stop = EarlyStopping(monitor='loss', patience=10, verbose=0)
            self.model.fit(X, y, epochs=100, batch_size=16, callbacks=[early_stop], verbose=0)
            self.is_fitted = True
        except Exception as e:
            print(f"CNN-LSTM模型训练失败: {e}")
            self.is_fitted = False
    
    def predict(self, steps):
        if not self.is_fitted:
            raise ValueError("模型尚未训练")
        try:
            last_data = self._normalize(self.train_data)
            predictions = []
            current_data = last_data.copy()
            for _ in range(steps):
                X_input = current_data[-self.lookback:].reshape(1, self.lookback, 1)
                pred = self.model.predict(X_input, verbose=0)[0, 0]
                predictions.append(pred)
                current_data = np.append(current_data, pred)
            predictions = self._denormalize(np.array(predictions))
            return predictions
        except:
            return np.full(steps, np.nan)
 
 
class Seq2SeqPredictor(TimeSeriesPredictor):
    """序列到序列模型(Seq2Seq) - 简化版"""
    
    def __init__(self, lookback=12, encoder_units=50, decoder_units=50):
        super().__init__("Seq2Seq")
        self.lookback = lookback
        self.encoder_units = encoder_units
        self.decoder_units = decoder_units
        self.scaler_mean = None
        self.scaler_std = None
    
    def _normalize(self, data):
        self.scaler_mean = np.mean(data)
        self.scaler_std = np.std(data)
        return (data - self.scaler_mean) / self.scaler_std
    
    def _denormalize(self, data):
        return data * self.scaler_std + self.scaler_mean
    
    def _create_sequences(self, data, lookback):
        X, y = [], []
        for i in range(lookback, len(data)):
            X.append(data[i-lookback:i])
            y.append(data[i])
        return np.array(X), np.array(y)
    
    def fit(self, data):
        if not TENSORFLOW_AVAILABLE:
            self.is_fitted = False
            return
        try:
            self.train_data = data.copy()
            data_norm = self._normalize(data)
            if len(data_norm) < self.lookback + 1:
                self.is_fitted = False
                return
            X, y = self._create_sequences(data_norm, self.lookback)
            X = X.reshape((X.shape[0], X.shape[1], 1))
            # 简化的Seq2Seq:使用编码器-解码器结构
            encoder_input = Input(shape=(self.lookback, 1))
            encoder = LSTM(self.encoder_units, return_state=True)
            encoder_outputs, state_h, state_c = encoder(encoder_input)
            encoder_states = [state_h, state_c]
            
            decoder_input = Input(shape=(1, 1))
            decoder_lstm = LSTM(self.decoder_units, return_sequences=True, return_state=True)
            decoder_dense = Dense(1)
            
            decoder_outputs, _, _ = decoder_lstm(decoder_input, initial_state=encoder_states)
            decoder_outputs = decoder_dense(decoder_outputs)
            
            self.model = Model([encoder_input, decoder_input], decoder_outputs)
            self.model.compile(optimizer='adam', loss='mse')
            
            # 准备训练数据
            decoder_input_data = np.zeros((len(X), 1, 1))
            decoder_target_data = y.reshape(-1, 1, 1)
            
            early_stop = EarlyStopping(monitor='loss', patience=10, verbose=0)
            self.model.fit([X, decoder_input_data], decoder_target_data, 
                          epochs=100, batch_size=16, callbacks=[early_stop], verbose=0)
            
            # 保存编码器和解码器用于预测
            self.encoder_model = Model(encoder_input, encoder_states)
            decoder_state_input_h = Input(shape=(self.decoder_units,))
            decoder_state_input_c = Input(shape=(self.decoder_units,))
            decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]
            decoder_outputs, state_h, state_c = decoder_lstm(decoder_input, initial_state=decoder_states_inputs)
            decoder_states = [state_h, state_c]
            decoder_outputs = decoder_dense(decoder_outputs)
            self.decoder_model = Model([decoder_input] + decoder_states_inputs, 
                                      [decoder_outputs] + decoder_states)
            
            self.is_fitted = True
        except Exception as e:
            print(f"Seq2Seq模型训练失败: {e}")
            self.is_fitted = False
    
    def predict(self, steps):
        if not self.is_fitted:
            raise ValueError("模型尚未训练")
        try:
            last_data = self._normalize(self.train_data)
            X_input = last_data[-self.lookback:].reshape(1, self.lookback, 1)
            states_value = self.encoder_model.predict(X_input, verbose=0)
            
            target_seq = np.array([0.0]).reshape(1, 1, 1)
            predictions = []
            
            for _ in range(steps):
                output_tokens, h, c = self.decoder_model.predict([target_seq] + states_value, verbose=0)
                predictions.append(output_tokens[0, 0, 0])
                target_seq = output_tokens
                states_value = [h, c]
            
            predictions = self._denormalize(np.array(predictions))
            return predictions
        except:
            # 如果Seq2Seq预测失败,使用简单方法
            try:
                last_data = self._normalize(self.train_data)
                predictions = []
                current_data = last_data.copy()
                for _ in range(steps):
                    X_input = current_data[-self.lookback:].reshape(1, self.lookback, 1)
                    pred = self.model.predict([X_input, np.zeros((1, 1, 1))], verbose=0)[0, 0, 0]
                    predictions.append(pred)
                    current_data = np.append(current_data, pred)
                predictions = self._denormalize(np.array(predictions))
                return predictions
            except:
                return np.full(steps, np.nan)
 
 
# 简化版本的模型(使用ARIMA近似)
class VARPredictor(TimeSeriesPredictor):
    """向量自回归模型(VAR) - 单变量简化版"""
    
    def __init__(self, maxlags=5):
        super().__init__(f"VAR({maxlags})")
        self.maxlags = maxlags
    
    def fit(self, data):
        try:
            # VAR需要多变量,这里使用ARIMA近似
            self.model = ARIMA(data, order=(self.maxlags, 0, 0)).fit()
            self.is_fitted = True
        except Exception as e:
            print(f"VAR模型训练失败: {e}")
            self.is_fitted = False
    
    def predict(self, steps):
        if not self.is_fitted:
            raise ValueError("模型尚未训练")
        try:
            forecast = self.model.forecast(steps=steps)
            return forecast.values if hasattr(forecast, 'values') else forecast
        except:
            return np.full(steps, np.nan)
 
 
class VARMAPredictor(TimeSeriesPredictor):
    """向量自回归移动平均模型(VARMA) - 单变量简化版"""
    
    def __init__(self, p=2, q=2):
        super().__init__(f"VARMA({p},{q})")
        self.p = p
        self.q = q
    
    def fit(self, data):
        try:
            # VARMA需要多变量,这里使用ARMA近似
            self.model = ARIMA(data, order=(self.p, 0, self.q)).fit()
            self.is_fitted = True
        except Exception as e:
            print(f"VARMA模型训练失败: {e}")
            self.is_fitted = False
    
    def predict(self, steps):
        if not self.is_fitted:
            raise ValueError("模型尚未训练")
        try:
            forecast = self.model.forecast(steps=steps)
            return forecast.values if hasattr(forecast, 'values') else forecast
        except:
            return np.full(steps, np.nan)
 
 
class VECMPredictor(TimeSeriesPredictor):
    """向量误差修正模型(VECM) - 单变量简化版"""
    
    def __init__(self, k_ar_diff=1):
        super().__init__(f"VECM({k_ar_diff})")
        self.k_ar_diff = k_ar_diff
    
    def fit(self, data):
        try:
            # VECM需要多变量,这里使用ARIMA近似
            self.model = ARIMA(data, order=(self.k_ar_diff, 1, 0)).fit()
            self.is_fitted = True
        except Exception as e:
            print(f"VECM模型训练失败: {e}")
            self.is_fitted = False
    
    def predict(self, steps):
        if not self.is_fitted:
            raise ValueError("模型尚未训练")
        try:
            forecast = self.model.forecast(steps=steps)
            return forecast.values if hasattr(forecast, 'values') else forecast
        except:
            return np.full(steps, np.nan)
 
 
class TransformerPredictor(TimeSeriesPredictor):
    """时间序列Transformer - 简化版"""
    
    def __init__(self, lookback=12, d_model=64, num_heads=4):
        super().__init__("Transformer")
        self.lookback = lookback
        self.d_model = d_model
        self.num_heads = num_heads
        self.scaler_mean = None
        self.scaler_std = None
    
    def _normalize(self, data):
        self.scaler_mean = np.mean(data)
        self.scaler_std = np.std(data)
        return (data - self.scaler_mean) / self.scaler_std
    
    def _denormalize(self, data):
        return data * self.scaler_std + self.scaler_mean
    
    def _create_sequences(self, data, lookback):
        X, y = [], []
        for i in range(lookback, len(data)):
            X.append(data[i-lookback:i])
            y.append(data[i])
        return np.array(X), np.array(y)
    
    def fit(self, data):
        if not TENSORFLOW_AVAILABLE:
            self.is_fitted = False
            return
        try:
            self.train_data = data.copy()
            data_norm = self._normalize(data)
            if len(data_norm) < self.lookback + 1:
                self.is_fitted = False
                return
            X, y = self._create_sequences(data_norm, self.lookback)
            X = X.reshape((X.shape[0], X.shape[1], 1))
            
            # 简化的Transformer:使用注意力机制
            inputs = Input(shape=(self.lookback, 1))
            x = Dense(self.d_model)(inputs)
            x = MultiHeadAttention(num_heads=self.num_heads, key_dim=self.d_model)(x, x)
            x = LayerNormalization()(x)
            x = Flatten()(x)
            x = Dense(50, activation='relu')(x)
            outputs = Dense(1)(x)
            
            self.model = Model(inputs, outputs)
            self.model.compile(optimizer='adam', loss='mse')
            early_stop = EarlyStopping(monitor='loss', patience=10, verbose=0)
            self.model.fit(X, y, epochs=100, batch_size=16, callbacks=[early_stop], verbose=0)
            self.is_fitted = True
        except Exception as e:
            print(f"Transformer模型训练失败: {e}")
            self.is_fitted = False
    
    def predict(self, steps):
        if not self.is_fitted:
            raise ValueError("模型尚未训练")
        try:
            last_data = self._normalize(self.train_data)
            predictions = []
            current_data = last_data.copy()
            for _ in range(steps):
                X_input = current_data[-self.lookback:].reshape(1, self.lookback, 1)
                pred = self.model.predict(X_input, verbose=0)[0, 0]
                predictions.append(pred)
                current_data = np.append(current_data, pred)
            predictions = self._denormalize(np.array(predictions))
            return predictions
        except:
            return np.full(steps, np.nan)
 
 
class DLMPredictor(TimeSeriesPredictor):
    """动态线性模型(DLM) - 简化版"""
    
    def __init__(self):
        super().__init__("DLM")
    
    def fit(self, data):
        try:
            # DLM简化实现:使用指数平滑
            self.model = ExponentialSmoothing(data, trend='add', seasonal='add', seasonal_periods=12).fit()
            self.is_fitted = True
        except Exception as e:
            try:
                self.model = ExponentialSmoothing(data, trend='add').fit()
                self.is_fitted = True
            except:
                print(f"DLM模型训练失败: {e}")
                self.is_fitted = False
    
    def predict(self, steps):
        if not self.is_fitted:
            raise ValueError("模型尚未训练")
        try:
            forecast = self.model.forecast(steps=steps)
            return forecast.values if hasattr(forecast, 'values') else forecast
        except:
            return np.full(steps, np.nan)
 
 
class FuzzyTimeSeriesPredictor(TimeSeriesPredictor):
    """模糊时间序列模型 - 简化版"""
    
    def __init__(self, n_partitions=7):
        super().__init__("模糊时间序列")
        self.n_partitions = n_partitions
    
    def fit(self, data):
        try:
            # 模糊时间序列简化实现:使用ARIMA
            self.model = ARIMA(data, order=(3, 1, 2)).fit()
            self.is_fitted = True
        except Exception as e:
            print(f"模糊时间序列模型训练失败: {e}")
            self.is_fitted = False
    
    def predict(self, steps):
        if not self.is_fitted:
            raise ValueError("模型尚未训练")
        try:
            forecast = self.model.forecast(steps=steps)
            return forecast.values if hasattr(forecast, 'values') else forecast
        except:
            return np.full(steps, np.nan)
 
 
class MarkovChainPredictor(TimeSeriesPredictor):
    """马尔可夫链预测模型 - 简化版"""
    
    def __init__(self, n_states=5):
        super().__init__("马尔可夫链")
        self.n_states = n_states
    
    def fit(self, data):
        try:
            # 马尔可夫链简化实现:使用ARIMA
            self.model = ARIMA(data, order=(2, 1, 1)).fit()
            self.is_fitted = True
        except Exception as e:
            print(f"马尔可夫链模型训练失败: {e}")
            self.is_fitted = False
    
    def predict(self, steps):
        if not self.is_fitted:
            raise ValueError("模型尚未训练")
        try:
            forecast = self.model.forecast(steps=steps)
            return forecast.values if hasattr(forecast, 'values') else forecast
        except:
            return np.full(steps, np.nan)
 
 
class BSTSPredictor(TimeSeriesPredictor):
    """贝叶斯结构时间序列模型(BSTS) - 简化版"""
    
    def __init__(self):
        super().__init__("BSTS")
    
    def fit(self, data):
        try:
            # BSTS简化实现:使用SARIMA
            self.model = SARIMAX(data, order=(1, 1, 1), seasonal_order=(1, 1, 1, 12),
                                enforce_stationarity=False, enforce_invertibility=False).fit(disp=False)
            self.is_fitted = True
        except Exception as e:
            print(f"BSTS模型训练失败: {e}")
            self.is_fitted = False
    
    def predict(self, steps):
        if not self.is_fitted:
            raise ValueError("模型尚未训练")
        try:
            forecast = self.model.forecast(steps=steps)
            return forecast.values if hasattr(forecast, 'values') else forecast
        except:
            return np.full(steps, np.nan)
 
 
def plot_predictions(train_data, test_data, predictions, model_name, save_path):
    """绘制单个模型的预测结果"""
    plt.figure(figsize=(12, 6))
    
    # 训练集
    train_idx = np.arange(len(train_data))
    plt.plot(train_idx, train_data, 'b-', label='训练数据', linewidth=2, alpha=0.7)
    
    # 测试集和预测
    test_idx = np.arange(len(train_data), len(train_data) + len(test_data))
    plt.plot(test_idx, test_data, 'g-', label='测试数据', linewidth=2, marker='o', markersize=4)
    
    if predictions is not None and not np.isnan(predictions).any():
        plt.plot(test_idx, predictions, 'r--', label='预测值', linewidth=2, marker='s', markersize=4)
    
    # 添加分界线
    plt.axvline(x=len(train_data), color='gray', linestyle='--', alpha=0.7, label='训练/测试分界')
    
    plt.xlabel('时间', fontsize=12)
    plt.ylabel('数值', fontsize=12)
    plt.title(f'{model_name} 预测结果', fontsize=14, fontweight='bold')
    plt.legend(fontsize=10)
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    
    plt.savefig(save_path, dpi=300, bbox_inches='tight')
    plt.close()
    print(f"  图表已保存: {save_path}")
 
 
def main(optimize_params=False, output_dir=None):
    """
    主函数
    
    Parameters:
    -----------
    optimize_params : bool, 是否进行参数优化
    output_dir : str, 输出目录,默认为"预测结果"
    """
    print("=" * 70)
    print("时间序列预测算法对比分析程序")
    print("=" * 70)
    
    # 创建输出目录
    if output_dir is None:
        output_dir = "预测结果"
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
        print(f"创建输出目录: {output_dir}
")
    
    # 1. 读取数据
    print("1. 读取时间序列数据...")
    data_file = r"D:TrinityPredict时间序列数据.csv"
    try:
        df = pd.read_csv(data_file, encoding='utf-8')
        data = df['数值'].values
        print(f"   成功读取 {len(data)} 条数据
")
    except Exception as e:
        print(f"   错误: 无法读取数据文件 - {e}")
        return
    
    # 2. 划分训练集和测试集 (80-20)
    print("2. 划分训练集和测试集 (80-20)...")
    split_idx = int(len(data) * 0.8)
    train_data = data[:split_idx]
    test_data = data[split_idx:]
    print(f"   训练集: {len(train_data)} 条")
    print(f"   测试集: {len(test_data)} 条
")
    
    # 2.5. 参数优化(如果启用)
    optimized_params = {}
    if optimize_params:
        print("2.5. 开始参数优化...")
        try:
            from parameter_optimization import optimize_all_models
            optimized_params = optimize_all_models(train_data, test_data, output_dir)
            print(f"
参数优化完成,共优化 {len(optimized_params)} 个模型
")
        except Exception as e:
            print(f"参数优化失败: {e}
")
            optimize_params = False
    
    # 3. 初始化所有预测器
    print("3. 初始化预测模型...")
    
    # 获取优化后的参数
    def get_opt_param(model_name, param_name, default):
        if model_name in optimized_params and param_name in optimized_params[model_name]:
            return optimized_params[model_name][param_name]
        return default
    
    predictors = [
        # 基础统计模型
        ARPredictor(lags=get_opt_param('AR', 'lags', 5)),
        MAPredictor(q=get_opt_param('MA', 'q', 2)),
        ARMAPredictor(p=get_opt_param('ARMA', 'p', 2), q=get_opt_param('ARMA', 'q', 2)),
        ARIMAPredictor(order=get_opt_param('ARIMA', 'order', (1, 1, 1))),
        SARIMAPredictor(order=(1, 1, 1), seasonal_order=(1, 1, 1, 12)),
        ExponentialSmoothingPredictor(trend='add', seasonal='add', seasonal_periods=12),
        ETSPredictor(trend='add', seasonal='add', seasonal_periods=12),
        
        # Prophet模型
        ProphetPredictor() if PROPHET_AVAILABLE else None,
        
        # 机器学习模型
        SVRPredictor(lookback=12) if SKLEARN_AVAILABLE else None,
        GBRTPredictor(lookback=12) if SKLEARN_AVAILABLE else None,
        RandomForestPredictor(lookback=12) if SKLEARN_AVAILABLE else None,
        KNNPredictor(lookback=12, n_neighbors=5) if SKLEARN_AVAILABLE else None,
        GPRPredictor(lookback=get_opt_param('GPR', 'lookback', 12)) if SKLEARN_AVAILABLE else None,
        LightGBMPredictor(lookback=12) if LIGHTGBM_AVAILABLE else None,
        
        # 深度学习模型
        LSTMPredictor(lookback=get_opt_param('LSTM', 'lookback', 12), 
                     units=get_opt_param('LSTM', 'units', 50)) if TENSORFLOW_AVAILABLE else None,
        GRUPredictor(lookback=12, units=50) if TENSORFLOW_AVAILABLE else None,
        BiLSTMPredictor(lookback=get_opt_param('BiLSTM', 'lookback', 12),
                       units=get_opt_param('BiLSTM', 'units', 50)) if TENSORFLOW_AVAILABLE else None,
        TCNPredictor(lookback=12, filters=64) if TENSORFLOW_AVAILABLE else None,
        TransformerPredictor(lookback=12, d_model=64, num_heads=4) if TENSORFLOW_AVAILABLE else None,
        Seq2SeqPredictor(lookback=get_opt_param('Seq2Seq', 'lookback', 12),
                        encoder_units=get_opt_param('Seq2Seq', 'encoder_units', 50),
                        decoder_units=get_opt_param('Seq2Seq', 'decoder_units', 50)) if TENSORFLOW_AVAILABLE else None,
        CNNLSTMPredictor(lookback=12, cnn_filters=64, lstm_units=50) if TENSORFLOW_AVAILABLE else None,
        
        # 统计模型(多变量简化版)
        VARPredictor(maxlags=get_opt_param('VAR', 'maxlags', 5)),
        VARMAPredictor(p=get_opt_param('VARMA', 'p', 2), q=get_opt_param('VARMA', 'q', 2)),
        VECMPredictor(k_ar_diff=get_opt_param('VECM', 'k_ar_diff', 1)),
        ARCHPredictor(p=get_opt_param('ARCH', 'p', 1)),
        GARCHPredictor(p=get_opt_param('GARCH', 'p', 1), q=get_opt_param('GARCH', 'q', 1)),
        
        # 其他模型
        DLMPredictor(),
        FuzzyTimeSeriesPredictor(n_partitions=7),
        MarkovChainPredictor(n_states=5),
        BSTSPredictor(),
    ]
    
    # 过滤掉None值(由于依赖不可用)
    predictors = [p for p in predictors if p is not None]
    
    print(f"   共初始化 {len(predictors)} 个模型
")
    
    # 4. 训练和预测
    print("4. 开始训练和预测...")
    print("=" * 70)
    results = {}
    
    for predictor in predictors:
        print(f"
正在处理: {predictor.name}")
        try:
            # 训练模型
            predictor.fit(train_data)
            
            if not predictor.is_fitted:
                print(f"  {predictor.name} 训练失败,跳过")
                continue
            
            # 对于需要设置历史数据的模型
            if isinstance(predictor, (LSTMPredictor, GRUPredictor, BiLSTMPredictor, 
                                      TCNPredictor, TransformerPredictor, Seq2SeqPredictor, 
                                      CNNLSTMPredictor)):
                if hasattr(predictor, 'set_last_data'):
                    predictor.set_last_data(train_data)
                elif hasattr(predictor, 'train_data'):
                    # 这些模型已经在fit中保存了train_data
                    pass
            
            # 预测
            predictions = predictor.predict(steps=len(test_data))
            
            # 评估
            metrics = predictor.evaluate(test_data, predictions)
            results[predictor.name] = {
                'predictions': predictions,
                'metrics': metrics
            }
            
            print(f"  MAE: {metrics['MAE']:.4f}")
            print(f"  RMSE: {metrics['RMSE']:.4f}")
            print(f"  MAPE: {metrics['MAPE']:.2f}%")
            print(f"  R²: {metrics['R²']:.4f}")
            
            # 绘制并保存图表
            # 清理文件名中的特殊字符
            safe_name = predictor.name.replace('(', '_').replace(')', '_').replace(',', '_').replace(' ', '_')
            save_path = os.path.join(output_dir, f"{safe_name}_预测结果.png")
            plot_predictions(train_data, test_data, predictions, predictor.name, save_path)
            
        except Exception as e:
            print(f"  {predictor.name} 处理失败: {e}")
            continue
    
    print("
" + "=" * 70)
    print("5. 生成对比分析报告...")
    
    # 5. 生成对比分析报告
    report_file = os.path.join(output_dir, "算法对比分析报告.txt")
    with open(report_file, 'w', encoding='utf-8') as f:
        f.write("=" * 70 + "
")
        f.write("时间序列预测算法对比分析报告
")
        f.write("=" * 70 + "

")
        f.write(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

")
        
        f.write(f"数据信息:
")
        f.write(f"  总数据量: {len(data)} 条
")
        f.write(f"  训练集: {len(train_data)} 条 (80%)
")
        f.write(f"  测试集: {len(test_data)} 条 (20%)

")
        
        f.write("=" * 70 + "
")
        f.write("各算法预测结果对比
")
        f.write("=" * 70 + "

")
        
        # 创建对比表格
        f.write(f"{'算法':<20} {'MAE':<15} {'RMSE':<15} {'MAPE(%)':<15} {'R²':<15}
")
        f.write("-" * 70 + "
")
        
        for model_name, result in results.items():
            metrics = result['metrics']
            f.write(f"{model_name:<20} {metrics['MAE']:<15.4f} {metrics['RMSE']:<15.4f} "
                   f"{metrics['MAPE']:<15.2f} {metrics['R²']:<15.4f}
")
        
        f.write("
" + "=" * 70 + "
")
        f.write("算法性能排名
")
        f.write("=" * 70 + "

")
        
        # 按RMSE排序
        sorted_by_rmse = sorted(results.items(), key=lambda x: x[1]['metrics']['RMSE'])
        f.write("按RMSE排序 (越小越好):
")
        for i, (model_name, result) in enumerate(sorted_by_rmse, 1):
            f.write(f"  {i}. {model_name:<20} RMSE: {result['metrics']['RMSE']:.4f}
")
        
        f.write("
")
        # 按MAPE排序
        sorted_by_mape = sorted(results.items(), key=lambda x: x[1]['metrics']['MAPE'])
        f.write("按MAPE排序 (越小越好):
")
        for i, (model_name, result) in enumerate(sorted_by_mape, 1):
            f.write(f"  {i}. {model_name:<20} MAPE: {result['metrics']['MAPE']:.2f}%
")
        
        f.write("
")
        # 按R²排序
        sorted_by_r2 = sorted(results.items(), key=lambda x: x[1]['metrics']['R²'], reverse=True)
        f.write("按R²排序 (越大越好):
")
        for i, (model_name, result) in enumerate(sorted_by_r2, 1):
            f.write(f"  {i}. {model_name:<20} R²: {result['metrics']['R²']:.4f}
")
        
        f.write("
" + "=" * 70 + "
")
        f.write("指标说明
")
        f.write("=" * 70 + "
")
        f.write("MAE (平均绝对误差): 预测值与真实值之间的平均绝对差异,越小越好
")
        f.write("RMSE (均方根误差): 预测误差的平方根,对异常值敏感,越小越好
")
        f.write("MAPE (平均绝对百分比误差): 预测误差的百分比表示,越小越好
")
        f.write("R² (决定系数): 模型对数据变异性的解释程度,越接近1越好
")
    
    print(f"   对比分析报告已保存: {report_file}")
    
    # 6. 绘制综合对比图
    print("
6. 生成综合对比图...")
    if len(results) > 0:
        plt.figure(figsize=(14, 8))
        
        # 训练集
        train_idx = np.arange(len(train_data))
        plt.plot(train_idx, train_data, 'k-', label='训练数据', linewidth=2, alpha=0.7)
        
        # 测试集
        test_idx = np.arange(len(train_data), len(train_data) + len(test_data))
        plt.plot(test_idx, test_data, 'g-', label='测试数据', linewidth=2.5, marker='o', markersize=6)
        
        # 各模型预测结果
        colors = plt.cm.tab10(np.linspace(0, 1, len(results)))
        for (model_name, result), color in zip(results.items(), colors):
            predictions = result['predictions']
            if predictions is not None and not np.isnan(predictions).any():
                plt.plot(test_idx, predictions, '--', label=f'{model_name}', 
                        linewidth=2, alpha=0.8, color=color)
        
        plt.axvline(x=len(train_data), color='gray', linestyle='--', alpha=0.7)
        plt.xlabel('时间', fontsize=12)
        plt.ylabel('数值', fontsize=12)
        plt.title('所有算法预测结果对比', fontsize=14, fontweight='bold')
        plt.legend(fontsize=9, loc='best')
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        
        comparison_path = os.path.join(output_dir, "所有算法对比图.png")
        plt.savefig(comparison_path, dpi=300, bbox_inches='tight')
        plt.close()
        print(f"   综合对比图已保存: {comparison_path}")
    
    print("
" + "=" * 70)
    print("所有任务完成!")
    print("=" * 70)
    print(f"
结果文件保存在目录: {output_dir}")
    print(f"   - 各算法预测图表: {len(results)} 个PNG文件")
    print(f"   - 综合对比图: 所有算法对比图.png")
    print(f"   - 对比分析报告: 算法对比分析报告.txt")
 
 
if __name__ == "__main__":
    import sys
    # 如果命令行参数包含--optimize,则进行参数优化
    optimize = '--optimize' in sys.argv or '-o' in sys.argv
    # 如果命令行参数包含--output,则使用指定输出目录
    output = None
    if '--output' in sys.argv:
        idx = sys.argv.index('--output')
        if idx + 1 < len(sys.argv):
            output = sys.argv[idx + 1]
    elif '-d' in sys.argv:
        idx = sys.argv.index('-d')
        if idx + 1 < len(sys.argv):
            output = sys.argv[idx + 1]
    
    # 默认输出到output文件夹
    if output is None:
        output = r"D:TrinityPredictoutput"
    
    main(optimize_params=optimize, output_dir=output)

数据集如下:

时间 数值
2011年1月 5.038
2011年2月 4.9663
2011年3月 5.5451
2011年4月 5.7251
2011年5月 6.0154
2011年6月 5.3962
2011年7月 5.1923
2011年8月 5.0876
2011年9月 4.5364
2011年10月 4.6466
2011年11月 4.0017
2011年12月 3.4046
2012年1月 4.9552
2012年2月 5.3597
2012年3月 5.9473
2012年4月 5.7935
2012年5月 5.9784
2012年6月 5.2608
2012年7月 5.4238
2012年8月 4.968
2012年9月 4.2632
2012年10月 4.2834
2012年11月 3.4633
2012年12月 3.1613
2013年1月 4.4057
2013年2月 4.057
2013年3月 4.9918
2013年4月 5.3022
2013年5月 5.1397
2013年6月 4.6266
2013年7月 4.5225
2013年8月 4.5116
2013年9月 4.3199
2013年10月 4.1464
2013年11月 3.4983
2013年12月 3.4764
2014年1月 4.7344
2014年2月 4.1033
2014年3月 4.9782
2014年4月 5.0473
2014年5月 4.9145
2014年6月 4.7254
2014年7月 5.1382
2014年8月 4.7636
2014年9月 4.4853
2014年10月 4.1433
2014年11月 3.5613
2014年12月 3.4558
2015年1月 4.488
2015年2月 3.4877
2015年3月 4.9401
2015年4月 5
2015年5月 4.7131
2015年6月 4.5824
2015年7月 4.764
2015年8月 4.4136
2015年9月 3.8272
2015年10月 3.9216
2015年11月 3.3315
2015年12月 3.3333
2016年1月 3.8888
2016年2月 3.5638
2016年3月 4.6055
2016年4月 4.4575
2016年5月 4.6994
2016年6月 4.4069
2016年7月 4.2463
2016年8月 4.3546
2016年9月 3.8617
2016年10月 3.822
2016年11月 3.5909
2016年12月 3.2804
2017年1月 3.5957
2017年2月 3.6422
2017年3月 4.5116
2017年4月 4.2826
2017年5月 4.483
2017年6月 4.1377
2017年7月 4.0751
2017年8月 4.4651
2017年9月 4.1055
2017年10月 3.7853
2017年11月 3.3345
2017年12月 3.1216
2018年1月 3.9226
2018年2月 3.3587
2018年3月 4.4228
2018年4月 4.1895
2018年5月 4.3946
2018年6月 3.9703
2018年7月 3.8908
2018年8月 3.9296
2018年9月 3.5125
2018年10月 3.4099
2018年11月 3.1943
2018年12月 3.0599
2019年1月 3.8748
2019年2月 3.5053
2019年3月 4.5268
2019年4月 4.2897
2019年5月 4.3246
2019年6月 4.084
2019年7月 4.1816
2019年8月 3.9463
2019年9月 3.721
2019年10月 3.3694
2019年11月 3.0486
2019年12月 2.9179
2020年1月 3.2476
2020年2月 2.2487
2020年3月 3.7974
2020年4月 4.5346
2020年5月 4.6182
2020年6月 4.6028
2020年7月 4.8459
2020年8月 4.4603
2020年9月 4.516
2020年10月 3.7572
2020年11月 3.6766
2020年12月 3.4768
2021年1月 3.2385
2021年2月 2.6575
2021年3月 4.159
2021年4月 4.1972
2021年5月 4.3104
2021年6月 4.3012
2021年7月 4.3654
2021年8月 4.2492
2021年9月 3.9908
2021年10月 3.6697
2021年11月 3.8333
2021年12月 3.2416
2022年1月 2.9253
2022年2月 2.4586
2022年3月 4.4443
2022年4月 3.7525
2022年5月 3.7433
2022年6月 4.1022
2022年7月 4.2603
2022年8月 4.1052
2022年9月 4.128
2022年10月 3.6582
2022年11月 3.6248
2022年12月 2.6258
© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
一只被压在五指山下的猴子的头像 - 鹿快
评论 抢沙发

请登录后发表评论

    暂无评论内容