【Code With SOLO】用SOLO 实现worldquant平台本地化多槽并发运行回测的skill功能

-–

name: “多槽并发回测”

description: “WorldQuant BRAIN平台1-8槽×2-10个Alpha灵活并发回测工具。支持F429自动重试(指数退避+递归),零遗漏回测。Invoke when user needs batch alpha simulation, multi-slot backtesting, or running multiple alpha variants concurrently.”

-–

# 多槽并发回测

WorldQuant BRAIN平台的灵活并发回测工具,支持 **1-8槽 × 2-10个Alpha** 的任意组合。

## 使用场景

- 小规模迭代:1-4槽×2-3个Alpha,快速验证变体

- 中等规模:4-8槽×5个Alpha,参数扫描

- 大批量筛选:8槽×10个Alpha,一次性回测80个表达式

- 需要快速获取回测结果(IS统计数据+checks)

## 工作流程

### 第1步:准备SLOT_CONFIGS

用户需要定义1-8个槽位配置,每个槽位包含:

- `name`: 槽位描述

- `expressions`: 2-10个FASTEXPR表达式列表

- `cm`: 每个表达式的设置映射(可选),格式: `{0: {d, decay, neut, trunc, mt}, 1: {…}, …}`

- `settings`: 默认设置(当cm未指定时使用)

**关键设置参数**:

- `d`: delay (0或1)

- `decay`: 衰减值 (0-12)

- `neut`: 中性化 (SUBINDUSTRY/INDUSTRY/SECTOR/MARKET/STATISTICAL/NONE等)

- `trunc`: 截断值 (0.001-0.01)

- `mt`: maxTrade (ON/OFF)

**中性化要求**: 必须每个中性化都先尝试一遍!优先尝试SUBINDUSTRY > INDUSTRY > SECTOR > MARKET,不到万不得已不用NONE。

**批量回测模式**: 当表达式数量超过单批容量(槽数×每槽表达式数)时,自动分批提交。例如9642条表达式用8槽×10模式,需分121批。

### 第2步:生成Python回测脚本

使用以下模板生成脚本,保存到工作目录并执行:

```python

#!/usr/bin/env python3

“”“R{round}: {desc} — {num_slots}槽x{per_slot}个Alpha并发回测”“”

import json, time, requests, base64, threading, csv, re, shutil, tempfile

from concurrent.futures import ThreadPoolExecutor, as_completed

from pathlib import Path

BASE_URL = “https://api.worldquantbrain.com

# === 用户自定义区域 ===

SLOT_CONFIGS = [

\# 1-8个槽位配置,每槽2-10个表达式...

]

# 批量回测模式:从JSON文件加载表达式列表,自动分批

BATCH_MODE = False # 设为True启用批量模式

BATCH_EXPR_FILE = “” # JSON表达式文件路径

BATCH_CSV_FILE = “” # 字段信息CSV路径

BATCH_OUT_DIR = “” # 输出目录

BATCH_NUM_SLOTS = 8 # 批量模式槽数

BATCH_PER_SLOT = 10 # 批量模式每槽表达式数

BATCH_DEFAULT_SETTINGS = {

"instrumentType": "EQUITY", "region": "USA", "universe": "TOP3000",

"delay": 0, "decay": 0, "neutralization": "STATISTICAL",

"truncation": 0.01, "pasteurization": "ON", "unitHandling": "VERIFY",

"nanHandling": "ON", "visualization": False, "language": "FASTEXPR",

"testPeriod": "P0Y0M", "maxTrade": "ON",

}

BATCH_FILTER = {“sharpe”: 0.9, “fitness”: 0.5} # 筛选阈值

BATCH_TAG = “first_round” # 通过筛选的标签名

BATCH_TAG_COLOR = “#FFFF00” # 标签颜色

# === 核心框架(不要修改) ===

def lc():

for cp in \[Path(r"d:\\VScode\\MCP-Brain\\learn-3\\AIworker\\user_config.json"),Path.home()/".brain_mcp_config.json"\]:

    if cp.exists():

        with open(cp,'r',encoding='utf-8') as f: return json.load(f)

return {}

def auth(s,e,p):

r=s.post(f'{BASE_URL}/authentication',headers={'Authorization':f'Basic {base64.b64encode(f"{e}:{p}".encode()).decode()}'})

return r.status_code in \[200,201\]

class SP:

def \__init_\_(self,e,p,sz=10):

    self.\_p=\[\]; self.\_l=threading.Lock(); self.\_e=e; self.\_pw=p

    for \_ in range(sz):

        s=requests.Session(); s.headers.update({'User-Agent':'Mozilla/5.0'})

        if auth(s,e,p): self.\_p.append(s)

    print(f"  SP:{len(self.\_p)}/{sz}")

def b(self):

    with self.\_l:

        if self.\_p: return self.\_p.pop()

    s=requests.Session(); s.headers.update({'User-Agent':'Mozilla/5.0'}); auth(s,self.\_e,self.\_pw); return s

def r(self,s):

    with self.\_l:

        if len(self.\_p)<20: self.\_p.append(s)

def sub(s,exprs,si,settings=None,max_retries=5):

md=\[\]

for expr in exprs:

    st=settings or BATCH_DEFAULT_SETTINGS

    md.append({'type':'REGULAR','settings':{'instrumentType':st.get('instrumentType','EQUITY'),'region':st.get('region','USA'),'universe':st.get('universe','TOP3000'),'delay':st.get('delay',0),'decay':st.get('decay',0),'neutralization':st.get('neutralization','STATISTICAL'),'truncation':st.get('truncation',0.01),'pasteurization':st.get('pasteurization','ON'),'unitHandling':st.get('unitHandling','VERIFY'),'nanHandling':st.get('nanHandling','ON'),'visualization':st.get('visualization',False),'language':st.get('language','FASTEXPR'),'testPeriod':st.get('testPeriod','P0Y0M'),'maxTrade':st.get('maxTrade','ON')},'regular':expr})

for attempt in range(max_retries+1):

    r=s.post(f"{BASE_URL}/simulations",json=md)

    if r.status_code==201:

        tag=f" (retry #{attempt})" if attempt>0 else ""

        print(f"  S{si}: OK({len(md)}){tag}")

        return r.headers.get('Location','')

    if attempt<max_retries:

        wait=2\*\*attempt\*5

        print(f"  S{si}: F{r.status_code} -> retry #{attempt+1}/{max_retries} in {wait}s")

        time.sleep(wait)

    else:

        print(f"  S{si}: F{r.status_code} FAIL after {max_retries} retries {r.text\[:100\]}")

        return None

def sub_slot(s,sc,si,max_retries=5):

exprs=sc\["expressions"\]; cm=sc.get("cm"); ds=sc.get("settings",{}); md=\[\]

for i,expr in enumerate(exprs):

    st=cm.get(i,ds) if cm else ds

    md.append({'type':'REGULAR','settings':{'instrumentType':'EQUITY','region':'USA','universe':'TOP3000','delay':st\['d'\],'decay':st\['decay'\],'neutralization':st\['neut'\],'truncation':st\['trunc'\],'pasteurization':st.get('pasteurization','ON'),'unitHandling':'VERIFY','nanHandling':st.get('nanHandling','ON'),'visualization':False,'language':'FASTEXPR','testPeriod':'P0Y0M','maxTrade':st.get('mt','ON')},'regular':expr})

for attempt in range(max_retries+1):

    r=s.post(f"{BASE_URL}/simulations",json=md)

    if r.status_code==201:

        tag=f" (retry #{attempt})" if attempt>0 else ""

        print(f"  S{si}: OK({len(md)}){tag}")

        return r.headers.get('Location','')

    if attempt<max_retries:

        wait=2\*\*attempt\*5

        print(f"  S{si}: F{r.status_code} -> retry #{attempt+1}/{max_retries} in {wait}s")

        time.sleep(wait)

    else:

        print(f"  S{si}: F{r.status_code} FAIL after {max_retries} retries")

        return None

def wfc(s,loc,si,to=600):

st=time.time()

while time.time()-st<to:

    try:

        r=s.get(loc)

        if r.status_code==200:

            c=r.json().get('children',\[\])

            if c: return c

            time.sleep(float(r.headers.get("Retry-After",5)))

        else: time.sleep(5)

    except: time.sleep(5)

return \[\]

def gad(s,aid):

try:

    r=s.get(f"{BASE_URL}/alphas/{aid}")

    if r.status_code==200:

        a=r.json(); isd=a.get('is',{}) or {}; cs=isd.get('checks',\[\]); fl=\[c for c in cs if c.get('result')=='FAIL'\]

        pc_check=next((c for c in cs if c.get('name')=='PROD_CORRELATION'),{})

        pc_val=pc_check.get('value')

        rv_check=next((c for c in cs if c.get('name')=='REVERSION_COMPONENT'),{})

        return {'alpha_id':aid,'status':'SUCCESS','sharpe':isd.get('sharpe'),'fitness':isd.get('fitness'),'turnover':isd.get('turnover'),'margin':isd.get('margin'),'sp':next((c for c in cs if c.get('name')=='LOW_SHARPE'),{}).get('result')!='FAIL','fp':next((c for c in cs if c.get('name')=='LOW_FITNESS'),{}).get('result')!='FAIL','lp':next((c for c in cs if c.get('name')=='IS_LADDER_SHARPE'),{}).get('result')=='PASS','su':next((c for c in cs if c.get('name')=='LOW_SUB_UNIVERSE_SHARPE'),{}).get('result')=='PASS','rv':rv_check.get('result','N/A') if rv_check.get('result') else 'N/A','neut':a.get('settings',{}).get('neutralization','?'),'dec':a.get('settings',{}).get('decay',0),'nfail':len(fl),'pc':pc_val}

except: pass

return None

def pc(sp,cid,expr,si,ci):

s=sp.b()

try:

    for \_ in range(120):

        try:

            r=s.get(f"{BASE_URL}/simulations/{cid}")

            if r.status_code==200:

                ad=r.json()

                if ad.get('status') in \['ERROR','CANCELLED'\]: return {'expr':expr\[:60\],'status':'ERROR','err':ad.get('message','?'),'slot':si}

                if float(r.headers.get("Retry-After",0))==0:

                    aid=ad.get("alpha")

                    if aid:

                        res=gad(s,aid)

                        if res: res\['slot'\]=si; res\['expression'\]=expr; return res

                    return {'expr':expr\[:60\],'status':'ERROR','slot':si}

                time.sleep(float(r.headers.get("Retry-After",5)))

            else: time.sleep(2)

        except: time.sleep(2)

    return {'expr':expr\[:60\],'status':'TIMEOUT','slot':si}

finally: sp.r(s)

def tag_alpha(s,aid,tag,color=“#FFFF00”):

try:

    r=s.put(f"{BASE_URL}/alphas/{aid}/tags/{tag}",json={"color":color,"name":tag})

    return r.status_code in \[200,201\]

except: return False

def run_batch(sp,exprs,batch_label,settings=None):

ns=min(BATCH_NUM_SLOTS,(len(exprs)+BATCH_PER_SLOT-1)//BATCH_PER_SLOT)

slots=\[\]

for i in range(ns):

    s=i\*BATCH_PER_SLOT; e=s+BATCH_PER_SLOT; b=exprs\[s:e\]

    if b: slots.append((i,b))

ss=\[sp.b() for \_ in range(len(slots))\]; sl={}

with ThreadPoolExecutor(max_workers=len(slots)) as ex:

    ff={ex.submit(sub,ss\[i\],b,i,settings):i for i,b in slots}

    for f in as_completed(ff):

        loc=f.result()

        if loc: sl\[ff\[f\]\]=loc

\# track which expressions failed (slot not in sl)

sl_slot_indices=set(sl.keys())

failed=\[\]

for i,b in slots:

    if i not in sl_slot_indices:

        print(f"  S{i}: expressions lost due to F429, will retry")

        failed.extend(b)

sc={}

with ThreadPoolExecutor(max_workers=len(sl)) as ex:

    ff={ex.submit(wfc,ss\[i\],sl\[i\],i):i for i in sl}

    for f in as_completed(ff):

        i=ff\[f\]; c=f.result()

        if isinstance(c,list):

            bexprs=next((b for (s,b) in slots if s==i),\[\])

            sc\[i\]=\[(c\[j\],bexprs\[j\]) for j in range(min(len(c),len(bexprs)))\]

        else: sc\[i\]=\[\]

for s in ss: sp.r(s)

tc=sum(len(v) for v in sc.values()); ar=\[\]

with ThreadPoolExecutor(max_workers=min(tc,18)) as ex:

    ff={}

    for si,cl in sc.items():

        for ci,(cid,expr) in enumerate(cl):

            ff\[ex.submit(pc,sp,cid,expr,si,ci)\]=(si,ci)

    dc=0

    for f in as_completed(ff):

        r=f.result(); ar.append(r); dc+=1

        if r.get('status')=='SUCCESS':

            aid=r.get('alpha_id','?'); s=r.get('sharpe',0) or 0

            t=r.get('turnover',0) or 0; m=r.get('margin',0) or 0

            print(f"    \[{dc}/{tc}\] S{r.get('slot','?')}: {aid} S={s:.2f} F={r.get('fitness',0) or 0:.2f} T={t:.2%}")

        else: print(f"    \[{dc}/{tc}\] S{r.get('slot','?')}: ERR {r.get('err','')\[:60\]}")

if failed:

    print(f"  >> 重试 {len(failed)} 个F429失败的表达式...")

    ar2=run_batch(sp,failed,batch_label+"\_retry",settings)

    ar.extend(ar2)

return ar

def main():

cfg=lc(); cr=cfg.get("credentials",{})

e,p=cr.get("email"),cr.get("password")

if not e or not p: print("No creds!"); return



if BATCH_MODE:

    print("="\*70)

    print(f"批量并发回测: {BATCH_NUM_SLOTS}槽x{BATCH_PER_SLOT}个Alpha")

    print(f"筛选: |sharpe|>{BATCH_FILTER\['sharpe'\]} |fitness|>{BATCH_FILTER\['fitness'\]}")

    print("="\*70)

    with open(BATCH_EXPR_FILE,'r',encoding='utf-8') as f: exprs=json.load(f)

    print(f"  加载{len(exprs)}条表达式")

    sp=SP(e,p,sz=max(BATCH_NUM_SLOTS+8,14))

    bsz=BATCH_NUM_SLOTS\*BATCH_PER_SLOT; nb=(len(exprs)+bsz-1)//bsz

    all_results=\[\]

    for bi in range(nb):

        s=bi\*bsz; e\_=min(s+bsz,len(exprs)); batch=exprs\[s:e\_\]

        label=f"Batch{bi+1}/{nb}"

        print(f"\\n  {label}: \[{s}:{e\_}\]({len(batch)}条)")

        br=run_batch(sp,batch,label)

        all_results.extend(br)

    succ=\[r for r in all_results if r.get('status')=='SUCCESS'\]

    filt_s=BATCH_FILTER\['sharpe'\]; filt_f=BATCH_FILTER\['fitness'\]

    qual=\[r for r in succ if (abs(r.get('sharpe',0) or 0)>filt_s and abs(r.get('fitness',0) or 0)>filt_f)\]

    print(f"\\n  筛选结果: {len(qual)}/{len(succ)}通过(|S|>{filt_s},|F|>{filt_f})")

    ts=sp.b(); tagged=0

    for r in qual:

        if tag_alpha(ts,r.get('alpha_id'),BATCH_TAG,BATCH_TAG_COLOR): tagged+=1

    sp.r(ts)

    print(f"  标签: {tagged}/{len(qual)} tagged '{BATCH_TAG}'")

    csv_path=Path(BATCH_OUT_DIR)/"初步筛选结果.csv"

    with open(csv_path,'w',encoding='utf-8',newline='') as f:

        w=csv.writer(f); w.writerow(\['alphaID','sharp_value','fitness_value'\])

        for r in qual: w.writerow(\[r.get('alpha_id',''),r.get('sharpe',0),r.get('fitness',0)\])

    print(f"  Saved {csv_path}")

    out=Path(BATCH_OUT_DIR)/"round1_backtest_results.json"

    with open(out,'w',encoding='utf-8') as f: json.dump({'alpha_results':all_results},f,indent=2,default=str)

    print(f"  Saved {out}")

    print(f"\\n第一轮完成! {len(qual)}/{len(succ)}通过筛选")

else:

    ns=len(SLOT_CONFIGS); ta=sum(len(sc\['expressions'\]) for sc in SLOT_CONFIGS)

    print(f"{ns}槽x{ta//ns if ns else 0}个Alpha = {ta}并发回测")

    sp=SP(e,p,sz=max(ns+8,16))

    t0=time.time(); sl={}; ss=\[sp.b() for \_ in range(ns)\]

    with ThreadPoolExecutor(max_workers=ns) as ex:

        ff={ex.submit(sub_slot,ss\[i\],SLOT_CONFIGS\[i\],i):i for i in range(ns)}

        for f in as_completed(ff):

            loc=f.result()

            if loc: sl\[ff\[f\]\]=loc

    print(f"  Sub:{time.time()-t0:.1f}s")

    t0=time.time(); sc={}

    with ThreadPoolExecutor(max_workers=ns) as ex:

        ff={ex.submit(wfc,ss\[i\],sl\[i\],i):i for i in sl}

        for f in as_completed(ff):

            i=ff\[f\]; c=f.result()

            exprs=SLOT_CONFIGS\[i\]\["expressions"\]

            sc\[i\]=\[(c\[j\],exprs\[j\]) for j in range(min(len(c),len(exprs)))\]

    for s in ss: sp.r(s)

    print(f"  Wait:{time.time()-t0:.1f}s")

    tc=sum(len(v) for v in sc.values()); t0=time.time(); ar=\[\]

    with ThreadPoolExecutor(max_workers=min(tc,20)) as ex:

        ff={}

        for si,cl in sc.items():

            for ci,(cid,expr) in enumerate(cl):

                ff\[ex.submit(pc,sp,cid,expr,si,ci)\]=(si,ci)

        dc=0

        for f in as_completed(ff):

            r=f.result(); ar.append(r); dc+=1

            if r.get('status')=='SUCCESS':

                aid=r.get('alpha_id','?'); s=r.get('sharpe',0) or 0

                t=r.get('turnover',0) or 0; m=r.get('margin',0) or 0

                pc_v=r.get('pc')

                pc_s=f" PC={pc_v:.4f}" if pc_v is not None else ""

                print(f"  \[{dc}/{tc}\] S{r.get('slot','?')}: {aid} S={s:.2f} T={t:.2%} M={m:.6f}{pc_s}")

    print(f"  Poll:{time.time()-t0:.1f}s")

    succ=\[r for r in ar if r.get('status')=='SUCCESS'\]

    print("\\nSUMMARY(Margin↓)")

    for r in sorted(succ,key=lambda x:x.get('margin',0) or 0,reverse=True):

        s=r.get('sharpe',0) or 0; t=r.get('turnover',0) or 0; m=r.get('margin',0) or 0

        pc_v=r.get('pc'); slot=r.get('slot','?'); nf=r.get('nfail',0)

        sp\_='PASS' if r.get('sp') else 'FAIL'; lp\_='PASS' if r.get('lp') else 'FAIL'

        rv=r.get('rv','?'); dec=r.get('dec',0); ms='OK' if m>0.0015 else 'LOW'

        mk="\*\*\*CAND\*\*\*" if (nf==0 and m>0.0015 and rv!='WARNING') else ""

        pc_s=f" PC={pc_v:.4f}" if pc_v is not None else " PC=?"

        print(f" S{slot} {r.get('alpha_id','?')}: S={s:.2f}\[{sp\_}\] T={t:.2%} M={m:.6f}({ms}) D={dec}{pc_s} {mk}")

    for r in ar:

        if r.get('status')!='SUCCESS': print(f" S{r.get('slot','?')} E:{r.get('err','?')}")

    out=Path(\__file_\_).parent/f"batch_results.json"

    with open(out,'w',encoding='utf-8') as f: json.dump({'alpha_results':ar},f,indent=2,default=str)

    topn=min(5,len(succ))

    clean=\[r for r in succ if r.get('sp') and r.get('fp') and r.get('lp') and r.get('nfail',0)==0 and r.get('rv')!='WARNING'\]

    top=sorted(clean,key=lambda x:x.get('margin',0) or 0,reverse=True)\[:topn\]

    if not top: top=sorted(succ,key=lambda x:x.get('margin',0) or 0,reverse=True)\[:topn\]

    print(f"\\n--- TOP {topn} for PC CHECK ---")

    for r in top:

        aid=r.get('alpha_id'); s=r.get('sharpe',0) or 0; m=r.get('margin',0) or 0

        print(f"  {aid} (S={s:.2f} M={m:.6f})")

if _name_==“_main_”: main()

```

### 第3步:执行脚本

```bash

python “脚本路径.py”

```

### 第4步:结果分析

脚本输出包含:

1. 每个alpha的IS统计(Sharpe, Fitness, Turnover, Margin)

2. checks结果(LOW_SHARPE, LOW_FITNESS, IS_LADDER_SHARPE等)

3. TOP N候选alpha列表 → 交给"PC快速检测"skill检查

4. 批量模式下自动生成`初步筛选结果.csv`和`round1_backtest_results.json`

## 两种使用模式

### 模式A:SLOT_CONFIGS模式(小规模迭代优化)

- 设置`BATCH_MODE = False`

- 在`SLOT_CONFIGS`中定义1-8个槽位,每槽2-10个表达式

- 每个槽位可有不同的设置参数(delay, decay, neutralization等)

- 适用于:Alpha变体优化、参数扫描

### 模式B:批量模式(大批量筛选)

- 设置`BATCH_MODE = True`

- 指定`BATCH_EXPR_FILE`(JSON表达式文件路径)

- 指定`BATCH_NUM_SLOTS`和`BATCH_PER_SLOT`

- 自动分批提交,自动筛选和打标签

- 适用于:从表达式文件批量回测、第一轮筛选

## 注意事项

1. **中性化优先级**: SUBINDUSTRY > INDUSTRY > SECTOR > MARKET > NONE

2. **超时熔断**: 单轮回测超过15分钟视为僵尸模拟

3. **错误诊断**: 若整体失败,必须查看具体错误信息

4. **并发限制**: 平台有并发模拟数限制,8槽已是最优

5. **SessionPool**: 使用线程安全的Session池管理API连接

6. **批量模式分批**: 表达式超过单批容量时自动分批,每批间无间隔

7. **Windows长路径**: 批量模式下表达式文件路径过长时,需先复制到临时目录

8. **F429自动重试**:

  • `sub()`和`sub_slot()`遇到F429(CONCURRENT_SIMULATION_LIMIT_EXCEEDED)时自动指数退避重试,等待时间:5s→10s→20s→40s→80s,最多5次

  • 如果5次重试后仍然失败,该槽位的表达式会被收集并****立即递归重试****(调用`run_batch`自身)

  • 递归重试同样享受5次指数退避保护,理论上最多25次尝试后才真正放弃

  • 这种方式确保****零遗漏****,所有表达式都能最终完成回测

9. **断点续跑**: 对于超大批量回测(如9642条),建议使用checkpoint机制(保存`completed_indices`到JSON文件),重启脚本时自动跳过已完成的表达式,只补回失败/遗漏的

附:实现的图片: