
以下是基于掘金量化平台实现的资金流选股策略代码,包含基础版与增强版两个版本。代码在保留核心逻辑的基础上,优化了注释清晰度、参数可读性与异常处理机制,同时补充了关键功能说明,方便您直接复用或二次开发。
一、基础版:主力资金 + 量价共振选股策略
策略核心逻辑
通过「连续主力资金流入 + 量价配合 + 均线支撑」筛选股票,每日固定时间调仓,严格控制持仓数量与权重,适合新手入门验证资金流策略有效性。
python
运行
from gm.api import *
import numpy as np
import pandas as pd
"""
基于资金流入的选股策略(掘金量化基础版)
策略逻辑:连续N日主力资金信号+量能放大+价格趋势+均线支撑,多条件共振选股
适用场景:沪深300成分股池,中短期(10天左右)持仓
"""
def init(context):
"""策略初始化:设置参数、股票池、调仓频率"""
# 1. 核心参数配置(可根据回测结果调整)
context.param = {
"lookback_days": 3, # 资金流回顾天数(连续N日信号)
"inflow_threshold": 0.002, # 资金流入幅度阈值
"volume_ratio": 1.5, # 量比阈值(今日成交量/20日均量)
"holding_days": 10, # 单只股票最大持有天数
"max_position": 10, # 最大持仓数量(分散风险)
"ma_period": 20 # 均线周期(20日均线作为支撑)
}
# 2. 股票池:沪深300成分股(流动性好、市值稳定)
context.universe = "SHSE.000300"
# 3. 调仓频率:每日开盘后10分钟(09:40)执行策略,避免开盘波动
schedule(schedule_func=algo, date_rule="1d", time_rule="09:40:00")
# 4. 持仓记录:跟踪每只股票的买入时间(用于控制持有天数)
context.holding_records = {}
def algo(context):
"""主策略逻辑:选股→筛选→交易"""
current_date = context.now.strftime("%Y-%m-%d")
# 步骤1:获取初始股票池(沪深300成分股,过滤流动性差的标的)
candidate_stocks = get_initial_universe(context)
if not candidate_stocks:
print(f"{current_date}:未获取到有效股票池,跳过当日调仓")
return
# 步骤2:计算资金流+量价信号,筛选符合条件的股票
selected_stocks = calculate_multi_factor_signals(context, candidate_stocks, current_date)
# 步骤3:执行交易(平仓不符合条件的股票,建仓新选中的股票)
execute_trading(context, selected_stocks, current_date)
def get_initial_universe(context):
"""获取初始股票池:沪深300最新成分股,过滤停牌/退市标的"""
try:
# 获取指定日期的指数成分股
constituents = get_history_constituents(
index=context.universe,
start_date=context.now,
end_date=context.now
)
if not constituents:
return []
# 提取成分股代码,过滤停牌股票(通过获取最新行情验证)
stock_list = [item["symbol"] for item in constituents[0]["constituents"]]
valid_stocks = []
for symbol in stock_list[:100]: # 限制数量,减少计算压力
# 获取最新行情,排除停牌(status=1为正常交易)
quote = get_quote(symbols=symbol, fields="status")
if quote and quote[0]["status"] == 1:
valid_stocks.append(symbol)
return valid_stocks
except Exception as e:
print(f"获取股票池出错:{str(e)}")
return []
def calculate_multi_factor_signals(context, stocks, current_date):
"""多因子信号计算:资金流+量能+趋势+均线,四因子共振"""
selected = []
param = context.param
for symbol in stocks:
try:
# 1. 获取历史数据(需包含足够周期:lookback_days+ma_period)
hist_data = history_n(
symbol=symbol,
frequency="1d", # 日线数据
count=param["lookback_days"] + param["ma_period"], # 数据长度
fields="open,high,low,close,volume,amount,pre_close", # 所需字段
end_time=current_date,
fill_missing="Last", # 缺失值用前值填充
df=True # 返回DataFrame格式,便于计算
)
# 过滤数据量不足的股票
if len(hist_data) < param["lookback_days"] + param["ma_period"]:
continue
# 2. 因子1:价格趋势(最近N日收盘价创新高,确认上升趋势)
recent_data = hist_data.tail(param["lookback_days"])
condition_trend = recent_data["close"].iloc[-1] > recent_data["close"].iloc[:-1].max()
# 3. 因子2:量能放大(今日成交量>20日均量×量比阈值,验证资金活跃度)
volume_ma = hist_data["volume"].rolling(param["ma_period"]).mean().iloc[-1] # 20日均量
condition_volume = recent_data["volume"].iloc[-1] > volume_ma * param["volume_ratio"]
# 4. 因子3:资金流信号(简化版:价涨量增,真实场景提议用逐笔成交数据)
condition_money_flow = estimate_money_flow(recent_data, param["inflow_threshold"])
# 5. 因子4:均线支撑(收盘价站在20日均线上,确认趋势有效性)
ma_value = hist_data["close"].rolling(param["ma_period"]).mean().iloc[-1]
condition_ma = recent_data["close"].iloc[-1] > ma_value
# 6. 四因子同时满足,加入选中列表
if condition_trend and condition_volume and condition_money_flow and condition_ma:
selected.append(symbol)
except Exception as e:
print(f"计算{symbol}信号出错:{str(e)},跳过该股票")
continue
# 限制持仓数量,取前N只股票(按信号强度排序,此处简化为按代码排序)
return selected[:param["max_position"]]
def estimate_money_flow(recent_data, threshold):
"""估算资金流信号(简化版):价涨量增→判断为资金流入"""
# 1. 计算每日涨跌幅(收盘价-开盘价)/开盘价
daily_return = (recent_data["close"] - recent_data["open"]) / recent_data["open"]
# 2. 计算每日量能变化(今日成交量/昨日成交量)
daily_volume_ratio = recent_data["volume"] / recent_data["volume"].shift(1)
# 3. 连续N日满足:涨跌幅>阈值 + 量能放大(>1.1倍)→ 判定为资金流入
valid_days = sum((daily_return > threshold) & (daily_volume_ratio > 1.1))
return valid_days == len(recent_data) # 所有回顾日都满足
def execute_trading(context, selected_stocks, current_date):
"""执行交易:平仓不在选中列表的股票,等权重建仓新股票"""
param = context.param
current_positions = context.account().positions() # 当前持仓
# 步骤1:平仓操作(3种情况需平仓)
for position in current_positions:
symbol = position["symbol"]
# 情况1:股票不在新选中列表
# 情况2:持有天数超过max_holding_days
holding_days = (pd.to_datetime(current_date) - pd.to_datetime(context.holding_records.get(symbol, current_date))).days
# 情况3:股价跌破20日均线(趋势反转)
ma_value = get_ma_value(symbol, current_date, param["ma_period"])
if (symbol not in selected_stocks) or (holding_days >= param["holding_days"]) or (position["price"] < ma_value):
order_target_percent(symbol=symbol, percent=0, order_type=OrderType_Market)
print(f"{current_date}:平仓{symbol},缘由:{'不在选中列表' if symbol not in selected_stocks else '持有天数超限' if holding_days >= param['holding_days'] else '跌破均线支撑'}")
# 移除持仓记录
if symbol in context.holding_records:
del context.holding_records[symbol]
# 步骤2:建仓操作(等权重分配,预留2%现金应对波动)
if not selected_stocks:
return
target_weight = 0.98 / len(selected_stocks) # 单只股票目标权重
for symbol in selected_stocks:
# 检查当前权重,差异>1%时才调仓(减少交易成本)
current_weight = get_position_weight(context, symbol)
if abs(current_weight - target_weight) > 0.01:
order_target_percent(
symbol=symbol,
percent=target_weight,
order_type=OrderType_Market # 市价单,确保成交
)
print(f"{current_date}:建仓/调仓{symbol},目标权重:{target_weight:.2%}")
# 记录买入时间
if symbol not in context.holding_records:
context.holding_records[symbol] = current_date
def get_ma_value(symbol, date, ma_period):
"""辅助函数:获取指定股票的N日均线值"""
try:
hist_data = history_n(
symbol=symbol,
frequency="1d",
count=ma_period,
fields="close",
end_time=date,
df=True
)
return hist_data["close"].rolling(ma_period).mean().iloc[-1]
except:
return 0
def get_position_weight(context, symbol):
"""辅助函数:获取指定股票的当前持仓权重"""
try:
position = context.account().position(symbol)
if position:
return position["market_value"] / context.account().cash["nav"] # 市值/净值
except:
pass
return 0
def on_backtest_finished(context, indicator):
"""回测结束后输出关键指标,评估策略表现"""
print("=" * 60)
print("策略回测结果汇总")
print("=" * 60)
# 核心收益指标
print(f"累计收益率:{indicator['accumulative_return']:.2%}")
print(f"年化收益率:{indicator['annual_return']:.2%}")
print(f"最大回撤:{indicator['max_drawdown']:.2%} (越小越好)")
print(f"夏普比率:{indicator['sharpe']:.2f} (>1.5为优秀)")
# 交易质量指标
print(f"胜率:{indicator['win_rate']:.2%} (盈利交易占比)")
print(f"总交易次数:{indicator['total_trades']}")
print(f"平均持仓时间:{indicator['average_holding_time']:.1f}天")
print("=" * 60)
if __name__ == "__main__":
"""策略运行入口:配置回测参数"""
run(
strategy_id="money_flow_basic_v1", # 自定义策略ID
filename=__file__, # 代码文件名(需与本地一致)
mode=MODE_BACKTEST, # 模式:回测(MODE_BACKTEST)/实盘(MODE_LIVE)
token="your_token_here", # 替换为您的掘金量化Token(在平台密钥管理中获取)
backtest_start_time="2020-01-01 08:00:00", # 回测开始时间
backtest_end_time="2023-12-31 16:00:00", # 回测结束时间
backtest_adjust=ADJUST_PREV, # 复权方式:前复权(推荐)
backtest_initial_cash=1000000, # 初始资金:100万元
backtest_commission_ratio=0.0001, # 佣金费率:0.01%(万1)
backtest_slippage_ratio=0.0001 # 滑点:0.01%(模拟实际交易成本)
)
二、增强版:多因子加权评分选股策略
策略核心升级
相比基础版,增强版通过「评分制」替代「硬条件筛选」,结合「市值过滤 + 行业分散 + 收盘前风险控制」,同时优化资金流计算逻辑,更贴近实际市场主力资金行为。
python
运行
from gm.api import *
import numpy as np
import pandas as pd
from datetime import datetime
"""
资金流选股策略(掘金量化增强版)
核心升级:多因子加权评分(资金流50%+趋势30%+量能20%)+市值过滤+收盘风控
适用场景:全市场选股(剔除过小/过大市值),中短期持仓
"""
class EnhancedMoneyFlowStrategy:
def __init__(self):
"""类初始化:存储策略上下文(替代全局变量)"""
self.context = None
def init(self, context):
"""策略初始化:参数、调度、风控配置"""
self.context = context
# 1. 核心参数(较基础版更精细)
self.param = {
"lookback_days": 5, # 资金流回顾天数(更长周期,减少噪音)
"volume_threshold": 1.3, # 量比阈值(降低门槛,扩大选股范围)
"min_market_cap": 5e9, # 最小市值:50亿(剔除小票,减少波动)
"max_market_cap": 2e11, # 最大市值:2000亿(剔除巨无霸,保证弹性)
"score_threshold": 0.6, # 总评分阈值(0-1分,高于0.6才选中)
"max_position": 10, # 最大持仓数量
"close_risk_time": "14:30" # 收盘前风控时间(14:30检查持仓)
}
# 2. 调度配置:10:00选股调仓,14:30收盘风控
schedule(schedule_func=self.run_strategy, date_rule="1d", time_rule="10:00:00")
schedule(schedule_func=self.close_risk_positions, date_rule="1d", time_rule=self.param["close_risk_time"])
# 3. 持仓记录:跟踪评分(用于权重分配)
self.context.position_scores = {}
def run_strategy(self, context):
"""主策略:选股→评分→交易"""
current_date = context.now.strftime("%Y-%m-%d")
# 步骤1:获取过滤后的全市场股票池(市值+流动性)
universe = self.get_filtered_universe()
if not universe:
print(f"{current_date}:无符合条件的股票池,跳过调仓")
return
# 步骤2:多因子加权评分(资金流+趋势+量能)
stock_scores = self.calculate_weighted_scores(universe, current_date)
if not stock_scores:
print(f"{current_date}:无评分达标股票,跳过调仓")
return
# 步骤3:按评分排序,取前N只股票
selected_stocks = dict(sorted(stock_scores.items(), key=lambda x: x[1], reverse=True)[:self.param["max_position"]])
self.context.position_scores = selected_stocks # 保存评分,用于权重分配
# 步骤4:执行交易
self.execute_trading(selected_stocks)
def get_filtered_universe(self):
"""获取过滤后的股票池:全市场+市值+流动性"""
try:
# 1. 获取全市场A股(排除创业板/科创板?可根据需求添加)
all_stocks = get_instruments(
exchanges="SHSE,SZSE",
sec_types=1, # 1=股票
fields="symbol,market_cap,status", # 市值+交易状态
df=True
)
# 2. 过滤条件:市值在区间内+正常交易(status=1)
filtered = all_stocks[
(all_stocks["market_cap"] >= self.param["min_market_cap"]) &
(all_stocks["market_cap"] <= self.param["max_market_cap"]) &
(all_stocks["status"] == 1) # 正常交易,排除停牌/退市
]
return filtered["symbol"].tolist()[:200] # 限制数量,减少计算压力
except Exception as e:
print(f"过滤股票池出错:{str(e)}")
return []
def calculate_weighted_scores(self, stocks, current_date):
"""多因子加权评分:资金流(50%)+趋势(30%)+量能(20%)"""
stock_scores = {}
for symbol in stocks:
try:
# 1. 获取历史数据(日线,包含lookback_days天)
hist_data = history_n(
symbol=symbol,
frequency="1d",
count=self.param["lookback_days"],
fields="open,high,low,close,volume,amount,pre_close",
end_time=current_date,
df=True
)
if len(hist_data) < self.param["lookback_days"]:
continue
# 2. 计算单因子得分(0-1分,归一化)
flow_score = self.calculate_money_flow_score(hist_data) # 资金流得分(50%权重)
trend_score = self.calculate_trend_score(hist_data) # 趋势得分(30%权重)
volume_score = self.calculate_volume_score(hist_data) # 量能得分(20%权重)
# 3. 总评分=加权求和
total_score = flow_score * 0.5 + trend_score * 0.3 + volume_score * 0.2
# 4. 达标则加入评分字典
if total_score >= self.param["score_threshold"]:
stock_scores[symbol] = total_score
except Exception as e:
print(f"计算{symbol}评分出错:{str(e)},跳过")
continue
return stock_scores
def calculate_money_flow_score(self, data):
"""因子1:资金流得分(基于价量关系,模拟主力资金行为)"""
# 逻辑:涨时量增权重高,跌时量增权重低
flow_strength = 0
for i in range(len(data)):
# 当日涨跌幅:(收盘价-开盘价)/开盘价
daily_return = (data["close"].iloc[i] - data["open"].iloc[i]) / data["open"].iloc[i]
# 当日量能相对值:当日成交量/5日平均成交量
volume_ratio = data["volume"].iloc[i] / data["volume"].mean()
# 涨时加权重,跌时减重
if daily_return > 0:
flow_strength += daily_return * volume_ratio
else:
flow_strength += daily_return * volume_ratio * 0.5 # 下跌时权重减半
# 归一化到0-1分(避免极端值影响)
return max(0, min(1, flow_strength * 10))
def calculate_trend_score(self, data):
"""因子2:趋势得分(短期均线>长期均线,确认上升趋势)"""
closes = data["close"].values
# 短期均线:3日均值,长期均线:5日均值
short_ma = np.mean(closes[-3:]) if len(closes)>=3 else 0
long_ma = np.mean(closes[-5:]) if len(closes)>=5 else 0
# 评分规则:短期>长期+最新价>短期→0.8分;短期>长期→0.6分;否则0.3分
if short_ma > long_ma and closes[-1] > short_ma:
return 0.8
elif short_ma > long_ma:
return 0.6
else:
return 0.3
def calculate_volume_score(self, data):
"""因子3:量能得分(最近3日平均量/5日平均量,验证资金活跃度)"""
recent_volume = data["volume"].tail(3).mean() # 最近3日均量
avg_volume = data["volume"].mean() # 5日均量
volume_ratio = recent_volume / avg_volume
# 归一化到0-1分(量比3倍以上按满分算)
return min(1.0, volume_ratio / 3.0)
def execute_trading(self, selected_stocks):
"""执行交易:平仓+建仓,按评分分配权重"""
context = self.context
current_positions = context.account().positions()
current_hold_symbols = {pos["symbol"] for pos in current_positions}
selected_symbols = set(selected_stocks.keys())
# 步骤1:平仓不在选中列表的股票
for symbol in current_hold_symbols - selected_symbols:
order_target_percent(symbol, 0)
print(f"平仓{symbol}:不在最新选中列表")
if symbol in context.position_scores:
del context.position_scores[symbol]
# 步骤2:按评分分配权重(评分越高,权重越大)
total_score = sum(selected_stocks.values())
if total_score == 0:
return
# 预留5%现金,应对收盘前波动
target_weights = {sym: (score / total_score) * 0.95 for sym, score in selected_stocks.items()}
# 步骤3:建仓/调仓(权重差异>2%时操作,减少交易成本)
for symbol, target_weight in target_weights.items():
current_weight = self.get_position_weight(symbol)
if abs(current_weight - target_weight) > 0.02:
order_target_percent(symbol, target_weight)
print(f"调仓{symbol}:目标权重{target_weight:.2%}(当前{current_weight:.2%})")
def close_risk_positions(self, context):
"""收盘前风控:平仓跌破当日开盘价5%的股票(控制单日亏损)"""
current_positions = context.account().positions()
for position in current_positions:
symbol = position["symbol"]
# 获取当日开盘价和当前价
quote = get_quote(symbols=symbol, fields="open,last")
if not quote:
continue
open_price = quote[0]["open"]
current_price = quote[0]["last"]
# 当日跌幅>5%,强制平仓(止损)
if (current_price - open_price) / open_price < -0.05:
order_target_percent(symbol, 0)
print(f"收盘风控平仓{symbol}:当日跌幅{((current_price-open_price)/open_price):.2%},触发止损")
def get_position_weight(self, symbol):
"""辅助函数:获取当前持仓权重"""
try:
pos = self.context.account().position(symbol)
if pos:
return pos["market_value"] / self.context.account().cash["nav"]
except:
pass
return 0
# 策略运行入口
if __name__ == "__main__":
strategy = EnhancedMoneyFlowStrategy()
run(
strategy_id="money_flow_enhanced_v1",
filename=__file__,
mode=MODE_BACKTEST,
token="your_token_here", # 替换为您的掘金Token
backtest_start_time="2022-01-01 08:00:00",
backtest_end_time="2023-12-31 16:00:00",
backtest_adjust=ADJUST_PREV,
backtest_initial_cash=1000000,
backtest_commission_ratio=0.0008, # 佣金:万8(模拟实际散户费率)
backtest_slippage_ratio=0.0005 # 滑点:万5(更贴近实盘)
)
三、使用说明与注意事项
1. 环境准备
- 安装掘金量化 SDK:pip install gm-sdk
- 获取 Token:登录掘金量化平台→「个人中心」→「密钥管理」→生成 Token,替换代码中your_token_here
2. 策略调优提议
- 参数优化:通过「参数扫描」功能测试不同参数组合(如lookback_days=3/5/7),选择回测中「夏普比率高 + 最大回撤小」的组合。
- 股票池调整:基础版适合沪深 300,增强版可扩展至中证 500,但需同步调整市值参数。
- 资金流精度:当前代码用日线价量估算资金流,实盘提议接入掘金「逐笔成交数据」(history_ticks接口),计算真实主力资金(大单买入 / 卖出)。
3. 风险控制
- 仓位限制:单只股票权重不超过 10%,避免黑天鹅风险。
- 止损机制:增强版加入「收盘前 5% 止损」,基础版可补充「跌破建仓价 8% 止损」逻辑。
- 回测陷阱:避免过度拟合(如参数仅适配某一时间段),提议分「2020-2022 回测」「2023 年验证」,确保策略稳定性。
4. 实盘注意事项
- 先跑「模拟盘」1-3 个月,验证策略在实时行情中的表现(回测存在滞后性)。
- 降低调仓频率:若实盘交易成本高(佣金 > 万 3),可将调仓频率改为「每 2-3 天一次」,减少交易费用。
- 关注市场风格:资金流策略在「震荡市」表现较好,「单边熊市」需降低仓位(如最大持仓减至 5 只)
© 版权声明
文章版权归作者所有,未经允许请勿转载。如内容涉嫌侵权,请在本页底部进入<联系我们>进行举报投诉!
THE END















暂无评论内容