diff --git a/.gitignore b/.gitignore index 97cb8ed9..15a9b64e 100644 --- a/.gitignore +++ b/.gitignore @@ -56,3 +56,5 @@ backend/Dockerfile.langgraph config.yaml.bak .playwright-mcp .gstack/ + +.planning/ \ No newline at end of file diff --git a/.planning/STATE.md b/.planning/STATE.md index 51750b06..ffad3890 100644 --- a/.planning/STATE.md +++ b/.planning/STATE.md @@ -3,8 +3,8 @@ gsd_state_version: 1.0 milestone: v1.0 milestone_name: milestone status: v1.0 milestone complete -last_updated: "2026-04-17T06:09:01.300Z" -last_activity: 2026-04-17 +last_updated: "2026-04-22T02:08:30.000Z" +last_activity: 2026-04-22 progress: total_phases: 8 completed_phases: 7 @@ -53,5 +53,6 @@ See: .planning/PROJECT.md (updated 2026-04-07) |---|-------------|------|--------|-----------| | 260415-owq | 归档当前git diff为Phase 06验收后补丁:检查改动、更新06-UAT/06-VERIFICATION/06-SUMMARY(必要时)与STATE,再做原子提交 | 2026-04-15 | atomic | [260415-owq-git-diff-phase-06-06-uat-06-verification](./quick/260415-owq-git-diff-phase-06-06-uat-06-verification/) | | 260416-koe | 归档 Phase 06 明确指代(“这张图”)语义修复到 GSD 流程(已验收,通过人工确认,免验证) | 2026-04-16 | pending | [260416-koe-phase-06](./quick/260416-koe-phase-06/) | +| 260422-e2i | 后端为会话历史消息增加时间戳字段(前端不显示) | 2026-04-22 | pending | [260422-e2i-message-timestamp](./quick/260422-e2i-message-timestamp/) | -Last activity: 2026-04-17 +Last activity: 2026-04-22 diff --git a/backend/packages/harness/deerflow/agents/lead_agent/agent.py b/backend/packages/harness/deerflow/agents/lead_agent/agent.py index c7e9d77b..6410ae1d 100644 --- a/backend/packages/harness/deerflow/agents/lead_agent/agent.py +++ b/backend/packages/harness/deerflow/agents/lead_agent/agent.py @@ -7,6 +7,7 @@ from langchain_core.runnables import RunnableConfig from deerflow.agents.lead_agent.prompt import apply_prompt_template from deerflow.agents.middlewares.clarification_middleware import ClarificationMiddleware from deerflow.agents.middlewares.loop_detection_middleware import LoopDetectionMiddleware +from deerflow.agents.middlewares.message_timestamp_middleware import MessageTimestampMiddleware from deerflow.agents.middlewares.memory_middleware import MemoryMiddleware from deerflow.agents.middlewares.subagent_limit_middleware import SubagentLimitMiddleware from deerflow.agents.middlewares.title_middleware import TitleMiddleware @@ -233,6 +234,9 @@ def _build_middlewares(config: RunnableConfig, model_name: str | None, agent_nam if get_app_config().token_usage.enabled: middlewares.append(TokenUsageMiddleware()) + # Stamp every conversation message with backend timestamp metadata. + middlewares.append(MessageTimestampMiddleware()) + # Add TitleMiddleware middlewares.append(TitleMiddleware()) diff --git a/backend/packages/harness/deerflow/agents/middlewares/message_timestamp_middleware.py b/backend/packages/harness/deerflow/agents/middlewares/message_timestamp_middleware.py new file mode 100644 index 00000000..203d9f18 --- /dev/null +++ b/backend/packages/harness/deerflow/agents/middlewares/message_timestamp_middleware.py @@ -0,0 +1,89 @@ +"""Middleware that stamps conversation messages with backend timestamps.""" + +from __future__ import annotations + +from datetime import datetime, timedelta, timezone +from typing import Any +from typing import override +from zoneinfo import ZoneInfo + +from langchain.agents import AgentState +from langchain.agents.middleware import AgentMiddleware +from langgraph.runtime import Runtime + +_TIMESTAMP_KEY = "deerflow_created_at" +try: + _BEIJING_TZ = ZoneInfo("Asia/Shanghai") +except Exception: + # Fallback when zoneinfo database is unavailable. + _BEIJING_TZ = timezone(timedelta(hours=8)) + + +def _beijing_iso_millis(dt: datetime) -> str: + return dt.astimezone(_BEIJING_TZ).isoformat(timespec="milliseconds") + + +def _extract_existing_timestamp(message: Any) -> str | None: + if isinstance(message, dict): + top = message.get("created_at") + if isinstance(top, str) and top: + return top + additional_kwargs = message.get("additional_kwargs") + if isinstance(additional_kwargs, dict): + value = additional_kwargs.get(_TIMESTAMP_KEY) or additional_kwargs.get("created_at") + if isinstance(value, str) and value: + return value + return None + + additional_kwargs = getattr(message, "additional_kwargs", None) + if isinstance(additional_kwargs, dict): + value = additional_kwargs.get(_TIMESTAMP_KEY) or additional_kwargs.get("created_at") + if isinstance(value, str) and value: + return value + return None + + +def _stamp_message(message: Any, timestamp: str) -> None: + if _extract_existing_timestamp(message): + return + + if isinstance(message, dict): + additional_kwargs = message.get("additional_kwargs") + if not isinstance(additional_kwargs, dict): + additional_kwargs = {} + message["additional_kwargs"] = additional_kwargs + additional_kwargs[_TIMESTAMP_KEY] = timestamp + return + + additional_kwargs = getattr(message, "additional_kwargs", None) + if not isinstance(additional_kwargs, dict): + additional_kwargs = {} + try: + setattr(message, "additional_kwargs", additional_kwargs) + except Exception: + return + additional_kwargs[_TIMESTAMP_KEY] = timestamp + + +def _stamp_messages(messages: list[Any]) -> None: + now = datetime.now(_BEIJING_TZ) + for idx, message in enumerate(messages): + _stamp_message(message, _beijing_iso_millis(now + timedelta(milliseconds=idx))) + + +class MessageTimestampMiddleware(AgentMiddleware): + """Ensure every persisted conversation message has a backend timestamp.""" + + @override + def after_model(self, state: AgentState, runtime: Runtime) -> dict | None: + messages = state.get("messages") + if isinstance(messages, list): + _stamp_messages(messages) + return None + + @override + async def aafter_model(self, state: AgentState, runtime: Runtime) -> dict | None: + messages = state.get("messages") + if isinstance(messages, list): + _stamp_messages(messages) + return None diff --git a/backend/packages/harness/deerflow/runtime/serialization.py b/backend/packages/harness/deerflow/runtime/serialization.py index 48853dfb..aecdb34e 100644 --- a/backend/packages/harness/deerflow/runtime/serialization.py +++ b/backend/packages/harness/deerflow/runtime/serialization.py @@ -12,6 +12,49 @@ from __future__ import annotations from typing import Any +_TIMESTAMP_KEYS: tuple[str, ...] = ("deerflow_created_at", "created_at", "timestamp", "sent_at") +_MESSAGE_TYPES: set[str] = {"human", "ai", "tool", "system", "function", "chat"} + + +def _read_message_timestamp(message: dict[str, Any]) -> str | None: + top = message.get("created_at") + if isinstance(top, str) and top: + return top + + additional_kwargs = message.get("additional_kwargs") + if isinstance(additional_kwargs, dict): + for key in _TIMESTAMP_KEYS: + value = additional_kwargs.get(key) + if isinstance(value, str) and value: + return value + + response_metadata = message.get("response_metadata") + if isinstance(response_metadata, dict): + for key in _TIMESTAMP_KEYS: + value = response_metadata.get(key) + if isinstance(value, str) and value: + return value + + return None + + +def _attach_created_at(message: Any) -> Any: + if not isinstance(message, dict): + return message + if message.get("type") not in _MESSAGE_TYPES: + return message + + timestamp = _read_message_timestamp(message) + if timestamp: + message["created_at"] = timestamp + return message + + +def _normalize_message_timestamps(payload: Any) -> Any: + if isinstance(payload, list): + return [_attach_created_at(item) for item in payload] + return _attach_created_at(payload) + def serialize_lc_object(obj: Any) -> Any: """Recursively serialize a LangChain object to a JSON-serialisable dict.""" @@ -52,7 +95,10 @@ def serialize_channel_values(channel_values: dict[str, Any]) -> dict[str, Any]: for key, value in channel_values.items(): if key.startswith("__pregel_") or key == "__interrupt__": continue - result[key] = serialize_lc_object(value) + serialized = serialize_lc_object(value) + if key == "messages": + serialized = _normalize_message_timestamps(serialized) + result[key] = serialized return result @@ -60,7 +106,8 @@ def serialize_messages_tuple(obj: Any) -> Any: """Serialize a messages-mode tuple ``(chunk, metadata)``.""" if isinstance(obj, tuple) and len(obj) == 2: chunk, metadata = obj - return [serialize_lc_object(chunk), metadata if isinstance(metadata, dict) else {}] + serialized_chunk = _normalize_message_timestamps(serialize_lc_object(chunk)) + return [serialized_chunk, metadata if isinstance(metadata, dict) else {}] return serialize_lc_object(obj) diff --git a/backend/tests/test_message_timestamp_middleware.py b/backend/tests/test_message_timestamp_middleware.py new file mode 100644 index 00000000..089f769c --- /dev/null +++ b/backend/tests/test_message_timestamp_middleware.py @@ -0,0 +1,31 @@ +from __future__ import annotations + +from langchain_core.messages import AIMessage, HumanMessage + +from deerflow.agents.middlewares.message_timestamp_middleware import MessageTimestampMiddleware + + +def test_after_model_stamps_missing_message_timestamps(): + middleware = MessageTimestampMiddleware() + state = { + "messages": [ + HumanMessage(content="hello"), + AIMessage(content="hi"), + ] + } + + middleware.after_model(state, runtime=None) # type: ignore[arg-type] + + timestamps = [msg.additional_kwargs.get("deerflow_created_at") for msg in state["messages"]] + assert all(isinstance(ts, str) and ts.endswith("+08:00") for ts in timestamps) + + +def test_after_model_keeps_existing_timestamp(): + middleware = MessageTimestampMiddleware() + human = HumanMessage(content="hello") + human.additional_kwargs["deerflow_created_at"] = "2026-04-22T01:00:00.000Z" + state = {"messages": [human, AIMessage(content="hi")]} + + middleware.after_model(state, runtime=None) # type: ignore[arg-type] + + assert state["messages"][0].additional_kwargs["deerflow_created_at"] == "2026-04-22T01:00:00.000Z" diff --git a/backend/tests/test_serialization.py b/backend/tests/test_serialization.py index b707d714..c3c55d54 100644 --- a/backend/tests/test_serialization.py +++ b/backend/tests/test_serialization.py @@ -114,6 +114,22 @@ def test_serialize_channel_values_serializes_objects(): assert result == {"obj": {"key": "v2"}} +def test_serialize_channel_values_promotes_message_created_at(): + from deerflow.runtime.serialization import serialize_channel_values + + raw = { + "messages": [ + { + "type": "human", + "content": "hello", + "additional_kwargs": {"deerflow_created_at": "2026-04-22T01:23:45.000Z"}, + } + ] + } + result = serialize_channel_values(raw) + assert result["messages"][0]["created_at"] == "2026-04-22T01:23:45.000Z" + + def test_serialize_messages_tuple(): from deerflow.runtime.serialization import serialize_messages_tuple @@ -130,6 +146,18 @@ def test_serialize_messages_tuple_non_dict_metadata(): assert result == [{"key": "v2"}, {}] +def test_serialize_messages_tuple_promotes_message_created_at(): + from deerflow.runtime.serialization import serialize_messages_tuple + + chunk = { + "type": "ai", + "content": "hi", + "additional_kwargs": {"deerflow_created_at": "2026-04-22T01:23:45.000Z"}, + } + result = serialize_messages_tuple((chunk, {"langgraph_node": "agent"})) + assert result[0]["created_at"] == "2026-04-22T01:23:45.000Z" + + def test_serialize_messages_tuple_fallback(): from deerflow.runtime.serialization import serialize_messages_tuple