206 lines
8.3 KiB
Python
206 lines
8.3 KiB
Python
"""DingTalk Stream Connection Manager.
|
|
|
|
Manages WebSocket-based Stream connections for DingTalk bots, similar to feishu_ws.py.
|
|
Uses the dingtalk-stream SDK to receive bot messages via persistent connections.
|
|
"""
|
|
|
|
import asyncio
|
|
import threading
|
|
import uuid
|
|
from typing import Dict
|
|
|
|
from loguru import logger
|
|
from sqlalchemy import select
|
|
|
|
from app.database import async_session
|
|
from app.models.channel_config import ChannelConfig
|
|
|
|
|
|
class DingTalkStreamManager:
|
|
"""Manages DingTalk Stream clients for all agents."""
|
|
|
|
def __init__(self):
|
|
self._threads: Dict[uuid.UUID, threading.Thread] = {}
|
|
self._stop_events: Dict[uuid.UUID, threading.Event] = {}
|
|
self._main_loop: asyncio.AbstractEventLoop | None = None
|
|
|
|
async def start_client(
|
|
self,
|
|
agent_id: uuid.UUID,
|
|
app_key: str,
|
|
app_secret: str,
|
|
stop_existing: bool = True,
|
|
):
|
|
"""Start a DingTalk Stream client for a specific agent."""
|
|
if not app_key or not app_secret:
|
|
logger.warning(f"[DingTalk Stream] Missing credentials for {agent_id}, skipping")
|
|
return
|
|
|
|
logger.info(f"[DingTalk Stream] Starting client for agent {agent_id} (AppKey: {app_key[:8]}...)")
|
|
|
|
# Capture the main event loop so threads can dispatch coroutines back
|
|
if self._main_loop is None:
|
|
self._main_loop = asyncio.get_running_loop()
|
|
|
|
# Stop existing client if any
|
|
if stop_existing:
|
|
await self.stop_client(agent_id)
|
|
|
|
stop_event = threading.Event()
|
|
self._stop_events[agent_id] = stop_event
|
|
|
|
# Run Stream client in a separate thread (SDK uses its own event loop)
|
|
thread = threading.Thread(
|
|
target=self._run_client_thread,
|
|
args=(agent_id, app_key, app_secret, stop_event),
|
|
name=f"dingtalk-stream-{str(agent_id)[:8]}",
|
|
daemon=True,
|
|
)
|
|
self._threads[agent_id] = thread
|
|
thread.start()
|
|
logger.info(f"[DingTalk Stream] Client thread started for agent {agent_id}")
|
|
|
|
def _run_client_thread(
|
|
self,
|
|
agent_id: uuid.UUID,
|
|
app_key: str,
|
|
app_secret: str,
|
|
stop_event: threading.Event,
|
|
):
|
|
"""Run the DingTalk Stream client in a blocking thread."""
|
|
try:
|
|
import dingtalk_stream
|
|
|
|
# Reference to manager's main loop for async dispatch
|
|
main_loop = self._main_loop
|
|
|
|
class ClawithChatbotHandler(dingtalk_stream.ChatbotHandler):
|
|
"""Custom handler that dispatches messages to the Clawith LLM pipeline."""
|
|
|
|
async def process(self, callback: dingtalk_stream.CallbackMessage):
|
|
"""Handle incoming bot message from DingTalk Stream.
|
|
|
|
NOTE: The SDK invokes this method in the thread's own asyncio loop,
|
|
so we must dispatch to the main FastAPI loop for DB + LLM work.
|
|
"""
|
|
try:
|
|
# Parse the raw data into a ChatbotMessage via class method
|
|
incoming = dingtalk_stream.ChatbotMessage.from_dict(callback.data)
|
|
|
|
# Extract text content
|
|
text_list = incoming.get_text_list()
|
|
user_text = " ".join(text_list).strip() if text_list else ""
|
|
|
|
if not user_text:
|
|
return dingtalk_stream.AckMessage.STATUS_OK, "empty message"
|
|
|
|
sender_staff_id = incoming.sender_staff_id or incoming.sender_id or ""
|
|
conversation_id = incoming.conversation_id or ""
|
|
conversation_type = incoming.conversation_type or "1"
|
|
session_webhook = incoming.session_webhook or ""
|
|
|
|
logger.info(
|
|
f"[DingTalk Stream] Message from [{incoming.sender_nick}]{sender_staff_id}: {user_text[:80]}"
|
|
)
|
|
|
|
# Dispatch to the main FastAPI event loop for DB + LLM processing
|
|
from app.api.dingtalk import process_dingtalk_message
|
|
|
|
if main_loop and main_loop.is_running():
|
|
future = asyncio.run_coroutine_threadsafe(
|
|
process_dingtalk_message(
|
|
agent_id=agent_id,
|
|
sender_staff_id=sender_staff_id,
|
|
user_text=user_text,
|
|
conversation_id=conversation_id,
|
|
conversation_type=conversation_type,
|
|
session_webhook=session_webhook,
|
|
),
|
|
main_loop,
|
|
)
|
|
# Wait for result (with timeout)
|
|
try:
|
|
future.result(timeout=120)
|
|
except Exception as e:
|
|
logger.error(f"[DingTalk Stream] LLM processing error: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
else:
|
|
logger.warning("[DingTalk Stream] Main loop not available for dispatch")
|
|
|
|
return dingtalk_stream.AckMessage.STATUS_OK, "ok"
|
|
except Exception as e:
|
|
logger.error(f"[DingTalk Stream] Error in message handler: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return dingtalk_stream.AckMessage.STATUS_SYSTEM_EXCEPTION, str(e)
|
|
|
|
credential = dingtalk_stream.Credential(client_id=app_key, client_secret=app_secret)
|
|
client = dingtalk_stream.DingTalkStreamClient(credential=credential)
|
|
client.register_callback_handler(
|
|
dingtalk_stream.chatbot.ChatbotMessage.TOPIC,
|
|
ClawithChatbotHandler(),
|
|
)
|
|
|
|
logger.info(f"[DingTalk Stream] Connecting for agent {agent_id}...")
|
|
# start_forever() blocks until disconnected
|
|
client.start_forever()
|
|
|
|
except ImportError:
|
|
logger.warning(
|
|
"[DingTalk Stream] dingtalk-stream package not installed. "
|
|
"Install with: pip install dingtalk-stream"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"[DingTalk Stream] Client error for {agent_id}: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
finally:
|
|
self._threads.pop(agent_id, None)
|
|
self._stop_events.pop(agent_id, None)
|
|
logger.info(f"[DingTalk Stream] Client stopped for agent {agent_id}")
|
|
|
|
async def stop_client(self, agent_id: uuid.UUID):
|
|
"""Stop a running Stream client for an agent."""
|
|
stop_event = self._stop_events.pop(agent_id, None)
|
|
if stop_event:
|
|
stop_event.set()
|
|
thread = self._threads.pop(agent_id, None)
|
|
if thread and thread.is_alive():
|
|
logger.info(f"[DingTalk Stream] Stopping client for agent {agent_id}")
|
|
|
|
async def start_all(self):
|
|
"""Start Stream clients for all configured DingTalk agents."""
|
|
logger.info("[DingTalk Stream] Initializing all active DingTalk channels...")
|
|
async with async_session() as db:
|
|
result = await db.execute(
|
|
select(ChannelConfig).where(
|
|
ChannelConfig.is_configured == True,
|
|
ChannelConfig.channel_type == "dingtalk",
|
|
)
|
|
)
|
|
configs = result.scalars().all()
|
|
|
|
logger.info(f"[DingTalk Stream] Found {len(configs)} configured DingTalk channel(s)")
|
|
|
|
for config in configs:
|
|
if config.app_id and config.app_secret:
|
|
await self.start_client(
|
|
config.agent_id, config.app_id, config.app_secret,
|
|
stop_existing=False,
|
|
)
|
|
else:
|
|
logger.warning(
|
|
f"[DingTalk Stream] Skipping agent {config.agent_id}: missing credentials"
|
|
)
|
|
|
|
def status(self) -> dict:
|
|
"""Return status of all active Stream clients."""
|
|
return {
|
|
str(aid): self._threads[aid].is_alive()
|
|
for aid in self._threads
|
|
}
|
|
|
|
|
|
dingtalk_stream_manager = DingTalkStreamManager()
|