fix(memory): use asyncio.to_thread for blocking file I/O in aupdate_memory (#2220)
* fix(memory): use asyncio.to_thread for blocking file I/O in aupdate_memory
`_finalize_update` performs synchronous blocking operations (os.mkdir,
file open/write/rename/stat) that were called directly from the async
`aupdate_memory` method, causing `BlockingError` from blockbuster when
running under an ASGI server. Wrap the call with `asyncio.to_thread` to
offload all blocking I/O to a thread pool.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
* fix(memory): use unique temp filename to prevent concurrent write collision
`file_path.with_suffix(".tmp")` produces a fixed path — concurrent saves
for the same agent (now possible after wrapping _finalize_update in
asyncio.to_thread) would clobber the same temp file. Use a UUID-suffixed
temp file so each write is isolated.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
* fix(memory): also offload _prepare_update_prompt to thread pool
FileMemoryStorage.load() inside _prepare_update_prompt performs
synchronous stat() and file read, blocking the event loop just like
_finalize_update did. Wrap _prepare_update_prompt in asyncio.to_thread
for the same reason.
The async path now has no blocking file I/O on the event loop:
to_thread(_prepare_update_prompt) → await model.ainvoke() → to_thread(_finalize_update)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
---------
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
4ba3167f48
commit
8760937439
@ -4,6 +4,7 @@ import abc
|
|||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import threading
|
import threading
|
||||||
|
import uuid
|
||||||
from datetime import UTC, datetime
|
from datetime import UTC, datetime
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any
|
from typing import Any
|
||||||
@ -144,7 +145,7 @@ class FileMemoryStorage(MemoryStorage):
|
|||||||
file_path.parent.mkdir(parents=True, exist_ok=True)
|
file_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
memory_data["lastUpdated"] = utc_now_iso_z()
|
memory_data["lastUpdated"] = utc_now_iso_z()
|
||||||
|
|
||||||
temp_path = file_path.with_suffix(".tmp")
|
temp_path = file_path.with_suffix(f".{uuid.uuid4().hex}.tmp")
|
||||||
with open(temp_path, "w", encoding="utf-8") as f:
|
with open(temp_path, "w", encoding="utf-8") as f:
|
||||||
json.dump(memory_data, f, indent=2, ensure_ascii=False)
|
json.dump(memory_data, f, indent=2, ensure_ascii=False)
|
||||||
|
|
||||||
|
|||||||
@ -394,7 +394,8 @@ class MemoryUpdater:
|
|||||||
) -> bool:
|
) -> bool:
|
||||||
"""Update memory asynchronously based on conversation messages."""
|
"""Update memory asynchronously based on conversation messages."""
|
||||||
try:
|
try:
|
||||||
prepared = self._prepare_update_prompt(
|
prepared = await asyncio.to_thread(
|
||||||
|
self._prepare_update_prompt,
|
||||||
messages=messages,
|
messages=messages,
|
||||||
agent_name=agent_name,
|
agent_name=agent_name,
|
||||||
correction_detected=correction_detected,
|
correction_detected=correction_detected,
|
||||||
@ -406,7 +407,8 @@ class MemoryUpdater:
|
|||||||
current_memory, prompt = prepared
|
current_memory, prompt = prepared
|
||||||
model = self._get_model()
|
model = self._get_model()
|
||||||
response = await model.ainvoke(prompt)
|
response = await model.ainvoke(prompt)
|
||||||
return self._finalize_update(
|
return await asyncio.to_thread(
|
||||||
|
self._finalize_update,
|
||||||
current_memory=current_memory,
|
current_memory=current_memory,
|
||||||
response_content=response.content,
|
response_content=response.content,
|
||||||
thread_id=thread_id,
|
thread_id=thread_id,
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user