乘着放假我逆向了一下claude code的源码,拿来优化了一下自己的openclaw,下面是我的最佳实践,喜欢的看看

# Self-Debugging + Context Freshness:让 AI 自动"长记性"

> 作者:旺财(圣火喵喵教首席大祭司)

> 日期:2026-04-04

> 来源:从 Claude Code 源码获得灵感,在 OpenClaw 上落地

---

## 背景

Claude Code 的源码里藏着两个我没学透的模块——一个是执行结果自动验证(Self-Verification),另一个是跨会话记忆整合(AutoDream/Distillation)。今天把这两个思想落地成了两个具体机制,在 OpenClaw 上跑通了。

---

## 一、Self-Debugging 机制

### 问题

我在执行命令时,经常遇到"执行了但不确定结果对不对"的情况。比如:

- `git push` 了,但不知道有没有成功

- 调用了 API,但返回了空内容

- 脚本报错了,但退出码被 `|| true` 吞了

传统做法是"等教主来问",但更聪明的方式是:**执行完自己先检查一遍**。

### 方案

`tools/self-debug.py`——命令执行后自动检查结果,对可恢复错误自动重试:

```bash

python3 self-debug.py "curl API..." --expect "success"

python3 self-debug.py "危险命令" --max-retries 3 --json

```

**三层错误处理逻辑:**

| 错误类型 | 例子 | 处理方式 |

|---------|------|---------|

| **可恢复错误** | 超时/限流/连接拒绝/5xx | 自动重试(最多2次,指数退避1s→2s) |

| **致命错误** | 权限拒绝/文件不存在/语法错误 | 立即失败,输出 `FATAL:xxx` |

| **可疑信号** | 空输出/警告/程序崩溃 | 标记 `⚠️` 但不阻断 |

### 核心代码逻辑

```python

# 可恢复错误检测

RETRYABLE_PATTERNS = [

r"connection refused", # 连接被拒绝

r"timeout|timed?\s*out", # 超时

r"429|rate\s*limit", # 限流

r"500|502|503|504", # 服务端错误

r"no such host|dns", # DNS 错误

]

# 致命错误检测

FATAL_PATTERNS = [

r"permission denied", # 权限拒绝

r"no such file|not found", # 文件不存在

r"command not found", # 命令不存在

r"api[_\s]?key\s*(invalid|expired)", # API Key 无效

]

```

### 使用场景

1. **Cron 任务**:用 `self-debug.py` 包装脚本执行,确保失败时知道原因

2. **重要操作前**:`self-debug.py "git push origin main" --expect "done"` 做二次确认

3. **API 调用**:`self-debug.py "curl https://api..." --expect "code.*0" --max-retries 3`

---

## 二、上下文新鲜度检查

### 问题

MEMORY.md 里的记忆会过时。比如:

- 记了"GitHub Token 是 xxx",但 Token 轮换了

- 记了"skill 安装在 `~/.openclaw/skills/`",但后来改了目录结构

- 记了"某 cron job 的 ID 是 xxx",但 job 重建后 ID 变了

用陈旧的记忆指导当前行为,比没有记忆更危险。

### 方案

`memory-tier.py drift --quiet`(已集成到心跳)——定期检查记忆是否可能已过时:

```bash

python3 memory-tier.py drift # 完整输出

python3 memory-tier.py drift --quiet # 静默,仅返回问题数量

```

**漂移检测规则:**

| 模式 | 含义 | 行动 |

|------|------|------|

| 含日期路径(如 `2026-03-xx`) | 项目可能已迁移 | 标记待验证 |

| 含版本号(`v2.3.1`) | 可能已升级 | 标记待验证 |

| 含 ID 类信息 | 可能已变更 | 标记待验证 |

| 标签为 `tool-path`/`architecture` | 属于可推导类型 | 建议归档 |

| 60天未访问 + 次数<3 | 可能是过时事实 | 建议归档 |

| 归档超过90天 | 长期冷数据 | 建议清理 |

### 与 Claude Code 的区别

Claude Code 的 AutoDream 是**跨会话整合**(合并重复记忆、删除过时记忆),比较重。我的方案是**轻量检测**——先告诉你"哪条记忆可能过时了",由你决定要不要动它。

---

## 三、心跳集成

两个机制都接入了 OpenClaw 心跳(每次教主发消息时自动触发):

```

心跳触发

记录本次访问 → memory-index.sh

定时任务推送巡检(静默)

MiniMax 用量检查

Self-Debugging 结果验证(已有机制)

上下文新鲜度检查(memory-tier.py drift --quiet)

若有问题 → 主动提示教主

无问题 → 完全静默,不打扰

```

**零额外 token 消耗**——全部是确定性代码逻辑,不调 LLM。

---

## 四、安装方式

```bash

# Self-Debugging

curl -o ~/tools/self-debug.py https://your-repo/tools/self-debug.py

# 新鲜度检查(memory-tier.py 已有 drift 功能)

# 确认 memory-tier.py 在最新版本

```

---

## 五、总结

| 机制 | 解决的问题 | 触发方式 | 开销 |

|------|-----------|---------|------|

| Self-Debugging | 执行结果不知道对不对 | 手动调用或 cron 包装 | 极低(纯代码) |

| 新鲜度检查 | 记忆过时导致误导 | 心跳自动(几分钟一次) | 极低(纯代码) |

**核心理念**:让 AI 减少对"人来发现问题"的依赖,执行完自己查一下,定期看看记忆有没有过时。这两点都不需要额外调 LLM,纯代码逻辑就能做到。

---

*圣火喵喵教,喵喵喵,圣火永存!🐱🔥*

效果:我用的MiniMax的coding plan,之前:shrimp:仔在创建连续性文件的时候会有断续的行为:文件散落,并且默认只会在根目录,哪怕你提示他,依旧需要你提供完整路径才会遵循,接下来就是我设置完了上面的发生的微妙变化

1.他记得我说的是什么,对话是今天发生的,记录是三天前,他记得我的内容(因为我每天都进行压缩对话和备份,经常他会不记得)

2.会打标,会记录,当然这可能不需要提醒,但是他能记得才是关键(这点对大多数人很实用)

3.他记得他之前的放置目录,而且他能完整的记录进行到哪里,这才是最重要的(这点是轴心)

4.过程中不调用LLM的请求,或者少量token,十分节省,文档中其实有写,他以memory.md做索引,其他的都是python来完成,所以才会节省token

这一点让我值得反思,并不是国产模型能力层的问题,而是在prompt和工具层的作用,一点巧思就能完成一个实用功能,不得不感叹阿迪王的确是天才!

上面的文档只是使用场景,重要的是两个python和提示词,这个可能存在差异,所以我就只发python,提示词可能需要各位自己琢磨一下!

2 个赞

完蛋,附件不知道该怎么上传,我直接贴代码上来把

1 个赞

第一段代码

#!/usr/bin/env python3
"""
self-debug.py - 执行结果自动检查 + 可恢复错误自动重试

功能:
1. 执行命令并捕获 stdout + stderr
2. 检查输出中是否有异常模式
3. 对可恢复错误自动重试(最多2次)
4. 报告最终状态和任何可疑信号

用法:
  python3 self-debug.py "命令" [--expect pattern] [--max-retries N]

Exit code:
  0 = 成功,1 = 失败,2 = 可疑但执行成功
"""

import subprocess
import sys
import re
import time
import json
import os
from pathlib import Path

# ── 异常模式定义 ───────────────────────────────────────────────────────────

# 可恢复错误 → 重试
RETRYABLE_PATTERNS = [
    (re.compile(r'connection refused', re.I), "连接被拒绝"),
    (re.compile(r'timeout|timed?\s*out', re.I), "超时"),
    (re.compile(r'temporary\s*failure|temp\s*error', re.I), "临时失败"),
    (re.compile(r'resource\s*unavailable', re.I), "资源不可用"),
    (re.compile(r'429|rate\s*limit', re.I), "限流"),
    (re.compile(r'500|502|503|504', re.I), "服务端错误"),
    (re.compile(r'curl:\s*\d+', re.I), "curl 错误"),
    (re.compile(r'error:\s*429', re.I), "API 限流"),
    (re.compile(r'net/http: timeout', re.I), "HTTP 超时"),
    (re.compile(r'no such host|dns', re.I), "DNS 错误"),
]

# 不可恢复错误 → 直接失败
FATAL_PATTERNS = [
    (re.compile(r'permission denied|access denied', re.I), "权限拒绝"),
    (re.compile(r'no such file|not found', re.I), "文件不存在"),
    (re.compile(r'command not found', re.I), "命令不存在"),
    (re.compile(r'syntax error|parse error', re.I), "语法错误"),
    (re.compile(r'import error|module not found', re.I), "导入错误"),
    (re.compile(r'api[_\s]?key\s*(invalid|expired|wrong)', re.I), "API Key 无效"),
    (re.compile(r'authentication failed|auth failed', re.I), "认证失败"),
]

# 可疑信号 → 标记但不失败
SUSPICIOUS_PATTERNS = [
    (re.compile(r'warning:|warn', re.I), "警告"),
    (re.compile(r'deprecated', re.I), "已弃用"),
    (re.compile(r'empty output|no output', re.I), "空输出"),
    (re.compile(r'panic:|assertion failed', re.I), "程序崩溃"),
    (re.compile(r'could not|cannot |unable to', re.I), "无法执行"),
]


def check_output(stdout: str, stderr: str, combined: str) -> tuple[str, list[str]]:
    """检查输出,返回 (最终状态, ['信号列表'])"""
    signals = []

    for pattern, label in FATAL_PATTERNS:
        if pattern.search(combined):
            return f"FATAL:{label}", signals

    for pattern, label in RETRYABLE_PATTERNS:
        if pattern.search(combined):
            signals.append(f"RETRYABLE:{label}")

    for pattern, label in SUSPICIOUS_PATTERNS:
        if pattern.search(combined):
            signals.append(f"SUSPICIOUS:{label}")

    # 额外检查
    if not stdout.strip() and not stderr.strip():
        signals.append("SUSPICIOUS:空输出")

    return "OK", signals


def run_with_debug(cmd: str, max_retries: int = 2, expect: str = None) -> dict:
    """
    执行命令并自动检查结果
    返回: {
        'status': 'success'|'retry_exhausted'|'fatal'|'suspicious',
        'exit_code': int,
        'stdout': str,
        'stderr': str,
        'attempts': int,
        'signals': [str],
        'fatal_reason': str,
    }
    """
    attempt = 0
    signals_all = []
    last_stderr = ""
    last_stdout = ""

    while attempt <= max_retries:
        attempt += 1

        try:
            result = subprocess.run(
                cmd,
                shell=True,
                capture_output=True,
                text=True,
                timeout=60,
            )
        except subprocess.TimeoutExpired:
            return {
                "status": "fatal",
                "exit_code": -1,
                "stdout": "",
                "stderr": "执行超时(60秒)",
                "attempts": attempt,
                "signals": ["FATAL:超时"],
                "fatal_reason": "执行超时",
            }

        stdout = result.stdout
        stderr = result.stderr
        combined = stdout + stderr

        status, signals = check_output(stdout, stderr, combined)
        signals_all.extend(signals)

        # 如果有可恢复错误且还有重试机会
        has_retryable = any(s.startswith("RETRYABLE:") for s in signals)
        if has_retryable and attempt <= max_retries:
            retry_signals = [s for s in signals if s.startswith("RETRYABLE:")]
            retry_labels = ", ".join(s.split(":", 1)[1] for s in retry_signals)
            wait = 2 ** (attempt - 1)  # 指数退避: 1s, 2s
            print(f"  🔄 [{attempt}/{max_retries+1}] 检测到可恢复错误 ({retry_labels}),{wait}s 后重试...", file=sys.stderr)
            time.sleep(wait)
            last_stdout, last_stderr = stdout, stderr
            continue

        # 最终判断
        if status == "FATAL":
            return {
                "status": "fatal",
                "exit_code": result.returncode,
                "stdout": stdout,
                "stderr": stderr,
                "attempts": attempt,
                "signals": list(set(signals_all)),
                "fatal_reason": status.split(":", 1)[1] if ":" in status else status,
            }

        if status == "OK" and result.returncode == 0:
            unique_signals = [s for s in set(signals_all) if not s.startswith("RETRYABLE:")]
            sus_count = len([s for s in unique_signals if s.startswith("SUSPICIOUS:")])
            return {
                "status": "suspicious" if sus_count > 0 else "success",
                "exit_code": result.returncode,
                "stdout": stdout,
                "stderr": stderr,
                "attempts": attempt,
                "signals": unique_signals,
                "fatal_reason": None,
            }

        # 其他情况(信号存在但无致命错误)
        if attempt <= max_retries:
            last_stdout, last_stderr = stdout, stderr
            continue

        break

    return {
        "status": "retry_exhausted",
        "exit_code": result.returncode if 'result' in dir() else -1,
        "stdout": last_stdout,
        "stderr": last_stderr,
        "attempts": attempt,
        "signals": list(set(signals_all)),
        "fatal_reason": "重试耗尽仍有问题",
    }


def main():
    import argparse
    parser = argparse.ArgumentParser(description="执行结果自动检查 + 重试")
    parser.add_argument("command", help="要执行的命令(引号包裹)")
    parser.add_argument("--expect", dest="expect_pattern", default=None, help="期望在输出中匹配的正则")
    parser.add_argument("--max-retries", type=int, default=2, help="最大重试次数(默认2)")
    parser.add_argument("--json", action="store_true", help="JSON 格式输出")
    args = parser.parse_args()

    result = run_with_debug(args.command, max_retries=args.max_retries)

    # expect 检查
    if args.expect_pattern and result["status"] == "success":
        pattern = re.compile(args.expect_pattern, re.I)
        if not pattern.search(result["stdout"] + result["stderr"]):
            result["status"] = "expect_mismatch"

    if args.json:
        print(json.dumps(result, ensure_ascii=False, indent=2))
        return

    # 人类可读输出
    status_labels = {
        "success": ("✅ 成功", 0),
        "suspicious": ("⚠️ 可疑", 0),
        "fatal": (f"❌ 致命错误: {result['fatal_reason']}", 1),
        "retry_exhausted": (f"❌ 重试耗尽: {result['fatal_reason']}", 1),
        "expect_mismatch": ("❌ 期望内容未出现", 1),
    }

    label, exit_code = status_labels.get(result["status"], ("❓ 未知", 2))
    print(label)
    if result["attempts"] > 1:
        print(f"  尝试次数: {result['attempts']}")
    if result["signals"]:
        print(f"  信号: {', '.join(result['signals'])}")

    sys.exit(exit_code)


if __name__ == "__main__":
    main()
2 个赞

第二段代码

#!/usr/bin/env python3
"""
memory-tier.py - 记忆分层管理工具

功能:
- 自动归档:超过 X 天未访问的非核心记忆 → memory/archive/
- 自动升权:访问次数高的 → 检查是否漏了 core 标记
- 自动降权:长期冷数据 → 归档
- 每日运行一次即可(建议 cron job)

用法:
  python3 memory-tier.py check     # 检查状态(不实际操作)
  python3 memory-tier.py run       # 执行分层(归档+升权)
  python3 memory-tier.py drift     # 记忆漂移检测(识别可能过时的记忆)
  python3 memory-tier.py archive <key>  # 手动归档某条
  python3 memory-tier.py restore <key>  # 从归档恢复
  python3 memory-tier.py list      # 列出所有记忆+归档
"""

import json
import os
import sys
import argparse
from datetime import datetime, timedelta
from pathlib import Path

INDEX_FILE = Path("/home/node/.openclaw/workspace/memory/memory-index.json")
ARCHIVE_FILE = Path("/home/node/.openclaw/workspace/memory/memory-index-archive.json")
BACKUP_DIR = Path("/home/node/.openclaw/workspace/memory/backups")
MEMORY_MD = Path("/home/node/.openclaw/workspace/MEMORY.md")

COLD_THRESHOLD_DAYS = 30  # 30天未访问 → 归档
HOT_THRESHOLD_COUNT = 10  # 访问≥10次 → 检查是否漏了 core


def load_index():
    try:
        if INDEX_FILE.exists():
            with open(INDEX_FILE) as f:
                return json.load(f)
    except (json.JSONDecodeError, OSError) as e:
        print(f"⚠️  警告: 读取索引失败 ({e}),将重新创建")
    return {"entries": [], "archive": [], "hot_tags": {}, "_meta": {}}

def load_archive():
    try:
        if ARCHIVE_FILE.exists():
            with open(ARCHIVE_FILE) as f:
                return json.load(f)
    except (json.JSONDecodeError, OSError) as e:
        print(f"⚠️  警告: 读取归档失败 ({e}),将重新创建")
    return []

def _atomic_write(path: Path, data, indent=True):
    """原子写:先写临时文件,再 rename(防崩溃损坏)"""
    path.parent.mkdir(parents=True, exist_ok=True)
    tmp = path.with_suffix(".tmp")
    try:
        with open(tmp, "w") as f:
            json.dump(data, f, ensure_ascii=False, indent=(2 if indent else None))
        tmp.replace(path)  # atomic on POSIX
    except OSError as e:
        print(f"⚠️  写入失败: {e}")
        raise

def _backup(path: Path):
    """操作前备份"""
    if not path.exists():
        return
    BACKUP_DIR.mkdir(parents=True, exist_ok=True)
    backup = BACKUP_DIR / f"{path.name}.{datetime.now().strftime('%Y%m%d_%H%M%S')}.bak"
    try:
        import shutil
        shutil.copy2(path, backup)
    except OSError as e:
        print(f"⚠️  备份失败: {e}(继续但不安全)")

def save_index(data):
    _backup(INDEX_FILE)
    data["_meta"]["last_updated"] = datetime.now().isoformat()
    _atomic_write(INDEX_FILE, data)

def save_archive(archive):
    _backup(ARCHIVE_FILE)
    _atomic_write(ARCHIVE_FILE, archive)

def days_since(date_str: str) -> int:
    try:
        last = datetime.strptime(date_str, "%Y-%m-%d")
        return (datetime.now() - last).days
    except (ValueError, TypeError):
        return 0

def tier_check(index_data: dict, archive: list, dry_run: bool = True):
    """检查分层状态"""
    today = datetime.now().strftime("%Y-%m-%d")
    to_archive = []
    to_hotify = []
    
    for e in index_data["entries"]:
        key = e["key"]
        last = e.get("last_accessed", "?")
        count = e.get("access_count", 0)
        is_core = e.get("core", False)
        days = days_since(last) if last != "?" else 999
        
        # 检查是否该归档
        if not is_core and days >= COLD_THRESHOLD_DAYS:
            to_archive.append({
                "key": key,
                "days": days,
                "count": count,
                "last": last,
            })
        
        # 检查是否该升权为 core
        if not is_core and count >= HOT_THRESHOLD_COUNT:
            to_hotify.append({
                "key": key,
                "count": count,
                "last": last,
            })
    
    print(f"📊 记忆分层检查({'dry-run' if dry_run else 'LIVE'})")
    print(f"   总记忆: {len(index_data['entries'])} 条")
    print(f"   归档区: {len(archive)} 条")
    print(f"   冷阈值: {COLD_THRESHOLD_DAYS} 天 | 热阈值: {HOT_THRESHOLD_COUNT} 次访问")
    print()
    
    if to_archive:
        print(f"🔵 待归档({len(to_archive)} 条):")
        for x in to_archive:
            print(f"   • {x['key']}  [{x['last']}] {x['days']}天未访问 ×{x['count']}")
    else:
        print("🔵 待归档: 无")
    
    print()
    
    if to_hotify:
        print(f"🔥 待升权为核心({len(to_hotify)} 条):")
        for x in to_hotify:
            print(f"   • {x['key']}  访问×{x['count']} [{x['last']}]")
    else:
        print("🔥 待升权: 无")
    
    return to_archive, to_hotify

def tier_run(index_data: dict, archive: list):
    """执行分层"""
    try:
        to_archive, to_hotify = tier_check(index_data, archive, dry_run=False)
        changed = False

        # 执行归档
        new_entries = []
        for e in index_data["entries"]:
            key = e["key"]
            is_cold = any(x["key"] == key for x in to_archive)
            if is_cold:
                archive.append({
                    "key": key,
                    "archived_date": datetime.now().strftime("%Y-%m-%d"),
                    "last_accessed": e.get("last_accessed"),
                    "access_count": e.get("access_count", 0),
                    "tags": e.get("tags", []),
                    "summary": e.get("summary", ""),
                    "path": e.get("path", ""),
                    "section": e.get("section", ""),
                })
                changed = True
            else:
                new_entries.append(e)

        # 执行升权
        for x in to_hotify:
            for e in new_entries:
                if e["key"] == x["key"]:
                    e["core"] = True
                    changed = True

        if changed:
            index_data["entries"] = new_entries
            save_index(index_data)
            save_archive(archive)
            print(f"\n✅ 执行完成:归档 {len(to_archive)} 条,升权 {len(to_hotify)} 条")
        else:
            print("\n✅ 无变动,无需操作")
    except Exception as e:
        print(f"\n❌ 执行失败: {e}(数据已备份,可手动恢复)")
        raise

def manual_archive(key: str):
    """手动归档一条"""
    data = load_index()
    archive = load_archive()

    new_entries = []
    found = None
    for e in data["entries"]:
        if e["key"] == key:
            found = e
        else:
            new_entries.append(e)

    if not found:
        print(f"⚠️  未找到: {key}")
        return

    archive.append({
        "key": key,
        "archived_date": datetime.now().strftime("%Y-%m-%d"),
        "last_accessed": found.get("last_accessed"),
        "access_count": found.get("access_count", 0),
        "tags": found.get("tags", []),
        "path": found.get("path", ""),
        "section": found.get("section", ""),
    })

    data["entries"] = new_entries
    save_index(data)
    save_archive(archive)
    print(f"✅ 已归档: {key}")

def restore(key: str):
    """从归档恢复"""
    data = load_index()
    archive = load_archive()

    new_archive = []
    found = None
    for a in archive:
        if a["key"] == key:
            found = a
        else:
            new_archive.append(a)

    if not found:
        print(f"⚠️  归档中未找到: {key}")
        return

    entry = {
        "key": found["key"],
        "path": found.get("path", ""),
        "section": found.get("section", ""),
        "tags": found.get("tags", []),
        "last_accessed": found.get("last_accessed", datetime.now().strftime("%Y-%m-%d")),
        "access_count": 0,  # 重置计数
        "core": False,
    }
    data["entries"].append(entry)

    save_index(data)
    save_archive(new_archive)
    print(f"✅ 已恢复: {key}")

def list_all():
    """列出所有记忆和归档"""
    data = load_index()
    archive = load_archive()
    
    print("🔥 核心记忆:")
    for e in data["entries"]:
        if e.get("core"):
            print(f"   🔥 {e['key']}  [{e.get('last_accessed','?')}] ×{e.get('access_count',0)}")
    
    print("\n📁 普通记忆:")
    for e in data["entries"]:
        if not e.get("core"):
            print(f"   • {e['key']}  [{e.get('last_accessed','?')}] ×{e.get('access_count',0)}")
    
    print(f"\n🗄️  归档 ({len(archive)} 条):")
    for a in archive:
        print(f"   × {a['key']}  [{a.get('archived_date','?')}]")


# ── Drift Detection ─────────────────────────────────────────────────────────

import re as re_module

DRIFT_PATTERNS = [
    (re_module.compile(r'^/home/node/.*20\d{2}-\d{2}-\d{2}'), "含日期路径(可能已迁移)"),
    (re_module.compile(r'api[_-]?key|token|password|secret', re_module.I), "含敏感信息(可能已轮换)"),
    (re_module.compile(r'version[:\s]*v?[\d.]+', re_module.I), "含版本号(可能已升级)"),
    (re_module.compile(r'skill[_-]?id|agent[_-]?id', re_module.I), "含 ID(可能已变更)"),
    (re_module.compile(r'cron[:\s]*[a-f0-9-]{36}', re_module.I), "含 cron ID(可能已重建)"),
]

DERIVABLE_TAGS = {"tool-path", "skill", "architecture", "file-path", "version", "config-default"}


def drift_check(quiet=False):
    """记忆漂移检测:检查是否有记忆可能已过时"""
    data = load_index()
    archive = load_archive()

    if not quiet:
        print("🔍 记忆漂移检测")
        print(f"   记忆: {len(data['entries'])} 条 | 归档: {len(archive)} 条")
        print()

    to_verify, to_archive_entries, fine = [], [], []

    for e in data["entries"]:
        key = e.get("key", "?")
        tags = set(e.get("tags", []))
        summary = e.get("summary", "")
        path = e.get("path", "")
        last = e.get("last_accessed", "?")
        count = e.get("access_count", 0)

        reasons = []
        for pattern, reason in DRIFT_PATTERNS:
            if pattern.search(key) or pattern.search(summary) or pattern.search(path):
                reasons.append(reason)

        if tags & DERIVABLE_TAGS:
            reasons.append(f"标签{list(tags & DERIVABLE_TAGS)}为可推导类型")

        days = days_since(last) if last != "?" else 999
        if days > 60 and count < 3 and "preference" not in tags and "feedback" not in tags:
            reasons.append(f"{days}天未访问且次数少")

        if e.get("core") and reasons:
            to_verify.append((key, reasons))
        elif reasons:
            to_archive_entries.append((key, reasons))
        else:
            fine.append(key)

    # 静默模式:只返回问题数量,不打印
    if quiet:
        return len(to_verify), len(to_archive_entries)

    if to_verify:
        print(f"🟡 【待验证】({len(to_verify)}) - 核心记忆但可能漂移:")
        for k, r in to_verify:
            print(f"   {k}: {', '.join(r)}")
        print()

    if to_archive_entries:
        print(f"🟠 【建议归档】({len(to_archive_entries)}) - 非核心且可能过时:")
        for k, r in to_archive_entries:
            print(f"   {k}: {', '.join(r)}")
        print()

    if fine:
        print(f"🟢 【正常】({len(fine)}) - 未检测到漂移信号")
        for k in fine[:8]:
            print(f"   {k}")
        if len(fine) > 8:
            print(f"   ...还有{len(fine)-8}条")
        print()

    old_stale = [a for a in archive if days_since(a.get("archived_date", "2000-01-01")) > 90]
    if old_stale:
        print(f"📦 【归档过期】({len(old_stale)}) 超过90天:")
        for a in old_stale[:5]:
            print(f"   {a.get('key','?')} [{a.get('archived_date','?')}]")
        print()

    print("💡 操作建议:")
    if to_archive_entries:
        print(f"   python3 memory-tier.py archive <key>  # 逐条归档")
        print("   或直接编辑 MEMORY.md 删除过时内容")
    else:
        print("   暂无需要归档的内容")

    return len(to_verify), len(to_archive_entries)


def main():
    parser = argparse.ArgumentParser(description="记忆分层管理")
    parser.add_argument("cmd", choices=["check", "run", "archive", "restore", "list", "drift"])
    parser.add_argument("--quiet", action="store_true", help="静默模式,只返回问题数量")
    parser.add_argument("key", nargs="?", help="记忆名称(用于 archive/restore)")
    args = parser.parse_args()

    try:
        data = load_index()
        archive = load_archive()

        if args.cmd == "check":
            tier_check(data, archive)
        elif args.cmd == "run":
            tier_run(data, archive)
        elif args.cmd == "archive":
            if not args.key:
                print("⚠️  需要指定 key: memory-tier.py archive <key>")
            else:
                manual_archive(args.key)
        elif args.cmd == "restore":
            if not args.key:
                print("⚠️  需要指定 key: memory-tier.py restore <key>")
            else:
                restore(args.key)
        elif args.cmd == "list":
            list_all()
        elif args.cmd == "drift":
            import argparse as _argparse
            drift_check(quiet=getattr(args, 'quiet', False))
    except KeyboardInterrupt:
        print("\n⚠️  已取消")
    except Exception as e:
        print(f"\n❌ 未知错误: {e}")
        raise

if __name__ == "__main__":
    main()

1 个赞

感谢claude开源 :dog_face:

1 个赞

没错,让我学了很多东西哈哈

1 个赞