Clawith/backend/app/api/dingtalk.py

349 lines
13 KiB
Python

"""DingTalk Channel API routes.
Provides Config CRUD and message handling for DingTalk bots using Stream mode.
"""
import uuid
from fastapi import APIRouter, Depends, HTTPException, status
from loguru import logger
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.permissions import check_agent_access, is_agent_creator
from app.core.security import get_current_user
from app.database import get_db
from app.models.channel_config import ChannelConfig
from app.models.user import User
from app.schemas.schemas import ChannelConfigOut
router = APIRouter(tags=["dingtalk"])
# ─── Config CRUD ────────────────────────────────────────
@router.post("/agents/{agent_id}/dingtalk-channel", response_model=ChannelConfigOut, status_code=201)
async def configure_dingtalk_channel(
agent_id: uuid.UUID,
data: dict,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Configure DingTalk bot for an agent. Fields: app_key, app_secret, agent_id (optional)."""
agent, _ = await check_agent_access(db, current_user, agent_id)
if not is_agent_creator(current_user, agent):
raise HTTPException(status_code=403, detail="Only creator can configure channel")
app_key = data.get("app_key", "").strip()
app_secret = data.get("app_secret", "").strip()
if not app_key or not app_secret:
raise HTTPException(status_code=422, detail="app_key and app_secret are required")
# Handle connection mode (Stream/WebSocket vs Webhook) and agent_id
extra_config = data.get("extra_config", {})
conn_mode = extra_config.get("connection_mode", "websocket")
dingtalk_agent_id = extra_config.get("agent_id", "") # DingTalk AgentId for API messaging
result = await db.execute(
select(ChannelConfig).where(
ChannelConfig.agent_id == agent_id,
ChannelConfig.channel_type == "dingtalk",
)
)
existing = result.scalar_one_or_none()
if existing:
existing.app_id = app_key
existing.app_secret = app_secret
existing.is_configured = True
existing.extra_config = {**existing.extra_config, "connection_mode": conn_mode, "agent_id": dingtalk_agent_id}
await db.flush()
# Restart Stream client if in websocket mode
if conn_mode == "websocket":
from app.services.dingtalk_stream import dingtalk_stream_manager
import asyncio
asyncio.create_task(dingtalk_stream_manager.start_client(agent_id, app_key, app_secret))
else:
# Stop existing Stream client if switched to webhook
from app.services.dingtalk_stream import dingtalk_stream_manager
import asyncio
asyncio.create_task(dingtalk_stream_manager.stop_client(agent_id))
return ChannelConfigOut.model_validate(existing)
config = ChannelConfig(
agent_id=agent_id,
channel_type="dingtalk",
app_id=app_key,
app_secret=app_secret,
is_configured=True,
extra_config={"connection_mode": conn_mode},
)
db.add(config)
await db.flush()
# Start Stream client if in websocket mode
if conn_mode == "websocket":
from app.services.dingtalk_stream import dingtalk_stream_manager
import asyncio
asyncio.create_task(dingtalk_stream_manager.start_client(agent_id, app_key, app_secret))
return ChannelConfigOut.model_validate(config)
@router.get("/agents/{agent_id}/dingtalk-channel", response_model=ChannelConfigOut)
async def get_dingtalk_channel(
agent_id: uuid.UUID,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
await check_agent_access(db, current_user, agent_id)
result = await db.execute(
select(ChannelConfig).where(
ChannelConfig.agent_id == agent_id,
ChannelConfig.channel_type == "dingtalk",
)
)
config = result.scalar_one_or_none()
if not config:
raise HTTPException(status_code=404, detail="DingTalk not configured")
return ChannelConfigOut.model_validate(config)
@router.delete("/agents/{agent_id}/dingtalk-channel", status_code=204)
async def delete_dingtalk_channel(
agent_id: uuid.UUID,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
agent, _ = await check_agent_access(db, current_user, agent_id)
if not is_agent_creator(current_user, agent):
raise HTTPException(status_code=403, detail="Only creator can remove channel")
result = await db.execute(
select(ChannelConfig).where(
ChannelConfig.agent_id == agent_id,
ChannelConfig.channel_type == "dingtalk",
)
)
config = result.scalar_one_or_none()
if not config:
raise HTTPException(status_code=404, detail="DingTalk not configured")
await db.delete(config)
# Stop Stream client
from app.services.dingtalk_stream import dingtalk_stream_manager
import asyncio
asyncio.create_task(dingtalk_stream_manager.stop_client(agent_id))
# ─── Message Processing (called by Stream callback) ────
async def process_dingtalk_message(
agent_id: uuid.UUID,
sender_staff_id: str,
user_text: str,
conversation_id: str,
conversation_type: str,
session_webhook: str,
):
"""Process an incoming DingTalk bot message and reply via session webhook."""
import json
import httpx
from datetime import datetime, timezone
from sqlalchemy import select as _select
from app.database import async_session
from app.models.agent import Agent as AgentModel
from app.models.audit import ChatMessage
from app.services.channel_session import find_or_create_channel_session
from app.services.channel_user_service import channel_user_service
from app.api.feishu import _call_agent_llm
async with async_session() as db:
# Load agent
agent_r = await db.execute(_select(AgentModel).where(AgentModel.id == agent_id))
agent_obj = agent_r.scalar_one_or_none()
if not agent_obj:
logger.warning(f"[DingTalk] Agent {agent_id} not found")
return
creator_id = agent_obj.creator_id
from app.models.agent import DEFAULT_CONTEXT_WINDOW_SIZE
ctx_size = (agent_obj.context_window_size or DEFAULT_CONTEXT_WINDOW_SIZE) if agent_obj else DEFAULT_CONTEXT_WINDOW_SIZE
# Determine conv_id for session isolation
if conversation_type == "2":
# Group chat
conv_id = f"dingtalk_group_{conversation_id}"
else:
# P2P / single chat
conv_id = f"dingtalk_p2p_{sender_staff_id}"
# Resolve channel user via unified service (uses OrgMember + SSO patterns)
platform_user = await channel_user_service.resolve_channel_user(
db=db,
agent=agent_obj,
channel_type="dingtalk",
external_user_id=sender_staff_id,
extra_info={"unionid": sender_staff_id},
)
platform_user_id = platform_user.id
# Find or create session
sess = await find_or_create_channel_session(
db=db,
agent_id=agent_id,
user_id=platform_user_id,
external_conv_id=conv_id,
source_channel="dingtalk",
first_message_title=user_text,
)
session_conv_id = str(sess.id)
# Load history
history_r = await db.execute(
_select(ChatMessage)
.where(ChatMessage.agent_id == agent_id, ChatMessage.conversation_id == session_conv_id)
.order_by(ChatMessage.created_at.desc())
.limit(ctx_size)
)
history = [{"role": m.role, "content": m.content} for m in reversed(history_r.scalars().all())]
# Save user message
db.add(ChatMessage(
agent_id=agent_id, user_id=platform_user_id,
role="user", content=user_text,
conversation_id=session_conv_id,
))
sess.last_message_at = datetime.now(timezone.utc)
await db.commit()
# Call LLM
reply_text = await _call_agent_llm(
db, agent_id, user_text,
history=history, user_id=platform_user_id,
)
logger.info(f"[DingTalk] LLM reply: {reply_text[:100]}")
# Reply via session webhook (markdown)
try:
async with httpx.AsyncClient(timeout=10) as client:
await client.post(session_webhook, json={
"msgtype": "markdown",
"markdown": {
"title": agent_obj.name or "AI Reply",
"text": reply_text,
},
})
except Exception as e:
logger.error(f"[DingTalk] Failed to reply via webhook: {e}")
# Fallback: try plain text
try:
async with httpx.AsyncClient(timeout=10) as client:
await client.post(session_webhook, json={
"msgtype": "text",
"text": {"content": reply_text},
})
except Exception as e2:
logger.error(f"[DingTalk] Fallback text reply also failed: {e2}")
# Save assistant reply
db.add(ChatMessage(
agent_id=agent_id, user_id=platform_user_id,
role="assistant", content=reply_text,
conversation_id=session_conv_id,
))
sess.last_message_at = datetime.now(timezone.utc)
await db.commit()
# Log activity
from app.services.activity_logger import log_activity
await log_activity(
agent_id, "chat_reply",
f"Replied to DingTalk message: {reply_text[:80]}",
detail={"channel": "dingtalk", "user_text": user_text[:200], "reply": reply_text[:500]},
)
# ─── OAuth Callback (SSO) ──────────────────────────────
@router.get("/auth/dingtalk/callback")
async def dingtalk_callback(
authCode: str, # DingTalk uses authCode parameter
state: str = None,
db: AsyncSession = Depends(get_db),
):
"""Callback for DingTalk OAuth2 login."""
from app.models.identity import SSOScanSession
from app.core.security import create_access_token
from fastapi.responses import HTMLResponse
from app.services.auth_registry import auth_provider_registry
# 1. Resolve session to get tenant context
tenant_id = None
if state:
try:
sid = uuid.UUID(state)
s_res = await db.execute(select(SSOScanSession).where(SSOScanSession.id == sid))
session = s_res.scalar_one_or_none()
if session:
tenant_id = session.tenant_id
except (ValueError, AttributeError):
pass
# 2. Get DingTalk provider config
auth_provider = await auth_provider_registry.get_provider(db, "dingtalk", str(tenant_id) if tenant_id else None)
if not auth_provider:
return HTMLResponse("Auth failed: DingTalk provider not configured for this tenant")
# 3. Exchange code for token and get user info
try:
# Step 1: Exchange authCode for userAccessToken
token_data = await auth_provider.exchange_code_for_token(authCode)
access_token = token_data.get("access_token")
if not access_token:
logger.error(f"DingTalk token exchange failed: {token_data}")
return HTMLResponse(f"Auth failed: Token exchange error")
# Step 2: Get user info using modern v1.0 API
user_info = await auth_provider.get_user_info(access_token)
if not user_info.provider_union_id:
logger.error(f"DingTalk user info missing unionId: {user_info.raw_data}")
return HTMLResponse("Auth failed: No unionid returned")
# Step 3: Find or create user (handles OrgMember linking)
user, is_new = await auth_provider.find_or_create_user(
db, user_info, tenant_id=str(tenant_id) if tenant_id else None
)
if not user:
return HTMLResponse("Auth failed: User resolution failed")
except Exception as e:
logger.error(f"DingTalk login error: {e}")
return HTMLResponse(f"Auth failed: {str(e)}")
# 4. Standard login
token = create_access_token(str(user.id), user.role)
if state:
try:
sid = uuid.UUID(state)
s_res = await db.execute(select(SSOScanSession).where(SSOScanSession.id == sid))
session = s_res.scalar_one_or_none()
if session:
session.status = "authorized"
session.provider_type = "dingtalk"
session.user_id = user.id
session.access_token = token
session.error_msg = None
await db.commit()
return HTMLResponse(
f"""<html><head><meta charset="utf-8" /></head>
<body style="font-family: sans-serif; padding: 24px;">
<div>SSO login successful. Redirecting...</div>
<script>window.location.href = "/sso/entry?sid={sid}&complete=1";</script>
</body></html>"""
)
except Exception as e:
logger.exception("Failed to update SSO session (dingtalk) %s", e)
return HTMLResponse(f"Logged in. Token: {token}")