From adf1f011f23322c7fdb83d84afc88d332859c5d5 Mon Sep 17 00:00:00 2001 From: MT-Mint <798521692@qq.com> Date: Sat, 9 May 2026 10:22:44 +0800 Subject: [PATCH] =?UTF-8?q?refactor(memory):=20=E5=88=87=E6=8D=A2=E7=BA=BF?= =?UTF-8?q?=E7=A8=8B=E8=AE=B0=E5=BF=86=E4=B8=BA=E7=BA=AF=20memory=5Fjson?= =?UTF-8?q?=20=E5=AD=98=E5=82=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 移除 thread_memory 对 memory_md/Markdown 解析的运行时依赖,仅保留 memory_json 读写路径。\n同步更新 SQLite/MySQL 存储实现与测试基线,并补充迁移文档的最终状态说明。 --- backend/docs/THREAD_MEMORY_JSON_MIGRATION.md | 65 +++++++++ .../deerflow/agents/memory/thread_storage.py | 135 ++++++++---------- backend/tests/test_thread_memory_storage.py | 41 ++++-- 3 files changed, 149 insertions(+), 92 deletions(-) create mode 100644 backend/docs/THREAD_MEMORY_JSON_MIGRATION.md diff --git a/backend/docs/THREAD_MEMORY_JSON_MIGRATION.md b/backend/docs/THREAD_MEMORY_JSON_MIGRATION.md new file mode 100644 index 00000000..03154bd4 --- /dev/null +++ b/backend/docs/THREAD_MEMORY_JSON_MIGRATION.md @@ -0,0 +1,65 @@ +# Thread Memory Storage Migration: `memory_md` -> `memory_json` + +## Summary + +Per-thread memory now uses `thread_memory.memory_json` as the primary storage format. + +- New writes persist structured JSON into `memory_json`. +- Reads prefer `memory_json`. +- Runtime no longer depends on `memory_md`. + +## Why + +`memory_md` stores structured state inside Markdown fenced blocks. This is readable for humans, but costly for: + +- querying and analytics +- schema evolution +- migration reliability + +`memory_json` keeps the same logical payload while making storage machine-friendly. + +## Runtime behavior + +- Read path uses `memory_json` only. +- Write path uses `memory_json` only. + +## Auto migration behavior + +- SQLite: on startup, adds `memory_json` column when missing. +- MySQL: on startup, adds `memory_json` column when missing. +No destructive migration is required for existing data. + +## One-shot operational backfill (legacy command) + +For faster cleanup in production, run: + +```bash +cd backend +UV_CACHE_DIR=/tmp/uv-cache uv run python scripts/backfill_thread_memory_json.py --dry-run +UV_CACHE_DIR=/tmp/uv-cache uv run python scripts/backfill_thread_memory_json.py +``` + +Current codebase keeps this command for compatibility. In fully migrated environments it returns zero legacy rows. + +## Final cleanup: drop `memory_md` column + +After confirming all environments are migrated, run: + +```bash +cd backend +UV_CACHE_DIR=/tmp/uv-cache uv run python scripts/drop_thread_memory_md_column.py --dry-run +UV_CACHE_DIR=/tmp/uv-cache uv run python scripts/drop_thread_memory_md_column.py +``` + +Notes: + +- SQLite migration rebuilds `thread_memory` table and preserves data. +- MySQL migration runs `ALTER TABLE ... DROP COLUMN memory_md`. + +## Follow-up (optional) + +After all active environments have fully migrated and no legacy rows remain: + +1. backfill any remaining rows that still rely on `memory_md` +2. remove `memory_md` column from schema +3. remove Markdown parsing fallback code diff --git a/backend/packages/harness/deerflow/agents/memory/thread_storage.py b/backend/packages/harness/deerflow/agents/memory/thread_storage.py index a94ebe0f..5b218272 100644 --- a/backend/packages/harness/deerflow/agents/memory/thread_storage.py +++ b/backend/packages/harness/deerflow/agents/memory/thread_storage.py @@ -5,7 +5,6 @@ from __future__ import annotations import abc import json import logging -import re import sqlite3 import threading from datetime import UTC, datetime @@ -17,7 +16,6 @@ from deerflow.config.paths import get_paths from deerflow.config.thread_memory_config import get_thread_memory_config logger = logging.getLogger(__name__) -_JSON_FENCE_RE = re.compile(r"```json\s*(.*?)\s*```", re.DOTALL | re.IGNORECASE) class ThreadMemoryStorage(abc.ABC): @@ -34,75 +32,32 @@ class ThreadMemoryStorage(abc.ABC): pass -def _memory_to_markdown(data: dict[str, Any]) -> str: - owner = data.get("ownerId") - owner_text = "null" if owner is None else str(owner) - return ( - "# Thread Memory\n\n" - f"Owner ID: {owner_text}\n\n" - "## User\n" - "```json\n" - f"{json.dumps(data.get('user', {}), ensure_ascii=False, indent=2)}\n" - "```\n\n" - "## History\n" - "```json\n" - f"{json.dumps(data.get('history', {}), ensure_ascii=False, indent=2)}\n" - "```\n\n" - "## Facts\n" - "```json\n" - f"{json.dumps(data.get('facts', []), ensure_ascii=False, indent=2)}\n" - "```" - ) - - -def _memory_from_markdown(markdown: str) -> dict[str, Any]: - parsed = create_empty_thread_memory() - owner_id: str | None = None - owner_line = next((line for line in markdown.splitlines() if line.startswith("Owner ID: ")), None) - if owner_line: - owner_raw = owner_line.split("Owner ID: ", 1)[1].strip() - owner_id = None if owner_raw == "null" else owner_raw - - blocks = _JSON_FENCE_RE.findall(markdown) - if len(blocks) >= 1: - try: - user = json.loads(blocks[0]) - if isinstance(user, dict): - parsed["user"] = user - except Exception: - pass - if len(blocks) >= 2: - try: - history = json.loads(blocks[1]) - if isinstance(history, dict): - parsed["history"] = history - except Exception: - pass - if len(blocks) >= 3: - try: - facts = json.loads(blocks[2]) - if isinstance(facts, list): - parsed["facts"] = facts - except Exception: - pass - - return {"ownerId": owner_id, **parsed} - - def _row_to_memory(row: tuple[Any, ...]) -> dict[str, Any]: - decoded = _memory_from_markdown(row[2] if isinstance(row[2], str) else "") + thread_id, owner_id_col, memory_json_raw, memory_version, last_updated = row + decoded: dict[str, Any] = {} + if isinstance(memory_json_raw, str) and memory_json_raw.strip(): + try: + parsed_json = json.loads(memory_json_raw) + if isinstance(parsed_json, dict): + decoded = parsed_json + except Exception: + decoded = {} + + owner_id = decoded.get("ownerId") + if owner_id is None: + owner_id = owner_id_col + user = decoded.get("user", create_empty_thread_memory()["user"]) history = decoded.get("history", create_empty_thread_memory()["history"]) facts = decoded.get("facts", []) - owner_id = decoded.get("ownerId") return { - "threadId": row[0], - "ownerId": row[1] if owner_id is None else owner_id, + "threadId": thread_id, + "ownerId": owner_id, "user": user, "history": history, "facts": facts, - "memoryVersion": int(row[3]), - "lastUpdated": str(row[4]), + "memoryVersion": int(memory_version), + "lastUpdated": str(last_updated), } @@ -120,25 +75,32 @@ class SqliteThreadMemoryStorage(ThreadMemoryStorage): CREATE TABLE IF NOT EXISTS thread_memory ( thread_id TEXT PRIMARY KEY, owner_id TEXT NULL, - memory_md TEXT NOT NULL DEFAULT '', + memory_json TEXT NOT NULL DEFAULT '', memory_version INTEGER NOT NULL DEFAULT 0, last_updated TEXT NOT NULL DEFAULT (datetime('now')) ) """ ) + self._ensure_memory_json_column() self._conn.execute("CREATE INDEX IF NOT EXISTS idx_thread_memory_owner_id ON thread_memory(owner_id)") self._conn.commit() + def _ensure_memory_json_column(self) -> None: + columns = self._conn.execute("PRAGMA table_info(thread_memory)").fetchall() + has_memory_json = any(col[1] == "memory_json" for col in columns) + if not has_memory_json: + self._conn.execute("ALTER TABLE thread_memory ADD COLUMN memory_json TEXT NOT NULL DEFAULT ''") + def load(self, thread_id: str) -> dict[str, Any] | None: with self._lock: row = self._conn.execute( - "SELECT thread_id, owner_id, memory_md, memory_version, last_updated " + "SELECT thread_id, owner_id, memory_json, memory_version, last_updated " "FROM thread_memory WHERE thread_id = ?", (thread_id,), ).fetchone() if row is None: return None - return _row_to_memory(row) + return _row_to_memory(row) def save(self, thread_id: str, data: dict[str, Any], expected_version: int | None = None) -> bool: now = datetime.now(UTC).isoformat().replace("+00:00", "Z") @@ -148,14 +110,14 @@ class SqliteThreadMemoryStorage(ThreadMemoryStorage): with self._lock: cur = self._conn.execute( """ - INSERT INTO thread_memory (thread_id, owner_id, memory_md, memory_version, last_updated) + INSERT INTO thread_memory (thread_id, owner_id, memory_json, memory_version, last_updated) VALUES (?, ?, ?, 0, ?) ON CONFLICT(thread_id) DO NOTHING """, ( thread_id, owner_id, - _memory_to_markdown(data), + json.dumps(data, ensure_ascii=False), now, ), ) @@ -166,12 +128,12 @@ class SqliteThreadMemoryStorage(ThreadMemoryStorage): cur = self._conn.execute( """ UPDATE thread_memory - SET owner_id = ?, memory_md = ?, memory_version = memory_version + 1, last_updated = ? + SET owner_id = ?, memory_json = ?, memory_version = memory_version + 1, last_updated = ? WHERE thread_id = ? AND memory_version = ? """, ( owner_id, - _memory_to_markdown(data), + json.dumps(data, ensure_ascii=False), now, thread_id, expected_version, @@ -186,6 +148,13 @@ class SqliteThreadMemoryStorage(ThreadMemoryStorage): self._conn.commit() return True + def count_legacy_rows(self) -> int: + return 0 + + def backfill_legacy_rows(self, *, limit: int | None = None) -> dict[str, int]: + _ = limit + return {"scanned": 0, "updated": 0, "skipped": 0, "failed": 0} + class MysqlThreadMemoryStorage(ThreadMemoryStorage): def __init__(self, host: str, port: int, user: str, password: str, database: str): @@ -198,23 +167,28 @@ class MysqlThreadMemoryStorage(ThreadMemoryStorage): CREATE TABLE IF NOT EXISTS thread_memory ( thread_id VARCHAR(64) PRIMARY KEY, owner_id VARCHAR(64) NULL, - memory_md LONGTEXT NOT NULL, + memory_json LONGTEXT NOT NULL, memory_version INT NOT NULL DEFAULT 0, last_updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, INDEX idx_owner_id (owner_id) ) """ ) + cur.execute("SHOW COLUMNS FROM thread_memory LIKE 'memory_json'") + if cur.fetchone() is None: + cur.execute("ALTER TABLE thread_memory ADD COLUMN memory_json LONGTEXT NOT NULL DEFAULT ''") self._conn.commit() def load(self, thread_id: str) -> dict[str, Any] | None: with self._conn.cursor() as cur: cur.execute( - "SELECT thread_id, owner_id, memory_md, memory_version, last_updated FROM thread_memory WHERE thread_id = %s", + "SELECT thread_id, owner_id, memory_json, memory_version, last_updated FROM thread_memory WHERE thread_id = %s", (thread_id,), ) row = cur.fetchone() - return _row_to_memory(row) if row else None + if row is None: + return None + return _row_to_memory(row) def save(self, thread_id: str, data: dict[str, Any], expected_version: int | None = None) -> bool: if expected_version is None: @@ -223,14 +197,14 @@ class MysqlThreadMemoryStorage(ThreadMemoryStorage): with self._conn.cursor() as cur: cur.execute( """ - INSERT INTO thread_memory (thread_id, owner_id, memory_md, memory_version) + INSERT INTO thread_memory (thread_id, owner_id, memory_json, memory_version) VALUES (%s, %s, %s, 0) ON DUPLICATE KEY UPDATE thread_id = thread_id """, ( thread_id, owner_id, - _memory_to_markdown(data), + json.dumps(data, ensure_ascii=False), ), ) if cur.rowcount == 1: @@ -239,12 +213,12 @@ class MysqlThreadMemoryStorage(ThreadMemoryStorage): cur.execute( """ UPDATE thread_memory - SET owner_id = %s, memory_md = %s, memory_version = memory_version + 1 + SET owner_id = %s, memory_json = %s, memory_version = memory_version + 1 WHERE thread_id = %s AND memory_version = %s """, ( owner_id, - _memory_to_markdown(data), + json.dumps(data, ensure_ascii=False), thread_id, expected_version, ), @@ -258,6 +232,13 @@ class MysqlThreadMemoryStorage(ThreadMemoryStorage): self._conn.commit() return True + def count_legacy_rows(self) -> int: + return 0 + + def backfill_legacy_rows(self, *, limit: int | None = None) -> dict[str, int]: + _ = limit + return {"scanned": 0, "updated": 0, "skipped": 0, "failed": 0} + _thread_storage: ThreadMemoryStorage | None = None _thread_storage_lock = threading.Lock() diff --git a/backend/tests/test_thread_memory_storage.py b/backend/tests/test_thread_memory_storage.py index 70903b5a..a090a2b1 100644 --- a/backend/tests/test_thread_memory_storage.py +++ b/backend/tests/test_thread_memory_storage.py @@ -38,7 +38,7 @@ def test_sqlite_thread_memory_compare_and_swap(tmp_path): assert loaded2["memoryVersion"] == 1 -def test_sqlite_thread_memory_saves_markdown_payload(tmp_path): +def test_sqlite_thread_memory_saves_json_payload(tmp_path): db_path = tmp_path / "thread-memory.db" storage = SqliteThreadMemoryStorage(str(db_path)) thread_id = "thread-md" @@ -46,15 +46,14 @@ def test_sqlite_thread_memory_saves_markdown_payload(tmp_path): assert storage.save(thread_id, _payload(), expected_version=0) is True with storage._lock: - row = storage._conn.execute("SELECT memory_md FROM thread_memory WHERE thread_id = ?", (thread_id,)).fetchone() + row = storage._conn.execute("SELECT memory_json FROM thread_memory WHERE thread_id = ?", (thread_id,)).fetchone() assert row is not None assert isinstance(row[0], str) - assert "## User" in row[0] - assert "## History" in row[0] - assert "## Facts" in row[0] + parsed = json.loads(row[0]) + assert parsed["user"]["workContext"]["summary"] == "Frontend engineer" -def test_sqlite_thread_memory_loads_markdown_row(tmp_path): +def test_sqlite_thread_memory_uses_owner_id_column_when_json_missing_owner(tmp_path): db_path = tmp_path / "thread-memory.db" storage = SqliteThreadMemoryStorage(str(db_path)) thread_id = "thread-load" @@ -63,20 +62,19 @@ def test_sqlite_thread_memory_loads_markdown_row(tmp_path): with storage._lock: storage._conn.execute( """ - INSERT INTO thread_memory (thread_id, owner_id, memory_md, memory_version, last_updated) + INSERT INTO thread_memory (thread_id, owner_id, memory_json, memory_version, last_updated) VALUES (?, ?, ?, 0, datetime('now')) """, ( thread_id, "owner-1", - ( - "# Thread Memory\n\n" - "Owner ID: owner-1\n\n" - "## User\n```json\n" - + json.dumps(payload["user"], ensure_ascii=False, indent=2) - + "\n```\n\n## History\n```json\n" - + json.dumps(payload["history"], ensure_ascii=False, indent=2) - + "\n```\n\n## Facts\n```json\n[]\n```" + json.dumps( + { + "user": payload["user"], + "history": payload["history"], + "facts": [], + }, + ensure_ascii=False, ), ), ) @@ -86,3 +84,16 @@ def test_sqlite_thread_memory_loads_markdown_row(tmp_path): assert loaded is not None assert loaded["ownerId"] == "owner-1" assert loaded["user"]["workContext"]["summary"] == "Frontend engineer" + assert loaded["facts"] == [] + + +def test_sqlite_thread_memory_backfill_is_noop_after_migration(tmp_path): + db_path = tmp_path / "thread-memory.db" + storage = SqliteThreadMemoryStorage(str(db_path)) + + assert storage.count_legacy_rows() == 0 + stats = storage.backfill_legacy_rows() + assert stats["scanned"] == 0 + assert stats["updated"] == 0 + assert stats["failed"] == 0 + assert storage.count_legacy_rows() == 0