Compare commits

...

3 Commits

7 changed files with 94 additions and 41 deletions

View File

@ -161,6 +161,14 @@ async def _handle_submit(
await _finalize_zero(frozen_id, record.proxy_call_id, "error exception")
raise HTTPException(status_code=502, detail=f"Provider unreachable: {exc}") from exc
# HTTP-level failure
if status_code >= 400:
reason = f"error_http_{status_code}"
await _finalize_zero(frozen_id, record.proxy_call_id, reason)
if resp_json is not None:
ledger.update_response(record.proxy_call_id, resp_json)
return Response(content=resp_body, status_code=status_code, headers=resp_headers, media_type="application/json")
resp_json = _try_parse_json(resp_body)
if resp_json is None:
@ -189,20 +197,8 @@ async def _handle_submit(
media_type = resp_headers.get("content-type")
return Response(content=resp_body, status_code=status_code, headers=resp_headers, media_type=media_type)
# HTTP-level failure
if status_code >= 400:
reason = f"error_http_{status_code}"
await _finalize_zero(frozen_id, record.proxy_call_id, reason)
if resp_json is not None:
ledger.update_response(record.proxy_call_id, resp_json)
return Response(content=resp_body, status_code=status_code, headers=resp_headers, media_type="application/json")
# Extract task_id from response; no task_id means provider rejected at business level
provider_task_id: str | None = None
if resp_json is not None:
raw = proxy.jsonpath_get(resp_json, task_id_jsonpath)
if raw is not None:
provider_task_id = str(raw)
provider_task_id = _extract_provider_task_id(resp_json, task_id_jsonpath)
if provider_task_id:
ledger.set_running(record.proxy_call_id, provider_task_id)
@ -374,6 +370,14 @@ async def _handle_query(
"[ThirdPartyProxy] finalize claim denied (already processed): proxy_call_id=%s",
record.proxy_call_id,
)
elif (is_success or is_failure) and provider_task_id:
logger.warning(
"[ThirdPartyProxy] terminal query without ledger record: provider=%s provider_task_id=%s status=%s. "
"Likely causes: submit route not matched, task_id_jsonpath extraction mismatch, or gateway process restart.",
provider,
provider_task_id,
status_str,
)
proxy_call_id = record.proxy_call_id if record else None
return _proxy_response(resp_json, proxy_call_id, status_code, resp_headers)
@ -439,6 +443,33 @@ def _try_parse_json(data: bytes) -> dict[str, Any] | None:
return None
def _extract_provider_task_id(resp_json: dict[str, Any] | None, configured_path: str) -> str | None:
"""Extract provider task ID from submit response with config-first strategy.
Priority:
1) configured path from submit route
2) common fallback paths for provider inconsistencies
"""
if not isinstance(resp_json, dict):
return None
raw = proxy.jsonpath_get(resp_json, configured_path)
if raw is not None:
return str(raw)
for fallback_path in ("taskId", "data.taskId", "id"):
raw = proxy.jsonpath_get(resp_json, fallback_path)
if raw is not None:
logger.warning(
"[ThirdPartyProxy] submit task id extracted via fallback path: configured=%s fallback=%s",
configured_path,
fallback_path,
)
return str(raw)
return None
def _resolve_final_amount(resp_json: dict[str, Any], query_route) -> float:
"""Resolve final billing amount from configured usage paths.

View File

@ -215,15 +215,14 @@ async def forward_request(
) -> tuple[int, dict[str, str], bytes]:
"""Forward *method* *path* to the provider and return (status_code, headers, body).
The provider's API key (read from the environment variable named in
``provider_config.api_key_env``) is injected automatically, replacing
any Authorization header the caller might have sent.
If configured, the provider API key from ``provider_config.api_key_env``
is used to replace API key marker placeholders in forwarded headers/body.
"""
target_url = provider_config.base_url.rstrip("/") + "/" + path.lstrip("/")
if query_params:
target_url += "?" + query_params
# Build forwarded headers: drop internal/hop-by-hop, then inject API key
# Build forwarded headers: drop internal/hop-by-hop, then replace API key markers.
forward_headers = {
k: v for k, v in headers.items() if k.lower() not in _STRIP_REQUEST_HEADERS
}
@ -233,7 +232,6 @@ async def forward_request(
# Dependency-injection style: replace marker placeholders first.
forward_headers = _replace_api_key_marker_in_headers(forward_headers, api_key)
body = _replace_api_key_marker_in_body(forward_headers, body, api_key)
forward_headers[provider_config.api_key_header] = provider_config.api_key_prefix + api_key
else:
logger.warning(
"[ThirdPartyProxy] api_key_env '%s' is not set for provider",

View File

@ -79,14 +79,6 @@ class ThirdPartyProviderConfig(BaseModel):
default=None,
description="Name of the environment variable holding the API key",
)
api_key_header: str = Field(
default="Authorization",
description="Request header name for the API key",
)
api_key_prefix: str = Field(
default="Bearer ",
description="String prepended to the API key value in the header",
)
timeout_seconds: float = Field(
default=30.0,
gt=0,

View File

@ -4,6 +4,7 @@ from __future__ import annotations
from app.gateway.third_party_proxy.ledger import CallLedger
from app.gateway.routers.third_party import (
_extract_provider_task_id,
_extract_usage_tokens,
_extract_usage_tokens_from_submit_stream,
_resolve_final_amount,
@ -290,3 +291,20 @@ class TestApiKeyMarkerReplacement:
replaced = _replace_api_key_marker_in_body(headers, body, "real-key")
assert b'"apiKey":"real-key"' in replaced
assert b'"token":"Bearer real-key"' in replaced
class TestExtractProviderTaskId:
def test_use_configured_path_first(self):
resp_json = {"data": {"taskId": "task-a"}, "taskId": "task-b"}
assert _extract_provider_task_id(resp_json, "data.taskId") == "task-a"
def test_fallback_to_task_id(self):
resp_json = {"taskId": "task-b"}
assert _extract_provider_task_id(resp_json, "data.taskId") == "task-b"
def test_fallback_to_id(self):
resp_json = {"id": "task-c"}
assert _extract_provider_task_id(resp_json, "data.taskId") == "task-c"
def test_return_none_for_non_dict(self):
assert _extract_provider_task_id(None, "taskId") is None

View File

@ -61,8 +61,6 @@ third_party_proxy:
runninghub:
base_url: https://www.runninghub.cn
api_key_env: RUNNINGHUB_API_KEY
api_key_header: Authorization
api_key_prefix: "Bearer "
timeout_seconds: 30.0
frozen_amount: 10.0
frozen_type: 2
@ -102,8 +100,6 @@ third_party_proxy:
dashscope:
base_url: https://dashscope.aliyuncs.com
api_key_env: DASHSCOPE_API_KEY
api_key_header: Authorization
api_key_prefix: "Bearer "
timeout_seconds: 60.0
frozen_token: 32768
submit_routes:

View File

@ -106,6 +106,8 @@ services:
- DEER_FLOW_HOST_BASE_DIR=${DEER_FLOW_HOME}
- DEER_FLOW_HOST_SKILLS_PATH=${DEER_FLOW_REPO_ROOT}/skills
- DEER_FLOW_SANDBOX_HOST=host.docker.internal
- PIP_INDEX_URL=https://pypi.tuna.tsinghua.edu.cn/simple
- UV_INDEX_URL=https://pypi.tuna.tsinghua.edu.cn/simple
env_file:
- ../.env
extra_hosts:
@ -159,6 +161,8 @@ services:
- DEER_FLOW_HOST_BASE_DIR=${DEER_FLOW_HOME}
- DEER_FLOW_HOST_SKILLS_PATH=${DEER_FLOW_REPO_ROOT}/skills
- DEER_FLOW_SANDBOX_HOST=host.docker.internal
- PIP_INDEX_URL=https://pypi.tuna.tsinghua.edu.cn/simple
- UV_INDEX_URL=https://pypi.tuna.tsinghua.edu.cn/simple
# LangSmith tracing: set LANGSMITH_TRACING=true and LANGSMITH_API_KEY in .env to enable.
env_file:
- ../.env

View File

@ -9,7 +9,7 @@ Applicable scenarios:
## 1. Migration Goals
1. The skill no longer calls third-party domains directly.
2. The skill no longer manages third-party API keys itself.
2. Real API keys are replaced by an API key marker in scripts being migrated.
3. All requests go through `/api/proxy/{provider}/...`.
4. Gateway handles:
- API key injection
@ -22,7 +22,11 @@ Applicable scenarios:
1. Keep provider names stable (for example, `runninghub`); do not encode model paths in provider names.
2. Only submit requests should carry `X-Idempotency-Key`; query requests should not.
3. Use `X-Thread-Id` as a common context header whenever available.
4. Use shorthand dot-paths in config extraction fields:
4. Replace API key usage in scripts with marker `API_KEY_MARKER = "__API_KEY_MARKER__"` at the actual usage location:
- If the script uses header auth, replace with `Authorization: Bearer __API_KEY_MARKER__`.
- If the script uses body auth, replace the body `apiKey` field value with `__API_KEY_MARKER__`.
- Choose header/body based on the script's real provider contract; do not force a single style.
5. Use shorthand dot-paths in config extraction fields:
- Correct: `taskId`, `status`, `usage.thirdPartyConsumeMoney`
- Incorrect: `$.taskId`, `'$'.taskId`
@ -45,6 +49,7 @@ Implement:
- always sets `Content-Type: application/json`
- optionally sets `X-Thread-Id`
- sets `X-Idempotency-Key` only when `include_idempotency=True`
- if this script authenticates via header, set `Authorization: Bearer __API_KEY_MARKER__`
### Step 3: Route submit calls through gateway
@ -68,13 +73,15 @@ With:
And use:
- `headers=build_proxy_headers()`
### Step 5: Remove third-party API key logic from the skill
### Step 5: Replace third-party API key values with marker (do not keep real keys in scripts)
Remove:
- Loading `RUNNINGHUB_API_KEY` in the script
- Building `Authorization: Bearer ...` in the script
Replace in the script being migrated:
- Any real `Authorization: Bearer <real-key>` value -> `Authorization: Bearer __API_KEY_MARKER__`
- Any real body `apiKey` value -> `"apiKey": "__API_KEY_MARKER__"`
Reason: third-party credentials are injected by gateway.
Keep the original auth location (header vs body) unchanged unless provider API requirements changed.
Reason: gateway injects real credentials by replacing marker values at proxy forwarding time.
### Step 6: Keep essential error handling
@ -115,8 +122,6 @@ third_party_proxy:
runninghub:
base_url: https://www.runninghub.cn
api_key_env: RUNNINGHUB_API_KEY
api_key_header: Authorization
api_key_prefix: "Bearer "
timeout_seconds: 30.0
frozen_amount: 10.0
frozen_type: 2
@ -150,10 +155,12 @@ from pathlib import Path
from dotenv import dotenv_values
API_KEY_MARKER = "__API_KEY_MARKER__"
def load_skill_env() -> dict[str, str]:
"""Load skill-local .env values."""
env_path = Path(__file__).parent.parent / ".env"
env_path = Path(__file__).parent / ".env"
return {
key: value
for key, value in dotenv_values(env_path).items()
@ -184,6 +191,13 @@ def build_proxy_headers(*, include_idempotency: bool = False) -> dict[str, str]:
from uuid import uuid4
headers["X-Idempotency-Key"] = str(uuid4())
return headers
# Optional helper when provider requires header auth in script contract:
def build_proxy_headers_with_auth(*, include_idempotency: bool = False) -> dict[str, str]:
headers = build_proxy_headers(include_idempotency=include_idempotency)
headers["Authorization"] = f"Bearer {API_KEY_MARKER}"
return headers
```
## 6. Common Pitfalls
@ -206,7 +220,7 @@ For Docker-based sandbox execution, use:
## 7. Validation Checklist
1. No direct third-party domain calls remain in the skill script.
2. The skill script no longer reads third-party API keys.
2. The skill script no longer contains real third-party API key values.
3. Submit uses proxy URL + `include_idempotency=True`.
4. Query uses proxy URL + `include_idempotency=False`.
5. Config extraction fields use shorthand dot-paths only.