掘金量化平台资金流选股策略代码(优化版)

掘金量化平台资金流选股策略代码(优化版)

以下是基于掘金量化平台实现的资金流选股策略代码,包含基础版与增强版两个版本。代码在保留核心逻辑的基础上,优化了注释清晰度、参数可读性与异常处理机制,同时补充了关键功能说明,方便您直接复用或二次开发。

一、基础版:主力资金 + 量价共振选股策略

策略核心逻辑

通过「连续主力资金流入 + 量价配合 + 均线支撑」筛选股票,每日固定时间调仓,严格控制持仓数量与权重,适合新手入门验证资金流策略有效性。

python

运行

from gm.api import *
import numpy as np
import pandas as pd

"""
基于资金流入的选股策略(掘金量化基础版)
策略逻辑:连续N日主力资金信号+量能放大+价格趋势+均线支撑,多条件共振选股
适用场景:沪深300成分股池,中短期(10天左右)持仓
"""

def init(context):
    """策略初始化:设置参数、股票池、调仓频率"""
    # 1. 核心参数配置(可根据回测结果调整)
    context.param = {
        "lookback_days": 3,          # 资金流回顾天数(连续N日信号)
        "inflow_threshold": 0.002,   # 资金流入幅度阈值
        "volume_ratio": 1.5,         # 量比阈值(今日成交量/20日均量)
        "holding_days": 10,          # 单只股票最大持有天数
        "max_position": 10,          # 最大持仓数量(分散风险)
        "ma_period": 20              # 均线周期(20日均线作为支撑)
    }
    
    # 2. 股票池:沪深300成分股(流动性好、市值稳定)
    context.universe = "SHSE.000300"
    
    # 3. 调仓频率:每日开盘后10分钟(09:40)执行策略,避免开盘波动
    schedule(schedule_func=algo, date_rule="1d", time_rule="09:40:00")
    
    # 4. 持仓记录:跟踪每只股票的买入时间(用于控制持有天数)
    context.holding_records = {}

def algo(context):
    """主策略逻辑:选股→筛选→交易"""
    current_date = context.now.strftime("%Y-%m-%d")
    
    # 步骤1:获取初始股票池(沪深300成分股,过滤流动性差的标的)
    candidate_stocks = get_initial_universe(context)
    if not candidate_stocks:
        print(f"{current_date}:未获取到有效股票池,跳过当日调仓")
        return
    
    # 步骤2:计算资金流+量价信号,筛选符合条件的股票
    selected_stocks = calculate_multi_factor_signals(context, candidate_stocks, current_date)
    
    # 步骤3:执行交易(平仓不符合条件的股票,建仓新选中的股票)
    execute_trading(context, selected_stocks, current_date)

def get_initial_universe(context):
    """获取初始股票池:沪深300最新成分股,过滤停牌/退市标的"""
    try:
        # 获取指定日期的指数成分股
        constituents = get_history_constituents(
            index=context.universe,
            start_date=context.now,
            end_date=context.now
        )
        if not constituents:
            return []
        
        # 提取成分股代码,过滤停牌股票(通过获取最新行情验证)
        stock_list = [item["symbol"] for item in constituents[0]["constituents"]]
        valid_stocks = []
        for symbol in stock_list[:100]:  # 限制数量,减少计算压力
            # 获取最新行情,排除停牌(status=1为正常交易)
            quote = get_quote(symbols=symbol, fields="status")
            if quote and quote[0]["status"] == 1:
                valid_stocks.append(symbol)
        return valid_stocks
    except Exception as e:
        print(f"获取股票池出错:{str(e)}")
        return []

def calculate_multi_factor_signals(context, stocks, current_date):
    """多因子信号计算:资金流+量能+趋势+均线,四因子共振"""
    selected = []
    param = context.param
    
    for symbol in stocks:
        try:
            # 1. 获取历史数据(需包含足够周期:lookback_days+ma_period)
            hist_data = history_n(
                symbol=symbol,
                frequency="1d",          # 日线数据
                count=param["lookback_days"] + param["ma_period"],  # 数据长度
                fields="open,high,low,close,volume,amount,pre_close",  # 所需字段
                end_time=current_date,
                fill_missing="Last",     # 缺失值用前值填充
                df=True                  # 返回DataFrame格式,便于计算
            )
            # 过滤数据量不足的股票
            if len(hist_data) < param["lookback_days"] + param["ma_period"]:
                continue
            
            # 2. 因子1:价格趋势(最近N日收盘价创新高,确认上升趋势)
            recent_data = hist_data.tail(param["lookback_days"])
            condition_trend = recent_data["close"].iloc[-1] > recent_data["close"].iloc[:-1].max()
            
            # 3. 因子2:量能放大(今日成交量>20日均量×量比阈值,验证资金活跃度)
            volume_ma = hist_data["volume"].rolling(param["ma_period"]).mean().iloc[-1]  # 20日均量
            condition_volume = recent_data["volume"].iloc[-1] > volume_ma * param["volume_ratio"]
            
            # 4. 因子3:资金流信号(简化版:价涨量增,真实场景提议用逐笔成交数据)
            condition_money_flow = estimate_money_flow(recent_data, param["inflow_threshold"])
            
            # 5. 因子4:均线支撑(收盘价站在20日均线上,确认趋势有效性)
            ma_value = hist_data["close"].rolling(param["ma_period"]).mean().iloc[-1]
            condition_ma = recent_data["close"].iloc[-1] > ma_value
            
            # 6. 四因子同时满足,加入选中列表
            if condition_trend and condition_volume and condition_money_flow and condition_ma:
                selected.append(symbol)
        
        except Exception as e:
            print(f"计算{symbol}信号出错:{str(e)},跳过该股票")
            continue
    
    # 限制持仓数量,取前N只股票(按信号强度排序,此处简化为按代码排序)
    return selected[:param["max_position"]]

def estimate_money_flow(recent_data, threshold):
    """估算资金流信号(简化版):价涨量增→判断为资金流入"""
    # 1. 计算每日涨跌幅(收盘价-开盘价)/开盘价
    daily_return = (recent_data["close"] - recent_data["open"]) / recent_data["open"]
    # 2. 计算每日量能变化(今日成交量/昨日成交量)
    daily_volume_ratio = recent_data["volume"] / recent_data["volume"].shift(1)
    
    # 3. 连续N日满足:涨跌幅>阈值 + 量能放大(>1.1倍)→ 判定为资金流入
    valid_days = sum((daily_return > threshold) & (daily_volume_ratio > 1.1))
    return valid_days == len(recent_data)  # 所有回顾日都满足

def execute_trading(context, selected_stocks, current_date):
    """执行交易:平仓不在选中列表的股票,等权重建仓新股票"""
    param = context.param
    current_positions = context.account().positions()  # 当前持仓
    
    # 步骤1:平仓操作(3种情况需平仓)
    for position in current_positions:
        symbol = position["symbol"]
        # 情况1:股票不在新选中列表
        # 情况2:持有天数超过max_holding_days
        holding_days = (pd.to_datetime(current_date) - pd.to_datetime(context.holding_records.get(symbol, current_date))).days
        # 情况3:股价跌破20日均线(趋势反转)
        ma_value = get_ma_value(symbol, current_date, param["ma_period"])
        
        if (symbol not in selected_stocks) or (holding_days >= param["holding_days"]) or (position["price"] < ma_value):
            order_target_percent(symbol=symbol, percent=0, order_type=OrderType_Market)
            print(f"{current_date}:平仓{symbol},缘由:{'不在选中列表' if symbol not in selected_stocks else '持有天数超限' if holding_days >= param['holding_days'] else '跌破均线支撑'}")
            # 移除持仓记录
            if symbol in context.holding_records:
                del context.holding_records[symbol]
    
    # 步骤2:建仓操作(等权重分配,预留2%现金应对波动)
    if not selected_stocks:
        return
    target_weight = 0.98 / len(selected_stocks)  # 单只股票目标权重
    
    for symbol in selected_stocks:
        # 检查当前权重,差异>1%时才调仓(减少交易成本)
        current_weight = get_position_weight(context, symbol)
        if abs(current_weight - target_weight) > 0.01:
            order_target_percent(
                symbol=symbol,
                percent=target_weight,
                order_type=OrderType_Market  # 市价单,确保成交
            )
            print(f"{current_date}:建仓/调仓{symbol},目标权重:{target_weight:.2%}")
            # 记录买入时间
            if symbol not in context.holding_records:
                context.holding_records[symbol] = current_date

def get_ma_value(symbol, date, ma_period):
    """辅助函数:获取指定股票的N日均线值"""
    try:
        hist_data = history_n(
            symbol=symbol,
            frequency="1d",
            count=ma_period,
            fields="close",
            end_time=date,
            df=True
        )
        return hist_data["close"].rolling(ma_period).mean().iloc[-1]
    except:
        return 0

def get_position_weight(context, symbol):
    """辅助函数:获取指定股票的当前持仓权重"""
    try:
        position = context.account().position(symbol)
        if position:
            return position["market_value"] / context.account().cash["nav"]  # 市值/净值
    except:
        pass
    return 0

def on_backtest_finished(context, indicator):
    """回测结束后输出关键指标,评估策略表现"""
    print("=" * 60)
    print("策略回测结果汇总")
    print("=" * 60)
    # 核心收益指标
    print(f"累计收益率:{indicator['accumulative_return']:.2%}")
    print(f"年化收益率:{indicator['annual_return']:.2%}")
    print(f"最大回撤:{indicator['max_drawdown']:.2%} (越小越好)")
    print(f"夏普比率:{indicator['sharpe']:.2f} (>1.5为优秀)")
    # 交易质量指标
    print(f"胜率:{indicator['win_rate']:.2%} (盈利交易占比)")
    print(f"总交易次数:{indicator['total_trades']}")
    print(f"平均持仓时间:{indicator['average_holding_time']:.1f}天")
    print("=" * 60)

if __name__ == "__main__":
    """策略运行入口:配置回测参数"""
    run(
        strategy_id="money_flow_basic_v1",  # 自定义策略ID
        filename=__file__,                 # 代码文件名(需与本地一致)
        mode=MODE_BACKTEST,                # 模式:回测(MODE_BACKTEST)/实盘(MODE_LIVE)
        token="your_token_here",           # 替换为您的掘金量化Token(在平台密钥管理中获取)
        backtest_start_time="2020-01-01 08:00:00",  # 回测开始时间
        backtest_end_time="2023-12-31 16:00:00",    # 回测结束时间
        backtest_adjust=ADJUST_PREV,       # 复权方式:前复权(推荐)
        backtest_initial_cash=1000000,     # 初始资金:100万元
        backtest_commission_ratio=0.0001,  # 佣金费率:0.01%(万1)
        backtest_slippage_ratio=0.0001     # 滑点:0.01%(模拟实际交易成本)
    )

二、增强版:多因子加权评分选股策略

策略核心升级

相比基础版,增强版通过「评分制」替代「硬条件筛选」,结合「市值过滤 + 行业分散 + 收盘前风险控制」,同时优化资金流计算逻辑,更贴近实际市场主力资金行为。

python

运行

from gm.api import *
import numpy as np
import pandas as pd
from datetime import datetime

"""
资金流选股策略(掘金量化增强版)
核心升级:多因子加权评分(资金流50%+趋势30%+量能20%)+市值过滤+收盘风控
适用场景:全市场选股(剔除过小/过大市值),中短期持仓
"""

class EnhancedMoneyFlowStrategy:
    def __init__(self):
        """类初始化:存储策略上下文(替代全局变量)"""
        self.context = None
    
    def init(self, context):
        """策略初始化:参数、调度、风控配置"""
        self.context = context
        # 1. 核心参数(较基础版更精细)
        self.param = {
            "lookback_days": 5,          # 资金流回顾天数(更长周期,减少噪音)
            "volume_threshold": 1.3,     # 量比阈值(降低门槛,扩大选股范围)
            "min_market_cap": 5e9,       # 最小市值:50亿(剔除小票,减少波动)
            "max_market_cap": 2e11,      # 最大市值:2000亿(剔除巨无霸,保证弹性)
            "score_threshold": 0.6,      # 总评分阈值(0-1分,高于0.6才选中)
            "max_position": 10,          # 最大持仓数量
            "close_risk_time": "14:30"   # 收盘前风控时间(14:30检查持仓)
        }
        
        # 2. 调度配置:10:00选股调仓,14:30收盘风控
        schedule(schedule_func=self.run_strategy, date_rule="1d", time_rule="10:00:00")
        schedule(schedule_func=self.close_risk_positions, date_rule="1d", time_rule=self.param["close_risk_time"])
        
        # 3. 持仓记录:跟踪评分(用于权重分配)
        self.context.position_scores = {}

    def run_strategy(self, context):
        """主策略:选股→评分→交易"""
        current_date = context.now.strftime("%Y-%m-%d")
        
        # 步骤1:获取过滤后的全市场股票池(市值+流动性)
        universe = self.get_filtered_universe()
        if not universe:
            print(f"{current_date}:无符合条件的股票池,跳过调仓")
            return
        
        # 步骤2:多因子加权评分(资金流+趋势+量能)
        stock_scores = self.calculate_weighted_scores(universe, current_date)
        if not stock_scores:
            print(f"{current_date}:无评分达标股票,跳过调仓")
            return
        
        # 步骤3:按评分排序,取前N只股票
        selected_stocks = dict(sorted(stock_scores.items(), key=lambda x: x[1], reverse=True)[:self.param["max_position"]])
        self.context.position_scores = selected_stocks  # 保存评分,用于权重分配
        
        # 步骤4:执行交易
        self.execute_trading(selected_stocks)

    def get_filtered_universe(self):
        """获取过滤后的股票池:全市场+市值+流动性"""
        try:
            # 1. 获取全市场A股(排除创业板/科创板?可根据需求添加)
            all_stocks = get_instruments(
                exchanges="SHSE,SZSE",
                sec_types=1,                # 1=股票
                fields="symbol,market_cap,status",  # 市值+交易状态
                df=True
            )
            
            # 2. 过滤条件:市值在区间内+正常交易(status=1)
            filtered = all_stocks[
                (all_stocks["market_cap"] >= self.param["min_market_cap"]) &
                (all_stocks["market_cap"] <= self.param["max_market_cap"]) &
                (all_stocks["status"] == 1)  # 正常交易,排除停牌/退市
            ]
            
            return filtered["symbol"].tolist()[:200]  # 限制数量,减少计算压力
        except Exception as e:
            print(f"过滤股票池出错:{str(e)}")
            return []

    def calculate_weighted_scores(self, stocks, current_date):
        """多因子加权评分:资金流(50%)+趋势(30%)+量能(20%)"""
        stock_scores = {}
        
        for symbol in stocks:
            try:
                # 1. 获取历史数据(日线,包含lookback_days天)
                hist_data = history_n(
                    symbol=symbol,
                    frequency="1d",
                    count=self.param["lookback_days"],
                    fields="open,high,low,close,volume,amount,pre_close",
                    end_time=current_date,
                    df=True
                )
                if len(hist_data) < self.param["lookback_days"]:
                    continue
                
                # 2. 计算单因子得分(0-1分,归一化)
                flow_score = self.calculate_money_flow_score(hist_data)    # 资金流得分(50%权重)
                trend_score = self.calculate_trend_score(hist_data)      # 趋势得分(30%权重)
                volume_score = self.calculate_volume_score(hist_data)    # 量能得分(20%权重)
                
                # 3. 总评分=加权求和
                total_score = flow_score * 0.5 + trend_score * 0.3 + volume_score * 0.2
                
                # 4. 达标则加入评分字典
                if total_score >= self.param["score_threshold"]:
                    stock_scores[symbol] = total_score
            
            except Exception as e:
                print(f"计算{symbol}评分出错:{str(e)},跳过")
                continue
        
        return stock_scores

    def calculate_money_flow_score(self, data):
        """因子1:资金流得分(基于价量关系,模拟主力资金行为)"""
        # 逻辑:涨时量增权重高,跌时量增权重低
        flow_strength = 0
        for i in range(len(data)):
            # 当日涨跌幅:(收盘价-开盘价)/开盘价
            daily_return = (data["close"].iloc[i] - data["open"].iloc[i]) / data["open"].iloc[i]
            # 当日量能相对值:当日成交量/5日平均成交量
            volume_ratio = data["volume"].iloc[i] / data["volume"].mean()
            
            # 涨时加权重,跌时减重
            if daily_return > 0:
                flow_strength += daily_return * volume_ratio
            else:
                flow_strength += daily_return * volume_ratio * 0.5  # 下跌时权重减半
        
        # 归一化到0-1分(避免极端值影响)
        return max(0, min(1, flow_strength * 10))

    def calculate_trend_score(self, data):
        """因子2:趋势得分(短期均线>长期均线,确认上升趋势)"""
        closes = data["close"].values
        # 短期均线:3日均值,长期均线:5日均值
        short_ma = np.mean(closes[-3:]) if len(closes)>=3 else 0
        long_ma = np.mean(closes[-5:]) if len(closes)>=5 else 0
        
        # 评分规则:短期>长期+最新价>短期→0.8分;短期>长期→0.6分;否则0.3分
        if short_ma > long_ma and closes[-1] > short_ma:
            return 0.8
        elif short_ma > long_ma:
            return 0.6
        else:
            return 0.3

    def calculate_volume_score(self, data):
        """因子3:量能得分(最近3日平均量/5日平均量,验证资金活跃度)"""
        recent_volume = data["volume"].tail(3).mean()  # 最近3日均量
        avg_volume = data["volume"].mean()             # 5日均量
        volume_ratio = recent_volume / avg_volume
        
        # 归一化到0-1分(量比3倍以上按满分算)
        return min(1.0, volume_ratio / 3.0)

    def execute_trading(self, selected_stocks):
        """执行交易:平仓+建仓,按评分分配权重"""
        context = self.context
        current_positions = context.account().positions()
        current_hold_symbols = {pos["symbol"] for pos in current_positions}
        selected_symbols = set(selected_stocks.keys())
        
        # 步骤1:平仓不在选中列表的股票
        for symbol in current_hold_symbols - selected_symbols:
            order_target_percent(symbol, 0)
            print(f"平仓{symbol}:不在最新选中列表")
            if symbol in context.position_scores:
                del context.position_scores[symbol]
        
        # 步骤2:按评分分配权重(评分越高,权重越大)
        total_score = sum(selected_stocks.values())
        if total_score == 0:
            return
        # 预留5%现金,应对收盘前波动
        target_weights = {sym: (score / total_score) * 0.95 for sym, score in selected_stocks.items()}
        
        # 步骤3:建仓/调仓(权重差异>2%时操作,减少交易成本)
        for symbol, target_weight in target_weights.items():
            current_weight = self.get_position_weight(symbol)
            if abs(current_weight - target_weight) > 0.02:
                order_target_percent(symbol, target_weight)
                print(f"调仓{symbol}:目标权重{target_weight:.2%}(当前{current_weight:.2%})")

    def close_risk_positions(self, context):
        """收盘前风控:平仓跌破当日开盘价5%的股票(控制单日亏损)"""
        current_positions = context.account().positions()
        for position in current_positions:
            symbol = position["symbol"]
            # 获取当日开盘价和当前价
            quote = get_quote(symbols=symbol, fields="open,last")
            if not quote:
                continue
            open_price = quote[0]["open"]
            current_price = quote[0]["last"]
            
            # 当日跌幅>5%,强制平仓(止损)
            if (current_price - open_price) / open_price < -0.05:
                order_target_percent(symbol, 0)
                print(f"收盘风控平仓{symbol}:当日跌幅{((current_price-open_price)/open_price):.2%},触发止损")

    def get_position_weight(self, symbol):
        """辅助函数:获取当前持仓权重"""
        try:
            pos = self.context.account().position(symbol)
            if pos:
                return pos["market_value"] / self.context.account().cash["nav"]
        except:
            pass
        return 0

# 策略运行入口
if __name__ == "__main__":
    strategy = EnhancedMoneyFlowStrategy()
    run(
        strategy_id="money_flow_enhanced_v1",
        filename=__file__,
        mode=MODE_BACKTEST,
        token="your_token_here",  # 替换为您的掘金Token
        backtest_start_time="2022-01-01 08:00:00",
        backtest_end_time="2023-12-31 16:00:00",
        backtest_adjust=ADJUST_PREV,
        backtest_initial_cash=1000000,
        backtest_commission_ratio=0.0008,  # 佣金:万8(模拟实际散户费率)
        backtest_slippage_ratio=0.0005     # 滑点:万5(更贴近实盘)
    )

三、使用说明与注意事项

1. 环境准备

  • 安装掘金量化 SDK:pip install gm-sdk
  • 获取 Token:登录掘金量化平台→「个人中心」→「密钥管理」→生成 Token,替换代码中your_token_here

2. 策略调优提议

  • 参数优化:通过「参数扫描」功能测试不同参数组合(如lookback_days=3/5/7),选择回测中「夏普比率高 + 最大回撤小」的组合。
  • 股票池调整:基础版适合沪深 300,增强版可扩展至中证 500,但需同步调整市值参数。
  • 资金流精度:当前代码用日线价量估算资金流,实盘提议接入掘金「逐笔成交数据」(history_ticks接口),计算真实主力资金(大单买入 / 卖出)。

3. 风险控制

  • 仓位限制:单只股票权重不超过 10%,避免黑天鹅风险。
  • 止损机制:增强版加入「收盘前 5% 止损」,基础版可补充「跌破建仓价 8% 止损」逻辑。
  • 回测陷阱:避免过度拟合(如参数仅适配某一时间段),提议分「2020-2022 回测」「2023 年验证」,确保策略稳定性。

4. 实盘注意事项

  • 先跑「模拟盘」1-3 个月,验证策略在实时行情中的表现(回测存在滞后性)。
  • 降低调仓频率:若实盘交易成本高(佣金 > 万 3),可将调仓频率改为「每 2-3 天一次」,减少交易费用。
  • 关注市场风格:资金流策略在「震荡市」表现较好,「单边熊市」需降低仓位(如最大持仓减至 5 只)
© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
评论 共4条

请登录后发表评论

    暂无评论内容