68 lines
2.3 KiB
Python
68 lines
2.3 KiB
Python
from typing import Annotated, NotRequired, TypedDict
|
|
|
|
from langchain.agents import AgentState
|
|
|
|
ARTIFACTS_REPLACE_SENTINEL = "__deerflow_replace_artifacts__"
|
|
|
|
|
|
class SandboxState(TypedDict):
|
|
sandbox_id: NotRequired[str | None]
|
|
|
|
|
|
class ThreadDataState(TypedDict):
|
|
workspace_path: NotRequired[str | None]
|
|
uploads_path: NotRequired[str | None]
|
|
outputs_path: NotRequired[str | None]
|
|
|
|
|
|
class ViewedImageData(TypedDict):
|
|
base64: str
|
|
mime_type: str
|
|
|
|
|
|
def merge_artifacts(existing: list[str] | None, new: list[str] | None) -> list[str]:
|
|
"""Reducer for artifacts list - merges and deduplicates artifacts."""
|
|
def _clean(values: list[str] | None) -> list[str]:
|
|
if not values:
|
|
return []
|
|
return [v for v in values if isinstance(v, str) and v != ARTIFACTS_REPLACE_SENTINEL]
|
|
|
|
cleaned_existing = _clean(existing)
|
|
cleaned_new = _clean(new)
|
|
|
|
if new and new[0] == ARTIFACTS_REPLACE_SENTINEL:
|
|
return list(dict.fromkeys(cleaned_new))
|
|
if existing is None:
|
|
return cleaned_new
|
|
if new is None:
|
|
return cleaned_existing
|
|
# Use dict.fromkeys to deduplicate while preserving order
|
|
return list(dict.fromkeys(cleaned_existing + cleaned_new))
|
|
|
|
|
|
def merge_viewed_images(existing: dict[str, ViewedImageData] | None, new: dict[str, ViewedImageData] | None) -> dict[str, ViewedImageData]:
|
|
"""Reducer for viewed_images dict - merges image dictionaries.
|
|
|
|
Special case: If new is an empty dict {}, it clears the existing images.
|
|
This allows middlewares to clear the viewed_images state after processing.
|
|
"""
|
|
if existing is None:
|
|
return new or {}
|
|
if new is None:
|
|
return existing
|
|
# Special case: empty dict means clear all viewed images
|
|
if len(new) == 0:
|
|
return {}
|
|
# Merge dictionaries, new values override existing ones for same keys
|
|
return {**existing, **new}
|
|
|
|
|
|
class ThreadState(AgentState):
|
|
sandbox: NotRequired[SandboxState | None]
|
|
thread_data: NotRequired[ThreadDataState | None]
|
|
title: NotRequired[str | None]
|
|
artifacts: Annotated[list[str], merge_artifacts]
|
|
todos: NotRequired[list | None]
|
|
uploaded_files: NotRequired[list[dict] | None]
|
|
viewed_images: Annotated[dict[str, ViewedImageData], merge_viewed_images] # image_path -> {base64, mime_type}
|