Clawith/backend/app/services/wecom_stream.py

373 lines
15 KiB
Python

"""WeCom (企业微信) AI Bot WebSocket Long Connection Manager.
Uses the wecom-aibot-sdk-python SDK for WebSocket-based message reception.
No callback URL or domain verification needed.
"""
import asyncio
import uuid
from typing import Dict
from loguru import logger
from sqlalchemy import select
from app.database import async_session
from app.models.channel_config import ChannelConfig
class WeComStreamManager:
"""Manages WeCom AI Bot WebSocket clients for all agents."""
def __init__(self):
self._clients: Dict[uuid.UUID, object] = {}
self._tasks: Dict[uuid.UUID, asyncio.Task] = {}
async def start_client(
self,
agent_id: uuid.UUID,
bot_id: str,
bot_secret: str,
stop_existing: bool = True,
):
"""Start a WeCom AI Bot WebSocket client for a specific agent."""
if not bot_id or not bot_secret:
logger.warning(f"[WeCom Stream] Missing bot_id or bot_secret for {agent_id}, skipping")
return
logger.info(f"[WeCom Stream] Starting client for agent {agent_id} (BotID: {bot_id[:12]}...)")
# Stop existing client if any
if stop_existing:
await self.stop_client(agent_id)
task = asyncio.create_task(
self._run_client(agent_id, bot_id, bot_secret),
name=f"wecom-stream-{str(agent_id)[:8]}",
)
self._tasks[agent_id] = task
async def _run_client(
self,
agent_id: uuid.UUID,
bot_id: str,
bot_secret: str,
):
"""Run the WeCom WebSocket client (async, runs in the main event loop)."""
try:
from wecom_aibot_sdk import WSClient, generate_req_id
except ImportError:
logger.warning(
"[WeCom Stream] wecom-aibot-sdk-python not installed. "
"Install with: pip install wecom-aibot-sdk-python"
)
return
try:
client = WSClient({
"bot_id": bot_id,
"secret": bot_secret,
"max_reconnect_attempts": -1, # infinite reconnect
"heartbeat_interval": 30000, # 30s heartbeat
})
self._clients[agent_id] = client
# ── Message handler: text ──
async def on_text(frame):
try:
body = frame.body or {}
text_obj = body.get("text", {})
user_text = text_obj.get("content", "").strip()
if not user_text:
return
sender = body.get("from", {})
sender_id = sender.get("user_id", "") or sender.get("userid", "")
chat_id = body.get("chatid", "")
# WeCom SDK's 'chattype' is unreliable (always 'single').
# The real group indicator is the PRESENCE of 'chatid' field.
is_group_msg = bool(chat_id)
# Debug: log full body to understand the data structure
logger.info(
f"[WeCom Stream] Text from {sender_id}, "
f"is_group={is_group_msg}, chat_id={chat_id or 'N/A'}, "
f"body_keys={list(body.keys())}: {user_text[:80]}"
)
# Process message and get reply
reply_text = await _process_wecom_stream_message(
agent_id=agent_id,
sender_id=sender_id,
user_text=user_text,
chat_id=chat_id,
is_group=is_group_msg,
)
# Reply via streaming
stream_id = generate_req_id("stream")
await client.reply_stream(frame, stream_id, reply_text, finish=True)
logger.info(f"[WeCom Stream] Replied to {sender_id}: {reply_text[:80]}")
except Exception as e:
logger.error(f"[WeCom Stream] Error handling text message: {e}")
import traceback
traceback.print_exc()
try:
stream_id = generate_req_id("stream")
await client.reply_stream(
frame, stream_id,
f"Processing error: {str(e)[:100]}",
finish=True,
)
except Exception:
pass
# ── Message handler: image ──
async def on_image(frame):
try:
body = frame.body or {}
sender = body.get("from", {})
sender_id = sender.get("user_id", "") or sender.get("userid", "")
logger.info(f"[WeCom Stream] Image message from {sender_id} (not yet handled)")
stream_id = generate_req_id("stream")
await client.reply_stream(
frame, stream_id,
"Received your image. Image processing is not yet supported.",
finish=True,
)
except Exception as e:
logger.error(f"[WeCom Stream] Error handling image: {e}")
# ── Message handler: file ──
async def on_file(frame):
try:
body = frame.body or {}
sender = body.get("from", {})
sender_id = sender.get("user_id", "") or sender.get("userid", "")
logger.info(f"[WeCom Stream] File message from {sender_id} (not yet handled)")
stream_id = generate_req_id("stream")
await client.reply_stream(
frame, stream_id,
"Received your file. File processing is not yet supported.",
finish=True,
)
except Exception as e:
logger.error(f"[WeCom Stream] Error handling file: {e}")
# ── Enter chat event: send welcome ──
async def on_enter_chat(frame):
try:
# Look up agent's welcome message
from app.models.agent import Agent as AgentModel
async with async_session() as db:
r = await db.execute(select(AgentModel).where(AgentModel.id == agent_id))
agent = r.scalar_one_or_none()
welcome = (agent.welcome_message if agent else None) or "Hello! How can I help you?"
await client.reply_welcome(frame, {
"msgtype": "text",
"text": {"content": welcome},
})
logger.info(f"[WeCom Stream] Sent welcome message for agent {agent_id}")
except Exception as e:
logger.error(f"[WeCom Stream] Error sending welcome: {e}")
# Register event handlers
client.on("message.text", on_text)
client.on("message.image", on_image)
client.on("message.file", on_file)
client.on("event.enter_chat", on_enter_chat)
# Connect and run (with retry on failure)
retry_delay = 5 # Start with 5 seconds
max_retry_delay = 120 # Cap at 2 minutes
while True:
try:
logger.info(f"[WeCom Stream] Connecting for agent {agent_id}...")
await client.connect_async()
# Keep alive
retry_delay = 5 # Reset on successful connect
while client.is_connected:
await asyncio.sleep(1)
logger.info(f"[WeCom Stream] Client disconnected for agent {agent_id}, reconnecting in {retry_delay}s...")
except asyncio.CancelledError:
raise # Propagate cancellation
except Exception as e:
logger.error(f"[WeCom Stream] Connection error for {agent_id}: {e}, retrying in {retry_delay}s...")
await asyncio.sleep(retry_delay)
retry_delay = min(retry_delay * 2, max_retry_delay)
except asyncio.CancelledError:
logger.info(f"[WeCom Stream] Client task cancelled for agent {agent_id}")
if agent_id in self._clients:
try:
await self._clients[agent_id].disconnect()
except Exception:
pass
except Exception as e:
logger.error(f"[WeCom Stream] Fatal client error for {agent_id}: {e}")
import traceback
traceback.print_exc()
finally:
self._clients.pop(agent_id, None)
self._tasks.pop(agent_id, None)
async def stop_client(self, agent_id: uuid.UUID):
"""Stop a running WebSocket client for an agent."""
task = self._tasks.pop(agent_id, None)
if task and not task.done():
task.cancel()
logger.info(f"[WeCom Stream] Stopped client for agent {agent_id}")
client = self._clients.pop(agent_id, None)
if client:
try:
await client.disconnect()
except Exception:
pass
async def start_all(self):
"""Start WebSocket clients for all configured WeCom agents with bot credentials."""
logger.info("[WeCom Stream] Initializing all active WeCom AI Bot channels...")
async with async_session() as db:
result = await db.execute(
select(ChannelConfig).where(
ChannelConfig.is_configured == True,
ChannelConfig.channel_type == "wecom",
)
)
configs = result.scalars().all()
started = 0
for config in configs:
extra = config.extra_config or {}
bot_id = extra.get("bot_id", "")
bot_secret = extra.get("bot_secret", "")
if bot_id and bot_secret:
await self.start_client(
config.agent_id, bot_id, bot_secret,
stop_existing=False,
)
started += 1
logger.info(f"[WeCom Stream] Started {started} WeCom AI Bot client(s)")
def status(self) -> dict:
"""Return status of all active WebSocket clients."""
return {
str(aid): not self._tasks[aid].done()
for aid in self._tasks
}
# ── Message processing helper ──
async def _process_wecom_stream_message(
agent_id: uuid.UUID,
sender_id: str,
user_text: str,
chat_id: str = "",
is_group: bool = False,
) -> str:
"""Process a WeCom message through the LLM pipeline and return the reply text."""
from datetime import datetime, timezone
from sqlalchemy import select as _select
from app.database import async_session
from app.models.agent import Agent as AgentModel
from app.models.audit import ChatMessage
from app.services.channel_session import find_or_create_channel_session
from app.services.channel_user_service import channel_user_service
from app.api.feishu import _call_agent_llm
async with async_session() as db:
# Load agent
agent_r = await db.execute(_select(AgentModel).where(AgentModel.id == agent_id))
agent_obj = agent_r.scalar_one_or_none()
if not agent_obj:
logger.warning(f"[WeCom Stream] Agent {agent_id} not found")
return "Agent not found"
from app.models.agent import DEFAULT_CONTEXT_WINDOW_SIZE
ctx_size = agent_obj.context_window_size or DEFAULT_CONTEXT_WINDOW_SIZE
# Conversation ID: differentiate single chat vs group chat
# Group detection is based on chatid presence, not chattype (SDK bug)
if is_group and chat_id:
conv_id = f"wecom_group_{chat_id}"
else:
conv_id = f"wecom_p2p_{sender_id}"
# Resolve or create platform user via unified channel user service.
# This correctly handles the User/Identity model relationship
# (email/username/password_hash are AssociationProxy fields — cannot be
# set directly in UserModel constructor).
platform_user = await channel_user_service.resolve_channel_user(
db=db,
agent=agent_obj,
channel_type="wecom",
external_user_id=sender_id,
extra_info={"display_name": f"WeCom {sender_id[:8]}"},
)
platform_user_id = platform_user.id
# Find or create session
_is_group = (is_group and bool(chat_id))
sess = await find_or_create_channel_session(
db=db,
agent_id=agent_id,
user_id=agent_obj.creator_id if _is_group else platform_user_id,
external_conv_id=conv_id,
source_channel="wecom",
first_message_title=user_text,
is_group=_is_group,
group_name=f"WeCom Group {chat_id[:8]}" if _is_group else None,
)
session_conv_id = str(sess.id)
# Load history
history_r = await db.execute(
_select(ChatMessage)
.where(ChatMessage.agent_id == agent_id, ChatMessage.conversation_id == session_conv_id)
.order_by(ChatMessage.created_at.desc())
.limit(ctx_size)
)
history = [{"role": m.role, "content": m.content} for m in reversed(history_r.scalars().all())]
# Save user message
db.add(ChatMessage(
agent_id=agent_id, user_id=platform_user_id,
role="user", content=user_text,
conversation_id=session_conv_id,
))
sess.last_message_at = datetime.now(timezone.utc)
await db.commit()
# Call LLM
reply_text = await _call_agent_llm(
db, agent_id, user_text,
history=history, user_id=platform_user_id,
)
logger.info(f"[WeCom Stream] LLM reply: {reply_text[:100]}")
# Save assistant reply
db.add(ChatMessage(
agent_id=agent_id, user_id=platform_user_id,
role="assistant", content=reply_text,
conversation_id=session_conv_id,
))
sess.last_message_at = datetime.now(timezone.utc)
await db.commit()
# Log activity
from app.services.activity_logger import log_activity
await log_activity(
agent_id, "chat_reply",
f"Replied to WeCom message: {reply_text[:80]}",
detail={"channel": "wecom", "user_text": user_text[:200], "reply": reply_text[:500]},
)
return reply_text
wecom_stream_manager = WeComStreamManager()