"""Debounced queue for per-thread memory updates.""" from __future__ import annotations import threading from dataclasses import dataclass, field from datetime import datetime from typing import Any from deerflow.config.thread_memory_config import get_thread_memory_config @dataclass class ThreadConversationContext: thread_id: str messages: list[Any] timestamp: datetime = field(default_factory=datetime.utcnow) class ThreadMemoryUpdateQueue: def __init__(self): self._queue: list[ThreadConversationContext] = [] self._lock = threading.Lock() self._timer: threading.Timer | None = None self._processing = False def add(self, thread_id: str, messages: list[Any]) -> None: config = get_thread_memory_config() if not config.enabled: return with self._lock: self._queue = [c for c in self._queue if c.thread_id != thread_id] self._queue.append(ThreadConversationContext(thread_id=thread_id, messages=messages)) self._reset_timer() def _reset_timer(self) -> None: config = get_thread_memory_config() if self._timer is not None: self._timer.cancel() self._timer = threading.Timer(config.debounce_seconds, self._process_queue) self._timer.daemon = True self._timer.start() def _process_queue(self) -> None: from deerflow.agents.memory.thread_updater import ThreadMemoryUpdater with self._lock: if self._processing: self._reset_timer() return if not self._queue: return self._processing = True contexts = self._queue.copy() self._queue.clear() self._timer = None try: updater = ThreadMemoryUpdater() for context in contexts: updater.update_memory(context.messages, context.thread_id) finally: with self._lock: self._processing = False _thread_queue: ThreadMemoryUpdateQueue | None = None _lock = threading.Lock() def get_thread_memory_queue() -> ThreadMemoryUpdateQueue: global _thread_queue with _lock: if _thread_queue is None: _thread_queue = ThreadMemoryUpdateQueue() return _thread_queue