feat: implement provider task ID extraction with fallback paths and add unit tests
This commit is contained in:
parent
f101d8bd3e
commit
ab9555255a
|
|
@ -198,11 +198,7 @@ async def _handle_submit(
|
||||||
return Response(content=resp_body, status_code=status_code, headers=resp_headers, media_type=media_type)
|
return Response(content=resp_body, status_code=status_code, headers=resp_headers, media_type=media_type)
|
||||||
|
|
||||||
# Extract task_id from response; no task_id means provider rejected at business level
|
# Extract task_id from response; no task_id means provider rejected at business level
|
||||||
provider_task_id: str | None = None
|
provider_task_id = _extract_provider_task_id(resp_json, task_id_jsonpath)
|
||||||
if resp_json is not None:
|
|
||||||
raw = proxy.jsonpath_get(resp_json, task_id_jsonpath)
|
|
||||||
if raw is not None:
|
|
||||||
provider_task_id = str(raw)
|
|
||||||
|
|
||||||
if provider_task_id:
|
if provider_task_id:
|
||||||
ledger.set_running(record.proxy_call_id, provider_task_id)
|
ledger.set_running(record.proxy_call_id, provider_task_id)
|
||||||
|
|
@ -374,6 +370,14 @@ async def _handle_query(
|
||||||
"[ThirdPartyProxy] finalize claim denied (already processed): proxy_call_id=%s",
|
"[ThirdPartyProxy] finalize claim denied (already processed): proxy_call_id=%s",
|
||||||
record.proxy_call_id,
|
record.proxy_call_id,
|
||||||
)
|
)
|
||||||
|
elif (is_success or is_failure) and provider_task_id:
|
||||||
|
logger.warning(
|
||||||
|
"[ThirdPartyProxy] terminal query without ledger record: provider=%s provider_task_id=%s status=%s. "
|
||||||
|
"Likely causes: submit route not matched, task_id_jsonpath extraction mismatch, or gateway process restart.",
|
||||||
|
provider,
|
||||||
|
provider_task_id,
|
||||||
|
status_str,
|
||||||
|
)
|
||||||
|
|
||||||
proxy_call_id = record.proxy_call_id if record else None
|
proxy_call_id = record.proxy_call_id if record else None
|
||||||
return _proxy_response(resp_json, proxy_call_id, status_code, resp_headers)
|
return _proxy_response(resp_json, proxy_call_id, status_code, resp_headers)
|
||||||
|
|
@ -439,6 +443,33 @@ def _try_parse_json(data: bytes) -> dict[str, Any] | None:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_provider_task_id(resp_json: dict[str, Any] | None, configured_path: str) -> str | None:
|
||||||
|
"""Extract provider task ID from submit response with config-first strategy.
|
||||||
|
|
||||||
|
Priority:
|
||||||
|
1) configured path from submit route
|
||||||
|
2) common fallback paths for provider inconsistencies
|
||||||
|
"""
|
||||||
|
if not isinstance(resp_json, dict):
|
||||||
|
return None
|
||||||
|
|
||||||
|
raw = proxy.jsonpath_get(resp_json, configured_path)
|
||||||
|
if raw is not None:
|
||||||
|
return str(raw)
|
||||||
|
|
||||||
|
for fallback_path in ("taskId", "data.taskId", "id"):
|
||||||
|
raw = proxy.jsonpath_get(resp_json, fallback_path)
|
||||||
|
if raw is not None:
|
||||||
|
logger.warning(
|
||||||
|
"[ThirdPartyProxy] submit task id extracted via fallback path: configured=%s fallback=%s",
|
||||||
|
configured_path,
|
||||||
|
fallback_path,
|
||||||
|
)
|
||||||
|
return str(raw)
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
def _resolve_final_amount(resp_json: dict[str, Any], query_route) -> float:
|
def _resolve_final_amount(resp_json: dict[str, Any], query_route) -> float:
|
||||||
"""Resolve final billing amount from configured usage paths.
|
"""Resolve final billing amount from configured usage paths.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,7 @@ from __future__ import annotations
|
||||||
|
|
||||||
from app.gateway.third_party_proxy.ledger import CallLedger
|
from app.gateway.third_party_proxy.ledger import CallLedger
|
||||||
from app.gateway.routers.third_party import (
|
from app.gateway.routers.third_party import (
|
||||||
|
_extract_provider_task_id,
|
||||||
_extract_usage_tokens,
|
_extract_usage_tokens,
|
||||||
_extract_usage_tokens_from_submit_stream,
|
_extract_usage_tokens_from_submit_stream,
|
||||||
_resolve_final_amount,
|
_resolve_final_amount,
|
||||||
|
|
@ -290,3 +291,20 @@ class TestApiKeyMarkerReplacement:
|
||||||
replaced = _replace_api_key_marker_in_body(headers, body, "real-key")
|
replaced = _replace_api_key_marker_in_body(headers, body, "real-key")
|
||||||
assert b'"apiKey":"real-key"' in replaced
|
assert b'"apiKey":"real-key"' in replaced
|
||||||
assert b'"token":"Bearer real-key"' in replaced
|
assert b'"token":"Bearer real-key"' in replaced
|
||||||
|
|
||||||
|
|
||||||
|
class TestExtractProviderTaskId:
|
||||||
|
def test_use_configured_path_first(self):
|
||||||
|
resp_json = {"data": {"taskId": "task-a"}, "taskId": "task-b"}
|
||||||
|
assert _extract_provider_task_id(resp_json, "data.taskId") == "task-a"
|
||||||
|
|
||||||
|
def test_fallback_to_task_id(self):
|
||||||
|
resp_json = {"taskId": "task-b"}
|
||||||
|
assert _extract_provider_task_id(resp_json, "data.taskId") == "task-b"
|
||||||
|
|
||||||
|
def test_fallback_to_id(self):
|
||||||
|
resp_json = {"id": "task-c"}
|
||||||
|
assert _extract_provider_task_id(resp_json, "data.taskId") == "task-c"
|
||||||
|
|
||||||
|
def test_return_none_for_non_dict(self):
|
||||||
|
assert _extract_provider_task_id(None, "taskId") is None
|
||||||
|
|
|
||||||
|
|
@ -106,6 +106,8 @@ services:
|
||||||
- DEER_FLOW_HOST_BASE_DIR=${DEER_FLOW_HOME}
|
- DEER_FLOW_HOST_BASE_DIR=${DEER_FLOW_HOME}
|
||||||
- DEER_FLOW_HOST_SKILLS_PATH=${DEER_FLOW_REPO_ROOT}/skills
|
- DEER_FLOW_HOST_SKILLS_PATH=${DEER_FLOW_REPO_ROOT}/skills
|
||||||
- DEER_FLOW_SANDBOX_HOST=host.docker.internal
|
- DEER_FLOW_SANDBOX_HOST=host.docker.internal
|
||||||
|
- PIP_INDEX_URL=https://pypi.tuna.tsinghua.edu.cn/simple
|
||||||
|
- UV_INDEX_URL=https://pypi.tuna.tsinghua.edu.cn/simple
|
||||||
env_file:
|
env_file:
|
||||||
- ../.env
|
- ../.env
|
||||||
extra_hosts:
|
extra_hosts:
|
||||||
|
|
@ -159,6 +161,8 @@ services:
|
||||||
- DEER_FLOW_HOST_BASE_DIR=${DEER_FLOW_HOME}
|
- DEER_FLOW_HOST_BASE_DIR=${DEER_FLOW_HOME}
|
||||||
- DEER_FLOW_HOST_SKILLS_PATH=${DEER_FLOW_REPO_ROOT}/skills
|
- DEER_FLOW_HOST_SKILLS_PATH=${DEER_FLOW_REPO_ROOT}/skills
|
||||||
- DEER_FLOW_SANDBOX_HOST=host.docker.internal
|
- DEER_FLOW_SANDBOX_HOST=host.docker.internal
|
||||||
|
- PIP_INDEX_URL=https://pypi.tuna.tsinghua.edu.cn/simple
|
||||||
|
- UV_INDEX_URL=https://pypi.tuna.tsinghua.edu.cn/simple
|
||||||
# LangSmith tracing: set LANGSMITH_TRACING=true and LANGSMITH_API_KEY in .env to enable.
|
# LangSmith tracing: set LANGSMITH_TRACING=true and LANGSMITH_API_KEY in .env to enable.
|
||||||
env_file:
|
env_file:
|
||||||
- ../.env
|
- ../.env
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue