deerflow2/backend/tests/test_thread_memory_queue.py

34 lines
1.2 KiB
Python

from unittest.mock import patch
from deerflow.agents.memory.thread_queue import ThreadMemoryUpdateQueue
def test_thread_queue_keeps_latest_message_per_thread():
queue = ThreadMemoryUpdateQueue()
with patch.object(queue, "_reset_timer"):
queue.add("thread-a", ["msg-1"])
queue.add("thread-b", ["msg-2"])
queue.add("thread-a", ["msg-3"])
assert set(queue._queue_by_thread.keys()) == {"thread-a", "thread-b"}
assert queue._queue_by_thread["thread-a"].messages == ["msg-3"]
def test_thread_queue_processes_single_thread_without_affecting_others():
queue = ThreadMemoryUpdateQueue()
with patch.object(queue, "_reset_timer"):
queue.add("thread-a", ["a-msg"])
queue.add("thread-b", ["b-msg"])
updater_calls: list[tuple[list[str], str]] = []
class _FakeUpdater:
def update_memory(self, messages, thread_id):
updater_calls.append((messages, thread_id))
with patch("deerflow.agents.memory.thread_updater.ThreadMemoryUpdater", _FakeUpdater):
queue._process_thread("thread-a")
assert updater_calls == [(["a-msg"], "thread-a")]
assert "thread-b" in queue._queue_by_thread