-–
name: “多槽并发回测”
description: “WorldQuant BRAIN平台1-8槽×2-10个Alpha灵活并发回测工具。支持F429自动重试(指数退避+递归),零遗漏回测。Invoke when user needs batch alpha simulation, multi-slot backtesting, or running multiple alpha variants concurrently.”
-–
# 多槽并发回测
WorldQuant BRAIN平台的灵活并发回测工具,支持 **1-8槽 × 2-10个Alpha** 的任意组合。
## 使用场景
- 小规模迭代:1-4槽×2-3个Alpha,快速验证变体
- 中等规模:4-8槽×5个Alpha,参数扫描
- 大批量筛选:8槽×10个Alpha,一次性回测80个表达式
- 需要快速获取回测结果(IS统计数据+checks)
## 工作流程
### 第1步:准备SLOT_CONFIGS
用户需要定义1-8个槽位配置,每个槽位包含:
- `name`: 槽位描述
- `expressions`: 2-10个FASTEXPR表达式列表
- `cm`: 每个表达式的设置映射(可选),格式: `{0: {d, decay, neut, trunc, mt}, 1: {…}, …}`
- `settings`: 默认设置(当cm未指定时使用)
**关键设置参数**:
- `d`: delay (0或1)
- `decay`: 衰减值 (0-12)
- `neut`: 中性化 (SUBINDUSTRY/INDUSTRY/SECTOR/MARKET/STATISTICAL/NONE等)
- `trunc`: 截断值 (0.001-0.01)
- `mt`: maxTrade (ON/OFF)
**中性化要求**: 必须每个中性化都先尝试一遍!优先尝试SUBINDUSTRY > INDUSTRY > SECTOR > MARKET,不到万不得已不用NONE。
**批量回测模式**: 当表达式数量超过单批容量(槽数×每槽表达式数)时,自动分批提交。例如9642条表达式用8槽×10模式,需分121批。
### 第2步:生成Python回测脚本
使用以下模板生成脚本,保存到工作目录并执行:
```python
#!/usr/bin/env python3
“”“R{round}: {desc} — {num_slots}槽x{per_slot}个Alpha并发回测”“”
import json, time, requests, base64, threading, csv, re, shutil, tempfile
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
BASE_URL = “https://api.worldquantbrain.com”
# === 用户自定义区域 ===
SLOT_CONFIGS = [
\# 1-8个槽位配置,每槽2-10个表达式...
]
# 批量回测模式:从JSON文件加载表达式列表,自动分批
BATCH_MODE = False # 设为True启用批量模式
BATCH_EXPR_FILE = “” # JSON表达式文件路径
BATCH_CSV_FILE = “” # 字段信息CSV路径
BATCH_OUT_DIR = “” # 输出目录
BATCH_NUM_SLOTS = 8 # 批量模式槽数
BATCH_PER_SLOT = 10 # 批量模式每槽表达式数
BATCH_DEFAULT_SETTINGS = {
"instrumentType": "EQUITY", "region": "USA", "universe": "TOP3000",
"delay": 0, "decay": 0, "neutralization": "STATISTICAL",
"truncation": 0.01, "pasteurization": "ON", "unitHandling": "VERIFY",
"nanHandling": "ON", "visualization": False, "language": "FASTEXPR",
"testPeriod": "P0Y0M", "maxTrade": "ON",
}
BATCH_FILTER = {“sharpe”: 0.9, “fitness”: 0.5} # 筛选阈值
BATCH_TAG = “first_round” # 通过筛选的标签名
BATCH_TAG_COLOR = “#FFFF00” # 标签颜色
# === 核心框架(不要修改) ===
def lc():
for cp in \[Path(r"d:\\VScode\\MCP-Brain\\learn-3\\AIworker\\user_config.json"),Path.home()/".brain_mcp_config.json"\]:
if cp.exists():
with open(cp,'r',encoding='utf-8') as f: return json.load(f)
return {}
def auth(s,e,p):
r=s.post(f'{BASE_URL}/authentication',headers={'Authorization':f'Basic {base64.b64encode(f"{e}:{p}".encode()).decode()}'})
return r.status_code in \[200,201\]
class SP:
def \__init_\_(self,e,p,sz=10):
self.\_p=\[\]; self.\_l=threading.Lock(); self.\_e=e; self.\_pw=p
for \_ in range(sz):
s=requests.Session(); s.headers.update({'User-Agent':'Mozilla/5.0'})
if auth(s,e,p): self.\_p.append(s)
print(f" SP:{len(self.\_p)}/{sz}")
def b(self):
with self.\_l:
if self.\_p: return self.\_p.pop()
s=requests.Session(); s.headers.update({'User-Agent':'Mozilla/5.0'}); auth(s,self.\_e,self.\_pw); return s
def r(self,s):
with self.\_l:
if len(self.\_p)<20: self.\_p.append(s)
def sub(s,exprs,si,settings=None,max_retries=5):
md=\[\]
for expr in exprs:
st=settings or BATCH_DEFAULT_SETTINGS
md.append({'type':'REGULAR','settings':{'instrumentType':st.get('instrumentType','EQUITY'),'region':st.get('region','USA'),'universe':st.get('universe','TOP3000'),'delay':st.get('delay',0),'decay':st.get('decay',0),'neutralization':st.get('neutralization','STATISTICAL'),'truncation':st.get('truncation',0.01),'pasteurization':st.get('pasteurization','ON'),'unitHandling':st.get('unitHandling','VERIFY'),'nanHandling':st.get('nanHandling','ON'),'visualization':st.get('visualization',False),'language':st.get('language','FASTEXPR'),'testPeriod':st.get('testPeriod','P0Y0M'),'maxTrade':st.get('maxTrade','ON')},'regular':expr})
for attempt in range(max_retries+1):
r=s.post(f"{BASE_URL}/simulations",json=md)
if r.status_code==201:
tag=f" (retry #{attempt})" if attempt>0 else ""
print(f" S{si}: OK({len(md)}){tag}")
return r.headers.get('Location','')
if attempt<max_retries:
wait=2\*\*attempt\*5
print(f" S{si}: F{r.status_code} -> retry #{attempt+1}/{max_retries} in {wait}s")
time.sleep(wait)
else:
print(f" S{si}: F{r.status_code} FAIL after {max_retries} retries {r.text\[:100\]}")
return None
def sub_slot(s,sc,si,max_retries=5):
exprs=sc\["expressions"\]; cm=sc.get("cm"); ds=sc.get("settings",{}); md=\[\]
for i,expr in enumerate(exprs):
st=cm.get(i,ds) if cm else ds
md.append({'type':'REGULAR','settings':{'instrumentType':'EQUITY','region':'USA','universe':'TOP3000','delay':st\['d'\],'decay':st\['decay'\],'neutralization':st\['neut'\],'truncation':st\['trunc'\],'pasteurization':st.get('pasteurization','ON'),'unitHandling':'VERIFY','nanHandling':st.get('nanHandling','ON'),'visualization':False,'language':'FASTEXPR','testPeriod':'P0Y0M','maxTrade':st.get('mt','ON')},'regular':expr})
for attempt in range(max_retries+1):
r=s.post(f"{BASE_URL}/simulations",json=md)
if r.status_code==201:
tag=f" (retry #{attempt})" if attempt>0 else ""
print(f" S{si}: OK({len(md)}){tag}")
return r.headers.get('Location','')
if attempt<max_retries:
wait=2\*\*attempt\*5
print(f" S{si}: F{r.status_code} -> retry #{attempt+1}/{max_retries} in {wait}s")
time.sleep(wait)
else:
print(f" S{si}: F{r.status_code} FAIL after {max_retries} retries")
return None
def wfc(s,loc,si,to=600):
st=time.time()
while time.time()-st<to:
try:
r=s.get(loc)
if r.status_code==200:
c=r.json().get('children',\[\])
if c: return c
time.sleep(float(r.headers.get("Retry-After",5)))
else: time.sleep(5)
except: time.sleep(5)
return \[\]
def gad(s,aid):
try:
r=s.get(f"{BASE_URL}/alphas/{aid}")
if r.status_code==200:
a=r.json(); isd=a.get('is',{}) or {}; cs=isd.get('checks',\[\]); fl=\[c for c in cs if c.get('result')=='FAIL'\]
pc_check=next((c for c in cs if c.get('name')=='PROD_CORRELATION'),{})
pc_val=pc_check.get('value')
rv_check=next((c for c in cs if c.get('name')=='REVERSION_COMPONENT'),{})
return {'alpha_id':aid,'status':'SUCCESS','sharpe':isd.get('sharpe'),'fitness':isd.get('fitness'),'turnover':isd.get('turnover'),'margin':isd.get('margin'),'sp':next((c for c in cs if c.get('name')=='LOW_SHARPE'),{}).get('result')!='FAIL','fp':next((c for c in cs if c.get('name')=='LOW_FITNESS'),{}).get('result')!='FAIL','lp':next((c for c in cs if c.get('name')=='IS_LADDER_SHARPE'),{}).get('result')=='PASS','su':next((c for c in cs if c.get('name')=='LOW_SUB_UNIVERSE_SHARPE'),{}).get('result')=='PASS','rv':rv_check.get('result','N/A') if rv_check.get('result') else 'N/A','neut':a.get('settings',{}).get('neutralization','?'),'dec':a.get('settings',{}).get('decay',0),'nfail':len(fl),'pc':pc_val}
except: pass
return None
def pc(sp,cid,expr,si,ci):
s=sp.b()
try:
for \_ in range(120):
try:
r=s.get(f"{BASE_URL}/simulations/{cid}")
if r.status_code==200:
ad=r.json()
if ad.get('status') in \['ERROR','CANCELLED'\]: return {'expr':expr\[:60\],'status':'ERROR','err':ad.get('message','?'),'slot':si}
if float(r.headers.get("Retry-After",0))==0:
aid=ad.get("alpha")
if aid:
res=gad(s,aid)
if res: res\['slot'\]=si; res\['expression'\]=expr; return res
return {'expr':expr\[:60\],'status':'ERROR','slot':si}
time.sleep(float(r.headers.get("Retry-After",5)))
else: time.sleep(2)
except: time.sleep(2)
return {'expr':expr\[:60\],'status':'TIMEOUT','slot':si}
finally: sp.r(s)
def tag_alpha(s,aid,tag,color=“#FFFF00”):
try:
r=s.put(f"{BASE_URL}/alphas/{aid}/tags/{tag}",json={"color":color,"name":tag})
return r.status_code in \[200,201\]
except: return False
def run_batch(sp,exprs,batch_label,settings=None):
ns=min(BATCH_NUM_SLOTS,(len(exprs)+BATCH_PER_SLOT-1)//BATCH_PER_SLOT)
slots=\[\]
for i in range(ns):
s=i\*BATCH_PER_SLOT; e=s+BATCH_PER_SLOT; b=exprs\[s:e\]
if b: slots.append((i,b))
ss=\[sp.b() for \_ in range(len(slots))\]; sl={}
with ThreadPoolExecutor(max_workers=len(slots)) as ex:
ff={ex.submit(sub,ss\[i\],b,i,settings):i for i,b in slots}
for f in as_completed(ff):
loc=f.result()
if loc: sl\[ff\[f\]\]=loc
\# track which expressions failed (slot not in sl)
sl_slot_indices=set(sl.keys())
failed=\[\]
for i,b in slots:
if i not in sl_slot_indices:
print(f" S{i}: expressions lost due to F429, will retry")
failed.extend(b)
sc={}
with ThreadPoolExecutor(max_workers=len(sl)) as ex:
ff={ex.submit(wfc,ss\[i\],sl\[i\],i):i for i in sl}
for f in as_completed(ff):
i=ff\[f\]; c=f.result()
if isinstance(c,list):
bexprs=next((b for (s,b) in slots if s==i),\[\])
sc\[i\]=\[(c\[j\],bexprs\[j\]) for j in range(min(len(c),len(bexprs)))\]
else: sc\[i\]=\[\]
for s in ss: sp.r(s)
tc=sum(len(v) for v in sc.values()); ar=\[\]
with ThreadPoolExecutor(max_workers=min(tc,18)) as ex:
ff={}
for si,cl in sc.items():
for ci,(cid,expr) in enumerate(cl):
ff\[ex.submit(pc,sp,cid,expr,si,ci)\]=(si,ci)
dc=0
for f in as_completed(ff):
r=f.result(); ar.append(r); dc+=1
if r.get('status')=='SUCCESS':
aid=r.get('alpha_id','?'); s=r.get('sharpe',0) or 0
t=r.get('turnover',0) or 0; m=r.get('margin',0) or 0
print(f" \[{dc}/{tc}\] S{r.get('slot','?')}: {aid} S={s:.2f} F={r.get('fitness',0) or 0:.2f} T={t:.2%}")
else: print(f" \[{dc}/{tc}\] S{r.get('slot','?')}: ERR {r.get('err','')\[:60\]}")
if failed:
print(f" >> 重试 {len(failed)} 个F429失败的表达式...")
ar2=run_batch(sp,failed,batch_label+"\_retry",settings)
ar.extend(ar2)
return ar
def main():
cfg=lc(); cr=cfg.get("credentials",{})
e,p=cr.get("email"),cr.get("password")
if not e or not p: print("No creds!"); return
if BATCH_MODE:
print("="\*70)
print(f"批量并发回测: {BATCH_NUM_SLOTS}槽x{BATCH_PER_SLOT}个Alpha")
print(f"筛选: |sharpe|>{BATCH_FILTER\['sharpe'\]} |fitness|>{BATCH_FILTER\['fitness'\]}")
print("="\*70)
with open(BATCH_EXPR_FILE,'r',encoding='utf-8') as f: exprs=json.load(f)
print(f" 加载{len(exprs)}条表达式")
sp=SP(e,p,sz=max(BATCH_NUM_SLOTS+8,14))
bsz=BATCH_NUM_SLOTS\*BATCH_PER_SLOT; nb=(len(exprs)+bsz-1)//bsz
all_results=\[\]
for bi in range(nb):
s=bi\*bsz; e\_=min(s+bsz,len(exprs)); batch=exprs\[s:e\_\]
label=f"Batch{bi+1}/{nb}"
print(f"\\n {label}: \[{s}:{e\_}\]({len(batch)}条)")
br=run_batch(sp,batch,label)
all_results.extend(br)
succ=\[r for r in all_results if r.get('status')=='SUCCESS'\]
filt_s=BATCH_FILTER\['sharpe'\]; filt_f=BATCH_FILTER\['fitness'\]
qual=\[r for r in succ if (abs(r.get('sharpe',0) or 0)>filt_s and abs(r.get('fitness',0) or 0)>filt_f)\]
print(f"\\n 筛选结果: {len(qual)}/{len(succ)}通过(|S|>{filt_s},|F|>{filt_f})")
ts=sp.b(); tagged=0
for r in qual:
if tag_alpha(ts,r.get('alpha_id'),BATCH_TAG,BATCH_TAG_COLOR): tagged+=1
sp.r(ts)
print(f" 标签: {tagged}/{len(qual)} tagged '{BATCH_TAG}'")
csv_path=Path(BATCH_OUT_DIR)/"初步筛选结果.csv"
with open(csv_path,'w',encoding='utf-8',newline='') as f:
w=csv.writer(f); w.writerow(\['alphaID','sharp_value','fitness_value'\])
for r in qual: w.writerow(\[r.get('alpha_id',''),r.get('sharpe',0),r.get('fitness',0)\])
print(f" Saved {csv_path}")
out=Path(BATCH_OUT_DIR)/"round1_backtest_results.json"
with open(out,'w',encoding='utf-8') as f: json.dump({'alpha_results':all_results},f,indent=2,default=str)
print(f" Saved {out}")
print(f"\\n第一轮完成! {len(qual)}/{len(succ)}通过筛选")
else:
ns=len(SLOT_CONFIGS); ta=sum(len(sc\['expressions'\]) for sc in SLOT_CONFIGS)
print(f"{ns}槽x{ta//ns if ns else 0}个Alpha = {ta}并发回测")
sp=SP(e,p,sz=max(ns+8,16))
t0=time.time(); sl={}; ss=\[sp.b() for \_ in range(ns)\]
with ThreadPoolExecutor(max_workers=ns) as ex:
ff={ex.submit(sub_slot,ss\[i\],SLOT_CONFIGS\[i\],i):i for i in range(ns)}
for f in as_completed(ff):
loc=f.result()
if loc: sl\[ff\[f\]\]=loc
print(f" Sub:{time.time()-t0:.1f}s")
t0=time.time(); sc={}
with ThreadPoolExecutor(max_workers=ns) as ex:
ff={ex.submit(wfc,ss\[i\],sl\[i\],i):i for i in sl}
for f in as_completed(ff):
i=ff\[f\]; c=f.result()
exprs=SLOT_CONFIGS\[i\]\["expressions"\]
sc\[i\]=\[(c\[j\],exprs\[j\]) for j in range(min(len(c),len(exprs)))\]
for s in ss: sp.r(s)
print(f" Wait:{time.time()-t0:.1f}s")
tc=sum(len(v) for v in sc.values()); t0=time.time(); ar=\[\]
with ThreadPoolExecutor(max_workers=min(tc,20)) as ex:
ff={}
for si,cl in sc.items():
for ci,(cid,expr) in enumerate(cl):
ff\[ex.submit(pc,sp,cid,expr,si,ci)\]=(si,ci)
dc=0
for f in as_completed(ff):
r=f.result(); ar.append(r); dc+=1
if r.get('status')=='SUCCESS':
aid=r.get('alpha_id','?'); s=r.get('sharpe',0) or 0
t=r.get('turnover',0) or 0; m=r.get('margin',0) or 0
pc_v=r.get('pc')
pc_s=f" PC={pc_v:.4f}" if pc_v is not None else ""
print(f" \[{dc}/{tc}\] S{r.get('slot','?')}: {aid} S={s:.2f} T={t:.2%} M={m:.6f}{pc_s}")
print(f" Poll:{time.time()-t0:.1f}s")
succ=\[r for r in ar if r.get('status')=='SUCCESS'\]
print("\\nSUMMARY(Margin↓)")
for r in sorted(succ,key=lambda x:x.get('margin',0) or 0,reverse=True):
s=r.get('sharpe',0) or 0; t=r.get('turnover',0) or 0; m=r.get('margin',0) or 0
pc_v=r.get('pc'); slot=r.get('slot','?'); nf=r.get('nfail',0)
sp\_='PASS' if r.get('sp') else 'FAIL'; lp\_='PASS' if r.get('lp') else 'FAIL'
rv=r.get('rv','?'); dec=r.get('dec',0); ms='OK' if m>0.0015 else 'LOW'
mk="\*\*\*CAND\*\*\*" if (nf==0 and m>0.0015 and rv!='WARNING') else ""
pc_s=f" PC={pc_v:.4f}" if pc_v is not None else " PC=?"
print(f" S{slot} {r.get('alpha_id','?')}: S={s:.2f}\[{sp\_}\] T={t:.2%} M={m:.6f}({ms}) D={dec}{pc_s} {mk}")
for r in ar:
if r.get('status')!='SUCCESS': print(f" S{r.get('slot','?')} E:{r.get('err','?')}")
out=Path(\__file_\_).parent/f"batch_results.json"
with open(out,'w',encoding='utf-8') as f: json.dump({'alpha_results':ar},f,indent=2,default=str)
topn=min(5,len(succ))
clean=\[r for r in succ if r.get('sp') and r.get('fp') and r.get('lp') and r.get('nfail',0)==0 and r.get('rv')!='WARNING'\]
top=sorted(clean,key=lambda x:x.get('margin',0) or 0,reverse=True)\[:topn\]
if not top: top=sorted(succ,key=lambda x:x.get('margin',0) or 0,reverse=True)\[:topn\]
print(f"\\n--- TOP {topn} for PC CHECK ---")
for r in top:
aid=r.get('alpha_id'); s=r.get('sharpe',0) or 0; m=r.get('margin',0) or 0
print(f" {aid} (S={s:.2f} M={m:.6f})")
if _name_==“_main_”: main()
```
### 第3步:执行脚本
```bash
python “脚本路径.py”
```
### 第4步:结果分析
脚本输出包含:
1. 每个alpha的IS统计(Sharpe, Fitness, Turnover, Margin)
2. checks结果(LOW_SHARPE, LOW_FITNESS, IS_LADDER_SHARPE等)
3. TOP N候选alpha列表 → 交给"PC快速检测"skill检查
4. 批量模式下自动生成`初步筛选结果.csv`和`round1_backtest_results.json`
## 两种使用模式
### 模式A:SLOT_CONFIGS模式(小规模迭代优化)
- 设置`BATCH_MODE = False`
- 在`SLOT_CONFIGS`中定义1-8个槽位,每槽2-10个表达式
- 每个槽位可有不同的设置参数(delay, decay, neutralization等)
- 适用于:Alpha变体优化、参数扫描
### 模式B:批量模式(大批量筛选)
- 设置`BATCH_MODE = True`
- 指定`BATCH_EXPR_FILE`(JSON表达式文件路径)
- 指定`BATCH_NUM_SLOTS`和`BATCH_PER_SLOT`
- 自动分批提交,自动筛选和打标签
- 适用于:从表达式文件批量回测、第一轮筛选
## 注意事项
1. **中性化优先级**: SUBINDUSTRY > INDUSTRY > SECTOR > MARKET > NONE
2. **超时熔断**: 单轮回测超过15分钟视为僵尸模拟
3. **错误诊断**: 若整体失败,必须查看具体错误信息
4. **并发限制**: 平台有并发模拟数限制,8槽已是最优
5. **SessionPool**: 使用线程安全的Session池管理API连接
6. **批量模式分批**: 表达式超过单批容量时自动分批,每批间无间隔
7. **Windows长路径**: 批量模式下表达式文件路径过长时,需先复制到临时目录
8. **F429自动重试**:
-
`sub()`和`sub_slot()`遇到F429(CONCURRENT_SIMULATION_LIMIT_EXCEEDED)时自动指数退避重试,等待时间:5s→10s→20s→40s→80s,最多5次
-
如果5次重试后仍然失败,该槽位的表达式会被收集并****立即递归重试****(调用`run_batch`自身)
-
递归重试同样享受5次指数退避保护,理论上最多25次尝试后才真正放弃
-
这种方式确保****零遗漏****,所有表达式都能最终完成回测
9. **断点续跑**: 对于超大批量回测(如9642条),建议使用checkpoint机制(保存`completed_indices`到JSON文件),重启脚本时自动跳过已完成的表达式,只补回失败/遗漏的
附:实现的图片:


