* fix(backend): stream DeerFlowClient AI text as token deltas (#1969) DeerFlowClient.stream() subscribed to LangGraph stream_mode=["values", "custom"] which only delivers full-state snapshots at graph-node boundaries, so AI replies were dumped as a single messages-tuple event per node instead of streaming token-by-token. `client.stream("hello")` looked identical to `client.chat("hello")` — the bug reported in #1969. Subscribe to "messages" mode as well, forward AIMessageChunk deltas as messages-tuple events with delta semantics (consumers accumulate by id), and dedup the values-snapshot path so it does not re-synthesize AI text that was already streamed. Introduce a per-id usage_metadata counter so the final AIMessage in the values snapshot and the final "messages" chunk — which carry the same cumulative usage — are not double-counted. chat() now accumulates per-id deltas and returns the last message's full accumulated text. Non-streaming mock sources (single event per id) are a degenerate case of the same logic, keeping existing callers and tests backward compatible. Verified end-to-end against a real LLM: a 15-number count emits 35 messages-tuple events with BPE subword boundaries clearly visible ("eleven" -> "ele" / "ven", "twelve" -> "tw" / "elve"), 476ms across the window, end-event usage matches the values-snapshot usage exactly (not doubled). tests/test_client_live.py::TestLiveStreaming passes. New unit tests: - test_messages_mode_emits_token_deltas: 3 AIMessageChunks produce 3 delta events with correct content/id/usage, values-snapshot does not duplicate, usage counted once. - test_chat_accumulates_streamed_deltas: chat() rebuilds full text from deltas. - test_messages_mode_tool_message: ToolMessage delivered via messages mode is not duplicated by the values-snapshot synthesis path. The stream() docstring now documents why this client does not reuse Gateway's run_agent() / StreamBridge pipeline (sync vs async, raw LangChain objects vs serialized dicts, single caller vs HTTP fan-out). Fixes #1969 * refactor(backend): simplify DeerFlowClient streaming helpers (#1969) Post-review cleanup for the token-level streaming fix. No behavior change for correct inputs; one efficiency regression fixed. Fix: chat() O(n²) accumulator ----------------------------- `chat()` accumulated per-id text via `buffers[id] = buffers.get(id,"") + delta`, which is O(n) per concat → O(n²) total over a streamed response. At ~2 KB cumulative text this becomes user-visible; at 50 KB / 5000 chunks it costs roughly 100-300 ms of pure copying. Switched to `dict[str, list[str]]` + `"".join()` once at return. Cleanup ------- - Extract `_serialize_tool_calls`, `_ai_text_event`, `_ai_tool_calls_event`, and `_tool_message_event` static helpers. The messages-mode and values-mode branches previously repeated four inline dict literals each; they now call the same builders. - `StreamEvent.type` is now typed as `Literal["values", "messages-tuple", "custom", "end"]` via a `StreamEventType` alias. Makes the closed set explicit and catches typos at type-check time. - Direct attribute access on `AIMessage`/`AIMessageChunk`: `.usage_metadata`, `.tool_calls`, `.id` all have default values on the base class, so the `getattr(..., None)` fallbacks were dead code. Removed from the hot path. - `_account_usage` parameter type loosened to `Any` so that LangChain's `UsageMetadata` TypedDict is accepted under strict type checking. - Trimmed narrating comments on `seen_ids` / `streamed_ids` / the values-synthesis skip block; kept the non-obvious ones that document the cross-mode dedup invariant. Net diff: -15 lines. All 132 unit tests + harness boundary test still pass; ruff check and ruff format pass. * docs(backend): add STREAMING.md design note (#1969) Dedicated design document for the token-level streaming architecture, prompted by the bug investigation in #1969. Contents: - Why two parallel streaming paths exist (Gateway HTTP/async vs DeerFlowClient sync/in-process) and why they cannot be merged. - LangGraph's three-layer mode naming (Graph "messages" vs Platform SDK "messages-tuple" vs HTTP SSE) and why a shared string constant would be harmful. - Gateway path: run_agent + StreamBridge + sse_consumer with a sequence diagram. - DeerFlowClient path: sync generator + direct yield, delta semantics, chat() accumulator. - Why the three id sets (seen_ids / streamed_ids / counted_usage_ids) each carry an independent invariant and cannot be collapsed. - End-to-end sequence for a real conversation turn. - Lessons from #1969: why mock-based tests missed the bug, why BPE subword boundaries in live output are the strongest correctness signal, and the regression test that locks it in. - Source code location index. Also: - Link from backend/CLAUDE.md Embedded Client section. - Link from backend/docs/README.md under Feature Documentation. * test(backend): add refactor regression guards for stream() (#1969) Three new tests in TestStream that lock the contract introduced by PR #1974 so any future refactor (sync->async migration, sharing a core with Gateway's run_agent, dedup strategy change) cannot silently change behavior. - test_dedup_requires_messages_before_values_invariant: canary that documents the order-dependence of cross-mode dedup. streamed_ids is populated only by the messages branch, so values-before-messages for the same id produces duplicate AI text events. Real LangGraph never inverts this order, but a refactor that does (or that makes dedup idempotent) must update this test deliberately. - test_messages_mode_golden_event_sequence: locks the *exact* event sequence (4 events: 2 messages-tuple deltas, 1 values snapshot, 1 end) for a canonical streaming turn. List equality gives a clear diff on any drift in order, type, or payload shape. - test_chat_accumulates_in_linear_time: perf canary for the O(n^2) fix in commit 1f11ba10. 10,000 single-char chunks must accumulate in under 1s; the threshold is wide enough to pass on slow CI but tight enough to fail if buffer = buffer + delta is restored. All three tests pass alongside the existing 12 TestStream tests (15/15). ruff check + ruff format clean. * docs(backend): clarify stream() docstring on JSON serialization (#1969) Replace the misleading "raw LangChain objects (AIMessage, usage_metadata as dataclasses), not dicts" claim in the "Why not reuse Gateway's run_agent?" section. The implementation already yields plain Python dicts (StreamEvent.data is dict, and usage_metadata is a TypedDict), so the original wording suggested a richer return type than the API actually delivers. The corrected wording focuses on what is actually true and relevant: this client skips the JSON/SSE serialization layer that Gateway adds for HTTP wire transmission, and yields stream event payloads directly as Python data structures. Addresses Copilot review feedback on PR #1974. * test(backend): document none-id messages dedup limitation (#1969) Add test_none_id_chunks_produce_duplicates_known_limitation to TestStream that explicitly documents and asserts the current behavior when an LLM provider emits AIMessageChunk with id=None (vLLM, certain custom backends). The cross-mode dedup machinery cannot record a None id in streamed_ids (guarded by ``if msg_id:``), so the values snapshot's reassembled AIMessage with a real id falls through and synthesizes a duplicate AI text event. The test asserts len == 2 and locks this as a known limitation rather than silently letting future contributors hit it without context. Why this is documented rather than fixed: * Falling back to ``metadata.get("id")`` does not help — LangGraph's messages-mode metadata never carries the message id. * Synthesizing ``f"_synth_{id(msg_chunk)}"`` only helps if the values snapshot uses the same fallback, which it does not. * A real fix requires provider cooperation (always emit chunk ids) or content-based dedup (false-positive risk), neither of which belongs in this PR. If a real fix lands, replace this test with a positive assertion that dedup works for None-id chunks. Addresses Copilot review feedback on PR #1974 (client.py:515). * fix(frontend): UI polish - fix CSS typo, dark mode border, and hardcoded colors (#1942) - Fix `font-norma` typo to `font-normal` in message-list subtask count - Fix dark mode `--border` using reddish hue (22.216) instead of neutral - Replace hardcoded `rgb(184,184,192)` in hero with `text-muted-foreground` - Replace hardcoded `bg-[#a3a1a1]` in streaming indicator with `bg-muted-foreground` - Add missing `font-sans` to welcome description `<pre>` for consistency - Make case-study-section padding responsive (`px-4 md:px-20`) Closes #1940 * docs: clarify deployment sizing guidance (#1963) * fix(frontend): prevent stale 'new' thread ID from triggering 422 history requests (#1960) After history.replaceState updates the URL from /chats/new to /chats/{UUID}, Next.js useParams does not update because replaceState bypasses the router. The useEffect in useThreadChat would then set threadIdFromPath ('new') as the threadId, causing the LangGraph SDK to call POST /threads/new/history which returns HTTP 422 (Invalid thread ID: must be a UUID). This fix adds a guard to skip the threadId update when threadIdFromPath is the literal string 'new', preserving the already-correct UUID that was set when the thread was created. * fix(frontend): avoid using route new as thread id (#1967) Co-authored-by: luoxiao6645 <luoxiao6645@gmail.com> * Fix(subagent): Event loop conflict in SubagentExecutor.execute() (#1965) * Fix event loop conflict in SubagentExecutor.execute() When SubagentExecutor.execute() is called from within an already-running event loop (e.g., when the parent agent uses async/await), calling asyncio.run() creates a new event loop that conflicts with asyncio primitives (like httpx.AsyncClient) that were created in and bound to the parent loop. This fix detects if we're already in a running event loop, and if so, runs the subagent in a separate thread with its own isolated event loop to avoid conflicts. Fixes: sub-task cards not appearing in Ultra mode when using async parent agents Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(subagent): harden isolated event loop execution --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com> * refactor(backend): remove dead getattr in _tool_message_event --------- Co-authored-by: greatmengqi <chenmengqi.0376@bytedance.com> Co-authored-by: Xinmin Zeng <135568692+fancyboi999@users.noreply.github.com> Co-authored-by: 13ernkastel <LennonCMJ@live.com> Co-authored-by: siwuai <458372151@qq.com> Co-authored-by: 肖 <168966994+luoxiao6645@users.noreply.github.com> Co-authored-by: luoxiao6645 <luoxiao6645@gmail.com> Co-authored-by: Saber <11769524+hawkli-1994@users.noreply.github.com> Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com> Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
This commit is contained in:
parent
654354c624
commit
b1aabe88b8
|
|
@ -395,14 +395,16 @@ Both can be modified at runtime via Gateway API endpoints or `DeerFlowClient` me
|
||||||
**Architecture**: Imports the same `deerflow` modules that LangGraph Server and Gateway API use. Shares the same config files and data directories. No FastAPI dependency.
|
**Architecture**: Imports the same `deerflow` modules that LangGraph Server and Gateway API use. Shares the same config files and data directories. No FastAPI dependency.
|
||||||
|
|
||||||
**Agent Conversation** (replaces LangGraph Server):
|
**Agent Conversation** (replaces LangGraph Server):
|
||||||
- `chat(message, thread_id)` — synchronous, returns final text
|
- `chat(message, thread_id)` — synchronous, accumulates streaming deltas per message-id and returns the final AI text
|
||||||
- `stream(message, thread_id)` — yields `StreamEvent` aligned with LangGraph SSE protocol:
|
- `stream(message, thread_id)` — subscribes to LangGraph `stream_mode=["values", "messages", "custom"]` and yields `StreamEvent`:
|
||||||
- `"values"` — full state snapshot (title, messages, artifacts)
|
- `"values"` — full state snapshot (title, messages, artifacts); AI text already delivered via `messages` mode is **not** re-synthesized here to avoid duplicate deliveries
|
||||||
- `"messages-tuple"` — per-message update (AI text, tool calls, tool results)
|
- `"messages-tuple"` — per-chunk update: for AI text this is a **delta** (concat per `id` to rebuild the full message); tool calls and tool results are emitted once each
|
||||||
- `"end"` — stream finished
|
- `"custom"` — forwarded from `StreamWriter`
|
||||||
|
- `"end"` — stream finished (carries cumulative `usage` counted once per message id)
|
||||||
- Agent created lazily via `create_agent()` + `_build_middlewares()`, same as `make_lead_agent`
|
- Agent created lazily via `create_agent()` + `_build_middlewares()`, same as `make_lead_agent`
|
||||||
- Supports `checkpointer` parameter for state persistence across turns
|
- Supports `checkpointer` parameter for state persistence across turns
|
||||||
- `reset_agent()` forces agent recreation (e.g. after memory or skill changes)
|
- `reset_agent()` forces agent recreation (e.g. after memory or skill changes)
|
||||||
|
- See [docs/STREAMING.md](docs/STREAMING.md) for the full design: why Gateway and DeerFlowClient are parallel paths, LangGraph's `stream_mode` semantics, the per-id dedup invariants, and regression testing strategy
|
||||||
|
|
||||||
**Gateway Equivalent Methods** (replaces Gateway API):
|
**Gateway Equivalent Methods** (replaces Gateway API):
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -15,6 +15,7 @@ This directory contains detailed documentation for the DeerFlow backend.
|
||||||
|
|
||||||
| Document | Description |
|
| Document | Description |
|
||||||
|----------|-------------|
|
|----------|-------------|
|
||||||
|
| [STREAMING.md](STREAMING.md) | Token-level streaming design: Gateway vs DeerFlowClient paths, `stream_mode` semantics, per-id dedup |
|
||||||
| [FILE_UPLOAD.md](FILE_UPLOAD.md) | File upload functionality |
|
| [FILE_UPLOAD.md](FILE_UPLOAD.md) | File upload functionality |
|
||||||
| [PATH_EXAMPLES.md](PATH_EXAMPLES.md) | Path types and usage examples |
|
| [PATH_EXAMPLES.md](PATH_EXAMPLES.md) | Path types and usage examples |
|
||||||
| [summarization.md](summarization.md) | Context summarization feature |
|
| [summarization.md](summarization.md) | Context summarization feature |
|
||||||
|
|
@ -47,6 +48,7 @@ docs/
|
||||||
├── PATH_EXAMPLES.md # Path usage examples
|
├── PATH_EXAMPLES.md # Path usage examples
|
||||||
├── summarization.md # Summarization feature
|
├── summarization.md # Summarization feature
|
||||||
├── plan_mode_usage.md # Plan mode feature
|
├── plan_mode_usage.md # Plan mode feature
|
||||||
|
├── STREAMING.md # Token-level streaming design
|
||||||
├── AUTO_TITLE_GENERATION.md # Title generation
|
├── AUTO_TITLE_GENERATION.md # Title generation
|
||||||
├── TITLE_GENERATION_IMPLEMENTATION.md # Title implementation details
|
├── TITLE_GENERATION_IMPLEMENTATION.md # Title implementation details
|
||||||
└── TODO.md # Roadmap and issues
|
└── TODO.md # Roadmap and issues
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,351 @@
|
||||||
|
# DeerFlow 流式输出设计
|
||||||
|
|
||||||
|
本文档解释 DeerFlow 是如何把 LangGraph agent 的事件流端到端送到两类消费者(HTTP 客户端、嵌入式 Python 调用方)的:两条路径为什么**必须**并存、它们各自的契约是什么、以及设计里那些 non-obvious 的不变式。
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## TL;DR
|
||||||
|
|
||||||
|
- DeerFlow 有**两条并行**的流式路径:**Gateway 路径**(async / HTTP SSE / JSON 序列化)服务浏览器和 IM 渠道;**DeerFlowClient 路径**(sync / in-process / 原生 LangChain 对象)服务 Jupyter、脚本、测试。它们**无法合并**——消费者模型不同。
|
||||||
|
- 两条路径都从 `create_agent()` 工厂出发,核心都是订阅 LangGraph 的 `stream_mode=["values", "messages", "custom"]`。`values` 是节点级 state 快照,`messages` 是 LLM token 级 delta,`custom` 是显式 `StreamWriter` 事件。**这三种模式不是详细程度的梯度,是三个独立的事件源**,要 token 流就必须显式订阅 `messages`。
|
||||||
|
- 嵌入式 client 为每个 `stream()` 调用维护三个 `set[str]`:`seen_ids` / `streamed_ids` / `counted_usage_ids`。三者看起来相似但管理**三个独立的不变式**,不能合并。
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 为什么有两条流式路径
|
||||||
|
|
||||||
|
两条路径服务的消费者模型根本不同:
|
||||||
|
|
||||||
|
| 维度 | Gateway 路径 | DeerFlowClient 路径 |
|
||||||
|
|---|---|---|
|
||||||
|
| 入口 | FastAPI `/runs/stream` endpoint | `DeerFlowClient.stream(message)` |
|
||||||
|
| 触发层 | `runtime/runs/worker.py::run_agent` | `packages/harness/deerflow/client.py::DeerFlowClient.stream` |
|
||||||
|
| 执行模型 | `async def` + `agent.astream()` | sync generator + `agent.stream()` |
|
||||||
|
| 事件传输 | `StreamBridge`(asyncio Queue)+ `sse_consumer` | 直接 `yield` |
|
||||||
|
| 序列化 | `serialize(chunk)` → 纯 JSON dict,匹配 LangGraph Platform wire 格式 | `StreamEvent.data`,携带原生 LangChain 对象 |
|
||||||
|
| 消费者 | 前端 `useStream` React hook、飞书/Slack/Telegram channel、LangGraph SDK 客户端 | Jupyter notebook、集成测试、内部 Python 脚本 |
|
||||||
|
| 生命周期管理 | `RunManager`:run_id 跟踪、disconnect 语义、multitask 策略、heartbeat | 无;函数返回即结束 |
|
||||||
|
| 断连恢复 | `Last-Event-ID` SSE 重连 | 无需要 |
|
||||||
|
|
||||||
|
**两条路径的存在是 DRY 的刻意妥协**:Gateway 的全部基础设施(async + Queue + JSON + RunManager)**都是为了跨网络边界把事件送给 HTTP 消费者**。当生产者(agent)和消费者(Python 调用栈)在同一个进程时,这整套东西都是纯开销。
|
||||||
|
|
||||||
|
### 为什么不能让 DeerFlowClient 复用 Gateway
|
||||||
|
|
||||||
|
曾经考虑过三种复用方案,都被否决:
|
||||||
|
|
||||||
|
1. **让 `client.stream()` 变成 `async def client.astream()`**
|
||||||
|
breaking change。用户用不上的 `async for` / `asyncio.run()` 要硬塞进 Jupyter notebook 和同步脚本。DeerFlowClient 的一大卖点("把 agent 当普通函数调用")直接消失。
|
||||||
|
|
||||||
|
2. **在 `client.stream()` 内部起一个独立事件循环线程,用 `StreamBridge` 在 sync/async 之间做桥接**
|
||||||
|
引入线程池、队列、信号量。为了"消除重复",把**复杂度**代替代码行数引进来。是典型的"wrong abstraction"——开销高于复用收益。
|
||||||
|
|
||||||
|
3. **让 `run_agent` 自己兼容 sync mode**
|
||||||
|
给 Gateway 加一条用不到的死分支,污染 worker.py 的焦点。
|
||||||
|
|
||||||
|
所以两条路径的事件处理逻辑会**相似但不共享**。这是刻意设计,不是疏忽。
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## LangGraph `stream_mode` 三层语义
|
||||||
|
|
||||||
|
LangGraph 的 `agent.stream(stream_mode=[...])` 是**多路复用**接口:一次订阅多个 mode,每个 mode 是一个独立的事件源。三种核心 mode:
|
||||||
|
|
||||||
|
```mermaid
|
||||||
|
flowchart LR
|
||||||
|
classDef values fill:#B8C5D1,stroke:#5A6B7A,color:#2C3E50
|
||||||
|
classDef messages fill:#C9B8A8,stroke:#7A6B5A,color:#2C3E50
|
||||||
|
classDef custom fill:#B5C4B1,stroke:#5A7A5A,color:#2C3E50
|
||||||
|
|
||||||
|
subgraph LG["LangGraph agent graph"]
|
||||||
|
direction TB
|
||||||
|
Node1["node: LLM call"]
|
||||||
|
Node2["node: tool call"]
|
||||||
|
Node3["node: reducer"]
|
||||||
|
end
|
||||||
|
|
||||||
|
LG -->|"每个节点完成后"| V["values: 完整 state 快照"]
|
||||||
|
Node1 -->|"LLM 每产生一个 token"| M["messages: (AIMessageChunk, meta)"]
|
||||||
|
Node1 -->|"StreamWriter.write()"| C["custom: 任意 dict"]
|
||||||
|
|
||||||
|
class V values
|
||||||
|
class M messages
|
||||||
|
class C custom
|
||||||
|
```
|
||||||
|
|
||||||
|
| Mode | 发射时机 | Payload | 粒度 |
|
||||||
|
|---|---|---|---|
|
||||||
|
| `values` | 每个 graph 节点完成后 | 完整 state dict(title、messages、artifacts)| 节点级 |
|
||||||
|
| `messages` | LLM 每次 yield 一个 chunk;tool 节点完成时 | `(AIMessageChunk \| ToolMessage, metadata_dict)` | token 级 |
|
||||||
|
| `custom` | 用户代码显式调用 `StreamWriter.write()` | 任意 dict | 应用定义 |
|
||||||
|
|
||||||
|
### 两套命名的由来
|
||||||
|
|
||||||
|
同一件事在**三个协议层**有三个名字:
|
||||||
|
|
||||||
|
```
|
||||||
|
Application HTTP / SSE LangGraph Graph
|
||||||
|
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
|
||||||
|
│ frontend │ │ LangGraph │ │ agent.astream│
|
||||||
|
│ useStream │──"messages- │ Platform SDK │──"messages"──│ graph.astream│
|
||||||
|
│ Feishu IM │ tuple"──────│ HTTP wire │ │ │
|
||||||
|
└──────────────┘ └──────────────┘ └──────────────┘
|
||||||
|
```
|
||||||
|
|
||||||
|
- **Graph 层**(`agent.stream` / `agent.astream`):LangGraph Python 直接 API,mode 叫 **`"messages"`**。
|
||||||
|
- **Platform SDK 层**(`langgraph-sdk` HTTP client):跨进程 HTTP 契约,mode 叫 **`"messages-tuple"`**。
|
||||||
|
- **Gateway worker** 显式做翻译:`if m == "messages-tuple": lg_modes.append("messages")`(`runtime/runs/worker.py:117-121`)。
|
||||||
|
|
||||||
|
**后果**:`DeerFlowClient.stream()` 直接调 `agent.stream()`(Graph 层),所以必须传 `"messages"`。`app/channels/manager.py` 通过 `langgraph-sdk` 走 HTTP SDK,所以传 `"messages-tuple"`。**这两个字符串不能互相替代**,也不能抽成"一个共享常量"——它们是不同协议层的 type alias,共享只会让某一层说不是它母语的话。
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Gateway 路径:async + HTTP SSE
|
||||||
|
|
||||||
|
```mermaid
|
||||||
|
sequenceDiagram
|
||||||
|
participant Client as HTTP Client
|
||||||
|
participant API as FastAPI<br/>thread_runs.py
|
||||||
|
participant Svc as services.py<br/>start_run
|
||||||
|
participant Worker as worker.py<br/>run_agent (async)
|
||||||
|
participant Bridge as StreamBridge<br/>(asyncio.Queue)
|
||||||
|
participant Agent as LangGraph<br/>agent.astream
|
||||||
|
participant SSE as sse_consumer
|
||||||
|
|
||||||
|
Client->>API: POST /runs/stream
|
||||||
|
API->>Svc: start_run(body)
|
||||||
|
Svc->>Bridge: create bridge
|
||||||
|
Svc->>Worker: asyncio.create_task(run_agent(...))
|
||||||
|
Svc-->>API: StreamingResponse(sse_consumer)
|
||||||
|
API-->>Client: event-stream opens
|
||||||
|
|
||||||
|
par worker (producer)
|
||||||
|
Worker->>Agent: astream(stream_mode=lg_modes)
|
||||||
|
loop 每个 chunk
|
||||||
|
Agent-->>Worker: (mode, chunk)
|
||||||
|
Worker->>Bridge: publish(run_id, event, serialize(chunk))
|
||||||
|
end
|
||||||
|
Worker->>Bridge: publish_end(run_id)
|
||||||
|
and sse_consumer (consumer)
|
||||||
|
SSE->>Bridge: subscribe(run_id)
|
||||||
|
loop 每个 event
|
||||||
|
Bridge-->>SSE: StreamEvent
|
||||||
|
SSE-->>Client: "event: <name>\ndata: <json>\n\n"
|
||||||
|
end
|
||||||
|
end
|
||||||
|
```
|
||||||
|
|
||||||
|
关键组件:
|
||||||
|
|
||||||
|
- `runtime/runs/worker.py::run_agent` — 在 `asyncio.Task` 里跑 `agent.astream()`,把每个 chunk 通过 `serialize(chunk, mode=mode)` 转成 JSON,再 `bridge.publish()`。
|
||||||
|
- `runtime/stream_bridge` — 抽象 Queue。`publish/subscribe` 解耦生产者和消费者,支持 `Last-Event-ID` 重连、心跳、多订阅者 fan-out。
|
||||||
|
- `app/gateway/services.py::sse_consumer` — 从 bridge 订阅,格式化为 SSE wire 帧。
|
||||||
|
- `runtime/serialization.py::serialize` — mode-aware 序列化;`messages` mode 下 `serialize_messages_tuple` 把 `(chunk, metadata)` 转成 `[chunk.model_dump(), metadata]`。
|
||||||
|
|
||||||
|
**`StreamBridge` 的存在价值**:当生产者(`run_agent` 任务)和消费者(HTTP 连接)在不同的 asyncio task 里运行时,需要一个可以跨 task 传递事件的中介。Queue 同时还承担断连重连的 buffer 和多订阅者的 fan-out。
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## DeerFlowClient 路径:sync + in-process
|
||||||
|
|
||||||
|
```mermaid
|
||||||
|
sequenceDiagram
|
||||||
|
participant User as Python caller
|
||||||
|
participant Client as DeerFlowClient.stream
|
||||||
|
participant Agent as LangGraph<br/>agent.stream (sync)
|
||||||
|
|
||||||
|
User->>Client: for event in client.stream("hi"):
|
||||||
|
Client->>Agent: stream(stream_mode=["values","messages","custom"])
|
||||||
|
loop 每个 chunk
|
||||||
|
Agent-->>Client: (mode, chunk)
|
||||||
|
Client->>Client: 分发 mode<br/>构建 StreamEvent
|
||||||
|
Client-->>User: yield StreamEvent
|
||||||
|
end
|
||||||
|
Client-->>User: yield StreamEvent(type="end")
|
||||||
|
```
|
||||||
|
|
||||||
|
对比之下,sync 路径的每个环节都是显著更少的移动部件:
|
||||||
|
|
||||||
|
- 没有 `RunManager` —— 一次 `stream()` 调用对应一次生命周期,无需 run_id。
|
||||||
|
- 没有 `StreamBridge` —— 直接 `yield`,生产和消费在同一个 Python 调用栈,不需要跨 task 中介。
|
||||||
|
- 没有 JSON 序列化 —— `StreamEvent.data` 直接装原生 LangChain 对象(`AIMessage.content`、`usage_metadata` 的 `UsageMetadata` TypedDict)。Jupyter 用户拿到的是真正的类型,不是匿名 dict。
|
||||||
|
- 没有 asyncio —— 调用者可以直接 `for event in ...`,不必写 `async for`。
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 消费语义:delta vs cumulative
|
||||||
|
|
||||||
|
LangGraph `messages` mode 给出的是 **delta**:每个 `AIMessageChunk.content` 只包含这一次新 yield 的 token,**不是**从头的累计文本。
|
||||||
|
|
||||||
|
这个语义和 LangChain 的 `fs2 Stream` 风格一致:**上游发增量,下游负责累加**。Gateway 路径里前端 `useStream` React hook 自己维护累加器;DeerFlowClient 路径里 `chat()` 方法替调用者做累加。
|
||||||
|
|
||||||
|
### `DeerFlowClient.chat()` 的 O(n) 累加器
|
||||||
|
|
||||||
|
```python
|
||||||
|
chunks: dict[str, list[str]] = {}
|
||||||
|
last_id: str = ""
|
||||||
|
for event in self.stream(message, thread_id=thread_id, **kwargs):
|
||||||
|
if event.type == "messages-tuple" and event.data.get("type") == "ai":
|
||||||
|
msg_id = event.data.get("id") or ""
|
||||||
|
delta = event.data.get("content", "")
|
||||||
|
if delta:
|
||||||
|
chunks.setdefault(msg_id, []).append(delta)
|
||||||
|
last_id = msg_id
|
||||||
|
return "".join(chunks.get(last_id, ()))
|
||||||
|
```
|
||||||
|
|
||||||
|
**为什么不是 `buffers[id] = buffers.get(id,"") + delta`**:CPython 的字符串 in-place concat 优化仅在 refcount=1 且 LHS 是 local name 时生效;这里字符串存在 dict 里被 reassign,优化失效,每次都是 O(n) 拷贝 → 总体 O(n²)。实测 50 KB / 5000 chunk 的回复要 100-300ms 纯拷贝开销。用 `list` + `"".join()` 是 O(n)。
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 三个 id set 为什么不能合并
|
||||||
|
|
||||||
|
`DeerFlowClient.stream()` 在一次调用生命周期内维护三个 `set[str]`:
|
||||||
|
|
||||||
|
```python
|
||||||
|
seen_ids: set[str] = set() # values 路径内部 dedup
|
||||||
|
streamed_ids: set[str] = set() # messages → values 跨模式 dedup
|
||||||
|
counted_usage_ids: set[str] = set() # usage_metadata 幂等计数
|
||||||
|
```
|
||||||
|
|
||||||
|
乍看像是"三份几乎一样的东西",实际每个管**不同的不变式**。
|
||||||
|
|
||||||
|
| Set | 负责的不变式 | 被谁填充 | 被谁查询 |
|
||||||
|
|---|---|---|---|
|
||||||
|
| `seen_ids` | 连续两个 `values` 快照里同一条 message 只生成一个 `messages-tuple` 事件 | values 分支每处理一条消息就加入 | values 分支处理下一条消息前检查 |
|
||||||
|
| `streamed_ids` | 如果一条消息已经通过 `messages` 模式 token 级流过,values 快照到达时**不要**再合成一次完整 `messages-tuple` | messages 分支每发一个 AI/tool 事件就加入 | values 分支看到消息时检查 |
|
||||||
|
| `counted_usage_ids` | 同一个 `usage_metadata` 在 messages 末尾 chunk 和 values 快照的 final AIMessage 里各带一份,**累计总量只算一次** | `_account_usage()` 每次接受 usage 就加入 | `_account_usage()` 每次调用时检查 |
|
||||||
|
|
||||||
|
### 为什么不能只用一个 set
|
||||||
|
|
||||||
|
关键观察:**同一个 message id 在这三个 set 里的加入时机不同**。
|
||||||
|
|
||||||
|
```mermaid
|
||||||
|
sequenceDiagram
|
||||||
|
participant M as messages mode
|
||||||
|
participant V as values mode
|
||||||
|
participant SS as streamed_ids
|
||||||
|
participant SU as counted_usage_ids
|
||||||
|
participant SE as seen_ids
|
||||||
|
|
||||||
|
Note over M: 第一个 AI text chunk 到达
|
||||||
|
M->>SS: add(msg_id)
|
||||||
|
Note over M: 最后一个 chunk 带 usage
|
||||||
|
M->>SU: add(msg_id)
|
||||||
|
Note over V: snapshot 到达,包含同一条 AI message
|
||||||
|
V->>SE: add(msg_id)
|
||||||
|
V->>SS: 查询 → 已存在,跳过文本合成
|
||||||
|
V->>SU: 查询 → 已存在,不重复计数
|
||||||
|
```
|
||||||
|
|
||||||
|
- `seen_ids` **永远在 values 快照到达时**加入,所以它是 "values 已处理" 的标记。一条只出现在 messages 流里的消息(罕见但可能),`seen_ids` 里永远没有它。
|
||||||
|
- `streamed_ids` **在 messages 流的第一个有效事件时**加入。一条只通过 values 快照到达的非 AI 消息(HumanMessage、被 truncate 的 tool 消息),`streamed_ids` 里永远没有它。
|
||||||
|
- `counted_usage_ids` **只在看到非空 `usage_metadata` 时**加入。一条完全没有 usage 的消息(tool message、错误消息)永远不会进去。
|
||||||
|
|
||||||
|
**集合包含关系**:`counted_usage_ids ⊆ (streamed_ids ∪ seen_ids)` 大致成立,但**不是严格子集**,因为一条消息可以在 messages 模式流完 text 但**在最后那个带 usage 的 chunk 之前**就被 values snapshot 赶上——此时它已经在 `streamed_ids` 里,但还不在 `counted_usage_ids` 里。把它们合并成一个 dict-of-flags 会让这个微妙的时序依赖**从类型系统里消失**,变成注释里的一句话。三个独立的 set 把不变式显式化了:每个 set 名对应一个可以口头回答的问题。
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 端到端:一次真实对话的事件时序
|
||||||
|
|
||||||
|
假设调用 `client.stream("Count from 1 to 15")`,LLM 给出 "one\ntwo\n...\nfifteen"(88 字符),tokenizer 把它拆成 ~35 个 BPE chunk。下面是事件到达序列的精简版:
|
||||||
|
|
||||||
|
```mermaid
|
||||||
|
sequenceDiagram
|
||||||
|
participant U as User
|
||||||
|
participant C as DeerFlowClient
|
||||||
|
participant A as LangGraph<br/>agent.stream
|
||||||
|
|
||||||
|
U->>C: stream("Count ... 15")
|
||||||
|
C->>A: stream(mode=["values","messages","custom"])
|
||||||
|
|
||||||
|
A-->>C: ("values", {messages: [HumanMessage]})
|
||||||
|
C-->>U: StreamEvent(type="values", ...)
|
||||||
|
|
||||||
|
Note over A,C: LLM 开始 yield token
|
||||||
|
loop 35 次,约 476ms
|
||||||
|
A-->>C: ("messages", (AIMessageChunk(content="ele"), meta))
|
||||||
|
C->>C: streamed_ids.add(ai-1)
|
||||||
|
C-->>U: StreamEvent(type="messages-tuple",<br/>data={type:ai, content:"ele", id:ai-1})
|
||||||
|
end
|
||||||
|
|
||||||
|
Note over A: LLM finish_reason=stop,最后一个 chunk 带 usage
|
||||||
|
A-->>C: ("messages", (AIMessageChunk(content="", usage_metadata={...}), meta))
|
||||||
|
C->>C: counted_usage_ids.add(ai-1)<br/>(无文本,不 yield)
|
||||||
|
|
||||||
|
A-->>C: ("values", {messages: [..., AIMessage(complete)]})
|
||||||
|
C->>C: ai-1 in streamed_ids → 跳过合成
|
||||||
|
C->>C: 捕获 usage (已在 counted_usage_ids,no-op)
|
||||||
|
C-->>U: StreamEvent(type="values", ...)
|
||||||
|
|
||||||
|
C-->>U: StreamEvent(type="end", data={usage:{...}})
|
||||||
|
```
|
||||||
|
|
||||||
|
关键观察:
|
||||||
|
|
||||||
|
1. 用户看到 **35 个 messages-tuple 事件**,跨越约 476ms,每个事件带一个 token delta 和同一个 `id=ai-1`。
|
||||||
|
2. 最后一个 `values` 快照里的 `AIMessage` **不会**再触发一个完整的 `messages-tuple` 事件——因为 `ai-1 in streamed_ids` 跳过了合成。
|
||||||
|
3. `end` 事件里的 `usage` 正好等于那一份 cumulative usage,**不是它的两倍**——`counted_usage_ids` 在 messages 末尾 chunk 上已经吸收了,values 分支的重复访问是 no-op。
|
||||||
|
4. 消费者拿到的 `content` 是**增量**:"ele" 只包含 3 个字符,不是 "one\ntwo\n...ele"。想要完整文本要按 `id` 累加,`chat()` 已经帮你做了。
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 为什么这个设计容易出 bug,以及测试策略
|
||||||
|
|
||||||
|
本文档的直接起因是 bytedance/deer-flow#1969:`DeerFlowClient.stream()` 原本只订阅 `["values", "custom"]`,**漏了 `"messages"`**。结果 `client.stream("hello")` 等价于一次性返回,视觉上和 `chat()` 没区别。
|
||||||
|
|
||||||
|
这类 bug 有三个结构性原因:
|
||||||
|
|
||||||
|
1. **多协议层命名**:`messages` / `messages-tuple` / HTTP SSE `messages` 是同一概念的三个名字。在其中一层出错不会在另外两层报错。
|
||||||
|
2. **多消费者模型**:Gateway 和 DeerFlowClient 是两套独立实现,**没有单一的"订阅哪些 mode"的 single source of truth**。前者订阅对了不代表后者也订阅对了。
|
||||||
|
3. **mock 测试绕开了真实路径**:老测试用 `agent.stream.return_value = iter([dict_chunk, ...])` 喂 values 形状的 dict 模拟 state 快照。这样构造的输入**永远不会进入 `messages` mode 分支**,所以即使 `stream_mode` 里少一个元素,CI 依然全绿。
|
||||||
|
|
||||||
|
### 防御手段
|
||||||
|
|
||||||
|
真正的防线是**显式断言 "messages" mode 被订阅 + 用真实 chunk shape mock**:
|
||||||
|
|
||||||
|
```python
|
||||||
|
# tests/test_client.py::test_messages_mode_emits_token_deltas
|
||||||
|
agent.stream.return_value = iter([
|
||||||
|
("messages", (AIMessageChunk(content="Hel", id="ai-1"), {})),
|
||||||
|
("messages", (AIMessageChunk(content="lo ", id="ai-1"), {})),
|
||||||
|
("messages", (AIMessageChunk(content="world!", id="ai-1"), {})),
|
||||||
|
("values", {"messages": [HumanMessage(...), AIMessage(content="Hello world!", id="ai-1")]}),
|
||||||
|
])
|
||||||
|
# ...
|
||||||
|
assert [e.data["content"] for e in ai_text_events] == ["Hel", "lo ", "world!"]
|
||||||
|
assert len(ai_text_events) == 3 # values snapshot must NOT re-synthesize
|
||||||
|
assert "messages" in agent.stream.call_args.kwargs["stream_mode"]
|
||||||
|
```
|
||||||
|
|
||||||
|
**为什么这比"抽一个共享常量"更有效**:共享常量只能保证"用它的人写对字符串",但新增消费者的人可能根本不知道常量在哪。行为断言强制任何改动都要穿过**实际执行路径**,改回 `["values", "custom"]` 会立刻让 `assert "messages" in ...` 失败。
|
||||||
|
|
||||||
|
### 活体信号:BPE 子词边界
|
||||||
|
|
||||||
|
回归的最终验证是让真实 LLM 数 1-15,然后看是否能在输出里看到 tokenizer 的子词切分:
|
||||||
|
|
||||||
|
```
|
||||||
|
[5.460s] 'ele' / 'ven' eleven 被拆成两个 token
|
||||||
|
[5.508s] 'tw' / 'elve' twelve 拆两个
|
||||||
|
[5.568s] 'th' / 'irteen' thirteen 拆两个
|
||||||
|
[5.623s] 'four'/ 'teen' fourteen 拆两个
|
||||||
|
[5.677s] 'f' / 'if' / 'teen' fifteen 拆三个
|
||||||
|
```
|
||||||
|
|
||||||
|
子词切分是 tokenizer 的外部事实,**无法伪造**。能看到它就说明数据流**逐 chunk** 地穿过了整条管道,没有被任何中间层缓冲成整段。这种"活体信号"在流式系统里是比单元测试更高置信度的证据。
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 相关源码定位
|
||||||
|
|
||||||
|
| 关心什么 | 看这里 |
|
||||||
|
|---|---|
|
||||||
|
| DeerFlowClient 嵌入式流 | `packages/harness/deerflow/client.py::DeerFlowClient.stream` |
|
||||||
|
| `chat()` 的 delta 累加器 | `packages/harness/deerflow/client.py::DeerFlowClient.chat` |
|
||||||
|
| Gateway async 流 | `packages/harness/deerflow/runtime/runs/worker.py::run_agent` |
|
||||||
|
| HTTP SSE 帧输出 | `app/gateway/services.py::sse_consumer` / `format_sse` |
|
||||||
|
| 序列化到 wire 格式 | `packages/harness/deerflow/runtime/serialization.py` |
|
||||||
|
| LangGraph mode 命名翻译 | `packages/harness/deerflow/runtime/runs/worker.py:117-121` |
|
||||||
|
| 飞书渠道的增量卡片更新 | `app/channels/manager.py::_handle_streaming_chat` |
|
||||||
|
| Channels 自带的 delta/cumulative 防御性累加 | `app/channels/manager.py::_merge_stream_text` |
|
||||||
|
| Frontend useStream 支持的 mode 集合 | `frontend/src/core/api/stream-mode.ts` |
|
||||||
|
| 核心回归测试 | `backend/tests/test_client.py::TestStream::test_messages_mode_emits_token_deltas` |
|
||||||
|
|
@ -25,7 +25,7 @@ import uuid
|
||||||
from collections.abc import Generator, Sequence
|
from collections.abc import Generator, Sequence
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any
|
from typing import Any, Literal
|
||||||
|
|
||||||
from langchain.agents import create_agent
|
from langchain.agents import create_agent
|
||||||
from langchain.agents.middleware import AgentMiddleware
|
from langchain.agents.middleware import AgentMiddleware
|
||||||
|
|
@ -55,6 +55,9 @@ from deerflow.uploads.manager import (
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
StreamEventType = Literal["values", "messages-tuple", "custom", "end"]
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class StreamEvent:
|
class StreamEvent:
|
||||||
"""A single event from the streaming agent response.
|
"""A single event from the streaming agent response.
|
||||||
|
|
@ -69,7 +72,7 @@ class StreamEvent:
|
||||||
data: Event payload. Contents vary by type.
|
data: Event payload. Contents vary by type.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
type: str
|
type: StreamEventType
|
||||||
data: dict[str, Any] = field(default_factory=dict)
|
data: dict[str, Any] = field(default_factory=dict)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -254,13 +257,53 @@ class DeerFlowClient:
|
||||||
|
|
||||||
return get_available_tools(model_name=model_name, subagent_enabled=subagent_enabled)
|
return get_available_tools(model_name=model_name, subagent_enabled=subagent_enabled)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _serialize_tool_calls(tool_calls) -> list[dict]:
|
||||||
|
"""Reshape LangChain tool_calls into the wire format used in events."""
|
||||||
|
return [{"name": tc["name"], "args": tc["args"], "id": tc.get("id")} for tc in tool_calls]
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _ai_text_event(msg_id: str | None, text: str, usage: dict | None) -> "StreamEvent":
|
||||||
|
"""Build a ``messages-tuple`` AI text event, attaching usage when present."""
|
||||||
|
data: dict[str, Any] = {"type": "ai", "content": text, "id": msg_id}
|
||||||
|
if usage:
|
||||||
|
data["usage_metadata"] = usage
|
||||||
|
return StreamEvent(type="messages-tuple", data=data)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _ai_tool_calls_event(msg_id: str | None, tool_calls) -> "StreamEvent":
|
||||||
|
"""Build a ``messages-tuple`` AI tool-calls event."""
|
||||||
|
return StreamEvent(
|
||||||
|
type="messages-tuple",
|
||||||
|
data={
|
||||||
|
"type": "ai",
|
||||||
|
"content": "",
|
||||||
|
"id": msg_id,
|
||||||
|
"tool_calls": DeerFlowClient._serialize_tool_calls(tool_calls),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _tool_message_event(msg: ToolMessage) -> "StreamEvent":
|
||||||
|
"""Build a ``messages-tuple`` tool-result event from a ToolMessage."""
|
||||||
|
return StreamEvent(
|
||||||
|
type="messages-tuple",
|
||||||
|
data={
|
||||||
|
"type": "tool",
|
||||||
|
"content": DeerFlowClient._extract_text(msg.content),
|
||||||
|
"name": msg.name,
|
||||||
|
"tool_call_id": msg.tool_call_id,
|
||||||
|
"id": msg.id,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _serialize_message(msg) -> dict:
|
def _serialize_message(msg) -> dict:
|
||||||
"""Serialize a LangChain message to a plain dict for values events."""
|
"""Serialize a LangChain message to a plain dict for values events."""
|
||||||
if isinstance(msg, AIMessage):
|
if isinstance(msg, AIMessage):
|
||||||
d: dict[str, Any] = {"type": "ai", "content": msg.content, "id": getattr(msg, "id", None)}
|
d: dict[str, Any] = {"type": "ai", "content": msg.content, "id": getattr(msg, "id", None)}
|
||||||
if msg.tool_calls:
|
if msg.tool_calls:
|
||||||
d["tool_calls"] = [{"name": tc["name"], "args": tc["args"], "id": tc.get("id")} for tc in msg.tool_calls]
|
d["tool_calls"] = DeerFlowClient._serialize_tool_calls(msg.tool_calls)
|
||||||
if getattr(msg, "usage_metadata", None):
|
if getattr(msg, "usage_metadata", None):
|
||||||
d["usage_metadata"] = msg.usage_metadata
|
d["usage_metadata"] = msg.usage_metadata
|
||||||
return d
|
return d
|
||||||
|
|
@ -438,6 +481,53 @@ class DeerFlowClient:
|
||||||
consumers can switch between HTTP streaming and embedded mode
|
consumers can switch between HTTP streaming and embedded mode
|
||||||
without changing their event-handling logic.
|
without changing their event-handling logic.
|
||||||
|
|
||||||
|
Token-level streaming
|
||||||
|
~~~~~~~~~~~~~~~~~~~~~
|
||||||
|
This method subscribes to LangGraph's ``messages`` stream mode, so
|
||||||
|
``messages-tuple`` events for AI text are emitted as **deltas** as
|
||||||
|
the model generates tokens, not as one cumulative dump at node
|
||||||
|
completion. Each delta carries a stable ``id`` — consumers that
|
||||||
|
want the full text must accumulate ``content`` per ``id``.
|
||||||
|
``chat()`` already does this for you.
|
||||||
|
|
||||||
|
Tool calls and tool results are still emitted once per logical
|
||||||
|
message. ``values`` events continue to carry full state snapshots
|
||||||
|
after each graph node finishes; AI text already delivered via the
|
||||||
|
``messages`` stream is **not** re-synthesized from the snapshot to
|
||||||
|
avoid duplicate deliveries.
|
||||||
|
|
||||||
|
Why not reuse Gateway's ``run_agent``?
|
||||||
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
|
Gateway (``runtime/runs/worker.py``) has a complete streaming
|
||||||
|
pipeline: ``run_agent`` → ``StreamBridge`` → ``sse_consumer``. It
|
||||||
|
looks like this client duplicates that work, but the two paths
|
||||||
|
serve different audiences and **cannot** share execution:
|
||||||
|
|
||||||
|
* ``run_agent`` is ``async def`` and uses ``agent.astream()``;
|
||||||
|
this method is a sync generator using ``agent.stream()`` so
|
||||||
|
callers can write ``for event in client.stream(...)`` without
|
||||||
|
touching asyncio. Bridging the two would require spinning up
|
||||||
|
an event loop + thread per call.
|
||||||
|
* Gateway events are JSON-serialized by ``serialize()`` for SSE
|
||||||
|
wire transmission. This client yields in-process stream event
|
||||||
|
payloads directly as Python data structures (``StreamEvent``
|
||||||
|
with ``data`` as a plain ``dict``), without the extra
|
||||||
|
JSON/SSE serialization layer used for HTTP delivery.
|
||||||
|
* ``StreamBridge`` is an asyncio-queue decoupling producers from
|
||||||
|
consumers across an HTTP boundary (``Last-Event-ID`` replay,
|
||||||
|
heartbeats, multi-subscriber fan-out). A single in-process
|
||||||
|
caller with a direct iterator needs none of that.
|
||||||
|
|
||||||
|
So ``DeerFlowClient.stream()`` is a parallel, sync, in-process
|
||||||
|
consumer of the same ``create_agent()`` factory — not a wrapper
|
||||||
|
around Gateway. The two paths **should** stay in sync on which
|
||||||
|
LangGraph stream modes they subscribe to; that invariant is
|
||||||
|
enforced by ``tests/test_client.py::test_messages_mode_emits_token_deltas``
|
||||||
|
rather than by a shared constant, because the three layers
|
||||||
|
(Graph, Platform SDK, HTTP) each use their own naming
|
||||||
|
(``messages`` vs ``messages-tuple``) and cannot literally share
|
||||||
|
a string.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
message: User message text.
|
message: User message text.
|
||||||
thread_id: Thread ID for conversation context. Auto-generated if None.
|
thread_id: Thread ID for conversation context. Auto-generated if None.
|
||||||
|
|
@ -448,8 +538,8 @@ class DeerFlowClient:
|
||||||
StreamEvent with one of:
|
StreamEvent with one of:
|
||||||
- type="values" data={"title": str|None, "messages": [...], "artifacts": [...]}
|
- type="values" data={"title": str|None, "messages": [...], "artifacts": [...]}
|
||||||
- type="custom" data={...}
|
- type="custom" data={...}
|
||||||
- type="messages-tuple" data={"type": "ai", "content": str, "id": str}
|
- type="messages-tuple" data={"type": "ai", "content": <delta>, "id": str}
|
||||||
- type="messages-tuple" data={"type": "ai", "content": str, "id": str, "usage_metadata": {...}}
|
- type="messages-tuple" data={"type": "ai", "content": <delta>, "id": str, "usage_metadata": {...}}
|
||||||
- type="messages-tuple" data={"type": "ai", "content": "", "id": str, "tool_calls": [...]}
|
- type="messages-tuple" data={"type": "ai", "content": "", "id": str, "tool_calls": [...]}
|
||||||
- type="messages-tuple" data={"type": "tool", "content": str, "name": str, "tool_call_id": str, "id": str}
|
- type="messages-tuple" data={"type": "tool", "content": str, "name": str, "tool_call_id": str, "id": str}
|
||||||
- type="end" data={"usage": {"input_tokens": int, "output_tokens": int, "total_tokens": int}}
|
- type="end" data={"usage": {"input_tokens": int, "output_tokens": int, "total_tokens": int}}
|
||||||
|
|
@ -466,13 +556,47 @@ class DeerFlowClient:
|
||||||
context["agent_name"] = self._agent_name
|
context["agent_name"] = self._agent_name
|
||||||
|
|
||||||
seen_ids: set[str] = set()
|
seen_ids: set[str] = set()
|
||||||
|
# Cross-mode handoff: ids already streamed via LangGraph ``messages``
|
||||||
|
# mode so the ``values`` path skips re-synthesis of the same message.
|
||||||
|
streamed_ids: set[str] = set()
|
||||||
|
# The same message id carries identical cumulative ``usage_metadata``
|
||||||
|
# in both the final ``messages`` chunk and the values snapshot —
|
||||||
|
# count it only on whichever arrives first.
|
||||||
|
counted_usage_ids: set[str] = set()
|
||||||
cumulative_usage: dict[str, int] = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}
|
cumulative_usage: dict[str, int] = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}
|
||||||
|
|
||||||
|
def _account_usage(msg_id: str | None, usage: Any) -> dict | None:
|
||||||
|
"""Add *usage* to cumulative totals if this id has not been counted.
|
||||||
|
|
||||||
|
``usage`` is a ``langchain_core.messages.UsageMetadata`` TypedDict
|
||||||
|
or ``None``; typed as ``Any`` because TypedDicts are not
|
||||||
|
structurally assignable to plain ``dict`` under strict type
|
||||||
|
checking. Returns the normalized usage dict (for attaching
|
||||||
|
to an event) when we accepted it, otherwise ``None``.
|
||||||
|
"""
|
||||||
|
if not usage:
|
||||||
|
return None
|
||||||
|
if msg_id and msg_id in counted_usage_ids:
|
||||||
|
return None
|
||||||
|
if msg_id:
|
||||||
|
counted_usage_ids.add(msg_id)
|
||||||
|
input_tokens = usage.get("input_tokens", 0) or 0
|
||||||
|
output_tokens = usage.get("output_tokens", 0) or 0
|
||||||
|
total_tokens = usage.get("total_tokens", 0) or 0
|
||||||
|
cumulative_usage["input_tokens"] += input_tokens
|
||||||
|
cumulative_usage["output_tokens"] += output_tokens
|
||||||
|
cumulative_usage["total_tokens"] += total_tokens
|
||||||
|
return {
|
||||||
|
"input_tokens": input_tokens,
|
||||||
|
"output_tokens": output_tokens,
|
||||||
|
"total_tokens": total_tokens,
|
||||||
|
}
|
||||||
|
|
||||||
for item in self._agent.stream(
|
for item in self._agent.stream(
|
||||||
state,
|
state,
|
||||||
config=config,
|
config=config,
|
||||||
context=context,
|
context=context,
|
||||||
stream_mode=["values", "custom"],
|
stream_mode=["values", "messages", "custom"],
|
||||||
):
|
):
|
||||||
if isinstance(item, tuple) and len(item) == 2:
|
if isinstance(item, tuple) and len(item) == 2:
|
||||||
mode, chunk = item
|
mode, chunk = item
|
||||||
|
|
@ -484,6 +608,36 @@ class DeerFlowClient:
|
||||||
yield StreamEvent(type="custom", data=chunk)
|
yield StreamEvent(type="custom", data=chunk)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
if mode == "messages":
|
||||||
|
# LangGraph ``messages`` mode emits ``(message_chunk, metadata)``.
|
||||||
|
if isinstance(chunk, tuple) and len(chunk) == 2:
|
||||||
|
msg_chunk, _metadata = chunk
|
||||||
|
else:
|
||||||
|
msg_chunk = chunk
|
||||||
|
|
||||||
|
msg_id = getattr(msg_chunk, "id", None)
|
||||||
|
|
||||||
|
if isinstance(msg_chunk, AIMessage):
|
||||||
|
text = self._extract_text(msg_chunk.content)
|
||||||
|
counted_usage = _account_usage(msg_id, msg_chunk.usage_metadata)
|
||||||
|
|
||||||
|
if text:
|
||||||
|
if msg_id:
|
||||||
|
streamed_ids.add(msg_id)
|
||||||
|
yield self._ai_text_event(msg_id, text, counted_usage)
|
||||||
|
|
||||||
|
if msg_chunk.tool_calls:
|
||||||
|
if msg_id:
|
||||||
|
streamed_ids.add(msg_id)
|
||||||
|
yield self._ai_tool_calls_event(msg_id, msg_chunk.tool_calls)
|
||||||
|
|
||||||
|
elif isinstance(msg_chunk, ToolMessage):
|
||||||
|
if msg_id:
|
||||||
|
streamed_ids.add(msg_id)
|
||||||
|
yield self._tool_message_event(msg_chunk)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# mode == "values"
|
||||||
messages = chunk.get("messages", [])
|
messages = chunk.get("messages", [])
|
||||||
|
|
||||||
for msg in messages:
|
for msg in messages:
|
||||||
|
|
@ -493,47 +647,25 @@ class DeerFlowClient:
|
||||||
if msg_id:
|
if msg_id:
|
||||||
seen_ids.add(msg_id)
|
seen_ids.add(msg_id)
|
||||||
|
|
||||||
|
# Already streamed via ``messages`` mode; only (defensively)
|
||||||
|
# capture usage here and skip re-synthesizing the event.
|
||||||
|
if msg_id and msg_id in streamed_ids:
|
||||||
if isinstance(msg, AIMessage):
|
if isinstance(msg, AIMessage):
|
||||||
# Track token usage from AI messages
|
_account_usage(msg_id, getattr(msg, "usage_metadata", None))
|
||||||
usage = getattr(msg, "usage_metadata", None)
|
continue
|
||||||
if usage:
|
|
||||||
cumulative_usage["input_tokens"] += usage.get("input_tokens", 0) or 0
|
if isinstance(msg, AIMessage):
|
||||||
cumulative_usage["output_tokens"] += usage.get("output_tokens", 0) or 0
|
counted_usage = _account_usage(msg_id, msg.usage_metadata)
|
||||||
cumulative_usage["total_tokens"] += usage.get("total_tokens", 0) or 0
|
|
||||||
|
|
||||||
if msg.tool_calls:
|
if msg.tool_calls:
|
||||||
yield StreamEvent(
|
yield self._ai_tool_calls_event(msg_id, msg.tool_calls)
|
||||||
type="messages-tuple",
|
|
||||||
data={
|
|
||||||
"type": "ai",
|
|
||||||
"content": "",
|
|
||||||
"id": msg_id,
|
|
||||||
"tool_calls": [{"name": tc["name"], "args": tc["args"], "id": tc.get("id")} for tc in msg.tool_calls],
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
text = self._extract_text(msg.content)
|
text = self._extract_text(msg.content)
|
||||||
if text:
|
if text:
|
||||||
event_data: dict[str, Any] = {"type": "ai", "content": text, "id": msg_id}
|
yield self._ai_text_event(msg_id, text, counted_usage)
|
||||||
if usage:
|
|
||||||
event_data["usage_metadata"] = {
|
|
||||||
"input_tokens": usage.get("input_tokens", 0) or 0,
|
|
||||||
"output_tokens": usage.get("output_tokens", 0) or 0,
|
|
||||||
"total_tokens": usage.get("total_tokens", 0) or 0,
|
|
||||||
}
|
|
||||||
yield StreamEvent(type="messages-tuple", data=event_data)
|
|
||||||
|
|
||||||
elif isinstance(msg, ToolMessage):
|
elif isinstance(msg, ToolMessage):
|
||||||
yield StreamEvent(
|
yield self._tool_message_event(msg)
|
||||||
type="messages-tuple",
|
|
||||||
data={
|
|
||||||
"type": "tool",
|
|
||||||
"content": self._extract_text(msg.content),
|
|
||||||
"name": getattr(msg, "name", None),
|
|
||||||
"tool_call_id": getattr(msg, "tool_call_id", None),
|
|
||||||
"id": msg_id,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
# Emit a values event for each state snapshot
|
# Emit a values event for each state snapshot
|
||||||
yield StreamEvent(
|
yield StreamEvent(
|
||||||
|
|
@ -550,10 +682,12 @@ class DeerFlowClient:
|
||||||
def chat(self, message: str, *, thread_id: str | None = None, **kwargs) -> str:
|
def chat(self, message: str, *, thread_id: str | None = None, **kwargs) -> str:
|
||||||
"""Send a message and return the final text response.
|
"""Send a message and return the final text response.
|
||||||
|
|
||||||
Convenience wrapper around :meth:`stream` that returns only the
|
Convenience wrapper around :meth:`stream` that accumulates delta
|
||||||
**last** AI text from ``messages-tuple`` events. If the agent emits
|
``messages-tuple`` events per ``id`` and returns the text of the
|
||||||
multiple text segments in one turn, intermediate segments are
|
**last** AI message to complete. Intermediate AI messages (e.g.
|
||||||
discarded. Use :meth:`stream` directly to capture all events.
|
planner drafts) are discarded — only the final id's accumulated
|
||||||
|
text is returned. Use :meth:`stream` directly if you need every
|
||||||
|
delta as it arrives.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
message: User message text.
|
message: User message text.
|
||||||
|
|
@ -561,15 +695,21 @@ class DeerFlowClient:
|
||||||
**kwargs: Override client defaults (same as stream()).
|
**kwargs: Override client defaults (same as stream()).
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
The last AI message text, or empty string if no response.
|
The accumulated text of the last AI message, or empty string
|
||||||
|
if no AI text was produced.
|
||||||
"""
|
"""
|
||||||
last_text = ""
|
# Per-id delta lists joined once at the end — avoids the O(n²) cost
|
||||||
|
# of repeated ``str + str`` on a growing buffer for long responses.
|
||||||
|
chunks: dict[str, list[str]] = {}
|
||||||
|
last_id: str = ""
|
||||||
for event in self.stream(message, thread_id=thread_id, **kwargs):
|
for event in self.stream(message, thread_id=thread_id, **kwargs):
|
||||||
if event.type == "messages-tuple" and event.data.get("type") == "ai":
|
if event.type == "messages-tuple" and event.data.get("type") == "ai":
|
||||||
content = event.data.get("content", "")
|
msg_id = event.data.get("id") or ""
|
||||||
if content:
|
delta = event.data.get("content", "")
|
||||||
last_text = content
|
if delta:
|
||||||
return last_text
|
chunks.setdefault(msg_id, []).append(delta)
|
||||||
|
last_id = msg_id
|
||||||
|
return "".join(chunks.get(last_id, ()))
|
||||||
|
|
||||||
# ------------------------------------------------------------------
|
# ------------------------------------------------------------------
|
||||||
# Public API — configuration queries
|
# Public API — configuration queries
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,7 @@ from pathlib import Path
|
||||||
from unittest.mock import MagicMock, patch
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage # noqa: F401
|
from langchain_core.messages import AIMessage, AIMessageChunk, HumanMessage, SystemMessage, ToolMessage # noqa: F401
|
||||||
|
|
||||||
from app.gateway.routers.mcp import McpConfigResponse
|
from app.gateway.routers.mcp import McpConfigResponse
|
||||||
from app.gateway.routers.memory import MemoryConfigResponse, MemoryStatusResponse
|
from app.gateway.routers.memory import MemoryConfigResponse, MemoryStatusResponse
|
||||||
|
|
@ -225,7 +225,9 @@ class TestStream:
|
||||||
|
|
||||||
agent.stream.assert_called_once()
|
agent.stream.assert_called_once()
|
||||||
call_kwargs = agent.stream.call_args.kwargs
|
call_kwargs = agent.stream.call_args.kwargs
|
||||||
assert call_kwargs["stream_mode"] == ["values", "custom"]
|
# ``messages`` enables token-level streaming of AI text deltas;
|
||||||
|
# see DeerFlowClient.stream() docstring and GitHub issue #1969.
|
||||||
|
assert call_kwargs["stream_mode"] == ["values", "messages", "custom"]
|
||||||
|
|
||||||
assert events[0].type == "custom"
|
assert events[0].type == "custom"
|
||||||
assert events[0].data == {"type": "task_started", "task_id": "task-1"}
|
assert events[0].data == {"type": "task_started", "task_id": "task-1"}
|
||||||
|
|
@ -351,6 +353,123 @@ class TestStream:
|
||||||
# Should not raise; end event proves it completed
|
# Should not raise; end event proves it completed
|
||||||
assert events[-1].type == "end"
|
assert events[-1].type == "end"
|
||||||
|
|
||||||
|
def test_messages_mode_emits_token_deltas(self, client):
|
||||||
|
"""stream() forwards LangGraph ``messages`` mode chunks as delta events.
|
||||||
|
|
||||||
|
Regression for bytedance/deer-flow#1969 — before the fix the client
|
||||||
|
only subscribed to ``values`` mode, so LLM output was delivered as
|
||||||
|
a single cumulative dump after each graph node finished instead of
|
||||||
|
token-by-token deltas as the model generated them.
|
||||||
|
"""
|
||||||
|
# Three AI chunks sharing the same id, followed by a terminal
|
||||||
|
# values snapshot with the fully assembled message — this matches
|
||||||
|
# the shape LangGraph emits when ``stream_mode`` includes both
|
||||||
|
# ``messages`` and ``values``.
|
||||||
|
assembled = AIMessage(content="Hel lo world!", id="ai-1", usage_metadata={"input_tokens": 3, "output_tokens": 4, "total_tokens": 7})
|
||||||
|
agent = MagicMock()
|
||||||
|
agent.stream.return_value = iter(
|
||||||
|
[
|
||||||
|
("messages", (AIMessageChunk(content="Hel", id="ai-1"), {})),
|
||||||
|
("messages", (AIMessageChunk(content=" lo ", id="ai-1"), {})),
|
||||||
|
(
|
||||||
|
"messages",
|
||||||
|
(
|
||||||
|
AIMessageChunk(
|
||||||
|
content="world!",
|
||||||
|
id="ai-1",
|
||||||
|
usage_metadata={"input_tokens": 3, "output_tokens": 4, "total_tokens": 7},
|
||||||
|
),
|
||||||
|
{},
|
||||||
|
),
|
||||||
|
),
|
||||||
|
("values", {"messages": [HumanMessage(content="hi", id="h-1"), assembled]}),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch.object(client, "_ensure_agent"),
|
||||||
|
patch.object(client, "_agent", agent),
|
||||||
|
):
|
||||||
|
events = list(client.stream("hi", thread_id="t-stream"))
|
||||||
|
|
||||||
|
# Three delta messages-tuple events, all with the same id, each
|
||||||
|
# carrying only its own delta (not cumulative).
|
||||||
|
ai_text_events = [e for e in events if e.type == "messages-tuple" and e.data.get("type") == "ai" and e.data.get("content")]
|
||||||
|
assert [e.data["content"] for e in ai_text_events] == ["Hel", " lo ", "world!"]
|
||||||
|
assert all(e.data["id"] == "ai-1" for e in ai_text_events)
|
||||||
|
|
||||||
|
# The values snapshot MUST NOT re-synthesize an AI text event for
|
||||||
|
# the already-streamed id (otherwise consumers see duplicated text).
|
||||||
|
assert len(ai_text_events) == 3
|
||||||
|
|
||||||
|
# Usage metadata attached only to the chunk that actually carried
|
||||||
|
# it, and counted into cumulative usage exactly once (the values
|
||||||
|
# snapshot's duplicate usage on the assembled AIMessage must not
|
||||||
|
# be double-counted).
|
||||||
|
events_with_usage = [e for e in ai_text_events if "usage_metadata" in e.data]
|
||||||
|
assert len(events_with_usage) == 1
|
||||||
|
assert events_with_usage[0].data["usage_metadata"] == {"input_tokens": 3, "output_tokens": 4, "total_tokens": 7}
|
||||||
|
end_event = events[-1]
|
||||||
|
assert end_event.type == "end"
|
||||||
|
assert end_event.data["usage"] == {"input_tokens": 3, "output_tokens": 4, "total_tokens": 7}
|
||||||
|
|
||||||
|
# The values snapshot itself is still emitted.
|
||||||
|
assert any(e.type == "values" for e in events)
|
||||||
|
|
||||||
|
# stream_mode includes ``messages`` — the whole point of this fix.
|
||||||
|
call_kwargs = agent.stream.call_args.kwargs
|
||||||
|
assert "messages" in call_kwargs["stream_mode"]
|
||||||
|
|
||||||
|
def test_chat_accumulates_streamed_deltas(self, client):
|
||||||
|
"""chat() concatenates per-id deltas from messages mode."""
|
||||||
|
agent = MagicMock()
|
||||||
|
agent.stream.return_value = iter(
|
||||||
|
[
|
||||||
|
("messages", (AIMessageChunk(content="Hel", id="ai-1"), {})),
|
||||||
|
("messages", (AIMessageChunk(content="lo ", id="ai-1"), {})),
|
||||||
|
("messages", (AIMessageChunk(content="world!", id="ai-1"), {})),
|
||||||
|
("values", {"messages": [HumanMessage(content="hi", id="h-1"), AIMessage(content="Hello world!", id="ai-1")]}),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch.object(client, "_ensure_agent"),
|
||||||
|
patch.object(client, "_agent", agent),
|
||||||
|
):
|
||||||
|
result = client.chat("hi", thread_id="t-chat-stream")
|
||||||
|
|
||||||
|
assert result == "Hello world!"
|
||||||
|
|
||||||
|
def test_messages_mode_tool_message(self, client):
|
||||||
|
"""stream() forwards ToolMessage chunks from messages mode."""
|
||||||
|
agent = MagicMock()
|
||||||
|
agent.stream.return_value = iter(
|
||||||
|
[
|
||||||
|
(
|
||||||
|
"messages",
|
||||||
|
(
|
||||||
|
ToolMessage(content="file.txt", id="tm-1", tool_call_id="tc-1", name="bash"),
|
||||||
|
{},
|
||||||
|
),
|
||||||
|
),
|
||||||
|
("values", {"messages": [HumanMessage(content="ls", id="h-1"), ToolMessage(content="file.txt", id="tm-1", tool_call_id="tc-1", name="bash")]}),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch.object(client, "_ensure_agent"),
|
||||||
|
patch.object(client, "_agent", agent),
|
||||||
|
):
|
||||||
|
events = list(client.stream("ls", thread_id="t-tool-stream"))
|
||||||
|
|
||||||
|
tool_events = [e for e in events if e.type == "messages-tuple" and e.data.get("type") == "tool"]
|
||||||
|
# The tool result must be delivered exactly once (from messages
|
||||||
|
# mode), not duplicated by the values-snapshot synthesis path.
|
||||||
|
assert len(tool_events) == 1
|
||||||
|
assert tool_events[0].data["content"] == "file.txt"
|
||||||
|
assert tool_events[0].data["name"] == "bash"
|
||||||
|
assert tool_events[0].data["tool_call_id"] == "tc-1"
|
||||||
|
|
||||||
def test_list_content_blocks(self, client):
|
def test_list_content_blocks(self, client):
|
||||||
"""stream() handles AIMessage with list-of-blocks content."""
|
"""stream() handles AIMessage with list-of-blocks content."""
|
||||||
ai = AIMessage(
|
ai = AIMessage(
|
||||||
|
|
@ -373,6 +492,253 @@ class TestStream:
|
||||||
assert len(msg_events) == 1
|
assert len(msg_events) == 1
|
||||||
assert msg_events[0].data["content"] == "result"
|
assert msg_events[0].data["content"] == "result"
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# Refactor regression guards (PR #1974 follow-up safety)
|
||||||
|
#
|
||||||
|
# The three tests below are not bug-fix tests — they exist to lock
|
||||||
|
# the *exact* contract of stream() so a future refactor (e.g. moving
|
||||||
|
# to ``agent.astream()``, sharing a core with Gateway's run_agent,
|
||||||
|
# changing the dedup strategy) cannot silently change behavior.
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
|
def test_dedup_requires_messages_before_values_invariant(self, client):
|
||||||
|
"""Canary: locks the order-dependence of cross-mode dedup.
|
||||||
|
|
||||||
|
``streamed_ids`` is populated only by the ``messages`` branch.
|
||||||
|
If a ``values`` snapshot arrives BEFORE its corresponding
|
||||||
|
``messages`` chunks for the same id, the values path falls
|
||||||
|
through and synthesizes its own AI text event, then the
|
||||||
|
messages chunk emits another delta — consumers see the same
|
||||||
|
id twice.
|
||||||
|
|
||||||
|
Under normal LangGraph operation this never happens (messages
|
||||||
|
chunks are emitted during LLM streaming, the values snapshot
|
||||||
|
after the node completes), so the implicit invariant is safe
|
||||||
|
in production. This test exists as a tripwire for refactors
|
||||||
|
that switch to ``agent.astream()`` or share a core with
|
||||||
|
Gateway: if the ordering ever changes, this test fails and
|
||||||
|
forces the refactor to either (a) preserve the ordering or
|
||||||
|
(b) deliberately re-baseline to a stronger order-independent
|
||||||
|
dedup contract — and document the new contract here.
|
||||||
|
"""
|
||||||
|
agent = MagicMock()
|
||||||
|
agent.stream.return_value = iter(
|
||||||
|
[
|
||||||
|
# values arrives FIRST — streamed_ids still empty.
|
||||||
|
("values", {"messages": [HumanMessage(content="hi", id="h-1"), AIMessage(content="Hello", id="ai-1")]}),
|
||||||
|
# messages chunk for the same id arrives SECOND.
|
||||||
|
("messages", (AIMessageChunk(content="Hello", id="ai-1"), {})),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch.object(client, "_ensure_agent"),
|
||||||
|
patch.object(client, "_agent", agent),
|
||||||
|
):
|
||||||
|
events = list(client.stream("hi", thread_id="t-order-canary"))
|
||||||
|
|
||||||
|
ai_text_events = [e for e in events if e.type == "messages-tuple" and e.data.get("type") == "ai" and e.data.get("content")]
|
||||||
|
# Current behavior: 2 events (values synthesis + messages delta).
|
||||||
|
# If a refactor makes dedup order-independent, this becomes 1 —
|
||||||
|
# update the assertion AND the docstring above to record the
|
||||||
|
# new contract, do not silently fix this number.
|
||||||
|
assert len(ai_text_events) == 2
|
||||||
|
assert all(e.data["id"] == "ai-1" for e in ai_text_events)
|
||||||
|
assert [e.data["content"] for e in ai_text_events] == ["Hello", "Hello"]
|
||||||
|
|
||||||
|
def test_messages_mode_golden_event_sequence(self, client):
|
||||||
|
"""Locks the **exact** event sequence for a canonical streaming turn.
|
||||||
|
|
||||||
|
This is a strong regression guard: any future refactor that
|
||||||
|
changes the order, type, or shape of emitted events fails this
|
||||||
|
test with a clear list-equality diff, forcing either a
|
||||||
|
preserved sequence or a deliberate re-baseline.
|
||||||
|
|
||||||
|
Input shape:
|
||||||
|
messages chunk 1 — text "Hel", no usage
|
||||||
|
messages chunk 2 — text "lo", with cumulative usage
|
||||||
|
values snapshot — assembled AIMessage with same usage
|
||||||
|
|
||||||
|
Locked behavior:
|
||||||
|
* Two messages-tuple AI text events (one per chunk), each
|
||||||
|
carrying ONLY its own delta — not cumulative.
|
||||||
|
* ``usage_metadata`` attached only to the chunk that
|
||||||
|
delivered it (not the first chunk).
|
||||||
|
* The values event is still emitted, but its embedded
|
||||||
|
``messages`` list is the *serialized* form — no
|
||||||
|
synthesized messages-tuple events for the already-
|
||||||
|
streamed id.
|
||||||
|
* ``end`` event carries cumulative usage counted exactly
|
||||||
|
once across both modes.
|
||||||
|
"""
|
||||||
|
# Inline the usage literal at construction sites so Pyright can
|
||||||
|
# narrow ``dict[str, int]`` to ``UsageMetadata`` (TypedDict
|
||||||
|
# narrowing only works on literals, not on bound variables).
|
||||||
|
# The local ``usage`` is reused only for assertion comparisons
|
||||||
|
# below, where structural dict equality is sufficient.
|
||||||
|
usage = {"input_tokens": 3, "output_tokens": 2, "total_tokens": 5}
|
||||||
|
agent = MagicMock()
|
||||||
|
agent.stream.return_value = iter(
|
||||||
|
[
|
||||||
|
("messages", (AIMessageChunk(content="Hel", id="ai-1"), {})),
|
||||||
|
("messages", (AIMessageChunk(content="lo", id="ai-1", usage_metadata={"input_tokens": 3, "output_tokens": 2, "total_tokens": 5}), {})),
|
||||||
|
(
|
||||||
|
"values",
|
||||||
|
{
|
||||||
|
"messages": [
|
||||||
|
HumanMessage(content="hi", id="h-1"),
|
||||||
|
AIMessage(content="Hello", id="ai-1", usage_metadata={"input_tokens": 3, "output_tokens": 2, "total_tokens": 5}),
|
||||||
|
]
|
||||||
|
},
|
||||||
|
),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch.object(client, "_ensure_agent"),
|
||||||
|
patch.object(client, "_agent", agent),
|
||||||
|
):
|
||||||
|
events = list(client.stream("hi", thread_id="t-golden"))
|
||||||
|
|
||||||
|
actual = [(e.type, e.data) for e in events]
|
||||||
|
expected = [
|
||||||
|
("messages-tuple", {"type": "ai", "content": "Hel", "id": "ai-1"}),
|
||||||
|
("messages-tuple", {"type": "ai", "content": "lo", "id": "ai-1", "usage_metadata": usage}),
|
||||||
|
(
|
||||||
|
"values",
|
||||||
|
{
|
||||||
|
"title": None,
|
||||||
|
"messages": [
|
||||||
|
{"type": "human", "content": "hi", "id": "h-1"},
|
||||||
|
{"type": "ai", "content": "Hello", "id": "ai-1", "usage_metadata": usage},
|
||||||
|
],
|
||||||
|
"artifacts": [],
|
||||||
|
},
|
||||||
|
),
|
||||||
|
("end", {"usage": usage}),
|
||||||
|
]
|
||||||
|
assert actual == expected
|
||||||
|
|
||||||
|
def test_chat_accumulates_in_linear_time(self, client):
|
||||||
|
"""``chat()`` must use a non-quadratic accumulation strategy.
|
||||||
|
|
||||||
|
PR #1974 commit 2 replaced ``buffer = buffer + delta`` with
|
||||||
|
``list[str].append`` + ``"".join`` to fix an O(n²) regression
|
||||||
|
introduced in commit 1. This test guards against a future
|
||||||
|
refactor accidentally restoring the quadratic path.
|
||||||
|
|
||||||
|
Threshold rationale (10,000 single-char chunks, 1 second):
|
||||||
|
* Current O(n) implementation: ~50-200 ms total, including
|
||||||
|
all mock + event yield overhead.
|
||||||
|
* O(n²) regression at n=10,000: chat accumulation alone
|
||||||
|
becomes ~500 ms-2 s (50 M character copies), reliably
|
||||||
|
over the bound on any reasonable CI.
|
||||||
|
|
||||||
|
If this test ever flakes on slow CI, do NOT raise the threshold
|
||||||
|
blindly — first confirm the implementation still uses
|
||||||
|
``"".join``, then consider whether the test should move to a
|
||||||
|
benchmark suite that excludes mock overhead.
|
||||||
|
"""
|
||||||
|
import time
|
||||||
|
|
||||||
|
n = 10_000
|
||||||
|
chunks: list = [("messages", (AIMessageChunk(content="x", id="ai-1"), {})) for _ in range(n)]
|
||||||
|
chunks.append(
|
||||||
|
(
|
||||||
|
"values",
|
||||||
|
{
|
||||||
|
"messages": [
|
||||||
|
HumanMessage(content="go", id="h-1"),
|
||||||
|
AIMessage(content="x" * n, id="ai-1"),
|
||||||
|
]
|
||||||
|
},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
agent = MagicMock()
|
||||||
|
agent.stream.return_value = iter(chunks)
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch.object(client, "_ensure_agent"),
|
||||||
|
patch.object(client, "_agent", agent),
|
||||||
|
):
|
||||||
|
start = time.monotonic()
|
||||||
|
result = client.chat("go", thread_id="t-perf")
|
||||||
|
elapsed = time.monotonic() - start
|
||||||
|
|
||||||
|
assert result == "x" * n
|
||||||
|
assert elapsed < 1.0, f"chat() took {elapsed:.3f}s for {n} chunks — possible O(n^2) regression (see PR #1974 commit 2 for the original fix)"
|
||||||
|
|
||||||
|
def test_none_id_chunks_produce_duplicates_known_limitation(self, client):
|
||||||
|
"""Documents a known dedup limitation: ``messages`` chunks with ``id=None``.
|
||||||
|
|
||||||
|
Some LLM providers (vLLM, certain custom backends) emit
|
||||||
|
``AIMessageChunk`` instances without an ``id``. In that case
|
||||||
|
the cross-mode dedup machinery cannot record the chunk in
|
||||||
|
``streamed_ids`` (the implementation guards on ``if msg_id``
|
||||||
|
before adding), and a subsequent ``values`` snapshot whose
|
||||||
|
reassembled ``AIMessage`` carries a real id will fall through
|
||||||
|
the dedup check and synthesize a second AI text event for the
|
||||||
|
same logical message — consumers see duplicated text.
|
||||||
|
|
||||||
|
Why this is documented rather than fixed
|
||||||
|
----------------------------------------
|
||||||
|
Falling back to ``metadata.get("id")`` does **not** help:
|
||||||
|
LangGraph's messages-mode metadata never carries the message
|
||||||
|
id (it carries ``langgraph_node`` / ``langgraph_step`` /
|
||||||
|
``checkpoint_ns`` / ``tags`` etc.). Synthesizing a fallback
|
||||||
|
like ``f"_synth_{id(msg_chunk)}"`` only helps if the values
|
||||||
|
snapshot uses the same fallback, which it does not. A real
|
||||||
|
fix requires either provider cooperation (always emit chunk
|
||||||
|
ids — out of scope for this PR) or content-based dedup (risks
|
||||||
|
false positives for two distinct short messages with identical
|
||||||
|
text).
|
||||||
|
|
||||||
|
This test makes the limitation **explicit and discoverable**
|
||||||
|
so a future contributor debugging "duplicate text in vLLM
|
||||||
|
streaming" finds the answer immediately. If a real fix lands,
|
||||||
|
replace this test with a positive assertion that dedup works
|
||||||
|
for the None-id case.
|
||||||
|
|
||||||
|
See PR #1974 Copilot review comment on ``client.py:515``.
|
||||||
|
"""
|
||||||
|
agent = MagicMock()
|
||||||
|
agent.stream.return_value = iter(
|
||||||
|
[
|
||||||
|
# Realistic shape: chunk has no id (provider didn't set one),
|
||||||
|
# values snapshot's reassembled AIMessage has a fresh id
|
||||||
|
# assigned somewhere downstream (langgraph or middleware).
|
||||||
|
("messages", (AIMessageChunk(content="Hello", id=None), {})),
|
||||||
|
(
|
||||||
|
"values",
|
||||||
|
{
|
||||||
|
"messages": [
|
||||||
|
HumanMessage(content="hi", id="h-1"),
|
||||||
|
AIMessage(content="Hello", id="ai-1"),
|
||||||
|
]
|
||||||
|
},
|
||||||
|
),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch.object(client, "_ensure_agent"),
|
||||||
|
patch.object(client, "_agent", agent),
|
||||||
|
):
|
||||||
|
events = list(client.stream("hi", thread_id="t-none-id-limitation"))
|
||||||
|
|
||||||
|
ai_text_events = [e for e in events if e.type == "messages-tuple" and e.data.get("type") == "ai" and e.data.get("content")]
|
||||||
|
# KNOWN LIMITATION: 2 events for the same logical message.
|
||||||
|
# 1) from messages chunk (id=None, NOT added to streamed_ids
|
||||||
|
# because of ``if msg_id:`` guard at client.py line ~522)
|
||||||
|
# 2) from values-snapshot synthesis (ai-1 not in streamed_ids,
|
||||||
|
# so the skip-branch at line ~549 doesn't trigger)
|
||||||
|
# If this becomes 1, someone fixed the limitation — update this
|
||||||
|
# test to a positive assertion and document the fix.
|
||||||
|
assert len(ai_text_events) == 2
|
||||||
|
assert ai_text_events[0].data["id"] is None
|
||||||
|
assert ai_text_events[1].data["id"] == "ai-1"
|
||||||
|
assert all(e.data["content"] == "Hello" for e in ai_text_events)
|
||||||
|
|
||||||
|
|
||||||
class TestChat:
|
class TestChat:
|
||||||
def test_returns_last_message(self, client):
|
def test_returns_last_message(self, client):
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue