deerflow2/backend/tests/test_thread_memory_storage.py

89 lines
3.2 KiB
Python

import json
from deerflow.agents.memory.thread_storage import SqliteThreadMemoryStorage
def _payload():
return {
"ownerId": None,
"user": {
"workContext": {"summary": "Frontend engineer", "updatedAt": "2026-05-08T00:00:00Z"},
"personalContext": {"summary": "Prefers Chinese", "updatedAt": "2026-05-08T00:00:00Z"},
"topOfMind": {"summary": "Thread memory migration", "updatedAt": "2026-05-08T00:00:00Z"},
},
"history": {
"recentMonths": {"summary": "Worked on memory features", "updatedAt": "2026-05-08T00:00:00Z"},
"earlierContext": {"summary": "", "updatedAt": ""},
"longTermBackground": {"summary": "Builds web products", "updatedAt": "2026-05-08T00:00:00Z"},
},
"facts": [],
}
def test_sqlite_thread_memory_compare_and_swap(tmp_path):
storage = SqliteThreadMemoryStorage(str(tmp_path / "thread-memory.db"))
thread_id = "thread-1"
assert storage.save(thread_id, _payload(), expected_version=0) is True
loaded = storage.load(thread_id)
assert loaded is not None
assert loaded["memoryVersion"] == 0
# wrong expected version should fail
assert storage.save(thread_id, _payload(), expected_version=9) is False
# correct version should pass and increment
assert storage.save(thread_id, _payload(), expected_version=0) is True
loaded2 = storage.load(thread_id)
assert loaded2 is not None
assert loaded2["memoryVersion"] == 1
def test_sqlite_thread_memory_saves_markdown_payload(tmp_path):
db_path = tmp_path / "thread-memory.db"
storage = SqliteThreadMemoryStorage(str(db_path))
thread_id = "thread-md"
assert storage.save(thread_id, _payload(), expected_version=0) is True
with storage._lock:
row = storage._conn.execute("SELECT memory_md FROM thread_memory WHERE thread_id = ?", (thread_id,)).fetchone()
assert row is not None
assert isinstance(row[0], str)
assert "## User" in row[0]
assert "## History" in row[0]
assert "## Facts" in row[0]
def test_sqlite_thread_memory_loads_markdown_row(tmp_path):
db_path = tmp_path / "thread-memory.db"
storage = SqliteThreadMemoryStorage(str(db_path))
thread_id = "thread-load"
payload = _payload()
with storage._lock:
storage._conn.execute(
"""
INSERT INTO thread_memory (thread_id, owner_id, memory_md, memory_version, last_updated)
VALUES (?, ?, ?, 0, datetime('now'))
""",
(
thread_id,
"owner-1",
(
"# Thread Memory\n\n"
"Owner ID: owner-1\n\n"
"## User\n```json\n"
+ json.dumps(payload["user"], ensure_ascii=False, indent=2)
+ "\n```\n\n## History\n```json\n"
+ json.dumps(payload["history"], ensure_ascii=False, indent=2)
+ "\n```\n\n## Facts\n```json\n[]\n```"
),
),
)
storage._conn.commit()
loaded = storage.load(thread_id)
assert loaded is not None
assert loaded["ownerId"] == "owner-1"
assert loaded["user"]["workContext"]["summary"] == "Frontend engineer"