191 lines
6.1 KiB
Python
191 lines
6.1 KiB
Python
"""Thin async billing client for the third-party proxy.
|
|
|
|
Calls the same reserve/finalize HTTP endpoints as BillingMiddleware,
|
|
but with semantics appropriate for third-party task calls:
|
|
- estimatedTokens = 0 (not applicable)
|
|
- finalAmount = actual provider monetary charge (thirdPartyConsumeMoney)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
|
|
import httpx
|
|
|
|
from deerflow.config.app_config import get_app_config
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_SUCCESS_STATUS_CODES = {200, 1000}
|
|
|
|
|
|
async def reserve(
|
|
*,
|
|
thread_id: str | None,
|
|
call_id: str,
|
|
provider: str,
|
|
operation: str,
|
|
frozen_amount: float,
|
|
frozen_type: int | None,
|
|
) -> str | None:
|
|
"""Reserve billing before forwarding a submit call.
|
|
|
|
Returns the frozen_id string on success, or None if billing is disabled
|
|
or the reserve call fails (non-blocking — proxy continues in that case).
|
|
"""
|
|
cfg = get_app_config().billing
|
|
if not cfg.enabled or not cfg.reserve_url:
|
|
logger.info(
|
|
"[ThirdPartyProxy][Billing] reserve skipped: enabled=%s reserve_url=%s call_id=%s",
|
|
cfg.enabled,
|
|
cfg.reserve_url,
|
|
call_id,
|
|
)
|
|
return None
|
|
|
|
expire_at = datetime.now() + timedelta(seconds=cfg.default_expire_seconds)
|
|
payload = {
|
|
"sessionId": thread_id,
|
|
"callId": call_id,
|
|
"modelName": provider,
|
|
"question": f"skill invokes {operation.split('/')[-1]}",
|
|
"frozenAmount": frozen_amount,
|
|
"frozenType": frozen_type if frozen_type is not None else cfg.frozen_type,
|
|
"estimatedInputTokens": 0,
|
|
"estimatedOutputTokens": 0,
|
|
"expireAt": expire_at.strftime("%Y-%m-%d %H:%M:%S"),
|
|
}
|
|
|
|
logger.info(
|
|
"[ThirdPartyProxy][Billing] reserve request: url=%s call_id=%s provider=%s thread_id=%s",
|
|
cfg.reserve_url,
|
|
call_id,
|
|
provider,
|
|
thread_id,
|
|
)
|
|
logger.debug("[ThirdPartyProxy][Billing] reserve payload: %s", payload)
|
|
try:
|
|
async with httpx.AsyncClient(timeout=cfg.timeout_seconds) as client:
|
|
resp = await client.post(cfg.reserve_url, headers=cfg.headers, json=payload)
|
|
resp.raise_for_status()
|
|
data: dict = resp.json()
|
|
except Exception as exc:
|
|
logger.warning("[ThirdPartyProxy][Billing] reserve HTTP error: %s", exc)
|
|
return None
|
|
|
|
logger.info(
|
|
"[ThirdPartyProxy][Billing] reserve response: call_id=%s status_code=%s",
|
|
call_id,
|
|
resp.status_code,
|
|
)
|
|
logger.debug("[ThirdPartyProxy][Billing] reserve response body: %s", data)
|
|
|
|
if not _is_success(data):
|
|
logger.warning(
|
|
"[ThirdPartyProxy][Billing] reserve rejected: call_id=%s status=%s payload=%s",
|
|
call_id,
|
|
data.get("status") or data.get("code"),
|
|
data,
|
|
)
|
|
return None
|
|
|
|
frozen_id = (data.get("data") or {}).get("frozenId")
|
|
if not isinstance(frozen_id, str) or not frozen_id:
|
|
logger.warning(
|
|
"[ThirdPartyProxy][Billing] reserve response missing frozenId: call_id=%s payload=%s",
|
|
call_id,
|
|
data,
|
|
)
|
|
return None
|
|
|
|
logger.info("[ThirdPartyProxy][Billing] reserve ok: call_id=%s frozen_id=%s", call_id, frozen_id)
|
|
logger.debug(
|
|
"[ThirdPartyProxy][Billing] reserve success details: provider=%s operation=%s expire_at=%s",
|
|
provider,
|
|
operation,
|
|
payload["expireAt"],
|
|
)
|
|
return frozen_id
|
|
|
|
|
|
async def finalize(
|
|
*,
|
|
frozen_id: str,
|
|
final_amount: float,
|
|
finalize_reason: str,
|
|
) -> bool:
|
|
"""Finalize billing after a third-party call reaches a terminal state.
|
|
|
|
final_amount is the actual provider charge (e.g. thirdPartyConsumeMoney from RunningHub).
|
|
Pass 0 for failed/cancelled calls.
|
|
Returns True on success.
|
|
"""
|
|
cfg = get_app_config().billing
|
|
if not cfg.enabled or not cfg.finalize_url:
|
|
# Billing not configured — treat as success so the caller marks the record finalized
|
|
logger.info(
|
|
"[ThirdPartyProxy][Billing] finalize skipped: enabled=%s finalize_url=%s frozen_id=%s",
|
|
cfg.enabled,
|
|
cfg.finalize_url,
|
|
frozen_id,
|
|
)
|
|
return True
|
|
|
|
payload = {
|
|
"frozenId": frozen_id,
|
|
"finalAmount": final_amount,
|
|
"usageInputTokens": 0,
|
|
"usageOutputTokens": 0,
|
|
"usageTotalTokens": 0,
|
|
"finalizeReason": finalize_reason,
|
|
}
|
|
|
|
logger.info(
|
|
"[ThirdPartyProxy][Billing] finalize request: frozen_id=%s amount=%s reason=%s url=%s",
|
|
frozen_id,
|
|
final_amount,
|
|
finalize_reason,
|
|
cfg.finalize_url,
|
|
)
|
|
logger.debug("[ThirdPartyProxy][Billing] finalize payload: %s", payload)
|
|
try:
|
|
async with httpx.AsyncClient(timeout=cfg.timeout_seconds) as client:
|
|
resp = await client.post(cfg.finalize_url, headers=cfg.headers, json=payload)
|
|
resp.raise_for_status()
|
|
data: dict = resp.json()
|
|
except Exception as exc:
|
|
logger.warning("[ThirdPartyProxy][Billing] finalize HTTP error: frozen_id=%s err=%s", frozen_id, exc)
|
|
return False
|
|
|
|
logger.info(
|
|
"[ThirdPartyProxy][Billing] finalize response: frozen_id=%s status_code=%s",
|
|
frozen_id,
|
|
resp.status_code,
|
|
)
|
|
logger.debug("[ThirdPartyProxy][Billing] finalize response body: %s", data)
|
|
|
|
if not _is_success(data):
|
|
logger.warning(
|
|
"[ThirdPartyProxy][Billing] finalize rejected: frozen_id=%s status=%s payload=%s",
|
|
frozen_id,
|
|
data.get("status") or data.get("code"),
|
|
data,
|
|
)
|
|
return False
|
|
|
|
logger.info("[ThirdPartyProxy][Billing] finalize ok: frozen_id=%s", frozen_id)
|
|
logger.debug(
|
|
"[ThirdPartyProxy][Billing] finalize success details: amount=%s reason=%s",
|
|
final_amount,
|
|
finalize_reason,
|
|
)
|
|
return True
|
|
|
|
|
|
def _is_success(data: dict) -> bool:
|
|
status = data.get("status") or data.get("code")
|
|
if isinstance(status, int) and status in _SUCCESS_STATUS_CODES:
|
|
return True
|
|
return data.get("success") is True
|