Compare commits

...

17 Commits

Author SHA1 Message Date
97cd19edce chore:快捷Skill按钮维护 2026-05-18 14:50:25 +08:00
75d9a595bd dev:版本推进 2026-05-15 16:38:48 +08:00
71c0ece6a0 chore: 快捷Skill按钮维护 2026-05-15 16:38:01 +08:00
0bbd3ec3f7 chore:隐藏管理记忆的入口 2026-05-13 16:33:11 +08:00
fc979b6841 feat: 本地清理旧用户遗留的localstorage 2026-05-13 16:29:34 +08:00
c5847d3222 feat(ThreadMemoryPanel): 新增会话记忆下拉面板并完成 i18n 接入 2026-05-09 11:14:36 +08:00
5736a2520c fix: 修复图标属性名为小驼峰 2026-05-09 10:25:26 +08:00
adf1f011f2 refactor(memory): 切换线程记忆为纯 memory_json 存储
移除 thread_memory 对 memory_md/Markdown 解析的运行时依赖,仅保留 memory_json 读写路径。\n同步更新 SQLite/MySQL 存储实现与测试基线,并补充迁移文档的最终状态说明。
2026-05-09 10:22:44 +08:00
a629f8855a fix(workspace): 修复复制消息时误带隐藏上下文内容 2026-05-08 17:09:00 +08:00
f1ffdf9ed8 feat: 使用大模型美观输出,等待用户输入之后,大模型输出规范json,再反序列化存入数据库。 2026-05-08 14:33:09 +08:00
b574316e39 fix(thread-memory): 修复语言识别与队列健壮性 2026-05-08 11:45:56 +08:00
6829d41895 feat: 对齐df的注入模式 2026-05-08 11:31:07 +08:00
7b7ba7698e feat:写入跟用户相同的语言的记忆 2026-05-08 11:26:08 +08:00
f0fe2d63c3 feat: 数据结构向df的memory.json对齐 2026-05-08 11:10:26 +08:00
ebd22a1a55 feat: 增加MD列 2026-05-08 10:46:43 +08:00
d6bba71524 feat:json会话记忆 2026-05-08 10:19:09 +08:00
5592f81c2b feat: 工具调用的description使用用户的语言 2026-05-07 17:27:56 +08:00
34 changed files with 2878 additions and 65 deletions

View File

@ -21,7 +21,13 @@ from fastapi import APIRouter, HTTPException, Request
from pydantic import BaseModel, Field
from app.gateway.deps import get_checkpointer, get_store
from deerflow.agents.memory.thread_summary import (
ThreadMemoryConflictError,
apply_thread_memory_summary,
render_thread_memory_summary,
)
from deerflow.config.paths import Paths, get_paths
from deerflow.agents.memory.thread_storage import delete_thread_memory_data
from deerflow.runtime import serialize_channel_values
# ---------------------------------------------------------------------------
@ -121,6 +127,27 @@ class ThreadHistoryRequest(BaseModel):
before: str | None = Field(default=None, description="Cursor for pagination")
class ThreadMemorySummaryResponse(BaseModel):
threadId: str
memoryVersion: int
summary: str
class ThreadMemorySummaryUpdateRequest(BaseModel):
summary: str = Field(..., min_length=1, description="User-edited natural language memory summary")
memoryVersion: int = Field(..., ge=0, description="Expected memory version for CAS update")
class ThreadMemoryRecordResponse(BaseModel):
threadId: str
ownerId: str | None = None
user: dict[str, Any] = Field(default_factory=dict)
history: dict[str, Any] = Field(default_factory=dict)
facts: list[dict[str, Any]] = Field(default_factory=list)
memoryVersion: int = 0
lastUpdated: str = ""
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
@ -244,6 +271,17 @@ async def delete_thread_data(thread_id: str, request: Request) -> ThreadDeleteRe
return response
@router.delete("/{thread_id}/memory", response_model=ThreadDeleteResponse)
async def delete_thread_memory(thread_id: str) -> ThreadDeleteResponse:
"""Delete per-thread memory only (explicit trigger)."""
try:
delete_thread_memory_data(thread_id)
except Exception as exc:
logger.exception("Failed to delete thread memory for %s", thread_id)
raise HTTPException(status_code=500, detail="Failed to delete thread memory.") from exc
return ThreadDeleteResponse(success=True, message=f"Deleted thread memory for {thread_id}")
@router.post("", response_model=ThreadResponse)
async def create_thread(body: ThreadCreateRequest, request: Request) -> ThreadResponse:
"""Create a new thread.
@ -680,3 +718,27 @@ async def get_thread_history(thread_id: str, body: ThreadHistoryRequest, request
raise HTTPException(status_code=500, detail="Failed to get thread history")
return entries
@router.get("/{thread_id}/memory-summary", response_model=ThreadMemorySummaryResponse)
async def get_thread_memory_summary(thread_id: str) -> ThreadMemorySummaryResponse:
"""Render per-thread memory as human-readable text for user inspection/editing."""
try:
payload = render_thread_memory_summary(thread_id)
except Exception as exc:
logger.exception("Failed to render thread memory summary for %s", thread_id)
raise HTTPException(status_code=500, detail="Failed to render thread memory summary.") from exc
return ThreadMemorySummaryResponse(**payload)
@router.post("/{thread_id}/memory-summary", response_model=ThreadMemoryRecordResponse)
async def update_thread_memory_summary(thread_id: str, body: ThreadMemorySummaryUpdateRequest) -> ThreadMemoryRecordResponse:
"""Apply edited natural-language summary back into structured thread memory."""
try:
payload = apply_thread_memory_summary(thread_id, body.summary, body.memoryVersion)
except ThreadMemoryConflictError as exc:
raise HTTPException(status_code=409, detail="Thread memory changed; refresh and retry.") from exc
except Exception as exc:
logger.exception("Failed to apply thread memory summary for %s", thread_id)
raise HTTPException(status_code=500, detail="Failed to apply thread memory summary.") from exc
return ThreadMemoryRecordResponse(**payload)

View File

@ -0,0 +1,65 @@
# Thread Memory Storage Migration: `memory_md` -> `memory_json`
## Summary
Per-thread memory now uses `thread_memory.memory_json` as the primary storage format.
- New writes persist structured JSON into `memory_json`.
- Reads prefer `memory_json`.
- Runtime no longer depends on `memory_md`.
## Why
`memory_md` stores structured state inside Markdown fenced blocks. This is readable for humans, but costly for:
- querying and analytics
- schema evolution
- migration reliability
`memory_json` keeps the same logical payload while making storage machine-friendly.
## Runtime behavior
- Read path uses `memory_json` only.
- Write path uses `memory_json` only.
## Auto migration behavior
- SQLite: on startup, adds `memory_json` column when missing.
- MySQL: on startup, adds `memory_json` column when missing.
No destructive migration is required for existing data.
## One-shot operational backfill (legacy command)
For faster cleanup in production, run:
```bash
cd backend
UV_CACHE_DIR=/tmp/uv-cache uv run python scripts/backfill_thread_memory_json.py --dry-run
UV_CACHE_DIR=/tmp/uv-cache uv run python scripts/backfill_thread_memory_json.py
```
Current codebase keeps this command for compatibility. In fully migrated environments it returns zero legacy rows.
## Final cleanup: drop `memory_md` column
After confirming all environments are migrated, run:
```bash
cd backend
UV_CACHE_DIR=/tmp/uv-cache uv run python scripts/drop_thread_memory_md_column.py --dry-run
UV_CACHE_DIR=/tmp/uv-cache uv run python scripts/drop_thread_memory_md_column.py
```
Notes:
- SQLite migration rebuilds `thread_memory` table and preserves data.
- MySQL migration runs `ALTER TABLE ... DROP COLUMN memory_md`.
## Follow-up (optional)
After all active environments have fully migrated and no legacy rows remain:
1. backfill any remaining rows that still rely on `memory_md`
2. remove `memory_md` column from schema
3. remove Markdown parsing fallback code

View File

@ -391,9 +391,34 @@ def _get_memory_context(agent_name: str | None = None) -> str:
"""
try:
from deerflow.agents.memory import format_memory_for_injection, get_memory_data
from deerflow.agents.memory.thread_prompt import format_thread_memory_for_injection
from deerflow.agents.memory.thread_storage import get_thread_memory_data
from deerflow.config.memory_config import get_memory_config
from deerflow.config.thread_memory_config import get_thread_memory_config
from langgraph.config import get_config
config = get_memory_config()
thread_config = get_thread_memory_config()
config_data = get_config()
thread_id = config_data.get("configurable", {}).get("thread_id")
if thread_config.enabled and thread_config.injection_enabled and thread_id:
thread_memory = get_thread_memory_data(thread_id)
if thread_memory is not None:
thread_content = format_thread_memory_for_injection(
{
"user": thread_memory.get("user", {}),
"history": thread_memory.get("history", {}),
"facts": thread_memory.get("facts", []),
},
max_tokens=thread_config.max_injection_tokens,
)
if thread_content.strip():
return f"""<memory>
{thread_content}
</memory>
"""
if not config.enabled or not config.injection_enabled:
return ""

View File

@ -0,0 +1,143 @@
"""Prompt and formatting helpers for per-thread memory."""
from __future__ import annotations
import json
import re
from typing import Any
from langchain_core.messages import HumanMessage
from deerflow.agents.memory.prompt import format_conversation_for_update, format_memory_for_injection
THREAD_MEMORY_UPDATE_PROMPT = """You are a user profile memory system.
Current per-thread memory:
<existing_memory>
{existing_memory}
</existing_memory>
Conversation:
<conversation>
{conversation}
</conversation>
Preferred memory language: {preferred_language}
Return JSON only with this schema:
{{
"user": {{
"workContext": {{"summary": string, "updatedAt": string}},
"personalContext": {{"summary": string, "updatedAt": string}},
"topOfMind": {{"summary": string, "updatedAt": string}}
}},
"history": {{
"recentMonths": {{"summary": string, "updatedAt": string}},
"earlierContext": {{"summary": string, "updatedAt": string}},
"longTermBackground": {{"summary": string, "updatedAt": string}}
}},
"facts": [
{{
"content": string,
"category": "tech_stack"|"preference"|"personal"|"context"|"goal",
"confidence": number
}}
]
}}
Rules:
- Keep only stable and useful user profile facts.
- Do not store sensitive personal data (phone/email/address/password/token/id/bank).
- Deduplicate and keep high-confidence facts.
- Write all human-readable text fields (`summary`, `content`, and similar prose) in the preferred memory language.
- Return valid JSON only.
"""
def create_empty_thread_memory() -> dict[str, Any]:
return {
"user": {
"workContext": {"summary": "", "updatedAt": ""},
"personalContext": {"summary": "", "updatedAt": ""},
"topOfMind": {"summary": "", "updatedAt": ""},
},
"history": {
"recentMonths": {"summary": "", "updatedAt": ""},
"earlierContext": {"summary": "", "updatedAt": ""},
"longTermBackground": {"summary": "", "updatedAt": ""},
},
"facts": [],
}
def _extract_human_text(content: Any) -> str:
if isinstance(content, str):
return content.strip()
if isinstance(content, list):
chunks: list[str] = []
for item in content:
if isinstance(item, str):
stripped = item.strip()
if stripped:
chunks.append(stripped)
elif isinstance(item, dict):
text_val = item.get("text")
if isinstance(text_val, str):
stripped = text_val.strip()
if stripped:
chunks.append(stripped)
return "\n".join(chunks).strip()
return ""
def _infer_preferred_memory_language(messages: list[Any]) -> str:
user_texts: list[str] = []
for msg in messages:
if isinstance(msg, HumanMessage):
extracted = _extract_human_text(getattr(msg, "content", None))
if extracted:
user_texts.append(extracted)
if not user_texts:
return "same as the user's latest message"
# Prioritize the latest user message; fallback to a short recent window.
recent_window = user_texts[-3:]
language_sample = "\n".join(recent_window)
# If user explicitly provides locale hints, prefer them.
locale_match = re.search(r"\b([a-z]{2}-[A-Z]{2})\b", language_sample)
if locale_match:
return locale_match.group(1)
# Script-based heuristic (dynamic, not hard-coded to two languages).
script_patterns = {
"zh-Hans": r"[\u4e00-\u9fff]",
"ja-JP": r"[\u3040-\u30ff]",
"ko-KR": r"[\uac00-\ud7af]",
"ru-RU": r"[\u0400-\u04FF]",
"ar": r"[\u0600-\u06FF]",
"hi-IN": r"[\u0900-\u097F]",
"th-TH": r"[\u0E00-\u0E7F]",
"he-IL": r"[\u0590-\u05FF]",
"el-GR": r"[\u0370-\u03FF]",
}
counts = {lang: len(re.findall(pattern, language_sample)) for lang, pattern in script_patterns.items()}
best_lang, best_count = max(counts.items(), key=lambda item: item[1])
if best_count > 0:
return best_lang
# Latin-script fallback: ask model to keep same language as the user's latest message.
return "same as the user's latest message"
def format_thread_memory_for_injection(memory_data: dict[str, Any], max_tokens: int = 2000) -> str:
return format_memory_for_injection(memory_data, max_tokens=max_tokens)
def build_thread_memory_prompt(existing_memory: dict[str, Any], messages: list[Any]) -> str:
return THREAD_MEMORY_UPDATE_PROMPT.format(
existing_memory=json.dumps(existing_memory, ensure_ascii=False, indent=2),
conversation=format_conversation_for_update(messages),
preferred_language=_infer_preferred_memory_language(messages),
)

View File

@ -0,0 +1,76 @@
"""Debounced queue for per-thread memory updates."""
from __future__ import annotations
import threading
from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import Any
from deerflow.config.thread_memory_config import get_thread_memory_config
@dataclass
class ThreadConversationContext:
thread_id: str
messages: list[Any]
timestamp: datetime = field(default_factory=lambda: datetime.now(UTC))
class ThreadMemoryUpdateQueue:
def __init__(self):
self._queue_by_thread: dict[str, ThreadConversationContext] = {}
self._lock = threading.Lock()
self._timers: dict[str, threading.Timer] = {}
self._processing_threads: set[str] = set()
def add(self, thread_id: str, messages: list[Any]) -> None:
config = get_thread_memory_config()
if not config.enabled:
return
with self._lock:
self._queue_by_thread[thread_id] = ThreadConversationContext(thread_id=thread_id, messages=messages)
self._reset_timer(thread_id)
def _reset_timer(self, thread_id: str) -> None:
config = get_thread_memory_config()
timer = self._timers.get(thread_id)
if timer is not None:
timer.cancel()
timer = threading.Timer(config.debounce_seconds, self._process_thread, args=(thread_id,))
timer.daemon = True
self._timers[thread_id] = timer
timer.start()
def _process_thread(self, thread_id: str) -> None:
from deerflow.agents.memory.thread_updater import ThreadMemoryUpdater
with self._lock:
if thread_id in self._processing_threads:
self._reset_timer(thread_id)
return
context = self._queue_by_thread.pop(thread_id, None)
if context is None:
self._timers.pop(thread_id, None)
return
self._processing_threads.add(thread_id)
self._timers.pop(thread_id, None)
try:
updater = ThreadMemoryUpdater()
updater.update_memory(context.messages, context.thread_id)
finally:
with self._lock:
self._processing_threads.discard(thread_id)
_thread_queue: ThreadMemoryUpdateQueue | None = None
_lock = threading.Lock()
def get_thread_memory_queue() -> ThreadMemoryUpdateQueue:
global _thread_queue
with _lock:
if _thread_queue is None:
_thread_queue = ThreadMemoryUpdateQueue()
return _thread_queue

View File

@ -0,0 +1,279 @@
"""Storage providers for per-thread memory."""
from __future__ import annotations
import abc
import json
import logging
import sqlite3
import threading
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from deerflow.agents.memory.thread_prompt import create_empty_thread_memory
from deerflow.config.paths import get_paths
from deerflow.config.thread_memory_config import get_thread_memory_config
logger = logging.getLogger(__name__)
class ThreadMemoryStorage(abc.ABC):
@abc.abstractmethod
def load(self, thread_id: str) -> dict[str, Any] | None:
pass
@abc.abstractmethod
def save(self, thread_id: str, data: dict[str, Any], expected_version: int | None = None) -> bool:
pass
@abc.abstractmethod
def delete(self, thread_id: str) -> bool:
pass
def _row_to_memory(row: tuple[Any, ...]) -> dict[str, Any]:
thread_id, owner_id_col, memory_json_raw, memory_version, last_updated = row
decoded: dict[str, Any] = {}
if isinstance(memory_json_raw, str) and memory_json_raw.strip():
try:
parsed_json = json.loads(memory_json_raw)
if isinstance(parsed_json, dict):
decoded = parsed_json
except Exception:
decoded = {}
owner_id = decoded.get("ownerId")
if owner_id is None:
owner_id = owner_id_col
user = decoded.get("user", create_empty_thread_memory()["user"])
history = decoded.get("history", create_empty_thread_memory()["history"])
facts = decoded.get("facts", [])
return {
"threadId": thread_id,
"ownerId": owner_id,
"user": user,
"history": history,
"facts": facts,
"memoryVersion": int(memory_version),
"lastUpdated": str(last_updated),
}
class SqliteThreadMemoryStorage(ThreadMemoryStorage):
def __init__(self, db_path: str):
path = Path(db_path)
if not path.is_absolute():
path = get_paths().base_dir / path
path.parent.mkdir(parents=True, exist_ok=True)
self._conn = sqlite3.connect(str(path), check_same_thread=False)
self._lock = threading.Lock()
with self._lock:
self._conn.execute(
"""
CREATE TABLE IF NOT EXISTS thread_memory (
thread_id TEXT PRIMARY KEY,
owner_id TEXT NULL,
memory_json TEXT NOT NULL DEFAULT '',
memory_version INTEGER NOT NULL DEFAULT 0,
last_updated TEXT NOT NULL DEFAULT (datetime('now'))
)
"""
)
self._ensure_memory_json_column()
self._conn.execute("CREATE INDEX IF NOT EXISTS idx_thread_memory_owner_id ON thread_memory(owner_id)")
self._conn.commit()
def _ensure_memory_json_column(self) -> None:
columns = self._conn.execute("PRAGMA table_info(thread_memory)").fetchall()
has_memory_json = any(col[1] == "memory_json" for col in columns)
if not has_memory_json:
self._conn.execute("ALTER TABLE thread_memory ADD COLUMN memory_json TEXT NOT NULL DEFAULT ''")
def load(self, thread_id: str) -> dict[str, Any] | None:
with self._lock:
row = self._conn.execute(
"SELECT thread_id, owner_id, memory_json, memory_version, last_updated "
"FROM thread_memory WHERE thread_id = ?",
(thread_id,),
).fetchone()
if row is None:
return None
return _row_to_memory(row)
def save(self, thread_id: str, data: dict[str, Any], expected_version: int | None = None) -> bool:
now = datetime.now(UTC).isoformat().replace("+00:00", "Z")
owner_id = data.get("ownerId")
if expected_version is None:
expected_version = 0
with self._lock:
cur = self._conn.execute(
"""
INSERT INTO thread_memory (thread_id, owner_id, memory_json, memory_version, last_updated)
VALUES (?, ?, ?, 0, ?)
ON CONFLICT(thread_id) DO NOTHING
""",
(
thread_id,
owner_id,
json.dumps(data, ensure_ascii=False),
now,
),
)
if cur.rowcount == 1:
self._conn.commit()
return True
cur = self._conn.execute(
"""
UPDATE thread_memory
SET owner_id = ?, memory_json = ?, memory_version = memory_version + 1, last_updated = ?
WHERE thread_id = ? AND memory_version = ?
""",
(
owner_id,
json.dumps(data, ensure_ascii=False),
now,
thread_id,
expected_version,
),
)
self._conn.commit()
return cur.rowcount == 1
def delete(self, thread_id: str) -> bool:
with self._lock:
self._conn.execute("DELETE FROM thread_memory WHERE thread_id = ?", (thread_id,))
self._conn.commit()
return True
def count_legacy_rows(self) -> int:
return 0
def backfill_legacy_rows(self, *, limit: int | None = None) -> dict[str, int]:
_ = limit
return {"scanned": 0, "updated": 0, "skipped": 0, "failed": 0}
class MysqlThreadMemoryStorage(ThreadMemoryStorage):
def __init__(self, host: str, port: int, user: str, password: str, database: str):
import pymysql
self._conn = pymysql.connect(host=host, port=port, user=user, password=password, database=database, charset="utf8mb4")
with self._conn.cursor() as cur:
cur.execute(
"""
CREATE TABLE IF NOT EXISTS thread_memory (
thread_id VARCHAR(64) PRIMARY KEY,
owner_id VARCHAR(64) NULL,
memory_json LONGTEXT NOT NULL,
memory_version INT NOT NULL DEFAULT 0,
last_updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_owner_id (owner_id)
)
"""
)
cur.execute("SHOW COLUMNS FROM thread_memory LIKE 'memory_json'")
if cur.fetchone() is None:
cur.execute("ALTER TABLE thread_memory ADD COLUMN memory_json LONGTEXT NOT NULL DEFAULT ''")
self._conn.commit()
def load(self, thread_id: str) -> dict[str, Any] | None:
with self._conn.cursor() as cur:
cur.execute(
"SELECT thread_id, owner_id, memory_json, memory_version, last_updated FROM thread_memory WHERE thread_id = %s",
(thread_id,),
)
row = cur.fetchone()
if row is None:
return None
return _row_to_memory(row)
def save(self, thread_id: str, data: dict[str, Any], expected_version: int | None = None) -> bool:
if expected_version is None:
expected_version = 0
owner_id = data.get("ownerId")
with self._conn.cursor() as cur:
cur.execute(
"""
INSERT INTO thread_memory (thread_id, owner_id, memory_json, memory_version)
VALUES (%s, %s, %s, 0)
ON DUPLICATE KEY UPDATE thread_id = thread_id
""",
(
thread_id,
owner_id,
json.dumps(data, ensure_ascii=False),
),
)
if cur.rowcount == 1:
self._conn.commit()
return True
cur.execute(
"""
UPDATE thread_memory
SET owner_id = %s, memory_json = %s, memory_version = memory_version + 1
WHERE thread_id = %s AND memory_version = %s
""",
(
owner_id,
json.dumps(data, ensure_ascii=False),
thread_id,
expected_version,
),
)
self._conn.commit()
return cur.rowcount == 1
def delete(self, thread_id: str) -> bool:
with self._conn.cursor() as cur:
cur.execute("DELETE FROM thread_memory WHERE thread_id = %s", (thread_id,))
self._conn.commit()
return True
def count_legacy_rows(self) -> int:
return 0
def backfill_legacy_rows(self, *, limit: int | None = None) -> dict[str, int]:
_ = limit
return {"scanned": 0, "updated": 0, "skipped": 0, "failed": 0}
_thread_storage: ThreadMemoryStorage | None = None
_thread_storage_lock = threading.Lock()
def get_thread_memory_storage() -> ThreadMemoryStorage:
global _thread_storage
if _thread_storage is not None:
return _thread_storage
with _thread_storage_lock:
if _thread_storage is not None:
return _thread_storage
config = get_thread_memory_config()
if config.database.type == "mysql":
mysql = config.database.mysql
_thread_storage = MysqlThreadMemoryStorage(
host=mysql.host,
port=mysql.port,
user=mysql.user,
password=mysql.password,
database=mysql.database,
)
else:
_thread_storage = SqliteThreadMemoryStorage(config.database.sqlite.path)
return _thread_storage
def get_thread_memory_data(thread_id: str) -> dict[str, Any] | None:
return get_thread_memory_storage().load(thread_id)
def delete_thread_memory_data(thread_id: str) -> bool:
return get_thread_memory_storage().delete(thread_id)
def initial_thread_memory_record() -> dict[str, Any]:
return {"ownerId": None, **create_empty_thread_memory()}

View File

@ -0,0 +1,300 @@
"""Thread memory summary generation and application helpers."""
from __future__ import annotations
import json
import logging
import re
import hashlib
from typing import Any
from deerflow.agents.memory.thread_prompt import create_empty_thread_memory
from deerflow.agents.memory.thread_storage import get_thread_memory_storage
from deerflow.agents.memory.thread_updater import ThreadMemoryUpdater
from deerflow.agents.memory.updater import _extract_text
from deerflow.config.thread_memory_config import get_thread_memory_config
from deerflow.models import create_chat_model
logger = logging.getLogger(__name__)
SUMMARY_RENDER_PROMPT = """You are an assistant that renders thread memory into natural language.
Thread memory JSON:
<memory_json>
{memory_json}
</memory_json>
Task:
- Output a concise, human-friendly editable profile summary.
- Keep the original language of the memory content where possible.
- Cover user profile, history, and key facts.
- Return plain text only (no markdown code fences).
"""
SUMMARY_PARSE_PROMPT = """You convert user-edited natural-language memory into a structured patch JSON.
Current thread memory JSON:
<current_memory_json>
{current_memory_json}
</current_memory_json>
Edited summary text:
<edited_summary>
{edited_summary}
</edited_summary>
Return JSON only with this schema (all fields optional):
{{
"user": {{
"workContext": {{"summary": string}},
"personalContext": {{"summary": string}},
"topOfMind": {{"summary": string}}
}},
"history": {{
"recentMonths": {{"summary": string}},
"earlierContext": {{"summary": string}},
"longTermBackground": {{"summary": string}}
}},
"facts": [
{{
"content": string,
"category": "preference"|"knowledge"|"context"|"behavior"|"goal"|"correction",
"confidence": number
}}
]
}}
"""
class ThreadMemoryConflictError(RuntimeError):
"""Raised when compare-and-swap save fails due to version mismatch."""
def _get_summary_model():
config = get_thread_memory_config()
return create_chat_model(name=config.model_name, thinking_enabled=False, stream_usage=False)
def _strip_code_fence(text: str) -> str:
cleaned = text.strip()
if not cleaned.startswith("```"):
return cleaned
lines = cleaned.split("\n")
return "\n".join(lines[1:-1] if lines[-1].strip() == "```" else lines[1:]).strip()
def _extract_json_object(text: str) -> dict[str, Any] | None:
cleaned = _strip_code_fence(text)
try:
parsed = json.loads(cleaned)
return parsed if isinstance(parsed, dict) else None
except json.JSONDecodeError:
repaired = _escape_inner_quotes_in_json_strings(cleaned)
if repaired != cleaned:
try:
parsed = json.loads(repaired)
if isinstance(parsed, dict):
logger.warning("THREAD_SUMMARY_DEBUG parse_repaired mode=full_text")
return parsed
except json.JSONDecodeError:
pass
match = re.search(r"\{.*\}", cleaned, flags=re.DOTALL)
if not match:
return None
try:
parsed = json.loads(match.group(0))
return parsed if isinstance(parsed, dict) else None
except json.JSONDecodeError:
candidate = match.group(0)
repaired = _escape_inner_quotes_in_json_strings(candidate)
if repaired != candidate:
try:
parsed = json.loads(repaired)
if isinstance(parsed, dict):
logger.warning("THREAD_SUMMARY_DEBUG parse_repaired mode=regex_object")
return parsed
except json.JSONDecodeError:
return None
return None
def _escape_inner_quotes_in_json_strings(text: str) -> str:
"""Heuristically repair unescaped inner double quotes inside JSON strings.
If a quote appears while inside a string but the next non-space character is
not a valid string terminator (comma, object/array close, or key colon), it is
treated as content and escaped.
"""
out: list[str] = []
in_string = False
escape = False
n = len(text)
i = 0
while i < n:
ch = text[i]
if not in_string:
out.append(ch)
if ch == '"':
in_string = True
i += 1
continue
if escape:
out.append(ch)
escape = False
i += 1
continue
if ch == "\\":
out.append(ch)
escape = True
i += 1
continue
if ch == '"':
j = i + 1
while j < n and text[j].isspace():
j += 1
next_char = text[j] if j < n else ""
# Valid JSON string terminators in context:
# - key string: :
# - value string: , } ]
if next_char in {":", ",", "}", "]", ""}:
out.append(ch)
in_string = False
else:
out.append('\\"')
i += 1
continue
out.append(ch)
i += 1
return "".join(out)
def _merge_summary_patch(base: dict[str, Any], patch: dict[str, Any]) -> dict[str, Any]:
merged = {"ownerId": base.get("ownerId"), **create_empty_thread_memory()}
merged["user"] = dict(base.get("user", {})) if isinstance(base.get("user"), dict) else merged["user"]
merged["history"] = dict(base.get("history", {})) if isinstance(base.get("history"), dict) else merged["history"]
merged["facts"] = list(base.get("facts", [])) if isinstance(base.get("facts"), list) else []
for section_name in ("user", "history"):
section_patch = patch.get(section_name, {})
if not isinstance(section_patch, dict):
continue
for key, value in section_patch.items():
if key not in merged[section_name] or not isinstance(value, dict):
continue
summary = value.get("summary")
if isinstance(summary, str):
merged[section_name][key]["summary"] = summary
facts_patch = patch.get("facts")
if isinstance(facts_patch, list):
merged["facts"] = facts_patch
return merged
def render_thread_memory_summary(thread_id: str) -> dict[str, Any]:
storage = get_thread_memory_storage()
current = storage.load(thread_id)
memory = {"ownerId": None, **create_empty_thread_memory()} if current is None else current
memory_payload = {
"user": memory.get("user", {}),
"history": memory.get("history", {}),
"facts": memory.get("facts", []),
}
prompt = SUMMARY_RENDER_PROMPT.format(memory_json=json.dumps(memory_payload, ensure_ascii=False, indent=2))
response = _get_summary_model().invoke(prompt)
text = _strip_code_fence(_extract_text(response.content))
return {
"threadId": thread_id,
"memoryVersion": int(memory.get("memoryVersion", 0)),
"summary": text,
}
def apply_thread_memory_summary(thread_id: str, edited_summary: str, expected_version: int) -> dict[str, Any]:
storage = get_thread_memory_storage()
current = storage.load(thread_id)
base = {"ownerId": None, **create_empty_thread_memory()} if current is None else current
memory_payload = {
"user": base.get("user", {}),
"history": base.get("history", {}),
"facts": base.get("facts", []),
}
prompt = SUMMARY_PARSE_PROMPT.format(
current_memory_json=json.dumps(memory_payload, ensure_ascii=False, indent=2),
edited_summary=edited_summary,
)
response = _get_summary_model().invoke(prompt)
raw = _extract_text(response.content)
raw_hash = hashlib.sha256(raw.encode("utf-8")).hexdigest()
logger.warning(
"THREAD_SUMMARY_DEBUG parse_raw_meta thread=%s raw_length=%d raw_sha256=%s",
thread_id,
len(raw),
raw_hash,
)
patch = _extract_json_object(raw)
if patch is None:
cleaned = _strip_code_fence(raw)
decode_error = None
try:
json.loads(cleaned)
except json.JSONDecodeError as exc:
decode_error = exc
if decode_error is not None:
logger.warning(
"THREAD_SUMMARY_DEBUG parse_error thread=%s msg=%s line=%d col=%d pos=%d snippet=%r",
thread_id,
decode_error.msg,
decode_error.lineno,
decode_error.colno,
decode_error.pos,
cleaned[max(0, decode_error.pos - 80): decode_error.pos + 80],
)
else:
logger.warning(
"THREAD_SUMMARY_DEBUG parse_error thread=%s msg=no_json_object_extracted raw_head=%r",
thread_id,
cleaned[:200],
)
logger.warning("THREAD_SUMMARY_DEBUG parse_fallback thread=%s", thread_id)
patch = {
"user": {
"topOfMind": {
"summary": edited_summary.strip(),
}
}
}
else:
logger.warning(
"THREAD_SUMMARY_DEBUG parse_success thread=%s patch=%s",
thread_id,
json.dumps(patch, ensure_ascii=False)[:2000],
)
merged = _merge_summary_patch(base, patch if isinstance(patch, dict) else {})
cleaned = ThreadMemoryUpdater()._scrub_sensitive(merged, thread_id)
cleaned["ownerId"] = base.get("ownerId")
logger.warning(
"THREAD_SUMMARY_DEBUG apply_cleaned thread=%s cleaned=%s",
thread_id,
json.dumps(
{
"user": cleaned.get("user", {}),
"history": cleaned.get("history", {}),
"facts_count": len(cleaned.get("facts", []) if isinstance(cleaned.get("facts"), list) else []),
},
ensure_ascii=False,
)[:2000],
)
if not storage.save(thread_id, cleaned, expected_version=expected_version):
raise ThreadMemoryConflictError(f"Thread memory version conflict for {thread_id}")
latest = storage.load(thread_id)
return latest if latest is not None else {"threadId": thread_id, "memoryVersion": expected_version, **cleaned}

View File

@ -0,0 +1,148 @@
"""Per-thread memory updater."""
from __future__ import annotations
import json
import logging
import re
import uuid
from datetime import UTC, datetime
from typing import Any
from deerflow.agents.memory.updater import _extract_text
from deerflow.agents.memory.thread_prompt import build_thread_memory_prompt, create_empty_thread_memory
from deerflow.agents.memory.thread_storage import get_thread_memory_storage
from deerflow.config.thread_memory_config import get_thread_memory_config
from deerflow.models import create_chat_model
logger = logging.getLogger(__name__)
_SENSITIVE_PATTERNS = (
re.compile(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b"),
re.compile(r"\b(?:\+?\d[\d -]{7,}\d)\b"),
re.compile(r"\b(?:api[_-]?key|token|password|passwd|secret)\b", re.IGNORECASE),
re.compile(r"\b\d{15,19}\b"), # bank-card like
)
class ThreadMemoryUpdater:
def __init__(self, model_name: str | None = None):
self._model_name = model_name
def _get_model(self):
config = get_thread_memory_config()
# Non-stream invoke path: some OpenAI-compatible gateways reject
# stream_options when stream=false, so force stream_usage off here.
return create_chat_model(
name=self._model_name or config.model_name,
thinking_enabled=False,
stream_usage=False,
)
def _scrub_sensitive(self, data: dict[str, Any], thread_id: str) -> dict[str, Any]:
def safe_confidence(val: Any, default: float = 0.5) -> float:
try:
parsed = float(val)
except (TypeError, ValueError):
return default
return max(0.0, min(1.0, parsed))
def safe_text(val: Any) -> str | None:
if not isinstance(val, str):
return None
text = val.strip()
if not text:
return None
if any(p.search(text) for p in _SENSITIVE_PATTERNS):
logger.info("thread_memory sensitive value dropped for thread=%s", thread_id)
return None
return text
user = data.get("user", {})
history = data.get("history", {})
facts = data.get("facts", [])
cleaned = create_empty_thread_memory()
def copy_summary_section(target_parent: dict[str, Any], target_key: str, source_parent: Any):
if not isinstance(source_parent, dict):
return
source_section = source_parent.get(target_key)
if not isinstance(source_section, dict):
return
summary = safe_text(source_section.get("summary"))
updated_at = safe_text(source_section.get("updatedAt"))
if summary:
target_parent[target_key]["summary"] = summary
if updated_at:
target_parent[target_key]["updatedAt"] = updated_at
elif summary:
target_parent[target_key]["updatedAt"] = datetime.now(UTC).isoformat().replace("+00:00", "Z")
copy_summary_section(cleaned["user"], "workContext", user)
copy_summary_section(cleaned["user"], "personalContext", user)
copy_summary_section(cleaned["user"], "topOfMind", user)
copy_summary_section(cleaned["history"], "recentMonths", history)
copy_summary_section(cleaned["history"], "earlierContext", history)
copy_summary_section(cleaned["history"], "longTermBackground", history)
seen: set[str] = set()
for fact in facts if isinstance(facts, list) else []:
if not isinstance(fact, dict):
continue
content = safe_text(fact.get("content"))
if not content:
continue
key = content.casefold()
if key in seen:
continue
seen.add(key)
confidence = safe_confidence(fact.get("confidence", 0.5))
cleaned["facts"].append(
{
"id": f"fact_{uuid.uuid4().hex[:8]}",
"content": content,
"category": str(fact.get("category", "context")).strip() or "context",
"confidence": confidence,
"createdAt": datetime.now(UTC).isoformat().replace("+00:00", "Z"),
"source": thread_id,
}
)
return cleaned
def update_memory(self, messages: list[Any], thread_id: str) -> bool:
config = get_thread_memory_config()
if not config.enabled or not messages or not thread_id:
return False
storage = get_thread_memory_storage()
current = storage.load(thread_id)
base_memory = create_empty_thread_memory() if current is None else {
"user": current.get("user", {}),
"history": current.get("history", {}),
"facts": current.get("facts", []),
}
prompt = build_thread_memory_prompt(base_memory, messages)
if not prompt.strip():
return False
try:
response = self._get_model().invoke(prompt)
response_text = _extract_text(response.content).strip()
if response_text.startswith("```"):
lines = response_text.split("\n")
response_text = "\n".join(lines[1:-1] if lines[-1] == "```" else lines[1:])
parsed = json.loads(response_text)
cleaned = self._scrub_sensitive(parsed, thread_id)
expected_version = 0 if current is None else int(current.get("memoryVersion", 0))
if storage.save(thread_id, cleaned, expected_version=expected_version):
return True
# conflict retry once
latest = storage.load(thread_id)
latest_version = 0 if latest is None else int(latest.get("memoryVersion", 0))
logger.info("thread_memory conflict detected, retrying once: thread=%s version=%s", thread_id, latest_version)
return storage.save(thread_id, cleaned, expected_version=latest_version)
except Exception:
logger.exception("Thread memory update failed for thread=%s", thread_id)
return False

View File

@ -10,7 +10,9 @@ from langgraph.config import get_config
from langgraph.runtime import Runtime
from deerflow.agents.memory.queue import get_memory_queue
from deerflow.agents.memory.thread_queue import get_thread_memory_queue
from deerflow.config.memory_config import get_memory_config
from deerflow.config.thread_memory_config import get_thread_memory_config
logger = logging.getLogger(__name__)
@ -206,8 +208,9 @@ class MemoryMiddleware(AgentMiddleware[MemoryMiddlewareState]):
Returns:
None (no state changes needed from this middleware).
"""
config = get_memory_config()
if not config.enabled:
global_config = get_memory_config()
thread_config = get_thread_memory_config()
if not global_config.enabled and not thread_config.enabled:
return None
# Get thread ID from runtime context first, then fall back to LangGraph's configurable metadata
@ -239,13 +242,19 @@ class MemoryMiddleware(AgentMiddleware[MemoryMiddlewareState]):
# Queue the filtered conversation for memory update
correction_detected = detect_correction(filtered_messages)
reinforcement_detected = not correction_detected and detect_reinforcement(filtered_messages)
queue = get_memory_queue()
queue.add(
thread_id=thread_id,
messages=filtered_messages,
agent_name=self._agent_name,
correction_detected=correction_detected,
reinforcement_detected=reinforcement_detected,
)
if global_config.enabled:
queue = get_memory_queue()
queue.add(
thread_id=thread_id,
messages=filtered_messages,
agent_name=self._agent_name,
correction_detected=correction_detected,
reinforcement_detected=reinforcement_detected,
)
if thread_config.enabled:
get_thread_memory_queue().add(
thread_id=thread_id,
messages=filtered_messages,
)
return None

View File

@ -2,6 +2,7 @@ from .app_config import get_app_config
from .billing_config import BillingConfig
from .extensions_config import ExtensionsConfig, get_extensions_config
from .memory_config import MemoryConfig, get_memory_config
from .thread_memory_config import ThreadMemoryConfig, get_thread_memory_config
from .paths import Paths, get_paths
from .skills_config import SkillsConfig
from .tracing_config import (
@ -22,6 +23,8 @@ __all__ = [
"get_extensions_config",
"MemoryConfig",
"get_memory_config",
"ThreadMemoryConfig",
"get_thread_memory_config",
"get_tracing_config",
"get_explicitly_enabled_tracing_providers",
"get_enabled_tracing_providers",

View File

@ -25,6 +25,7 @@ from deerflow.config.title_config import TitleConfig, load_title_config_from_dic
from deerflow.config.token_usage_config import TokenUsageConfig
from deerflow.config.tool_config import ToolConfig, ToolGroupConfig
from deerflow.config.tool_search_config import ToolSearchConfig, load_tool_search_config_from_dict
from deerflow.config.thread_memory_config import ThreadMemoryConfig, load_thread_memory_config_from_dict
load_dotenv()
@ -55,6 +56,7 @@ class AppConfig(BaseModel):
title: TitleConfig = Field(default_factory=TitleConfig, description="Automatic title generation configuration")
summarization: SummarizationConfig = Field(default_factory=SummarizationConfig, description="Conversation summarization configuration")
memory: MemoryConfig = Field(default_factory=MemoryConfig, description="Memory subsystem configuration")
thread_memory: ThreadMemoryConfig = Field(default_factory=ThreadMemoryConfig, description="Per-thread memory subsystem configuration")
subagents: SubagentsAppConfig = Field(default_factory=SubagentsAppConfig, description="Subagent runtime configuration")
guardrails: GuardrailsConfig = Field(default_factory=GuardrailsConfig, description="Guardrail middleware configuration")
model_config = ConfigDict(extra="allow", frozen=False)
@ -118,6 +120,8 @@ class AppConfig(BaseModel):
# Load memory config if present
if "memory" in config_data:
load_memory_config_from_dict(config_data["memory"])
if "thread_memory" in config_data:
load_thread_memory_config_from_dict(config_data["thread_memory"])
# Load subagents config if present
if "subagents" in config_data:

View File

@ -0,0 +1,50 @@
"""Configuration for per-thread memory mechanism."""
from pydantic import BaseModel, Field
class ThreadMemorySqliteConfig(BaseModel):
path: str = Field(default="thread_memory.db", description="SQLite database file path")
class ThreadMemoryMysqlConfig(BaseModel):
host: str = Field(default="localhost")
port: int = Field(default=3306)
user: str = Field(default="root")
password: str = Field(default="")
database: str = Field(default="deerflow")
class ThreadMemoryDatabaseConfig(BaseModel):
type: str = Field(default="sqlite", description="Database type: sqlite or mysql")
sqlite: ThreadMemorySqliteConfig = Field(default_factory=ThreadMemorySqliteConfig)
mysql: ThreadMemoryMysqlConfig = Field(default_factory=ThreadMemoryMysqlConfig)
class ThreadMemoryConfig(BaseModel):
enabled: bool = Field(default=True)
debounce_seconds: int = Field(default=30, ge=1, le=300)
model_name: str | None = Field(default=None)
max_facts: int = Field(default=100, ge=10, le=500)
fact_confidence_threshold: float = Field(default=0.7, ge=0.0, le=1.0)
injection_enabled: bool = Field(default=True)
max_injection_tokens: int = Field(default=2000, ge=100, le=8000)
bootstrap_from_global: bool = Field(default=False)
database: ThreadMemoryDatabaseConfig = Field(default_factory=ThreadMemoryDatabaseConfig)
_thread_memory_config: ThreadMemoryConfig = ThreadMemoryConfig()
def get_thread_memory_config() -> ThreadMemoryConfig:
return _thread_memory_config
def set_thread_memory_config(config: ThreadMemoryConfig) -> None:
global _thread_memory_config
_thread_memory_config = config
def load_thread_memory_config_from_dict(config_dict: dict) -> None:
global _thread_memory_config
_thread_memory_config = ThreadMemoryConfig(**config_dict)

View File

@ -88,18 +88,24 @@ def create_chat_model(name: str | None = None, thinking_enabled: bool = False, *
if not has_stream_usage:
model_settings_from_config["stream_usage"] = True
effective_stream_usage = kwargs.get("stream_usage", model_settings_from_config.get("stream_usage"))
# Some OpenAI-compatible providers only return usage in streaming mode
# when stream_options.include_usage is explicitly enabled.
stream_options_source = "kwargs" if "stream_options" in kwargs else "config"
stream_options = kwargs.get("stream_options") if stream_options_source == "kwargs" else model_settings_from_config.get("stream_options")
if stream_options is None:
model_settings_from_config["stream_options"] = {"include_usage": True}
elif isinstance(stream_options, dict) and "include_usage" not in stream_options:
patched_stream_options = {**stream_options, "include_usage": True}
if stream_options_source == "kwargs":
kwargs["stream_options"] = patched_stream_options
else:
model_settings_from_config["stream_options"] = patched_stream_options
if effective_stream_usage:
stream_options_source = "kwargs" if "stream_options" in kwargs else "config"
stream_options = kwargs.get("stream_options") if stream_options_source == "kwargs" else model_settings_from_config.get("stream_options")
if stream_options is None:
model_settings_from_config["stream_options"] = {"include_usage": True}
elif isinstance(stream_options, dict) and "include_usage" not in stream_options:
patched_stream_options = {**stream_options, "include_usage": True}
if stream_options_source == "kwargs":
kwargs["stream_options"] = patched_stream_options
else:
model_settings_from_config["stream_options"] = patched_stream_options
else:
# Some OpenAI-compatible endpoints reject stream_options when stream is false.
model_settings_from_config.pop("stream_options", None)
kwargs.pop("stream_options", None)
except Exception:
# Keep model creation robust when langchain_openai isn't available.
pass

View File

@ -973,7 +973,7 @@ def bash_tool(runtime: ToolRuntime[ContextT, ThreadState], description: str, com
- Use `python -m pip` (inside the virtual environment) to install Python packages.
Args:
description: Explain why you are running this command in short words. ALWAYS PROVIDE THIS PARAMETER FIRST.
description: Explain why you are running this command in short words in the user's language. ALWAYS PROVIDE THIS PARAMETER FIRST.
command: The bash command to execute. Always use absolute paths for files and directories.
"""
try:
@ -1017,7 +1017,7 @@ def ls_tool(runtime: ToolRuntime[ContextT, ThreadState], description: str, path:
"""List the contents of a directory up to 2 levels deep in tree format.
Args:
description: Explain why you are listing this directory in short words. ALWAYS PROVIDE THIS PARAMETER FIRST.
description: Explain why you are listing this directory in short words in the user's language. ALWAYS PROVIDE THIS PARAMETER FIRST.
path: The **absolute** path to the directory to list.
"""
try:
@ -1060,7 +1060,7 @@ def glob_tool(
"""Find files or directories that match a glob pattern under a root directory.
Args:
description: Explain why you are searching for these paths in short words. ALWAYS PROVIDE THIS PARAMETER FIRST.
description: Explain why you are searching for these paths in short words in the user's language. ALWAYS PROVIDE THIS PARAMETER FIRST.
pattern: The glob pattern to match relative to the root path, for example `**/*.py`.
path: The **absolute** root directory to search under.
include_dirs: Whether matching directories should also be returned. Default is False.
@ -1112,7 +1112,7 @@ def grep_tool(
"""Search for matching lines inside text files under a root directory.
Args:
description: Explain why you are searching file contents in short words. ALWAYS PROVIDE THIS PARAMETER FIRST.
description: Explain why you are searching file contents in short words in the user's language. ALWAYS PROVIDE THIS PARAMETER FIRST.
pattern: The string or regex pattern to search for.
path: The **absolute** root directory to search under.
glob: Optional glob filter for candidate files, for example `**/*.py`.
@ -1179,7 +1179,7 @@ def read_file_tool(
"""Read the contents of a text file. Use this to examine source code, configuration files, logs, or any text-based file.
Args:
description: Explain why you are reading this file in short words. ALWAYS PROVIDE THIS PARAMETER FIRST.
description: Explain why you are reading this file in short words in the user's language. ALWAYS PROVIDE THIS PARAMETER FIRST.
path: The **absolute** path to the file to read.
start_line: Optional starting line number (1-indexed, inclusive). Use with end_line to read a specific range.
end_line: Optional ending line number (1-indexed, inclusive). Use with start_line to read a specific range.
@ -1234,7 +1234,7 @@ def write_file_tool(
"""Write text content to a file.
Args:
description: Explain why you are writing to this file in short words. ALWAYS PROVIDE THIS PARAMETER FIRST.
description: Explain why you are writing to this file in short words in the user's language. ALWAYS PROVIDE THIS PARAMETER FIRST.
path: The **absolute** path to the file to write to. ALWAYS PROVIDE THIS PARAMETER SECOND.
content: The content to write to the file. ALWAYS PROVIDE THIS PARAMETER THIRD.
"""
@ -1276,7 +1276,7 @@ def str_replace_tool(
If `replace_all` is False (default), the substring to replace must appear **exactly once** in the file.
Args:
description: Explain why you are replacing the substring in short words. ALWAYS PROVIDE THIS PARAMETER FIRST.
description: Explain why you are replacing the substring in short words in the user's language. ALWAYS PROVIDE THIS PARAMETER FIRST.
path: The **absolute** path to the file to replace the substring in. ALWAYS PROVIDE THIS PARAMETER SECOND.
old_str: The substring to replace. ALWAYS PROVIDE THIS PARAMETER THIRD.
new_str: The new substring. ALWAYS PROVIDE THIS PARAMETER FOURTH.

View File

@ -54,7 +54,7 @@ async def task_tool(
- Tasks requiring user interaction or clarification
Args:
description: A short (3-5 word) description of the task for logging/display. ALWAYS PROVIDE THIS PARAMETER FIRST.
description: A short (3-5 word) description of the task for logging/display, in the user's language. ALWAYS PROVIDE THIS PARAMETER FIRST.
prompt: The task description for the subagent. Be specific and clear about what needs to be done. ALWAYS PROVIDE THIS PARAMETER SECOND.
subagent_type: The type of subagent to use. ALWAYS PROVIDE THIS PARAMETER THIRD.
max_turns: Optional maximum number of agent turns. Defaults to subagent's configured max.

View File

@ -0,0 +1,31 @@
from types import SimpleNamespace
from unittest.mock import AsyncMock, patch
import pytest
from app.gateway.routers import threads
@pytest.mark.anyio
async def test_delete_thread_does_not_delete_thread_memory():
request = SimpleNamespace(app=SimpleNamespace(state=SimpleNamespace(checkpointer=None, store=None)))
with (
patch("app.gateway.routers.threads._delete_thread_data", return_value=threads.ThreadDeleteResponse(success=True, message="ok")),
patch("app.gateway.routers.threads.get_store", return_value=None),
patch("app.gateway.routers.threads.delete_thread_memory_data") as delete_memory,
):
response = await threads.delete_thread_data("thread-1", request)
assert response.success is True
delete_memory.assert_not_called()
@pytest.mark.anyio
async def test_delete_thread_memory_endpoint_calls_cleanup():
with patch("app.gateway.routers.threads.delete_thread_memory_data") as delete_memory:
response = await threads.delete_thread_memory("thread-1")
assert response.success is True
assert response.message == "Deleted thread memory for thread-1"
delete_memory.assert_called_once_with("thread-1")

View File

@ -0,0 +1,32 @@
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
from langchain_core.messages import AIMessage, HumanMessage
from deerflow.agents.middlewares.memory_middleware import MemoryMiddleware
from deerflow.config.memory_config import MemoryConfig
from deerflow.config.thread_memory_config import ThreadMemoryConfig
def test_thread_memory_queue_runs_even_if_global_memory_disabled():
middleware = MemoryMiddleware()
state = {"messages": [HumanMessage(content="My name is Alice"), AIMessage(content="Nice to meet you")]}
runtime = SimpleNamespace(context={"thread_id": "thread-test"})
mock_global_queue = MagicMock()
mock_thread_queue = MagicMock()
with (
patch("deerflow.agents.middlewares.memory_middleware.get_memory_config", return_value=MemoryConfig(enabled=False)),
patch(
"deerflow.agents.middlewares.memory_middleware.get_thread_memory_config",
return_value=ThreadMemoryConfig(enabled=True),
),
patch("deerflow.agents.middlewares.memory_middleware.get_memory_queue", return_value=mock_global_queue),
patch("deerflow.agents.middlewares.memory_middleware.get_thread_memory_queue", return_value=mock_thread_queue),
):
middleware.after_agent(state, runtime)
mock_global_queue.add.assert_not_called()
mock_thread_queue.add.assert_called_once()

View File

@ -0,0 +1,81 @@
from langchain_core.messages import AIMessage, HumanMessage
from deerflow.agents.memory.thread_prompt import build_thread_memory_prompt, format_thread_memory_for_injection
def test_thread_memory_injection_keeps_profile_and_preferences_under_small_budget(monkeypatch):
monkeypatch.setattr("deerflow.agents.memory.prompt._count_tokens", lambda text, encoding_name="cl100k_base": len(text))
memory = {
"user": {
"workContext": {"summary": "Building APIs", "updatedAt": "2026-05-08T00:00:00Z"},
"personalContext": {"summary": "Engineer using Python and React", "updatedAt": "2026-05-08T00:00:00Z"},
"topOfMind": {"summary": "Improving thread memory", "updatedAt": "2026-05-08T00:00:00Z"},
},
"history": {
"recentMonths": {"summary": "Shipped memory features", "updatedAt": "2026-05-08T00:00:00Z"},
"earlierContext": {"summary": "Started from TS projects", "updatedAt": "2026-05-08T00:00:00Z"},
"longTermBackground": {"summary": "Frontend developer", "updatedAt": "2026-05-08T00:00:00Z"},
},
"facts": [
{"content": "Fact one that might be trimmed", "category": "context", "confidence": 0.9},
{"content": "Fact two that might be trimmed", "category": "context", "confidence": 0.8},
],
}
result = format_thread_memory_for_injection(memory, max_tokens=140)
assert "User Context:" in result
assert "History:" in result
def test_build_thread_memory_prompt_does_not_raise_format_key_error():
prompt = build_thread_memory_prompt(
{"user": {}, "history": {}, "facts": []},
[HumanMessage(content="My name is Alice.")],
)
assert "Current per-thread memory" in prompt
assert '"user"' in prompt
assert "Preferred memory language: same as the user's latest message" in prompt
def test_build_thread_memory_prompt_prefers_chinese_for_chinese_conversation():
prompt = build_thread_memory_prompt(
{"user": {}, "history": {}, "facts": []},
[HumanMessage(content="我叫小明,我更喜欢中文交流。")],
)
assert "Preferred memory language: zh-Hans" in prompt
def test_build_thread_memory_prompt_prefers_japanese_for_japanese_conversation():
prompt = build_thread_memory_prompt(
{"user": {}, "history": {}, "facts": []},
[HumanMessage(content="私は日本語で会話したいです。")],
)
assert "Preferred memory language: ja-JP" in prompt
def test_build_thread_memory_prompt_uses_user_messages_only_for_language_inference():
prompt = build_thread_memory_prompt(
{"user": {}, "history": {}, "facts": []},
[
HumanMessage(content="请用中文记录记忆"),
AIMessage(content="Sure, I will answer in English with many many words."),
AIMessage(content="More English content that should not change language inference."),
],
)
assert "Preferred memory language: zh-Hans" in prompt
def test_build_thread_memory_prompt_handles_structured_human_content():
prompt = build_thread_memory_prompt(
{"user": {}, "history": {}, "facts": []},
[
HumanMessage(
content=[
{"type": "text", "text": "我希望记忆使用中文。"},
{"type": "text", "text": "请继续。"},
]
),
AIMessage(content="I can also reply in English."),
],
)
assert "Preferred memory language: zh-Hans" in prompt

View File

@ -0,0 +1,33 @@
from unittest.mock import patch
from deerflow.agents.memory.thread_queue import ThreadMemoryUpdateQueue
def test_thread_queue_keeps_latest_message_per_thread():
queue = ThreadMemoryUpdateQueue()
with patch.object(queue, "_reset_timer"):
queue.add("thread-a", ["msg-1"])
queue.add("thread-b", ["msg-2"])
queue.add("thread-a", ["msg-3"])
assert set(queue._queue_by_thread.keys()) == {"thread-a", "thread-b"}
assert queue._queue_by_thread["thread-a"].messages == ["msg-3"]
def test_thread_queue_processes_single_thread_without_affecting_others():
queue = ThreadMemoryUpdateQueue()
with patch.object(queue, "_reset_timer"):
queue.add("thread-a", ["a-msg"])
queue.add("thread-b", ["b-msg"])
updater_calls: list[tuple[list[str], str]] = []
class _FakeUpdater:
def update_memory(self, messages, thread_id):
updater_calls.append((messages, thread_id))
with patch("deerflow.agents.memory.thread_updater.ThreadMemoryUpdater", _FakeUpdater):
queue._process_thread("thread-a")
assert updater_calls == [(["a-msg"], "thread-a")]
assert "thread-b" in queue._queue_by_thread

View File

@ -0,0 +1,99 @@
import json
from deerflow.agents.memory.thread_storage import SqliteThreadMemoryStorage
def _payload():
return {
"ownerId": None,
"user": {
"workContext": {"summary": "Frontend engineer", "updatedAt": "2026-05-08T00:00:00Z"},
"personalContext": {"summary": "Prefers Chinese", "updatedAt": "2026-05-08T00:00:00Z"},
"topOfMind": {"summary": "Thread memory migration", "updatedAt": "2026-05-08T00:00:00Z"},
},
"history": {
"recentMonths": {"summary": "Worked on memory features", "updatedAt": "2026-05-08T00:00:00Z"},
"earlierContext": {"summary": "", "updatedAt": ""},
"longTermBackground": {"summary": "Builds web products", "updatedAt": "2026-05-08T00:00:00Z"},
},
"facts": [],
}
def test_sqlite_thread_memory_compare_and_swap(tmp_path):
storage = SqliteThreadMemoryStorage(str(tmp_path / "thread-memory.db"))
thread_id = "thread-1"
assert storage.save(thread_id, _payload(), expected_version=0) is True
loaded = storage.load(thread_id)
assert loaded is not None
assert loaded["memoryVersion"] == 0
# wrong expected version should fail
assert storage.save(thread_id, _payload(), expected_version=9) is False
# correct version should pass and increment
assert storage.save(thread_id, _payload(), expected_version=0) is True
loaded2 = storage.load(thread_id)
assert loaded2 is not None
assert loaded2["memoryVersion"] == 1
def test_sqlite_thread_memory_saves_json_payload(tmp_path):
db_path = tmp_path / "thread-memory.db"
storage = SqliteThreadMemoryStorage(str(db_path))
thread_id = "thread-md"
assert storage.save(thread_id, _payload(), expected_version=0) is True
with storage._lock:
row = storage._conn.execute("SELECT memory_json FROM thread_memory WHERE thread_id = ?", (thread_id,)).fetchone()
assert row is not None
assert isinstance(row[0], str)
parsed = json.loads(row[0])
assert parsed["user"]["workContext"]["summary"] == "Frontend engineer"
def test_sqlite_thread_memory_uses_owner_id_column_when_json_missing_owner(tmp_path):
db_path = tmp_path / "thread-memory.db"
storage = SqliteThreadMemoryStorage(str(db_path))
thread_id = "thread-load"
payload = _payload()
with storage._lock:
storage._conn.execute(
"""
INSERT INTO thread_memory (thread_id, owner_id, memory_json, memory_version, last_updated)
VALUES (?, ?, ?, 0, datetime('now'))
""",
(
thread_id,
"owner-1",
json.dumps(
{
"user": payload["user"],
"history": payload["history"],
"facts": [],
},
ensure_ascii=False,
),
),
)
storage._conn.commit()
loaded = storage.load(thread_id)
assert loaded is not None
assert loaded["ownerId"] == "owner-1"
assert loaded["user"]["workContext"]["summary"] == "Frontend engineer"
assert loaded["facts"] == []
def test_sqlite_thread_memory_backfill_is_noop_after_migration(tmp_path):
db_path = tmp_path / "thread-memory.db"
storage = SqliteThreadMemoryStorage(str(db_path))
assert storage.count_legacy_rows() == 0
stats = storage.backfill_legacy_rows()
assert stats["scanned"] == 0
assert stats["updated"] == 0
assert stats["failed"] == 0
assert storage.count_legacy_rows() == 0

View File

@ -0,0 +1,103 @@
from unittest.mock import patch
import pytest
from deerflow.agents.memory.thread_summary import (
ThreadMemoryConflictError,
_extract_json_object,
apply_thread_memory_summary,
render_thread_memory_summary,
)
def test_render_thread_memory_summary_returns_text():
fake_storage = type(
"S",
(),
{"load": lambda self, tid: {"threadId": tid, "user": {}, "history": {}, "facts": [], "memoryVersion": 2}},
)()
fake_model = type("M", (), {"invoke": lambda self, prompt: type("R", (), {"content": "用户总结"})()})()
with (
patch("deerflow.agents.memory.thread_summary.get_thread_memory_storage", return_value=fake_storage),
patch("deerflow.agents.memory.thread_summary._get_summary_model", return_value=fake_model),
):
result = render_thread_memory_summary("t1")
assert result["threadId"] == "t1"
assert result["memoryVersion"] == 2
assert result["summary"] == "用户总结"
def test_apply_thread_memory_summary_raises_conflict_on_cas_failure():
class _Storage:
def load(self, _tid):
return {"threadId": "t1", "ownerId": None, "user": {}, "history": {}, "facts": [], "memoryVersion": 1}
def save(self, _tid, _data, expected_version=None):
return False
fake_model = type("M", (), {"invoke": lambda self, prompt: type("R", (), {"content": "{}"})()})()
fake_updater = type("U", (), {"_scrub_sensitive": lambda self, data, _thread_id: data})()
with (
patch("deerflow.agents.memory.thread_summary.get_thread_memory_storage", return_value=_Storage()),
patch("deerflow.agents.memory.thread_summary._get_summary_model", return_value=fake_model),
patch("deerflow.agents.memory.thread_summary.ThreadMemoryUpdater", return_value=fake_updater),
):
with pytest.raises(ThreadMemoryConflictError):
apply_thread_memory_summary("t1", "更新内容", 1)
def test_apply_thread_memory_summary_falls_back_when_model_output_is_not_json():
class _Storage:
def __init__(self):
self.saved = None
def load(self, _tid):
if self.saved is not None:
return {"threadId": "t1", "memoryVersion": 2, **self.saved}
return {
"threadId": "t1",
"ownerId": None,
"user": {"topOfMind": {"summary": ""}},
"history": {},
"facts": [],
"memoryVersion": 1,
}
def save(self, _tid, data, expected_version=None):
self.saved = data
return True
storage = _Storage()
fake_model = type("M", (), {"invoke": lambda self, prompt: type("R", (), {"content": "这是自然语言不是JSON"})()})()
fake_updater = type("U", (), {"_scrub_sensitive": lambda self, data, _thread_id: data})()
with (
patch("deerflow.agents.memory.thread_summary.get_thread_memory_storage", return_value=storage),
patch("deerflow.agents.memory.thread_summary._get_summary_model", return_value=fake_model),
patch("deerflow.agents.memory.thread_summary.ThreadMemoryUpdater", return_value=fake_updater),
):
result = apply_thread_memory_summary("t1", "我最近在做线程记忆功能", 1)
assert storage.saved is not None
assert storage.saved["user"]["topOfMind"]["summary"] == "我最近在做线程记忆功能"
assert result["user"]["topOfMind"]["summary"] == "我最近在做线程记忆功能"
def test_extract_json_object_repairs_inner_unescaped_quotes():
raw = """
{
"user": {
"topOfMind": {
"summary": "反感“作为 AI"这种句式认为回答不用寒暄直接说重点"
}
},
"history": {},
"facts": []
}
""".strip()
parsed = _extract_json_object(raw)
assert parsed is not None
assert parsed["user"]["topOfMind"]["summary"].startswith("反感“作为 AI")

View File

@ -0,0 +1,20 @@
from deerflow.agents.memory.thread_updater import ThreadMemoryUpdater
def test_scrub_sensitive_tolerates_non_numeric_confidence():
updater = ThreadMemoryUpdater()
cleaned = updater._scrub_sensitive(
{
"user": {},
"history": {},
"facts": [
{"content": "Uses React", "category": "knowledge", "confidence": "high"},
{"content": "Uses TypeScript", "category": "knowledge", "confidence": None},
],
},
"thread-test",
)
assert len(cleaned["facts"]) == 2
assert cleaned["facts"][0]["confidence"] == 0.5
assert cleaned["facts"][1]["confidence"] == 0.5

View File

@ -0,0 +1,760 @@
# Per-Thread Memory Brainstorm
Date: 2026-05-07
## Background
Deerflow 现有的记忆功能是单租户的——不同会话都属于同一个用户,所有对话共享一份全局 `memory.json`
要做一个新的记忆功能:不同对话属于不同用户,每个会话都有一个长期记忆,内容包括用户的使用习惯、个人信息、个人喜好和偏好语气。
## 现有记忆系统
- **存储**:单一全局 `backend/.deer-flow/memory.json`,所有会话共享
- **认证**没有用户认证没有用户隔离better-auth 已搭建但未启用)
- **结构**
- `user`: workContext / personalContext / topOfMind
- `history`: recentMonths / earlierContext / longTermBackground
- `facts[]`: id, content, category, confidence, source
- **读路径**system prompt 生成时注入 `<memory>...</memory>` XML 标签
- **写路径**MemoryMiddleware 在对话后过滤消息 → MemoryUpdateQueue debounce 30s → MemoryUpdater 调 LLM 提取更新 → 原子写入
- **配置**`config.yaml > memory`enabled, debounce_seconds, max_facts, max_injection_tokens 等)
---
## 决策记录
### 存储方式: 数据库
~~文件存储 `threads/{thread_id}/profile-memory.json`~~ → **改为数据库表**,通过 `thread_id` 区分用户。
### 数据库: SQLite本地/测试) + MySQL生产环境
### 表结构: 单表 + JSON 列Option A
### 依赖: 最小化,不引入 SQLAlchemy
SQLite 用标准库 `sqlite3`MySQL 用 `pymysql`(纯 Python轻量
### 与全局记忆关系: 策略 Bfallback
Per-thread 有记忆就用 per-thread 的,没有就 fallback 到全局记忆。
### 首次对话: 不主动询问用户偏好
---
## 1. 数据库表设计
```sql
-- SQLite
CREATE TABLE IF NOT EXISTS thread_memory (
thread_id TEXT PRIMARY KEY,
profile TEXT NOT NULL DEFAULT '{}',
preferences TEXT NOT NULL DEFAULT '{}',
facts TEXT NOT NULL DEFAULT '[]',
last_updated TEXT NOT NULL DEFAULT (datetime('now'))
);
-- MySQL
CREATE TABLE IF NOT EXISTS thread_memory (
thread_id VARCHAR(64) PRIMARY KEY,
profile JSON NOT NULL,
preferences JSON NOT NULL,
facts JSON NOT NULL,
last_updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
```
**profile** ({})
| 字段 | 类型 | 说明 |
|------|------|------|
| `name` | `string \| null` | 用户称呼 |
| `role` | `string \| null` | 职业/角色 |
| `expertise` | `string[]` | 技术栈/专业领域 |
| `language` | `"zh-CN" \| "en-US" \| null` | 使用的语言 |
| `context` | `string \| null` | 其他上下文(自由文本) |
**preferences** ({})
| 字段 | 类型 | 说明 |
|------|------|------|
| `tone` | `"casual" \| "formal" \| "technical" \| "friendly" \| null` | 语气偏好 |
| `verbosity` | `"concise" \| "detailed" \| null` | 回答详细程度 |
| `codeStyle` | `string \| null` | 代码风格偏好 |
| `other` | `string \| null` | 其他偏好(自由文本) |
**facts** ([]):复用现有全局记忆的 fact 结构
```json
{
"id": "fact_abc123",
"content": "用户在使用 React + TypeScript",
"category": "tech_stack | preference | personal | context | goal",
"confidence": 0.9,
"createdAt": "2026-05-07T...",
"source": "thread_id"
}
```
**说明**:三个 JSON 字段在 SQLite 中存为 TEXTsqlite3 标准库没有原生 JSON 类型),在 MySQL 中存为 JSON。代码层面读写时做 `json.dumps` / `json.loads`,对上层透明。
## 2. config.yaml 新增配置段
```yaml
thread_memory:
enabled: true
debounce_seconds: 30
model_name: null # null = 使用默认模型
max_facts: 100
fact_confidence_threshold: 0.7
injection_enabled: true
max_injection_tokens: 2000
database:
type: sqlite # sqlite | mysql
sqlite:
path: "thread_memory.db"
mysql:
host: "localhost"
port: 3306
user: "root"
password: "$MYSQL_PASSWORD"
database: "deerflow"
```
大部分字段和现有 `memory` 配置段语义相同,可以在两个配置段之间复用。`database` 段按 type 取子段,工厂函数只读自己需要的部分。
## 3. 存储层设计
### 3.1 抽象接口
```python
# deerflow/agents/memory/thread_storage.py
import abc
import json
import sqlite3
from datetime import datetime
from typing import Any
class ThreadMemoryStorage(abc.ABC):
@abc.abstractmethod
def load(self, thread_id: str) -> dict[str, Any] | None:
"""加载指定 thread 的记忆,不存在返回 None。"""
...
@abc.abstractmethod
def save(self, thread_id: str, data: dict[str, Any]) -> bool:
"""保存指定 thread 的记忆upsert。"""
...
@abc.abstractmethod
def delete(self, thread_id: str) -> bool:
"""删除指定 thread 的记忆thread 被删除时联动)。"""
...
def _create_empty_memory() -> dict[str, Any]:
"""Per-thread 记忆的初始空结构。"""
return {
"profile": {
"name": None,
"role": None,
"expertise": [],
"language": None,
"context": None,
},
"preferences": {
"tone": None,
"verbosity": None,
"codeStyle": None,
"other": None,
},
"facts": [],
}
def _row_to_memory(row: tuple) -> dict[str, Any]:
"""将数据库行转为 memory dict。SQLite 的 JSON 列存的是 TEXT需要 parse。"""
return {
"threadId": row[0],
"profile": json.loads(row[1]),
"preferences": json.loads(row[2]),
"facts": json.loads(row[3]),
"lastUpdated": row[4],
}
```
### 3.2 SQLite 实现(本地测试)
```python
class SqliteThreadMemoryStorage(ThreadMemoryStorage):
def __init__(self, db_path: str):
self._conn = sqlite3.connect(db_path)
self._conn.execute("""
CREATE TABLE IF NOT EXISTS thread_memory (
thread_id TEXT PRIMARY KEY,
profile TEXT NOT NULL DEFAULT '{}',
preferences TEXT NOT NULL DEFAULT '{}',
facts TEXT NOT NULL DEFAULT '[]',
last_updated TEXT NOT NULL DEFAULT (datetime('now'))
)
""")
self._conn.commit()
def load(self, thread_id: str) -> dict | None:
row = self._conn.execute(
"SELECT thread_id, profile, preferences, facts, last_updated "
"FROM thread_memory WHERE thread_id = ?",
(thread_id,)
).fetchone()
return _row_to_memory(row) if row else None
def save(self, thread_id: str, data: dict) -> bool:
now = datetime.utcnow().isoformat() + "Z"
self._conn.execute("""
INSERT INTO thread_memory (thread_id, profile, preferences, facts, last_updated)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(thread_id) DO UPDATE SET
profile = excluded.profile,
preferences = excluded.preferences,
facts = excluded.facts,
last_updated = excluded.last_updated
""", (
thread_id,
json.dumps(data["profile"], ensure_ascii=False),
json.dumps(data["preferences"], ensure_ascii=False),
json.dumps(data["facts"], ensure_ascii=False),
now,
))
self._conn.commit()
return True
def delete(self, thread_id: str) -> bool:
self._conn.execute("DELETE FROM thread_memory WHERE thread_id = ?", (thread_id,))
self._conn.commit()
return True
```
### 3.3 MySQL 实现(生产环境)
```python
class MysqlThreadMemoryStorage(ThreadMemoryStorage):
def __init__(self, host: str, port: int, user: str, password: str, database: str):
import pymysql
self._conn = pymysql.connect(
host=host, port=port, user=user, password=password, database=database,
charset="utf8mb4",
)
with self._conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS thread_memory (
thread_id VARCHAR(64) PRIMARY KEY,
profile JSON NOT NULL,
preferences JSON NOT NULL,
facts JSON NOT NULL,
last_updated TIMESTAMP NOT NULL
DEFAULT CURRENT_TIMESTAMP
ON UPDATE CURRENT_TIMESTAMP
)
""")
self._conn.commit()
def load(self, thread_id: str) -> dict | None:
with self._conn.cursor() as cur:
cur.execute(
"SELECT thread_id, profile, preferences, facts, last_updated "
"FROM thread_memory WHERE thread_id = %s",
(thread_id,)
)
row = cur.fetchone()
return _row_to_memory(row) if row else None
def save(self, thread_id: str, data: dict) -> bool:
now = datetime.utcnow()
with self._conn.cursor() as cur:
cur.execute("""
INSERT INTO thread_memory (thread_id, profile, preferences, facts, last_updated)
VALUES (%s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
profile = VALUES(profile),
preferences = VALUES(preferences),
facts = VALUES(facts),
last_updated = VALUES(last_updated)
""", (
thread_id,
json.dumps(data["profile"], ensure_ascii=False),
json.dumps(data["preferences"], ensure_ascii=False),
json.dumps(data["facts"], ensure_ascii=False),
now,
))
self._conn.commit()
return True
def delete(self, thread_id: str) -> bool:
with self._conn.cursor() as cur:
cur.execute("DELETE FROM thread_memory WHERE thread_id = %s", (thread_id,))
self._conn.commit()
return True
```
### 3.4 工厂函数
```python
def get_thread_memory_storage() -> ThreadMemoryStorage:
"""从 config 读取 database 配置,构建对应的 storage 实例(单例)。"""
config = get_thread_memory_config()
db = config.database
if db.type == "sqlite":
return SqliteThreadMemoryStorage(db.sqlite.path)
elif db.type == "mysql":
return MysqlThreadMemoryStorage(
host=db.mysql.host,
port=db.mysql.port,
user=db.mysql.user,
password=db.mysql.password,
database=db.mysql.database,
)
else:
raise ValueError(f"Unknown thread_memory database type: {db.type}")
```
### 3.5 注意事项
- **JSON 在 SQLite 中存为 TEXT**`sqlite3` 标准库没有 JSON 类型,用 TEXT 存储 `json.dumps` 的结果。读写时做序列化/反序列化。MySQL 用原生 JSON 列,`pymysql` 自动处理。
- **upsert 语法差异**SQLite 用 `ON CONFLICT ... DO UPDATE SET`MySQL 用 `ON DUPLICATE KEY UPDATE`,语义等价。
- **连接管理**:两个实现都在 `__init__` 创建连接并持有。单线程场景没问题。如果将来需要并发,可以加连接池或改为每次操作创建连接。
---
## 4. upsert 语义:全量替换 vs 合并更新
### 两种模式
**模式 A — 增量合并**LLM 出 delta应用层合并
```
LLM 输入: 现有记忆 + 新对话
LLM 输出: { profile: { name: "新值", shouldUpdate: true }, newFacts: [...], factsToRemove: [...] }
应用层: 读取现有记忆 → 按 delta 逐字段合并 → 写入
```
现有全局记忆用的就是这个模式。LLM 输出里带 `shouldUpdate` 标记和 `factsToRemove` 列表,应用代码做合并。
**模式 B — 全量替换**LLM 出完整状态,应用层直接覆盖):
```
LLM 输入: 现有记忆 + 新对话
LLM 输出: { profile: { name: "...", role: "...", ... }, preferences: {...}, facts: [...] }
应用层: INSERT ... ON CONFLICT DO UPDATE整行覆盖
```
### 选择模式 B 的理由
1. **profile 和 preferences 本身很小**。每个对象 5-6 个字段,全部输出最多几十个 token增量节省的 token 可以忽略。
2. **去重和淘汰由 LLM 负责,应用层零逻辑**。LLM 看到了完整的现有记忆,在 prompt 中就能决定哪些 facts 要保留、哪些过时了要删、哪些要合并。应用代码只需要 `json.dumps` + upsert。
3. **避免字段删除的尴尬**。如果 LLM 想把 `profile.context``"前端开发者"` 改成 `null`(表示不再确定这个信息),增量模式需要额外表达"显式置 null"还是"不变",全量替换没有歧义。
4. **和现有全局记忆的模式不同是合理的**。全局记忆的 `history` 有大量的对话摘要文本不适合全量替换。Per-thread 记忆的 profile/preferences 是结构化的元数据,全量输出成本低。
### 具体流程
```
用户对话结束
MemoryMiddleware.after_agent() 提取 user + final AI 消息
queue.add(thread_id, messages) # debounce 30s
ThreadMemoryUpdater.update()
1. 从 DB 读取现有记忆(不存在就用 _create_empty_memory()
2. 构建 prompt: "以下是用户的现有画像和偏好:{existing_memory},以下是新的对话:{conversation},请更新用户画像。"
3. LLM 返回完整的 profile + preferences + facts
4. storage.save(thread_id, data) # upsert 整行覆盖
```
**关键点**LLM 的 prompt 里放了**现有记忆**LLM 看到之后自己决定:
- 保留哪些 facts
- 更新哪些 profile 字段
- 新增什么偏好
- 删除过时的信息(不输出就是删除)
应用代码不做任何合并判断,只负责把 LLM 输出写入数据库。
---
## 5. 更新路径
### 5.1 MemoryMiddleware 改造(最小改动)
在现有 `MemoryMiddleware.after_agent()` 中加一段逻辑,当 `thread_id` 存在时,同时向 per-thread 记忆的 queue 推一条:
```python
# 现有逻辑:全局记忆
queue = get_memory_queue()
queue.add(thread_id=thread_id, messages=filtered_messages, ...)
# 新增per-thread 记忆
if thread_id:
thread_queue = get_thread_memory_queue()
thread_queue.add(thread_id=thread_id, messages=filtered_messages)
```
### 5.2 ThreadMemoryUpdater
新类,结构类似现有的 `MemoryUpdater`,但使用不同的 prompt 和存储后端:
```python
class ThreadMemoryUpdater:
def update(self, messages, thread_id):
storage = get_thread_memory_storage()
existing = storage.load(thread_id) or _create_empty_memory()
prompt = THREAD_MEMORY_UPDATE_PROMPT.format(
existing_memory=json.dumps(existing, ensure_ascii=False),
conversation=format_conversation(messages),
)
response = model.invoke(prompt)
new_memory = parse_llm_output(response) # { profile, preferences, facts }
storage.save(thread_id, new_memory)
```
### 5.3 Prompt 设计要点
与全局记忆 prompt 的关键区别:
| | 全局记忆 prompt | Per-thread 记忆 prompt |
|---|---|---|
| **目标** | "对话中发生了什么" | "这个人是谁、喜欢什么" |
| **输出** | user context 摘要 + history 摘要 + facts | profile + preferences + facts |
| **侧重** | 保留对话内容的事实性信息 | 推断用户的身份、偏好、风格 |
| **语气影响** | 无 | 输出 `preferences.tone` 直接影响后续回复风格 |
---
## 6. 读取路径(注入 System Prompt
```python
def inject_thread_memory(system_prompt: str, thread_id: str) -> str:
storage = get_thread_memory_storage()
memory = storage.load(thread_id)
if memory is None:
# fallback 到全局记忆
return inject_global_memory(system_prompt)
# 生成 <memory profile="..."> 标签注入 system prompt
profile_xml = _format_profile_xml(memory)
return system_prompt + "\n" + profile_xml
```
注入内容的 XML 结构示例:
```xml
<memory>
<profile>
<name>张三</name>
<role>全栈工程师</role>
<expertise>React, TypeScript, Python</expertise>
<language>zh-CN</language>
<context>在做一个电商项目</context>
</profile>
<preferences>
<tone>casual</tone>
<verbosity>detailed</verbosity>
<codeStyle>prefers functional components with hooks</codeStyle>
</preferences>
</memory>
```
语气偏好(`preferences.tone`)不直接改 system prompt 模板,而是放在 `<preferences>` XML 里让 LLM 自己理解。方式简单,不用维护 prompt 模板的分支逻辑。如果发现 LLM 不遵循,再考虑动态改写 prompt 模板。
---
## 7. Thread 删除时的联动
Gateway 已有 `DELETE /api/threads/{id}`。在现有 handler 中加一行:
```python
# app/gateway/routers/threads.py
@router.delete("/api/threads/{thread_id}")
async def delete_thread(thread_id: str):
# ... 现有清理逻辑 ...
# 新增:删除 per-thread 记忆
get_thread_memory_storage().delete(thread_id)
```
---
## 8. 实施步骤
1. **新增配置模型**`thread_memory_config.py`(参考现有 `memory_config.py`
2. **新增存储层**`thread_storage.py``ThreadMemoryStorage` + `SqliteThreadMemoryStorage` + `MysqlThreadMemoryStorage`
3. **新增 prompt**`thread_memory_prompt.py`(用于 LLM 提取用户画像)
4. **新增 updater** — 或扩展现有 `MemoryUpdater`,根据 `thread_id` 参数路由到不同逻辑
5. **改造 middleware**`MemoryMiddleware` 中加 per-thread 记忆的 queue 逻辑
6. **改造注入** — system prompt 生成时注入 `<memory>` 标签
7. **扩展 thread 删除 handler** — 联动删除 DB 记录
8. **写入测试**`test_thread_memory_storage.py`, `test_thread_memory_updater.py`
## 9. 待确认事项
- [ ] pymysql 作为新依赖是否 OK
- [ ] `database` 配置段结构是否合适?
- [ ] upsert 使用全量替换模式(模式 B是否认同
## 10. 第二轮脑暴(风险前置)
下面这轮不是改大方向,而是把容易在落地时踩坑的点先钉住。
### 10.1 隔离键:`thread_id` 是否足够?
当前设计用 `thread_id` 作为主键隔离用户记忆,简单可行。但有一个隐含前提:
- 一个 thread 永远只对应一个真实用户
如果未来支持“同一用户多 thread 共享画像”或“thread 可能转移 owner”只用 `thread_id` 会限制扩展。
可选路径:
- 路径 A维持现状推荐短期主键 `thread_id`,最快上线。
- 路径 B兼容未来增加 `owner_id`(可空),并加索引 `(owner_id, thread_id)`
建议:
- 第一版继续 `thread_id`,但在表里预留 `owner_id` nullable 字段,避免后续大迁移。
### 10.2 并发一致性:同一 thread 的并发写覆盖问题
场景:同一 thread 在短时间内触发多次 update后到达的旧结果可能覆盖先到达的新结果。
可选保护:
- 方案 A`last_updated` 乐观锁(更新时带 where 条件)
- 方案 B`memory_version` 整数版本号(推荐)
- 方案 C严格串行队列单 thread 单 worker
建议:
- 加 `memory_version`(默认 0。`save` 时做 compare-and-swap 语义:
- 读取 version = n
- 写入时要求 version 仍为 n成功后 version = n+1
- 失败则重试一次(重新 load + merge prompt 再写)
这样不需要分布式锁,也能规避“旧结果回写”。
### 10.3 记忆质量控制:防止噪声和幻觉固化
LLM 抽取用户画像时,最大风险是把一次性表达当长期偏好。
建议加三道门:
1. 事实类别阈值
- `preference` 类阈值可略低(如 0.7
- `personal` 类阈值更高(如 0.85
2. 稳定性规则
- 同类偏好至少被 2 次独立对话支持,才提升为 profile/preference 的强字段
3. 冲突降级
- 新旧事实冲突时,不立刻删旧值
- 先把旧值降权并标记 `supersededBy`,下一轮再淘汰
### 10.4 隐私与合规:先定义“不能记”的边界
建议在 prompt 与代码都加 denylist双保险
- 默认不写入:身份证号、手机号、邮箱、住址、银行卡、密码/API Key 等敏感信息
- 允许写入:技术偏好、工作语境、沟通风格、项目目标
实现上:
- 在 `ThreadMemoryUpdater` parse 后做一次 server-side scrub
- 命中敏感模式就丢弃并打审计日志(不落库原文)
### 10.5 注入预算:避免 memory 挤爆上下文
当前有 `max_injection_tokens`,但还缺“裁剪策略”。
建议固定优先级:
1. profile最高
2. preferences
3. facts按 confidence + recency 排序后截断)
当超预算时:
- 永远保留 profile/preference
- 只裁剪 facts
### 10.6 可观测性:上线后如何判断有效
建议最小指标集:
- `thread_memory_update_total{status=ok|error}`
- `thread_memory_injection_tokens`
- `thread_memory_fact_count`
- `thread_memory_update_latency_ms`
- `thread_memory_conflict_retry_total`
加两条抽样日志:
- 更新前后摘要 diff脱敏后
- 注入片段长度与截断原因
### 10.7 迁移与回滚策略(从全局记忆过渡)
你已选 fallback 策略,这很好。建议再补两个机制:
- 冷启动导入(可选)
- 首次访问 thread 且无 per-thread 记录时,从全局记忆抽取一份“弱画像”写入
- 打 `bootstrapped_from_global=true`
- 一键回滚
- 配置开关 `thread_memory.injection_enabled=false` 时,立刻只走全局注入
- 更新链路可继续跑,便于回滚期间保留数据
### 10.8 API 语义建议(便于后续运维)
即使第一版 UI 不暴露,也建议预留内部接口:
- `GET /internal/thread-memory/{thread_id}`(脱敏视图)
- `DELETE /internal/thread-memory/{thread_id}`
- `POST /internal/thread-memory/{thread_id}/rebuild`
这样排障时不用直接查库。
---
## 11. 第三轮决策清单(进入实现前最后拍板)
- [ ] 表结构是否预留 `owner_id``memory_version`
- [ ] 是否采用 `memory_version` 方案处理并发覆盖?
- [ ] 敏感信息 denylist 范围是否按 10.4 执行?
- [ ] 注入裁剪优先级是否固定为 profile > preferences > facts
- [ ] 是否需要“冷启动导入”全局记忆到 per-thread
- [ ] 是否要在首版就加内部运维接口?
如果以上 6 项确定,基本就能把实现风险压到可控范围内。
## 12. 默认拍板方案(建议直接采用)
目标:在不显著增加复杂度的前提下,拿到“可上线 + 可回滚 + 可演进”的第一版。
### 12.1 表结构默认值
采用:**预留 `owner_id` + 引入 `memory_version`**。
SQLite
```sql
CREATE TABLE IF NOT EXISTS thread_memory (
thread_id TEXT PRIMARY KEY,
owner_id TEXT NULL,
profile TEXT NOT NULL DEFAULT '{}',
preferences TEXT NOT NULL DEFAULT '{}',
facts TEXT NOT NULL DEFAULT '[]',
memory_version INTEGER NOT NULL DEFAULT 0,
last_updated TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_thread_memory_owner_id ON thread_memory(owner_id);
```
MySQL
```sql
CREATE TABLE IF NOT EXISTS thread_memory (
thread_id VARCHAR(64) PRIMARY KEY,
owner_id VARCHAR(64) NULL,
profile JSON NOT NULL,
preferences JSON NOT NULL,
facts JSON NOT NULL,
memory_version INT NOT NULL DEFAULT 0,
last_updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_owner_id (owner_id)
);
```
### 12.2 并发一致性默认值
采用:**`memory_version` 乐观并发控制 + 失败重试 1 次**。
保存逻辑:
- `load()` 读出 `memory_version=n`
- `save()` 时执行条件更新(`WHERE thread_id=? AND memory_version=n`
- 成功则 `memory_version=n+1`
- 如果受影响行数为 0说明被并发写抢先重读并重试一次
这能防止“旧更新覆盖新更新”,同时实现复杂度可控。
### 12.3 隐私策略默认值
采用:**默认拒绝敏感信息入库(代码层 hard filter**。
默认 denylist
- 手机号
- 邮箱
- 身份证号/护照号
- 银行卡号
- 密码/API Key/Token
- 详细住址
规则:
- 命中则从 `profile/preferences/facts` 中删除该片段
- 仅记录脱敏审计信息(类型 + 时间 + thread_id不记录原文
### 12.4 注入裁剪默认值
采用固定优先级:**`profile > preferences > facts`**。
当超过 `max_injection_tokens`
- 必保留:`profile`、`preferences`
- 裁剪:`facts`(按 `confidence DESC, createdAt DESC` 排序后截断)
这能保证人格与风格信息稳定注入,不被历史 facts 挤掉。
### 12.5 冷启动策略默认值
采用:**首版不开启自动冷启动导入**`bootstrap_from_global=false`)。
理由:
- 降低“全局脏数据复制到 thread”风险
- 逻辑更清晰,便于观察 per-thread 记忆真实质量
补充:
- 保留 fallback你当前已定
- 后续若需要可加后台任务做可控回填
### 12.6 内部运维接口默认值
采用:**首版只加读接口,写接口延后**。
第一版建议:
- `GET /internal/thread-memory/{thread_id}`(脱敏后返回)
暂不做:
- `DELETE /internal/thread-memory/{thread_id}`(已有 thread delete 联动可覆盖主场景)
- `POST /internal/thread-memory/{thread_id}/rebuild`(二期再加)
这样可以先满足排障可见性,避免过早扩大运维面。
---
## 13. 实施前冻结版 Checklist可直接转开发
- [ ] DDL 按 12.1 落地(含 `owner_id`, `memory_version`, index
- [ ] Storage `save()` 改为 compare-and-swap 语义
- [ ] Updater 增加一次冲突重试
- [ ] parse 后执行敏感信息 scrub
- [ ] 注入模块按 `profile > preferences > facts` 裁剪
- [ ] fallback 保持开启,冷启动导入保持关闭
- [ ] 增加最小指标与脱敏 diff 日志
- [ ] 增加内部只读排障接口
到这一步,方案已经可以进入实现,不需要再做大改。

View File

@ -0,0 +1,213 @@
# Thread Memory 手动测试清单
日期:`2026-05-08`
测试人:`__________`
---
## 0. 前置检查
- [ ] 已拉取包含以下修复的最新代码并重启后端进程
- `memory.enabled=false` 时仍允许 `thread_memory` 更新
- `thread_prompt` 的 JSON 模板转义修复(避免 `KeyError: "profile"`
- `thread_updater` 使用非流式安全参数(避免 `stream_options` 400
- [ ] `config.yaml` 中已启用 `thread_memory.enabled: true`
- [ ] 确认使用的是预期配置文件(当前项目根目录 `config.yaml`
---
## 1. 基础写入与读取
前置条件:
- 选择一个新的 `thread_id`(例:`1f571481-e3ae-42b5-a513-945bf8f1cbef`
步骤:
1. 在该线程发送 2-3 轮消息,包含姓名、角色、偏好语气等信息
2. 等待 `debounce_seconds`(默认 30 秒)
3. 查询 `thread_memory`
期望:
- 出现该 `thread_id` 记录
- `profile/preferences/facts` 有对应内容
结果:
- [1] 通过
- [ ] 失败(备注:`________________`
---
## 2. Per-Thread 隔离
前置条件:
- 准备两个线程 `thread_A`、`thread_B`
步骤:
1. 在 A 中输入“前端背景”信息
2. 在 B 中输入“后端背景”信息
3. 分别等待写入完成后查看两条记录
期望:
- A 仅保存 A 的画像B 仅保存 B 的画像
- 两个线程不串数据
结果:
- [1] 通过
- [ ] 失败(备注:`________________`
---
## 3. 全局记忆 Fallback
前置条件:
- 全局 memory 有内容
- 新建一个尚无 per-thread 记录的线程
步骤:
1. 先在该新线程发一轮普通消息
2. 观察回复是否体现全局记忆
3. 再继续对话触发 per-thread 写入后观察注入变化
期望:
- 无 per-thread 时可 fallback 到全局
- 有 per-thread 后优先使用 per-thread
结果:
- [ ] 通过
- [ ] 失败(备注:`未执行N/A当前环境 memory.enabled=false全局记忆关闭本用例不适用`
---
## 4. 注入裁剪优先级Profile > Preferences > Facts
前置条件:
- 某线程已有大量 facts
步骤:
1. 人为积累 facts 到接近/超过注入预算
2. 保持 profile/preferences 有值
3. 观察注入后的表现
期望:
- 超预算时保留 profile + preferences
- 优先裁剪 facts
结果:
- [1 ] 通过
- [ ] 失败(备注:`________________`
---
## 5. 敏感信息过滤
步骤:
1. 在对话中输入邮箱、手机号、token/password 等敏感样例
2. 等待写入后查库
期望:
- 敏感信息不应落入 `profile/preferences/facts`
结果:
- [1] 通过
- [ ] 失败(备注:`________________`
---
## 6. 并发覆盖保护CAS + version
步骤:
1. 同一 `thread_id` 短时间内触发两次更新(尽量并发)
2. 观察最终数据与日志
期望:
- 不出现明显“旧数据覆盖新数据”
- 冲突时可见重试行为(日志)
结果:
- [1] 通过
- [ ] 失败(备注:`________________`
---
## 7. Debounce 生效
步骤:
1. 在 30 秒内连续发送多条消息
2. 观察写库频率
期望:
- 多条输入被合并处理,不是每条都立即写库
结果:
- [1] 通过
- [ ] 失败(备注:`________________`
---
## 8. 线程删除联动清理
步骤:
1. 对已有 per-thread 记录的线程调用 `DELETE /api/threads/{thread_id}`
2. 查询 `thread_memory`
期望:
- 对应 `thread_id` 记录被删除
结果:
- [ ] 通过
- [ ] 失败(备注:`未执行:当前产品决策不接受“删线程即删记忆”,需改为用户显式触发清除后再复测`
---
## 9. SQLite 自动建表与路径
步骤:
1. 删除现有 `thread_memory.db`(测试环境)
2. 重启服务并触发一轮写入
3. 检查 DB 文件和表结构
期望:
- 自动创建 DB 文件与 `thread_memory`
- 索引 `idx_thread_memory_owner_id` 存在
结果:
- [1] 通过
- [ ] 失败(备注:`________________`
---
## 10. 配置开关验证
步骤:
1. 关闭 `thread_memory.enabled`,重启并测试写入
2. 开启 `thread_memory.enabled`,关闭 `thread_memory.injection_enabled`,重启并测试注入
期望:
- `enabled=false`:不更新 per-thread
- `injection_enabled=false`:不注入 per-thread可 fallback
结果:
- [1] 通过
- [ ] 失败(备注:`________________`
---
## 11. 已知错误回归验证
### 11.1 `KeyError: "profile"` 回归
- [ 1] 未再出现 `thread_prompt.py``KeyError` 报错
### 11.2 `stream_options` 400 回归
- [ 1] 未再出现 `"'stream_options' only set this when you set stream: true"` 报错
备注:`________________`
---
## 测试总结
- 总用例数:`11`
- 通过数:`____`
- 失败数:`____`
- 结论:
- [ ] 可上线
- [ ] 需修复后复测

View File

@ -440,7 +440,7 @@ export default function ChatPage() {
onClick={() => setShowExitDialog(true)}
>
<svg xmlns="http://www.w3.org/2000/svg" width="18" height="18" viewBox="0 0 18 18" fill="none">
<path d="M2 4H6M16 4H12M6 4H12M6 4C6 2.89543 6.89543 2 8 2H10C11.1046 2 12 2.89543 12 4M4 6V14C4 15.1046 4.89543 16 6 16H12C13.1046 16 14 15.1046 14 14V6M7 8V13M11 8V13" stroke="#150033" stroke-linecap="round" />
<path d="M2 4H6M16 4H12M6 4H12M6 4C6 2.89543 6.89543 2 8 2H10C11.1046 2 12 2.89543 12 4M4 6V14C4 15.1046 4.89543 16 6 16H12C13.1046 16 14 15.1046 14 14V6M7 8V13M11 8V13" stroke="#150033" strokeLinecap="round" />
</svg>
{t.common.resetThread}
</Button>
@ -458,10 +458,10 @@ export default function ChatPage() {
}}
>
<svg xmlns="http://www.w3.org/2000/svg" width="18" height="18" viewBox="0 0 18 18" fill="none">
<path d="M16 7V4C16 2.89543 15.1046 2 14 2H4C2.89543 2 2 2.89543 2 4V14C2 15.1046 2.89543 16 4 16H9" stroke="#150033" stroke-linecap="round" />
<path d="M5 5H9M5 8H7" stroke="#150033" stroke-linecap="round" stroke-linejoin="round" />
<path d="M16 7V4C16 2.89543 15.1046 2 14 2H4C2.89543 2 2 2.89543 2 4V14C2 15.1046 2.89543 16 4 16H9" stroke="#150033" strokeLinecap="round" />
<path d="M5 5H9M5 8H7" stroke="#150033" strokeLinecap="round" strokeLinejoin="round" />
<circle cx="11.5" cy="10.5" r="3" stroke="#150033" />
<path d="M15.5 14.5L14 13" stroke="#150033" stroke-linecap="round" stroke-linejoin="round" />
<path d="M15.5 14.5L14 13" stroke="#150033" strokeLinecap="round" strokeLinejoin="round" />
</svg>
{t.common.artifacts}
</Button>

View File

@ -4,6 +4,7 @@ import type { ChatStatus } from "ai";
import { Tour } from "antd";
import {
CheckIcon,
BrainIcon,
GraduationCapIcon,
LightbulbIcon,
Loader2Icon,
@ -97,6 +98,7 @@ import { Suggestion, Suggestions } from "../ai-elements/suggestion";
import { ScrollArea } from "../ui/scroll-area";
import { ModeHoverGuide } from "./mode-hover-guide";
import { ThreadMemoryPanel } from "./thread-memory-panel";
import { Tooltip } from "./tooltip";
@ -280,6 +282,7 @@ export function InputBox({
null,
);
const [isFocused, setIsFocused] = useState(false);
const [memoryPanelOpen, setMemoryPanelOpen] = useState(false);
const [references, setReferences] = useState<PromptInputReference[]>([]);
const [mentionQuery, setMentionQuery] = useState("");
const [mentionOpen, setMentionOpen] = useState(false);
@ -293,7 +296,8 @@ export function InputBox({
const { data: referenceFilesData } = useReferenceFiles(threadIdFromProps);
// Welcome 态下禁用收缩,始终保持展开
const effectiveIsFocused = (showWelcomeStyle ?? false) || isFocused;
const effectiveIsFocused =
(showWelcomeStyle ?? false) || isFocused || memoryPanelOpen;
const shouldShowSuggestionList =
showWelcomeStyle && searchParams.get("mode") !== "skill";
@ -965,6 +969,7 @@ export function InputBox({
/>
</div>
)}
{/* {!showWelcomeStyle && (
<div className="shrink-0 h-full">
<ExitChattingButton
@ -976,6 +981,22 @@ export function InputBox({
<div ref={attachmentsButtonTourRef} className="shrink-0 h-full">
<AddAttachmentsButton />
</div>
{/* 记忆按钮 */}
{/* <div className="shrink-0 h-full">
<DropdownMenu open={memoryPanelOpen} onOpenChange={setMemoryPanelOpen}>
<DropdownMenuTrigger asChild>
<WorkspaceToolButton
className="h-full"
disabled={!threadIdFromProps || threadIdFromProps === "new"}
>
<BrainIcon className="size-4" />
</WorkspaceToolButton>
</DropdownMenuTrigger>
<DropdownMenuContent align="start" className="w-auto p-0">
<ThreadMemoryPanel threadId={threadIdFromProps} />
</DropdownMenuContent>
</DropdownMenu>
</div> */}
<div className="min-w-0 grow basis-0 h-full">
<IframeSkillDialogButton
skillButtonRef={skillButtonTourRef}

View File

@ -53,6 +53,22 @@ function localizeAssistantFixedCopy(content: string, localized: string): string
return content;
}
function buildClipboardData(message: Message): string {
const raw =
extractContentFromMessage(message) ??
extractReasoningContentFromMessage(message) ??
"";
if (!raw) {
return "";
}
const cleaned = stripUploadedFilesTag(raw);
if (message.type === "human") {
return normalizeHumanMessageDisplayText(stripPriorityHintSuffix(cleaned));
}
return cleaned;
}
export function MessageListItem({
className,
message,
@ -90,13 +106,7 @@ export function MessageListItem({
)}
>
<div className="flex gap-1">
<CopyButton
clipboardData={
extractContentFromMessage(message) ??
extractReasoningContentFromMessage(message) ??
""
}
/>
<CopyButton clipboardData={buildClipboardData(message)} />
</div>
</MessageToolbar>
)}

View File

@ -0,0 +1,138 @@
"use client";
import { useState } from "react";
import { toast } from "sonner";
import { Button } from "@/components/ui/button";
import { Textarea } from "@/components/ui/textarea";
import { getBackendBaseURL } from "@/core/config";
import { useI18n } from "@/core/i18n/hooks";
type ThreadMemoryPanelProps = {
threadId?: string;
};
export function ThreadMemoryPanel({ threadId }: ThreadMemoryPanelProps) {
const [memorySummary, setMemorySummary] = useState("");
const [memoryVersion, setMemoryVersion] = useState<number | null>(null);
const [loadingSummary, setLoadingSummary] = useState(false);
const [savingSummary, setSavingSummary] = useState(false);
const [deletingMemory, setDeletingMemory] = useState(false);
const { t } = useI18n();
if (!threadId || threadId === "new") return null;
const handleLoadMemorySummary = async () => {
setLoadingSummary(true);
try {
const res = await fetch(
`${getBackendBaseURL()}/api/threads/${encodeURIComponent(threadId)}/memory-summary`,
);
if (!res.ok) throw new Error(`HTTP ${res.status}`);
const data = (await res.json()) as { summary: string; memoryVersion: number };
setMemorySummary(data.summary ?? "");
setMemoryVersion(data.memoryVersion ?? 0);
toast.success(t.threadMemoryPanel.toastLoadSuccess);
} catch {
toast.error(t.threadMemoryPanel.toastLoadFailed);
} finally {
setLoadingSummary(false);
}
};
const handleSaveMemorySummary = async () => {
if (memoryVersion == null) return;
setSavingSummary(true);
try {
const res = await fetch(
`${getBackendBaseURL()}/api/threads/${encodeURIComponent(threadId)}/memory-summary`,
{
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ summary: memorySummary, memoryVersion }),
},
);
if (res.status === 409) {
toast.error(t.threadMemoryPanel.toastConflict);
return;
}
if (!res.ok) throw new Error(`HTTP ${res.status}`);
const data = (await res.json()) as { memoryVersion?: number };
if (typeof data.memoryVersion === "number") setMemoryVersion(data.memoryVersion);
toast.success(t.threadMemoryPanel.toastSaveSuccess);
} catch {
toast.error(t.threadMemoryPanel.toastSaveFailed);
} finally {
setSavingSummary(false);
}
};
const handleDeleteMemory = async () => {
setDeletingMemory(true);
try {
const res = await fetch(
`${getBackendBaseURL()}/api/threads/${encodeURIComponent(threadId)}/memory`,
{ method: "DELETE" },
);
if (!res.ok) throw new Error(`HTTP ${res.status}`);
setMemorySummary("");
setMemoryVersion(0);
toast.success(t.threadMemoryPanel.toastDeleteSuccess);
} catch {
toast.error(t.threadMemoryPanel.toastDeleteFailed);
} finally {
setDeletingMemory(false);
}
};
return (
<div className="w-[380px] space-y-2 rounded-lg border border-ws-divider bg-ws-surface-elevated p-3 shadow-lg">
<div className="text-sm font-semibold">
<span className="hidden sm:inline">{t.threadMemoryPanel.title}</span>
</div>
<div className="space-y-2">
<div className="flex items-center gap-2">
<Button
size="sm"
variant="outline"
onClick={() => {
void handleLoadMemorySummary();
}}
disabled={loadingSummary}
>
{loadingSummary ? t.threadMemoryPanel.loading : t.threadMemoryPanel.load}
</Button>
<Button
size="sm"
onClick={() => {
void handleSaveMemorySummary();
}}
disabled={savingSummary || memoryVersion == null}
>
{savingSummary ? t.threadMemoryPanel.saving : t.threadMemoryPanel.save}
</Button>
<Button
size="sm"
variant="destructive"
onClick={() => {
void handleDeleteMemory();
}}
disabled={deletingMemory}
>
{deletingMemory ? t.threadMemoryPanel.removing : t.threadMemoryPanel.remove}
</Button>
</div>
<div className="text-xs text-ws-text-subtle-strong">
{t.threadMemoryPanel.threadId}: {threadId.slice(0, 8)}... |{" "}
{t.threadMemoryPanel.version}:{" "}
{memoryVersion == null ? t.threadMemoryPanel.unavailableVersion : memoryVersion}
</div>
<Textarea
value={memorySummary}
onChange={(e) => setMemorySummary(e.target.value)}
placeholder={t.threadMemoryPanel.summaryPlaceholder}
className="min-h-32 bg-white/80"
/>
</div>
</div>
);
}

View File

@ -65,7 +65,7 @@ export function WorkspaceHeader({ className }: { className?: string }) {
<div className="text-primary ml-2 cursor-default font-serif">
{/* TODO: 测试标识 */}
XClaw{" "}
<span className="text-sm text-ws-text-subtle-strong">v3.2.9 </span>{" "}
<span className="text-sm text-ws-text-subtle-strong">v3.3.0 </span>{" "}
<span
className={cn(
"text-xs font-mono",

View File

@ -264,6 +264,27 @@ export const enUS: Translations = {
scrollToBottom: "Scroll to bottom",
},
threadMemoryPanel: {
title: "Thread Memory",
load: "Load memory",
loading: "Loading...",
save: "Save memory",
saving: "Saving...",
remove: "Delete memory",
removing: "Deleting...",
threadId: "Thread ID",
version: "Version",
unavailableVersion: "-",
summaryPlaceholder: "Thread memory summary is shown here. Edit it and save.",
toastLoadSuccess: "Thread memory loaded",
toastLoadFailed: "Failed to load thread memory",
toastConflict: "Memory changed. Please reload before saving.",
toastSaveSuccess: "Thread memory saved",
toastSaveFailed: "Failed to save thread memory",
toastDeleteSuccess: "Thread memory deleted",
toastDeleteFailed: "Failed to delete thread memory",
},
// Workspace Chat Page
chatPage: {
defaultSlogan: "Let's study and work together",

View File

@ -194,6 +194,27 @@ export interface Translations {
scrollToBottom: string;
};
threadMemoryPanel: {
title: string;
load: string;
loading: string;
save: string;
saving: string;
remove: string;
removing: string;
threadId: string;
version: string;
unavailableVersion: string;
summaryPlaceholder: string;
toastLoadSuccess: string;
toastLoadFailed: string;
toastConflict: string;
toastSaveSuccess: string;
toastSaveFailed: string;
toastDeleteSuccess: string;
toastDeleteFailed: string;
};
// Workspace Chat Page
chatPage: {
defaultSlogan: string;

View File

@ -145,17 +145,29 @@ export const zhCN: Translations = {
children: [{ id: "6133", name: "音乐生成器" }],
},
{
suggestion: "excel数据处理",
suggestion: "excel填表神器",
prompt: "对[Excel文件/数据]进行分析,生成数据洞察和可视化建议。",
icon: MicroscopeIcon,
children: [{ id: "17", name: "Excel处理" }],
},
{
suggestion: "word填表神器",
prompt: "对[word文件/数据]进行分析",
icon: MicroscopeIcon,
children: [{ id: "6195", name: "docx表格填充器" }],
},
{
suggestion: "微信文章撰写",
prompt: "针对[行业/产品]进行市场调研,分析市场规模、竞品和趋势。",
icon: ShapesIcon,
children: [{ id: "6134", name: "微信文章撰写" }],
},
{
suggestion: "精美ppt制作",
prompt: "针对[行业/产品]进行市场调研,分析市场规模、竞品和趋势。",
icon: ShapesIcon,
children: [{ id: "6192", name: "精美ppt制作" }],
},
],
suggestionsCreate: [
{
@ -252,6 +264,27 @@ export const zhCN: Translations = {
scrollToBottom: "滚动到底部",
},
threadMemoryPanel: {
title: "会话记忆",
load: "查看记忆",
loading: "加载中...",
save: "保存记忆",
saving: "保存中...",
remove: "删除记忆",
removing: "删除中...",
threadId: "threadId",
version: "版本",
unavailableVersion: "-",
summaryPlaceholder: "这里显示会话记忆总结,可编辑后保存",
toastLoadSuccess: "已加载会话记忆",
toastLoadFailed: "加载会话记忆失败",
toastConflict: "记忆已更新,请先重新加载再保存",
toastSaveSuccess: "会话记忆已保存",
toastSaveFailed: "保存会话记忆失败",
toastDeleteSuccess: "当前会话记忆已删除",
toastDeleteFailed: "删除会话记忆失败",
},
// Workspace Chat Page
chatPage: {
defaultSlogan: "来,一起学习工作吧",

View File

@ -32,32 +32,16 @@ export interface LocalSettings {
};
}
function clearLocalSettingsStorage() {
localStorage.removeItem(LOCAL_SETTINGS_KEY);
}
export function getLocalSettings(): LocalSettings {
if (typeof window === "undefined") {
return DEFAULT_LOCAL_SETTINGS;
}
const json = localStorage.getItem(LOCAL_SETTINGS_KEY);
try {
if (json) {
const settings = JSON.parse(json);
const mergedSettings = {
...DEFAULT_LOCAL_SETTINGS,
context: {
...DEFAULT_LOCAL_SETTINGS.context,
...settings.context,
},
layout: {
...DEFAULT_LOCAL_SETTINGS.layout,
...settings.layout,
},
notification: {
...DEFAULT_LOCAL_SETTINGS.notification,
...settings.notification,
},
};
return mergedSettings;
}
} catch {}
clearLocalSettingsStorage();
return DEFAULT_LOCAL_SETTINGS;
}
@ -65,6 +49,6 @@ export function saveLocalSettings(settings: LocalSettings) {
void settings;
// 注释了,因为本地存储会污染模型配置
console.log("localStorage设置已经注释");
localStorage.removeItem(LOCAL_SETTINGS_KEY);
clearLocalSettingsStorage();
// localStorage.setItem(LOCAL_SETTINGS_KEY, JSON.stringify(settings));
}

View File

@ -0,0 +1,43 @@
import { expect, test } from "@playwright/test";
import { invalidNewChatUrl } from "./support/chat-helpers";
const LOCAL_SETTINGS_KEY = "deerflow.local-settings";
test.describe("本地设置清理", () => {
test("禁用持久化后会在进入工作台时清除历史 localStorage 设置", async ({
page,
}) => {
await page.addInitScript(
({ key, value }: { key: string; value: string }) => {
window.localStorage.setItem(key, value);
},
{
key: LOCAL_SETTINGS_KEY,
value: JSON.stringify({
context: {
model_name: "gpt-5",
mode: "pro",
reasoning_effort: "high",
},
layout: {
sidebar_collapsed: true,
},
notification: {
enabled: false,
},
}),
},
);
await page.goto(invalidNewChatUrl());
await expect(page.locator("textarea[name='message']")).toBeVisible();
await expect
.poll(
() => page.evaluate((key) => window.localStorage.getItem(key), LOCAL_SETTINGS_KEY),
{ message: "expected deprecated local settings storage to be cleared" },
)
.toBeNull();
});
});