"""DingTalk Channel API routes. Provides Config CRUD and message handling for DingTalk bots using Stream mode. """ import uuid from fastapi import APIRouter, Depends, HTTPException, status from loguru import logger from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.core.permissions import check_agent_access, is_agent_creator from app.core.security import get_current_user from app.database import get_db from app.models.channel_config import ChannelConfig from app.models.user import User from app.schemas.schemas import ChannelConfigOut router = APIRouter(tags=["dingtalk"]) # ─── Config CRUD ──────────────────────────────────────── @router.post("/agents/{agent_id}/dingtalk-channel", response_model=ChannelConfigOut, status_code=201) async def configure_dingtalk_channel( agent_id: uuid.UUID, data: dict, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """Configure DingTalk bot for an agent. Fields: app_key, app_secret, agent_id (optional).""" agent, _ = await check_agent_access(db, current_user, agent_id) if not is_agent_creator(current_user, agent): raise HTTPException(status_code=403, detail="Only creator can configure channel") app_key = data.get("app_key", "").strip() app_secret = data.get("app_secret", "").strip() if not app_key or not app_secret: raise HTTPException(status_code=422, detail="app_key and app_secret are required") # Handle connection mode (Stream/WebSocket vs Webhook) and agent_id extra_config = data.get("extra_config", {}) conn_mode = extra_config.get("connection_mode", "websocket") dingtalk_agent_id = extra_config.get("agent_id", "") # DingTalk AgentId for API messaging result = await db.execute( select(ChannelConfig).where( ChannelConfig.agent_id == agent_id, ChannelConfig.channel_type == "dingtalk", ) ) existing = result.scalar_one_or_none() if existing: existing.app_id = app_key existing.app_secret = app_secret existing.is_configured = True existing.extra_config = {**existing.extra_config, "connection_mode": conn_mode, "agent_id": dingtalk_agent_id} await db.flush() # Restart Stream client if in websocket mode if conn_mode == "websocket": from app.services.dingtalk_stream import dingtalk_stream_manager import asyncio asyncio.create_task(dingtalk_stream_manager.start_client(agent_id, app_key, app_secret)) else: # Stop existing Stream client if switched to webhook from app.services.dingtalk_stream import dingtalk_stream_manager import asyncio asyncio.create_task(dingtalk_stream_manager.stop_client(agent_id)) return ChannelConfigOut.model_validate(existing) config = ChannelConfig( agent_id=agent_id, channel_type="dingtalk", app_id=app_key, app_secret=app_secret, is_configured=True, extra_config={"connection_mode": conn_mode}, ) db.add(config) await db.flush() # Start Stream client if in websocket mode if conn_mode == "websocket": from app.services.dingtalk_stream import dingtalk_stream_manager import asyncio asyncio.create_task(dingtalk_stream_manager.start_client(agent_id, app_key, app_secret)) return ChannelConfigOut.model_validate(config) @router.get("/agents/{agent_id}/dingtalk-channel", response_model=ChannelConfigOut) async def get_dingtalk_channel( agent_id: uuid.UUID, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): await check_agent_access(db, current_user, agent_id) result = await db.execute( select(ChannelConfig).where( ChannelConfig.agent_id == agent_id, ChannelConfig.channel_type == "dingtalk", ) ) config = result.scalar_one_or_none() if not config: raise HTTPException(status_code=404, detail="DingTalk not configured") return ChannelConfigOut.model_validate(config) @router.delete("/agents/{agent_id}/dingtalk-channel", status_code=204) async def delete_dingtalk_channel( agent_id: uuid.UUID, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): agent, _ = await check_agent_access(db, current_user, agent_id) if not is_agent_creator(current_user, agent): raise HTTPException(status_code=403, detail="Only creator can remove channel") result = await db.execute( select(ChannelConfig).where( ChannelConfig.agent_id == agent_id, ChannelConfig.channel_type == "dingtalk", ) ) config = result.scalar_one_or_none() if not config: raise HTTPException(status_code=404, detail="DingTalk not configured") await db.delete(config) # Stop Stream client from app.services.dingtalk_stream import dingtalk_stream_manager import asyncio asyncio.create_task(dingtalk_stream_manager.stop_client(agent_id)) # ─── Message Processing (called by Stream callback) ──── async def process_dingtalk_message( agent_id: uuid.UUID, sender_staff_id: str, user_text: str, conversation_id: str, conversation_type: str, session_webhook: str, ): """Process an incoming DingTalk bot message and reply via session webhook.""" import json import httpx from datetime import datetime, timezone from sqlalchemy import select as _select from app.database import async_session from app.models.agent import Agent as AgentModel from app.models.audit import ChatMessage from app.services.channel_session import find_or_create_channel_session from app.services.channel_user_service import channel_user_service from app.api.feishu import _call_agent_llm async with async_session() as db: # Load agent agent_r = await db.execute(_select(AgentModel).where(AgentModel.id == agent_id)) agent_obj = agent_r.scalar_one_or_none() if not agent_obj: logger.warning(f"[DingTalk] Agent {agent_id} not found") return creator_id = agent_obj.creator_id from app.models.agent import DEFAULT_CONTEXT_WINDOW_SIZE ctx_size = (agent_obj.context_window_size or DEFAULT_CONTEXT_WINDOW_SIZE) if agent_obj else DEFAULT_CONTEXT_WINDOW_SIZE # Determine conv_id for session isolation if conversation_type == "2": # Group chat conv_id = f"dingtalk_group_{conversation_id}" else: # P2P / single chat conv_id = f"dingtalk_p2p_{sender_staff_id}" # Resolve channel user via unified service (uses OrgMember + SSO patterns) platform_user = await channel_user_service.resolve_channel_user( db=db, agent=agent_obj, channel_type="dingtalk", external_user_id=sender_staff_id, extra_info={"unionid": sender_staff_id}, ) platform_user_id = platform_user.id # Find or create session sess = await find_or_create_channel_session( db=db, agent_id=agent_id, user_id=platform_user_id, external_conv_id=conv_id, source_channel="dingtalk", first_message_title=user_text, ) session_conv_id = str(sess.id) # Load history history_r = await db.execute( _select(ChatMessage) .where(ChatMessage.agent_id == agent_id, ChatMessage.conversation_id == session_conv_id) .order_by(ChatMessage.created_at.desc()) .limit(ctx_size) ) history = [{"role": m.role, "content": m.content} for m in reversed(history_r.scalars().all())] # Save user message db.add(ChatMessage( agent_id=agent_id, user_id=platform_user_id, role="user", content=user_text, conversation_id=session_conv_id, )) sess.last_message_at = datetime.now(timezone.utc) await db.commit() # Call LLM reply_text = await _call_agent_llm( db, agent_id, user_text, history=history, user_id=platform_user_id, ) logger.info(f"[DingTalk] LLM reply: {reply_text[:100]}") # Reply via session webhook (markdown) try: async with httpx.AsyncClient(timeout=10) as client: await client.post(session_webhook, json={ "msgtype": "markdown", "markdown": { "title": agent_obj.name or "AI Reply", "text": reply_text, }, }) except Exception as e: logger.error(f"[DingTalk] Failed to reply via webhook: {e}") # Fallback: try plain text try: async with httpx.AsyncClient(timeout=10) as client: await client.post(session_webhook, json={ "msgtype": "text", "text": {"content": reply_text}, }) except Exception as e2: logger.error(f"[DingTalk] Fallback text reply also failed: {e2}") # Save assistant reply db.add(ChatMessage( agent_id=agent_id, user_id=platform_user_id, role="assistant", content=reply_text, conversation_id=session_conv_id, )) sess.last_message_at = datetime.now(timezone.utc) await db.commit() # Log activity from app.services.activity_logger import log_activity await log_activity( agent_id, "chat_reply", f"Replied to DingTalk message: {reply_text[:80]}", detail={"channel": "dingtalk", "user_text": user_text[:200], "reply": reply_text[:500]}, ) # ─── OAuth Callback (SSO) ────────────────────────────── @router.get("/auth/dingtalk/callback") async def dingtalk_callback( authCode: str, # DingTalk uses authCode parameter state: str = None, db: AsyncSession = Depends(get_db), ): """Callback for DingTalk OAuth2 login.""" from app.models.identity import SSOScanSession from app.core.security import create_access_token from fastapi.responses import HTMLResponse from app.services.auth_registry import auth_provider_registry # 1. Resolve session to get tenant context tenant_id = None if state: try: sid = uuid.UUID(state) s_res = await db.execute(select(SSOScanSession).where(SSOScanSession.id == sid)) session = s_res.scalar_one_or_none() if session: tenant_id = session.tenant_id except (ValueError, AttributeError): pass # 2. Get DingTalk provider config auth_provider = await auth_provider_registry.get_provider(db, "dingtalk", str(tenant_id) if tenant_id else None) if not auth_provider: return HTMLResponse("Auth failed: DingTalk provider not configured for this tenant") # 3. Exchange code for token and get user info try: # Step 1: Exchange authCode for userAccessToken token_data = await auth_provider.exchange_code_for_token(authCode) access_token = token_data.get("access_token") if not access_token: logger.error(f"DingTalk token exchange failed: {token_data}") return HTMLResponse(f"Auth failed: Token exchange error") # Step 2: Get user info using modern v1.0 API user_info = await auth_provider.get_user_info(access_token) if not user_info.provider_union_id: logger.error(f"DingTalk user info missing unionId: {user_info.raw_data}") return HTMLResponse("Auth failed: No unionid returned") # Step 3: Find or create user (handles OrgMember linking) user, is_new = await auth_provider.find_or_create_user( db, user_info, tenant_id=str(tenant_id) if tenant_id else None ) if not user: return HTMLResponse("Auth failed: User resolution failed") except Exception as e: logger.error(f"DingTalk login error: {e}") return HTMLResponse(f"Auth failed: {str(e)}") # 4. Standard login token = create_access_token(str(user.id), user.role) if state: try: sid = uuid.UUID(state) s_res = await db.execute(select(SSOScanSession).where(SSOScanSession.id == sid)) session = s_res.scalar_one_or_none() if session: session.status = "authorized" session.provider_type = "dingtalk" session.user_id = user.id session.access_token = token session.error_msg = None await db.commit() return HTMLResponse( f"""
SSO login successful. Redirecting...
""" ) except Exception as e: logger.exception("Failed to update SSO session (dingtalk) %s", e) return HTMLResponse(f"Logged in. Token: {token}")