Clawith/backend/app/api/discord_bot.py

377 lines
16 KiB
Python

"""Discord Bot Channel API routes (slash command interactions)."""
import os
import uuid
from fastapi import APIRouter, Depends, HTTPException, Request, Response, status
from loguru import logger
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.permissions import check_agent_access, is_agent_creator
from app.core.security import get_current_user
from app.database import get_db
from app.models.channel_config import ChannelConfig
from app.models.user import User
from app.schemas.schemas import ChannelConfigOut
router = APIRouter(tags=["discord"])
DISCORD_MSG_LIMIT = 2000 # Discord message char limit
# ─── Config CRUD ────────────────────────────────────────
@router.post("/agents/{agent_id}/discord-channel", response_model=ChannelConfigOut, status_code=201)
async def configure_discord_channel(
agent_id: uuid.UUID,
data: dict,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Configure Discord bot for an agent.
Gateway mode fields: bot_token (+ connection_mode='gateway').
Webhook mode fields: application_id, bot_token, public_key.
"""
agent, _ = await check_agent_access(db, current_user, agent_id)
if not is_agent_creator(current_user, agent):
raise HTTPException(status_code=403, detail="Only creator can configure channel")
connection_mode = data.get("connection_mode", "webhook").strip()
bot_token = data.get("bot_token", "").strip()
application_id = data.get("application_id", "").strip()
public_key = data.get("public_key", "").strip()
if not bot_token:
raise HTTPException(status_code=422, detail="bot_token is required")
if connection_mode == "webhook" and (not application_id or not public_key):
raise HTTPException(status_code=422, detail="application_id and public_key are required for webhook mode")
extra_config = {"connection_mode": connection_mode}
result = await db.execute(
select(ChannelConfig).where(
ChannelConfig.agent_id == agent_id,
ChannelConfig.channel_type == "discord",
)
)
existing = result.scalar_one_or_none()
if existing:
existing.app_id = application_id or existing.app_id
existing.app_secret = bot_token
existing.encrypt_key = public_key or existing.encrypt_key
existing.extra_config = extra_config
existing.is_configured = True
await db.flush()
else:
existing = ChannelConfig(
agent_id=agent_id,
channel_type="discord",
app_id=application_id,
app_secret=bot_token,
encrypt_key=public_key,
extra_config=extra_config,
is_configured=True,
)
db.add(existing)
await db.flush()
# Mode-specific post-configuration
if connection_mode == "gateway":
# Start Gateway bot
from app.services.discord_gateway import discord_gateway_manager
await discord_gateway_manager.start_client(agent_id, bot_token)
else:
# Register slash commands for webhook mode
try:
reg = await _register_slash_commands(application_id, bot_token)
logger.info(f"[Discord] Slash command registration: {reg['status']}")
except Exception as e:
logger.warning(f"[Discord] Could not register slash commands: {e}")
return ChannelConfigOut.model_validate(existing)
@router.get("/agents/{agent_id}/discord-channel", response_model=ChannelConfigOut)
async def get_discord_channel(
agent_id: uuid.UUID,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
await check_agent_access(db, current_user, agent_id)
result = await db.execute(
select(ChannelConfig).where(
ChannelConfig.agent_id == agent_id,
ChannelConfig.channel_type == "discord",
)
)
config = result.scalar_one_or_none()
if not config:
raise HTTPException(status_code=404, detail="Discord not configured")
return ChannelConfigOut.model_validate(config)
@router.get("/agents/{agent_id}/discord-channel/webhook-url")
async def get_discord_webhook_url(agent_id: uuid.UUID, request: Request, db: AsyncSession = Depends(get_db)):
from app.services.platform_service import platform_service
public_base = await platform_service.get_public_base_url(db, request)
return {"webhook_url": f"{public_base}/api/channel/discord/{agent_id}/webhook"}
@router.delete("/agents/{agent_id}/discord-channel", status_code=204)
async def delete_discord_channel(
agent_id: uuid.UUID,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
agent, _ = await check_agent_access(db, current_user, agent_id)
if not is_agent_creator(current_user, agent):
raise HTTPException(status_code=403, detail="Only creator can remove channel")
result = await db.execute(
select(ChannelConfig).where(
ChannelConfig.agent_id == agent_id,
ChannelConfig.channel_type == "discord",
)
)
config = result.scalar_one_or_none()
if not config:
raise HTTPException(status_code=404, detail="Discord not configured")
# Stop Gateway client if running
try:
from app.services.discord_gateway import discord_gateway_manager
await discord_gateway_manager.stop_client(agent_id)
except Exception:
pass
await db.delete(config)
# ─── Slash Command Registration ─────────────────────────
async def _register_slash_commands(application_id: str, bot_token: str) -> dict:
"""Register /ask global slash command with Discord API."""
import httpx
import os
command = {
"name": "ask",
"description": "Ask the AI agent a question",
"options": [
{
"name": "message",
"description": "Your question or message to the agent",
"type": 3, # STRING
"required": True,
}
],
}
url = f"https://discord.com/api/v10/applications/{application_id}/commands"
proxy = os.environ.get("DISCORD_PROXY") or os.environ.get("HTTPS_PROXY") or None
async with httpx.AsyncClient(timeout=15, proxy=proxy) as client:
resp = await client.put(
url,
headers={"Authorization": f"Bot {bot_token}", "Content-Type": "application/json"},
json=[command],
)
return {"status": resp.status_code, "body": resp.text}
# ─── Interactions Webhook ───────────────────────────────
def _verify_discord_signature(public_key: str, body: bytes, headers: dict) -> bool:
"""Verify Discord ed25519 signature."""
try:
from nacl.signing import VerifyKey
from nacl.exceptions import BadSignatureError
timestamp = headers.get("x-signature-timestamp", "")
signature = headers.get("x-signature-ed25519", "")
if not timestamp or not signature:
return False
verify_key = VerifyKey(bytes.fromhex(public_key))
verify_key.verify(f"{timestamp}".encode() + body, bytes.fromhex(signature))
return True
except Exception:
return False
async def _send_discord_followup(application_id: str, bot_token: str, interaction_token: str, text: str) -> None:
"""Send follow-up message(s) to Discord Interactions, chunked at 2000 chars."""
import httpx
chunks = [text[i:i + DISCORD_MSG_LIMIT] for i in range(0, len(text), DISCORD_MSG_LIMIT)]
proxy = os.environ.get("DISCORD_PROXY") or os.environ.get("HTTPS_PROXY") or None
async with httpx.AsyncClient(timeout=10, proxy=proxy) as client:
for i, chunk in enumerate(chunks):
if i == 0:
# Edit the original deferred response
await client.patch(
f"https://discord.com/api/v10/webhooks/{application_id}/{interaction_token}/messages/@original",
headers={"Authorization": f"Bot {bot_token}", "Content-Type": "application/json"},
json={"content": chunk},
)
else:
# Additional chunks as follow-up messages
await client.post(
f"https://discord.com/api/v10/webhooks/{application_id}/{interaction_token}",
headers={"Authorization": f"Bot {bot_token}", "Content-Type": "application/json"},
json={"content": chunk},
)
@router.post("/channel/discord/{agent_id}/webhook")
async def discord_interaction_webhook(
agent_id: uuid.UUID,
request: Request,
db: AsyncSession = Depends(get_db),
):
"""Handle Discord Interaction webhooks (PING + slash commands)."""
body_bytes = await request.body()
# Get channel config
result = await db.execute(
select(ChannelConfig).where(
ChannelConfig.agent_id == agent_id,
ChannelConfig.channel_type == "discord",
)
)
config = result.scalar_one_or_none()
if not config:
return Response(status_code=404)
# Verify Discord signature
public_key = config.encrypt_key or ""
if public_key and not _verify_discord_signature(public_key, body_bytes, dict(request.headers)):
return Response(content="Invalid signature", status_code=401)
import json
import asyncio
body = json.loads(body_bytes)
interaction_type = body.get("type", 0)
# Type 1: PING — Discord URL verification
if interaction_type == 1:
return {"type": 1}
# Type 2: APPLICATION_COMMAND (slash command)
if interaction_type == 2:
data_obj = body.get("data", {})
command_name = data_obj.get("name", "")
options = data_obj.get("options", [])
user_text = ""
for opt in options:
if opt.get("name") == "message":
user_text = opt.get("value", "").strip()
break
if not user_text:
return {"type": 4, "data": {"content": "⚠️ 请提供消息内容。Usage: `/ask message:<你的问题>`"}}
interaction_token = body.get("token", "")
application_id = config.app_id or ""
sender_id = body.get("member", {}).get("user", {}).get("id") or body.get("user", {}).get("id", "")
channel_id = body.get("channel_id", "")
# Discord: guild interactions are group chats, DM interactions are P2P
_is_group_discord = bool(body.get("guild_id"))
conv_id = f"discord_{channel_id}" if channel_id else f"discord_dm_{sender_id}"
logger.info(f"[Discord] /{command_name} from {sender_id}: {user_text[:80]}")
# Defer response immediately (Discord requires response within 3 seconds)
# We return type 5 (DEFERRED_CHANNEL_MESSAGE_WITH_SOURCE) and reply later
async def handle_in_background():
from app.models.audit import ChatMessage
from app.models.agent import Agent as AgentModel
from app.api.feishu import _call_agent_llm
from app.services.channel_session import find_or_create_channel_session
from app.database import async_session
from datetime import datetime, timezone
async with async_session() as bg_db:
# Load agent
agent_r = await bg_db.execute(select(AgentModel).where(AgentModel.id == agent_id))
agent_obj = agent_r.scalar_one_or_none()
creator_id = agent_obj.creator_id if agent_obj else agent_id
from app.models.agent import DEFAULT_CONTEXT_WINDOW_SIZE
ctx_size = (agent_obj.context_window_size or DEFAULT_CONTEXT_WINDOW_SIZE) if agent_obj else DEFAULT_CONTEXT_WINDOW_SIZE
# Find-or-create platform user for this Discord sender via unified service
from app.services.channel_user_service import channel_user_service
_discord_username = body.get("member", {}).get("user", {}).get("username") or body.get("user", {}).get("username", "")
_display = _discord_username or f"Discord User {sender_id[:8]}"
_extra_info = {"name": _display}
_platform_user = await channel_user_service.resolve_channel_user(
db=bg_db,
agent=agent_obj,
channel_type="discord",
external_user_id=sender_id,
extra_info=_extra_info,
)
# Update display_name if we now have a better name
if _discord_username and _platform_user.display_name and _platform_user.display_name.startswith("Discord User ") and _platform_user.display_name != _discord_username:
_platform_user.display_name = _discord_username
await bg_db.flush()
platform_user_id = _platform_user.id
# Find-or-create ChatSession for this Discord conversation
sess = await find_or_create_channel_session(
db=bg_db,
agent_id=agent_id,
user_id=creator_id if _is_group_discord else platform_user_id,
external_conv_id=conv_id,
source_channel="discord",
first_message_title=user_text,
is_group=_is_group_discord,
group_name=f"Discord Channel {channel_id[:8]}" if _is_group_discord else None,
)
session_conv_id = str(sess.id)
# Load history from session
history_r = await bg_db.execute(
select(ChatMessage)
.where(ChatMessage.agent_id == agent_id, ChatMessage.conversation_id == session_conv_id)
.order_by(ChatMessage.created_at.desc())
.limit(ctx_size)
)
history = [{"role": m.role, "content": m.content} for m in reversed(history_r.scalars().all())]
# Save user message
bg_db.add(ChatMessage(agent_id=agent_id, user_id=platform_user_id, role="user", content=user_text, conversation_id=session_conv_id))
sess.last_message_at = datetime.now(timezone.utc)
await bg_db.commit()
# Call LLM
reply_text = await _call_agent_llm(bg_db, agent_id, user_text, history=history)
logger.info(f"[Discord] LLM reply: {reply_text[:80]}")
# Save reply
bg_db.add(ChatMessage(agent_id=agent_id, user_id=platform_user_id, role="assistant", content=reply_text, conversation_id=session_conv_id))
sess.last_message_at = datetime.now(timezone.utc)
await bg_db.commit()
# Bot token stored in config — read from DB to avoid detached ORM issues
from sqlalchemy import select as _sel
cfg_r = await bg_db.execute(_sel(ChannelConfig).where(
ChannelConfig.agent_id == agent_id,
ChannelConfig.channel_type == "discord",
))
cfg = cfg_r.scalar_one_or_none()
bot_token_bg = cfg.app_secret if cfg else ""
app_id_bg = cfg.app_id if cfg else ""
# Send chunked reply via Discord follow-up
if bot_token_bg and interaction_token and app_id_bg:
try:
await _send_discord_followup(app_id_bg, bot_token_bg, interaction_token, reply_text)
except Exception as e:
logger.error(f"[Discord] Failed to send follow-up: {e}")
asyncio.create_task(handle_in_background())
# Return DEFERRED_CHANNEL_MESSAGE_WITH_SOURCE — shows "thinking..." to user
return {"type": 5}
# Unsupported interaction type
return {"type": 1}