Clawith/backend/app/services/dingtalk_stream.py

206 lines
8.3 KiB
Python

"""DingTalk Stream Connection Manager.
Manages WebSocket-based Stream connections for DingTalk bots, similar to feishu_ws.py.
Uses the dingtalk-stream SDK to receive bot messages via persistent connections.
"""
import asyncio
import threading
import uuid
from typing import Dict
from loguru import logger
from sqlalchemy import select
from app.database import async_session
from app.models.channel_config import ChannelConfig
class DingTalkStreamManager:
"""Manages DingTalk Stream clients for all agents."""
def __init__(self):
self._threads: Dict[uuid.UUID, threading.Thread] = {}
self._stop_events: Dict[uuid.UUID, threading.Event] = {}
self._main_loop: asyncio.AbstractEventLoop | None = None
async def start_client(
self,
agent_id: uuid.UUID,
app_key: str,
app_secret: str,
stop_existing: bool = True,
):
"""Start a DingTalk Stream client for a specific agent."""
if not app_key or not app_secret:
logger.warning(f"[DingTalk Stream] Missing credentials for {agent_id}, skipping")
return
logger.info(f"[DingTalk Stream] Starting client for agent {agent_id} (AppKey: {app_key[:8]}...)")
# Capture the main event loop so threads can dispatch coroutines back
if self._main_loop is None:
self._main_loop = asyncio.get_running_loop()
# Stop existing client if any
if stop_existing:
await self.stop_client(agent_id)
stop_event = threading.Event()
self._stop_events[agent_id] = stop_event
# Run Stream client in a separate thread (SDK uses its own event loop)
thread = threading.Thread(
target=self._run_client_thread,
args=(agent_id, app_key, app_secret, stop_event),
name=f"dingtalk-stream-{str(agent_id)[:8]}",
daemon=True,
)
self._threads[agent_id] = thread
thread.start()
logger.info(f"[DingTalk Stream] Client thread started for agent {agent_id}")
def _run_client_thread(
self,
agent_id: uuid.UUID,
app_key: str,
app_secret: str,
stop_event: threading.Event,
):
"""Run the DingTalk Stream client in a blocking thread."""
try:
import dingtalk_stream
# Reference to manager's main loop for async dispatch
main_loop = self._main_loop
class ClawithChatbotHandler(dingtalk_stream.ChatbotHandler):
"""Custom handler that dispatches messages to the Clawith LLM pipeline."""
async def process(self, callback: dingtalk_stream.CallbackMessage):
"""Handle incoming bot message from DingTalk Stream.
NOTE: The SDK invokes this method in the thread's own asyncio loop,
so we must dispatch to the main FastAPI loop for DB + LLM work.
"""
try:
# Parse the raw data into a ChatbotMessage via class method
incoming = dingtalk_stream.ChatbotMessage.from_dict(callback.data)
# Extract text content
text_list = incoming.get_text_list()
user_text = " ".join(text_list).strip() if text_list else ""
if not user_text:
return dingtalk_stream.AckMessage.STATUS_OK, "empty message"
sender_staff_id = incoming.sender_staff_id or incoming.sender_id or ""
conversation_id = incoming.conversation_id or ""
conversation_type = incoming.conversation_type or "1"
session_webhook = incoming.session_webhook or ""
logger.info(
f"[DingTalk Stream] Message from [{incoming.sender_nick}]{sender_staff_id}: {user_text[:80]}"
)
# Dispatch to the main FastAPI event loop for DB + LLM processing
from app.api.dingtalk import process_dingtalk_message
if main_loop and main_loop.is_running():
future = asyncio.run_coroutine_threadsafe(
process_dingtalk_message(
agent_id=agent_id,
sender_staff_id=sender_staff_id,
user_text=user_text,
conversation_id=conversation_id,
conversation_type=conversation_type,
session_webhook=session_webhook,
),
main_loop,
)
# Wait for result (with timeout)
try:
future.result(timeout=120)
except Exception as e:
logger.error(f"[DingTalk Stream] LLM processing error: {e}")
import traceback
traceback.print_exc()
else:
logger.warning("[DingTalk Stream] Main loop not available for dispatch")
return dingtalk_stream.AckMessage.STATUS_OK, "ok"
except Exception as e:
logger.error(f"[DingTalk Stream] Error in message handler: {e}")
import traceback
traceback.print_exc()
return dingtalk_stream.AckMessage.STATUS_SYSTEM_EXCEPTION, str(e)
credential = dingtalk_stream.Credential(client_id=app_key, client_secret=app_secret)
client = dingtalk_stream.DingTalkStreamClient(credential=credential)
client.register_callback_handler(
dingtalk_stream.chatbot.ChatbotMessage.TOPIC,
ClawithChatbotHandler(),
)
logger.info(f"[DingTalk Stream] Connecting for agent {agent_id}...")
# start_forever() blocks until disconnected
client.start_forever()
except ImportError:
logger.warning(
"[DingTalk Stream] dingtalk-stream package not installed. "
"Install with: pip install dingtalk-stream"
)
except Exception as e:
logger.error(f"[DingTalk Stream] Client error for {agent_id}: {e}")
import traceback
traceback.print_exc()
finally:
self._threads.pop(agent_id, None)
self._stop_events.pop(agent_id, None)
logger.info(f"[DingTalk Stream] Client stopped for agent {agent_id}")
async def stop_client(self, agent_id: uuid.UUID):
"""Stop a running Stream client for an agent."""
stop_event = self._stop_events.pop(agent_id, None)
if stop_event:
stop_event.set()
thread = self._threads.pop(agent_id, None)
if thread and thread.is_alive():
logger.info(f"[DingTalk Stream] Stopping client for agent {agent_id}")
async def start_all(self):
"""Start Stream clients for all configured DingTalk agents."""
logger.info("[DingTalk Stream] Initializing all active DingTalk channels...")
async with async_session() as db:
result = await db.execute(
select(ChannelConfig).where(
ChannelConfig.is_configured == True,
ChannelConfig.channel_type == "dingtalk",
)
)
configs = result.scalars().all()
logger.info(f"[DingTalk Stream] Found {len(configs)} configured DingTalk channel(s)")
for config in configs:
if config.app_id and config.app_secret:
await self.start_client(
config.agent_id, config.app_id, config.app_secret,
stop_existing=False,
)
else:
logger.warning(
f"[DingTalk Stream] Skipping agent {config.agent_id}: missing credentials"
)
def status(self) -> dict:
"""Return status of all active Stream clients."""
return {
str(aid): self._threads[aid].is_alive()
for aid in self._threads
}
dingtalk_stream_manager = DingTalkStreamManager()