1603 lines
75 KiB
Python
1603 lines
75 KiB
Python
"""Feishu OAuth and Channel API routes."""
|
||
|
||
import asyncio
|
||
import time
|
||
import uuid
|
||
from collections.abc import Awaitable, Callable
|
||
|
||
from fastapi import APIRouter, Depends, HTTPException, Request, Response, status
|
||
from loguru import logger
|
||
from sqlalchemy import select, or_
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
||
from app.core.permissions import check_agent_access, is_agent_creator, is_agent_expired
|
||
from app.core.security import get_current_user
|
||
from app.database import get_db
|
||
from app.models.channel_config import ChannelConfig
|
||
from app.models.user import User
|
||
from app.models.identity import IdentityProvider
|
||
from app.schemas.schemas import ChannelConfigCreate, ChannelConfigOut, TokenResponse, UserOut
|
||
from app.services.feishu_service import feishu_service
|
||
|
||
router = APIRouter(tags=["feishu"])
|
||
|
||
# Default LLM timeout for Feishu channel (fallback when model has no request_timeout set).
|
||
# The per-model request_timeout field takes precedence — see _get_llm_timeout().
|
||
_LLM_TIMEOUT_SECONDS_DEFAULT = 180.0
|
||
|
||
# Number of tool status lines to keep visible in the Feishu card.
|
||
# Shows the last N non-running lines plus any active "running" entry.
|
||
_TOOL_STATUS_KEEP_LINES = 20
|
||
|
||
|
||
def _get_llm_timeout(model) -> float:
|
||
"""Get effective LLM timeout for the Feishu channel.
|
||
|
||
Prefer the model-level request_timeout so each model can have its own
|
||
budget (local vLLM may need 300 s, cloud APIs often need only 60 s).
|
||
Falls back to _LLM_TIMEOUT_SECONDS_DEFAULT when the field is absent or zero.
|
||
"""
|
||
timeout = getattr(model, "request_timeout", None)
|
||
if timeout and float(timeout) > 0:
|
||
return float(timeout)
|
||
return _LLM_TIMEOUT_SECONDS_DEFAULT
|
||
|
||
|
||
class _SerialPatchQueue:
|
||
"""Serialize patch requests for one Feishu message to prevent out-of-order overwrite."""
|
||
|
||
def __init__(self):
|
||
self._tail: asyncio.Task | None = None
|
||
|
||
def enqueue(self, job_factory: Callable[[], Awaitable[None]]) -> None:
|
||
prev = self._tail
|
||
|
||
async def _runner():
|
||
if prev:
|
||
try:
|
||
await prev
|
||
except Exception as e:
|
||
logger.warning(f"[Feishu] Previous patch job failed before next job: {e}")
|
||
await job_factory()
|
||
|
||
self._tail = asyncio.create_task(_runner())
|
||
|
||
async def drain(self) -> None:
|
||
if self._tail:
|
||
await self._tail
|
||
|
||
|
||
# ─── OAuth ──────────────────────────────────────────────
|
||
|
||
from fastapi.responses import HTMLResponse, Response
|
||
|
||
@router.get("/auth/feishu/callback")
|
||
@router.post("/auth/feishu/callback", response_model=TokenResponse)
|
||
async def feishu_oauth_callback(
|
||
code: str,
|
||
state: str = None,
|
||
db: AsyncSession = Depends(get_db)
|
||
):
|
||
"""Handle Feishu OAuth callback — exchange code for user session."""
|
||
# Parse state if it's a UUID (session ID) or other context
|
||
from app.models.identity import SSOScanSession
|
||
tenant_id = None
|
||
if state:
|
||
try:
|
||
sid = uuid.UUID(state)
|
||
s_res = await db.execute(select(SSOScanSession).where(SSOScanSession.id == sid))
|
||
session = s_res.scalar_one_or_none()
|
||
if session:
|
||
tenant_id = session.tenant_id
|
||
except (ValueError, AttributeError):
|
||
pass
|
||
|
||
try:
|
||
# Use FeishuAuthProvider instead of legacy feishu_service
|
||
from app.services.auth_provider import FeishuAuthProvider
|
||
from app.models.identity import IdentityProvider
|
||
from app.config import get_settings
|
||
|
||
# Get Feishu credentials from settings
|
||
settings = get_settings()
|
||
feishu_config = {
|
||
"app_id": settings.FEISHU_APP_ID,
|
||
"app_secret": settings.FEISHU_APP_SECRET,
|
||
}
|
||
|
||
# Get or create provider via auth provider
|
||
provider = None
|
||
if tenant_id:
|
||
result = await db.execute(
|
||
select(IdentityProvider).where(
|
||
IdentityProvider.provider_type == "feishu",
|
||
IdentityProvider.tenant_id == tenant_id
|
||
)
|
||
)
|
||
provider = result.scalar_one_or_none()
|
||
|
||
auth_provider = FeishuAuthProvider(provider=provider, config=feishu_config)
|
||
|
||
# Ensure provider exists (will create if not)
|
||
await auth_provider._ensure_provider(db, tenant_id)
|
||
provider = auth_provider.provider
|
||
|
||
# Exchange code for user info
|
||
token_data = await auth_provider.exchange_code_for_token(code)
|
||
access_token = token_data.get("access_token", "")
|
||
user_info = await auth_provider.get_user_info(access_token)
|
||
|
||
# Find or create user
|
||
user, is_new = await auth_provider.find_or_create_user(db, user_info, tenant_id=tenant_id)
|
||
|
||
# Generate JWT token
|
||
from app.core.security import create_access_token
|
||
token = create_access_token(str(user.id), user.role)
|
||
|
||
except Exception as e:
|
||
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Feishu auth failed: {e}")
|
||
|
||
# If this is an SSO session, store result and redirect to frontend completion
|
||
if state:
|
||
try:
|
||
sid = uuid.UUID(state)
|
||
s_res = await db.execute(select(SSOScanSession).where(SSOScanSession.id == sid))
|
||
session = s_res.scalar_one_or_none()
|
||
if session:
|
||
session.status = "authorized"
|
||
session.provider_type = "feishu"
|
||
session.user_id = user.id
|
||
session.access_token = token
|
||
session.error_msg = None
|
||
await db.commit()
|
||
return HTMLResponse(
|
||
f"""<html><head><meta charset="utf-8" /></head>
|
||
<body style="font-family: sans-serif; padding: 24px;">
|
||
<div>SSO login successful. Redirecting...</div>
|
||
<script>window.location.href = "/sso/entry?sid={sid}&complete=1";</script>
|
||
</body></html>"""
|
||
)
|
||
except Exception as e:
|
||
logger.exception("Failed to update SSO session (feishu) %s", e)
|
||
|
||
return TokenResponse(access_token=token, user=UserOut.model_validate(user))
|
||
|
||
|
||
# ─── Channel Config (per-agent Feishu bot) ──────────────
|
||
|
||
@router.post("/agents/{agent_id}/channel", response_model=ChannelConfigOut, status_code=status.HTTP_201_CREATED)
|
||
async def configure_channel(
|
||
agent_id: uuid.UUID,
|
||
data: ChannelConfigCreate,
|
||
current_user: User = Depends(get_current_user),
|
||
db: AsyncSession = Depends(get_db),
|
||
):
|
||
"""Configure Feishu bot credentials for a digital employee (wizard step 5)."""
|
||
agent, _access = await check_agent_access(db, current_user, agent_id)
|
||
if not is_agent_creator(current_user, agent):
|
||
raise HTTPException(status_code=403, detail="Only creator can configure channel")
|
||
|
||
# Check existing
|
||
result = await db.execute(select(ChannelConfig).where(
|
||
ChannelConfig.agent_id == agent_id,
|
||
ChannelConfig.channel_type == "feishu",
|
||
))
|
||
existing = result.scalar_one_or_none()
|
||
if existing:
|
||
existing.app_id = data.app_id
|
||
existing.app_secret = data.app_secret
|
||
existing.encrypt_key = data.encrypt_key
|
||
existing.verification_token = data.verification_token
|
||
existing.extra_config = data.extra_config or {}
|
||
existing.is_configured = True
|
||
await db.flush()
|
||
|
||
# Start/Stop WS client in background
|
||
from app.services.feishu_ws import feishu_ws_manager
|
||
import asyncio
|
||
mode = existing.extra_config.get("connection_mode", "webhook")
|
||
if mode == "websocket":
|
||
asyncio.create_task(feishu_ws_manager.start_client(agent_id, existing.app_id, existing.app_secret))
|
||
else:
|
||
asyncio.create_task(feishu_ws_manager.stop_client(agent_id))
|
||
|
||
return ChannelConfigOut.model_validate(existing)
|
||
|
||
config = ChannelConfig(
|
||
agent_id=agent_id,
|
||
channel_type=data.channel_type,
|
||
app_id=data.app_id,
|
||
app_secret=data.app_secret,
|
||
encrypt_key=data.encrypt_key,
|
||
verification_token=data.verification_token,
|
||
extra_config=data.extra_config or {},
|
||
is_configured=True,
|
||
)
|
||
db.add(config)
|
||
await db.flush()
|
||
|
||
# Start WS client in background
|
||
from app.services.feishu_ws import feishu_ws_manager
|
||
import asyncio
|
||
mode = config.extra_config.get("connection_mode", "webhook")
|
||
if mode == "websocket":
|
||
asyncio.create_task(feishu_ws_manager.start_client(agent_id, config.app_id, config.app_secret))
|
||
|
||
return ChannelConfigOut.model_validate(config)
|
||
|
||
|
||
@router.get("/agents/{agent_id}/channel", response_model=ChannelConfigOut)
|
||
async def get_channel_config(
|
||
agent_id: uuid.UUID,
|
||
current_user: User = Depends(get_current_user),
|
||
db: AsyncSession = Depends(get_db),
|
||
):
|
||
"""Get Feishu channel configuration for an agent."""
|
||
await check_agent_access(db, current_user, agent_id)
|
||
result = await db.execute(select(ChannelConfig).where(
|
||
ChannelConfig.agent_id == agent_id,
|
||
ChannelConfig.channel_type == "feishu",
|
||
))
|
||
config = result.scalar_one_or_none()
|
||
if not config:
|
||
raise HTTPException(status_code=404, detail="Channel not configured")
|
||
return ChannelConfigOut.model_validate(config)
|
||
|
||
|
||
@router.get("/agents/{agent_id}/channel/webhook-url")
|
||
async def get_webhook_url(agent_id: uuid.UUID, request: Request, db: AsyncSession = Depends(get_db)):
|
||
"""Get the webhook URL for this agent's Feishu bot."""
|
||
from app.services.platform_service import platform_service
|
||
public_base = await platform_service.get_public_base_url(db, request)
|
||
return {"webhook_url": f"{public_base}/api/channel/feishu/{agent_id}/webhook"}
|
||
|
||
|
||
@router.delete("/agents/{agent_id}/channel", status_code=status.HTTP_204_NO_CONTENT)
|
||
async def delete_channel_config(
|
||
agent_id: uuid.UUID,
|
||
current_user: User = Depends(get_current_user),
|
||
db: AsyncSession = Depends(get_db),
|
||
):
|
||
"""Remove Feishu bot configuration for an agent."""
|
||
agent, _access = await check_agent_access(db, current_user, agent_id)
|
||
if not is_agent_creator(current_user, agent):
|
||
raise HTTPException(status_code=403, detail="Only creator can remove channel")
|
||
result = await db.execute(select(ChannelConfig).where(
|
||
ChannelConfig.agent_id == agent_id,
|
||
ChannelConfig.channel_type == "feishu",
|
||
))
|
||
config = result.scalar_one_or_none()
|
||
if not config:
|
||
raise HTTPException(status_code=404, detail="Channel not configured")
|
||
await db.delete(config)
|
||
|
||
|
||
|
||
# ─── Feishu Event Webhook ───────────────────────────────
|
||
|
||
# Simple in-memory dedup to avoid processing retried events
|
||
_processed_events: set[str] = set()
|
||
|
||
|
||
@router.post("/channel/feishu/{agent_id}/webhook")
|
||
async def feishu_event_webhook(
|
||
agent_id: uuid.UUID,
|
||
request: Request,
|
||
db: AsyncSession = Depends(get_db),
|
||
):
|
||
"""Handle Feishu event callback for a specific agent's bot."""
|
||
body = await request.json()
|
||
|
||
# Handle verification challenge
|
||
if "challenge" in body:
|
||
return {"challenge": body["challenge"]}
|
||
|
||
return await process_feishu_event(agent_id, body, db)
|
||
|
||
|
||
async def process_feishu_event(agent_id: uuid.UUID, body: dict, db: AsyncSession):
|
||
"""Core logic to process feishu events from both webhook and WS client."""
|
||
import json as _json
|
||
logger.info(f"[Feishu] Event processing for {agent_id}: event_type={body.get('header', {}).get('event_type', 'N/A')}")
|
||
|
||
# Deduplicate — Feishu retries on slow responses
|
||
# Only mark as processed AFTER successful handling so retries work on crash
|
||
event_id = body.get("header", {}).get("event_id", "")
|
||
if event_id in _processed_events:
|
||
return {"code": 0, "msg": "already processed"}
|
||
|
||
# Get channel config — filter by feishu since an agent can have multiple channels
|
||
result = await db.execute(
|
||
select(ChannelConfig).where(
|
||
ChannelConfig.agent_id == agent_id,
|
||
ChannelConfig.channel_type == "feishu",
|
||
)
|
||
)
|
||
config = result.scalar_one_or_none()
|
||
if not config:
|
||
return {"code": 1, "msg": "Channel not found"}
|
||
|
||
# Mark event as processed after config is loaded successfully
|
||
if event_id:
|
||
_processed_events.add(event_id)
|
||
# Keep set bounded
|
||
if len(_processed_events) > 1000:
|
||
_processed_events.clear()
|
||
|
||
# Handle events
|
||
event = body.get("event", {})
|
||
event_type = body.get("header", {}).get("event_type", "")
|
||
|
||
if event_type == "im.message.receive_v1":
|
||
message = event.get("message", {})
|
||
sender = event.get("sender", {}).get("sender_id", {})
|
||
sender_open_id = sender.get("open_id", "")
|
||
sender_user_id_from_event = sender.get("user_id", "") # tenant-stable ID, available directly in event body
|
||
msg_type = message.get("message_type", "text")
|
||
chat_type = message.get("chat_type", "p2p") # p2p or group
|
||
chat_id = message.get("chat_id", "")
|
||
|
||
logger.info(f"[Feishu] Received {msg_type} message, chat_type={chat_type}, from={sender_open_id}")
|
||
|
||
# ── Normalize post (rich text) → extract text + schedule image downloads ──
|
||
if msg_type == "post":
|
||
import json as _json_post
|
||
_post_body = _json_post.loads(message.get("content", "{}"))
|
||
# Feishu post content: {"title": "...", "content": [[{"tag":"text","text":"..."},...],...]}
|
||
# The content may be nested under a locale key like "zh_cn"
|
||
_paragraphs = _post_body.get("content", [])
|
||
if not _paragraphs:
|
||
# Try locale keys (zh_cn, en_us, etc.)
|
||
for _locale_key, _locale_val in _post_body.items():
|
||
if isinstance(_locale_val, dict) and "content" in _locale_val:
|
||
_paragraphs = _locale_val["content"]
|
||
break
|
||
_text_parts = []
|
||
_post_image_keys = []
|
||
for _para in _paragraphs:
|
||
_line_parts = []
|
||
for _elem in _para:
|
||
_tag = _elem.get("tag")
|
||
if _tag == "text":
|
||
_line_parts.append(_elem.get("text", ""))
|
||
elif _tag == "a":
|
||
_href = _elem.get("href", "")
|
||
_link_text = _elem.get("text", "")
|
||
_line_parts.append(f"{_link_text} ({_href})" if _href else _link_text)
|
||
elif _tag == "img":
|
||
_ik = _elem.get("image_key", "")
|
||
if _ik:
|
||
_post_image_keys.append(_ik)
|
||
if _line_parts:
|
||
_text_parts.append("".join(_line_parts))
|
||
_extracted_text = "\n".join(_text_parts).strip()
|
||
# Download images and embed as base64 for vision-capable models
|
||
_image_markers = []
|
||
if _post_image_keys:
|
||
import base64 as _b64
|
||
_msg_id = message.get("message_id", "")
|
||
from pathlib import Path as _PostPath
|
||
from app.config import get_settings as _post_gs
|
||
_post_settings = _post_gs()
|
||
_upload_dir = _PostPath(_post_settings.AGENT_DATA_DIR) / str(agent_id) / "workspace" / "uploads"
|
||
_upload_dir.mkdir(parents=True, exist_ok=True)
|
||
for _ik in _post_image_keys:
|
||
try:
|
||
_img_bytes = await feishu_service.download_message_resource(
|
||
config.app_id, config.app_secret, _msg_id, _ik, "image"
|
||
)
|
||
# Save to workspace
|
||
_save_path = _upload_dir / f"image_{_ik[-8:]}.jpg"
|
||
_save_path.write_bytes(_img_bytes)
|
||
logger.info(f"[Feishu] Saved post image to {_save_path} ({len(_img_bytes)} bytes)")
|
||
# Embed as base64 marker for vision models
|
||
_b64_data = _b64.b64encode(_img_bytes).decode("ascii")
|
||
_image_markers.append(f"[image_data:data:image/jpeg;base64,{_b64_data}]")
|
||
except Exception as _dl_err:
|
||
logger.error(f"[Feishu] Failed to download post image {_ik}: {_dl_err}")
|
||
# Build final text with embedded images
|
||
if not _extracted_text and _image_markers:
|
||
_extracted_text = "[用户发送了图片,请看图片内容]"
|
||
_final_content = _extracted_text
|
||
if _image_markers:
|
||
_final_content += "\n" + "\n".join(_image_markers)
|
||
# Rewrite as text message so existing handler processes it
|
||
message["content"] = _json_post.dumps({"text": _final_content})
|
||
msg_type = "text"
|
||
logger.info(f"[Feishu] Normalized post → text='{_extracted_text[:100]}', images={len(_image_markers)}")
|
||
|
||
if msg_type in ("file", "image"):
|
||
import asyncio as _asyncio
|
||
_asyncio.create_task(_handle_feishu_file(db, agent_id, config, message, sender_open_id, chat_type, chat_id))
|
||
return {"code": 0, "msg": "ok"}
|
||
|
||
if msg_type == "text":
|
||
import json
|
||
import re
|
||
content = json.loads(message.get("content", "{}"))
|
||
user_text = content.get("text", "")
|
||
|
||
# Strip @mention tags (e.g. @_user_1) from group messages
|
||
user_text = re.sub(r'@_user_\d+', '', user_text).strip()
|
||
|
||
if not user_text:
|
||
return {"code": 0, "msg": "empty message after stripping mentions"}
|
||
|
||
# Detect task creation intent
|
||
task_match = re.search(
|
||
r'(?:创建|新建|添加|建一个|帮我建)(?:一个)?(?:任务|待办|todo)[,,::\s]*(.+)',
|
||
user_text, re.IGNORECASE
|
||
)
|
||
|
||
# Determine conversation_id for history isolation
|
||
# Group chats: use chat_id; P2P chats: prefer user_id (tenant-stable)
|
||
if chat_type == "group" and chat_id:
|
||
conv_id = f"feishu_group_{chat_id}"
|
||
else:
|
||
conv_id = f"feishu_p2p_{sender_user_id_from_event or sender_open_id}"
|
||
|
||
# Load recent conversation history via session (session UUID may already exist)
|
||
from app.models.audit import ChatMessage
|
||
from app.models.agent import Agent as AgentModel
|
||
from app.services.channel_session import find_or_create_channel_session
|
||
agent_r = await db.execute(select(AgentModel).where(AgentModel.id == agent_id))
|
||
agent_obj = agent_r.scalar_one_or_none()
|
||
creator_id = agent_obj.creator_id if agent_obj else agent_id
|
||
from app.models.agent import DEFAULT_CONTEXT_WINDOW_SIZE
|
||
ctx_size = (agent_obj.context_window_size or DEFAULT_CONTEXT_WINDOW_SIZE) if agent_obj else DEFAULT_CONTEXT_WINDOW_SIZE
|
||
|
||
# Pre-resolve session so history lookup uses the UUID (session created later if new)
|
||
_pre_sess_r = await db.execute(
|
||
select(__import__('app.models.chat_session', fromlist=['ChatSession']).ChatSession).where(
|
||
__import__('app.models.chat_session', fromlist=['ChatSession']).ChatSession.agent_id == agent_id,
|
||
__import__('app.models.chat_session', fromlist=['ChatSession']).ChatSession.external_conv_id == conv_id,
|
||
)
|
||
)
|
||
_pre_sess = _pre_sess_r.scalar_one_or_none()
|
||
_history_conv_id = str(_pre_sess.id) if _pre_sess else conv_id
|
||
history_result = await db.execute(
|
||
select(ChatMessage)
|
||
.where(ChatMessage.agent_id == agent_id, ChatMessage.conversation_id == _history_conv_id)
|
||
.order_by(ChatMessage.created_at.desc())
|
||
.limit(ctx_size)
|
||
)
|
||
history_msgs = history_result.scalars().all()
|
||
history = [{"role": m.role, "content": m.content} for m in reversed(history_msgs)]
|
||
|
||
# --- Resolve Feishu sender identity & find/create platform user ---
|
||
import uuid as _uuid
|
||
import httpx as _httpx
|
||
|
||
sender_name = ""
|
||
sender_user_id_feishu = sender_user_id_from_event # tenant-level user_id, pre-filled from event body
|
||
extra_info: dict | None = None
|
||
|
||
try:
|
||
async with _httpx.AsyncClient() as _client:
|
||
_tok_resp = await _client.post(
|
||
"https://open.feishu.cn/open-apis/auth/v3/app_access_token/internal",
|
||
json={"app_id": config.app_id, "app_secret": config.app_secret},
|
||
)
|
||
_app_token = _tok_resp.json().get("app_access_token", "")
|
||
if _app_token:
|
||
_user_resp = await _client.get(
|
||
f"https://open.feishu.cn/open-apis/contact/v3/users/{sender_open_id}",
|
||
params={"user_id_type": "open_id"},
|
||
headers={"Authorization": f"Bearer {_app_token}"},
|
||
)
|
||
_user_data = _user_resp.json()
|
||
logger.info(f"[Feishu] Sender resolve: code={_user_data.get('code')}, msg={_user_data.get('msg', '')}")
|
||
if _user_data.get("code") == 0:
|
||
_user_info = _user_data.get("data", {}).get("user", {})
|
||
sender_name = _user_info.get("name", "")
|
||
sender_user_id_feishu = _user_info.get("user_id", "")
|
||
sender_email = _user_info.get("email", "") or _user_info.get("enterprise_email", "")
|
||
# Feishu contact API returns 'avatar' as a dict
|
||
# (keys: avatar_240, avatar_640, avatar_origin), NOT a plain URL.
|
||
# We must extract a string to avoid a DataError when writing to the DB.
|
||
_raw_avatar = _user_info.get("avatar")
|
||
if isinstance(_raw_avatar, dict):
|
||
_avatar_url = (
|
||
_raw_avatar.get("avatar_240")
|
||
or _raw_avatar.get("avatar_640")
|
||
or _raw_avatar.get("avatar_origin")
|
||
or ""
|
||
)
|
||
else:
|
||
_avatar_url = _raw_avatar or ""
|
||
extra_info = {
|
||
"name": sender_name,
|
||
"email": sender_email,
|
||
"mobile": _user_info.get("mobile"),
|
||
"avatar_url": _avatar_url,
|
||
"unionid": _user_info.get("user_id"), # tenant-level user_id
|
||
"open_id": sender_open_id,
|
||
}
|
||
logger.info(f"[Feishu] Resolved sender: {sender_name} (user_id={sender_user_id_feishu})")
|
||
# Cache sender info so feishu_user_search can find them by name
|
||
if sender_name and sender_open_id:
|
||
try:
|
||
import pathlib as _pl, json as _cj, time as _ct
|
||
_safe_id = str(agent_id).replace("..", "").replace("/", "")
|
||
_cache = _pl.Path(f"/data/workspaces/{_safe_id}/feishu_contacts_cache.json")
|
||
_cache.parent.mkdir(parents=True, exist_ok=True)
|
||
_existing = {}
|
||
if _cache.exists():
|
||
try:
|
||
_existing = _cj.loads(_cache.read_text())
|
||
except Exception:
|
||
pass
|
||
# Key by user_id when available (tenant-stable), fallback to open_id
|
||
_users = {}
|
||
for _u in _existing.get("users", []):
|
||
_key = _u.get("user_id") or _u.get("open_id", "")
|
||
_users[_key] = _u
|
||
_cache_key = sender_user_id_feishu or sender_open_id
|
||
_users[_cache_key] = {
|
||
"open_id": sender_open_id,
|
||
"name": sender_name,
|
||
"email": sender_email,
|
||
"user_id": sender_user_id_feishu,
|
||
}
|
||
_cache.write_text(_cj.dumps(
|
||
{"ts": _ct.time(), "users": list(_users.values())},
|
||
ensure_ascii=False,
|
||
), encoding="utf-8")
|
||
import os as _os
|
||
_os.chmod(str(_cache), 0o600)
|
||
except Exception as _ce:
|
||
logger.error(f"[Feishu] Cache write failed: {_ce}")
|
||
except Exception as e:
|
||
logger.error(f"[Feishu] Failed to resolve sender: {e}")
|
||
|
||
# Resolve channel user via unified service (uses OrgMember + SSO patterns)
|
||
from app.services.channel_user_service import channel_user_service
|
||
platform_user = await channel_user_service.resolve_channel_user(
|
||
db=db,
|
||
agent=agent_obj,
|
||
channel_type="feishu",
|
||
external_user_id=sender_open_id,
|
||
extra_info=extra_info,
|
||
)
|
||
platform_user_id = platform_user.id
|
||
|
||
# ── Find-or-create a ChatSession via external_conv_id (DB-based, no cache needed) ──
|
||
from datetime import datetime as _dt, timezone as _tz
|
||
_is_group = (chat_type == "group")
|
||
_sess = await find_or_create_channel_session(
|
||
db=db,
|
||
agent_id=agent_id,
|
||
user_id=platform_user_id if not _is_group else creator_id,
|
||
external_conv_id=conv_id,
|
||
source_channel="feishu",
|
||
first_message_title=user_text,
|
||
is_group=_is_group,
|
||
group_name=f"Feishu Group {chat_id[:8]}" if _is_group else None,
|
||
)
|
||
session_conv_id = str(_sess.id)
|
||
|
||
# Save user message
|
||
db.add(ChatMessage(agent_id=agent_id, user_id=platform_user_id, role="user", content=user_text, conversation_id=session_conv_id))
|
||
_sess.last_message_at = _dt.now(_tz.utc)
|
||
await db.commit()
|
||
|
||
|
||
# Prepend sender identity so the agent knows who is talking
|
||
llm_user_text = user_text
|
||
if sender_name:
|
||
id_part = f" (ID: {sender_user_id_feishu})" if sender_user_id_feishu else ""
|
||
llm_user_text = f"[发送者: {sender_name}{id_part}] {user_text}"
|
||
|
||
# ── Inject recent uploaded file context ──────────────────────────
|
||
# Check the uploads directory for recently modified files (within 30 min).
|
||
# This is more reliable than scanning DB history, because the file save
|
||
# to disk always succeeds even if the DB transaction fails.
|
||
try:
|
||
import time as _time
|
||
import pathlib as _pl
|
||
from app.config import get_settings as _gs
|
||
_upload_dir = _pl.Path(_gs().AGENT_DATA_DIR) / str(agent_id) / "workspace" / "uploads"
|
||
_recent_file_path = None
|
||
if _upload_dir.exists() and "uploads/" not in user_text and "workspace/" not in user_text:
|
||
_now = _time.time()
|
||
_candidates = sorted(
|
||
_upload_dir.iterdir(),
|
||
key=lambda p: p.stat().st_mtime,
|
||
reverse=True,
|
||
)
|
||
for _fp in _candidates:
|
||
if _fp.is_file() and (_now - _fp.stat().st_mtime) < 1800: # 30 min
|
||
_recent_file_path = f"uploads/{_fp.name}"
|
||
break
|
||
if _recent_file_path:
|
||
# _recent_file_path is relative to uploads dir; agent workspace root is
|
||
# AGENT_DATA_DIR/{agent_id}/, so the correct relative path is workspace/uploads/
|
||
_ws_rel_path = f"workspace/{_recent_file_path}"
|
||
llm_user_text = (
|
||
llm_user_text
|
||
+ f"\n\n[系统提示:用户刚上传了文件,路径为工作区 `{_ws_rel_path}`。"
|
||
f"如果用户的指令涉及这篇文章、这个文件、这份文档等,"
|
||
f"请立即调用 read_document(path=\"{_ws_rel_path}\") 读取内容,不要先用 list_files 验证,直接读取即可。]"
|
||
)
|
||
logger.info(f"[Feishu] Injected recent file hint: {_ws_rel_path}")
|
||
except Exception as _fe:
|
||
logger.error(f"[Feishu] File injection error: {_fe}")
|
||
|
||
# Set sender open_id contextvar so calendar tool can auto-invite the requester
|
||
from app.services.agent_tools import channel_feishu_sender_open_id as _cfso
|
||
_cfso_token = _cfso.set(sender_open_id)
|
||
|
||
# Set channel_file_sender contextvar so the agent can send files back via Feishu
|
||
from app.services.agent_tools import channel_file_sender as _cfs
|
||
_reply_to_id = chat_id if chat_type == "group" else sender_open_id
|
||
_rid_type = "chat_id" if chat_type == "group" else "open_id"
|
||
async def _feishu_file_sender(file_path, msg: str = ""):
|
||
try:
|
||
await feishu_service.upload_and_send_file(
|
||
config.app_id, config.app_secret,
|
||
_reply_to_id, file_path,
|
||
receive_id_type=_rid_type,
|
||
accompany_msg=msg,
|
||
)
|
||
except Exception as _upload_err:
|
||
# Fallback: send a download link when upload permission is not granted
|
||
from pathlib import Path as _P
|
||
from app.config import get_settings as _gs_fallback
|
||
_fs = _gs_fallback()
|
||
_base_url = getattr(_fs, 'BASE_URL', '').rstrip('/') or ''
|
||
_fp = _P(file_path)
|
||
_ws_root = _P(_fs.AGENT_DATA_DIR)
|
||
try:
|
||
_rel = str(_fp.relative_to(_ws_root / str(agent_id)))
|
||
except ValueError:
|
||
_rel = _fp.name
|
||
_fallback_parts = []
|
||
if msg:
|
||
_fallback_parts.append(msg)
|
||
if _base_url:
|
||
_dl_url = f"{_base_url}/api/agents/{agent_id}/files/download?path={_rel}"
|
||
_fallback_parts.append(f"📎 {_fp.name}\n🔗 {_dl_url}")
|
||
_fallback_parts.append(
|
||
f"⚠️ 文件直接发送失败({_upload_err})\n"
|
||
"如需 Agent 直接发飞书文件,请在飞书开放平台为应用开启 "
|
||
"`im:resource`(即 `im:resource:upload`)权限并发布版本。"
|
||
)
|
||
await feishu_service.send_message(
|
||
config.app_id, config.app_secret,
|
||
_reply_to_id, "text",
|
||
_json.dumps({"text": "\n\n".join(_fallback_parts)}),
|
||
receive_id_type=_rid_type,
|
||
)
|
||
_cfs_token = _cfs.set(_feishu_file_sender)
|
||
|
||
# Set up streaming response via CardKit (primary) or IM patch (fallback)
|
||
import json as _json_card
|
||
|
||
cardkit_card_id: str | None = None
|
||
cardkit_sequence: int = 0
|
||
msg_id_for_patch: str | None = None
|
||
|
||
_reply_target = chat_id if chat_type == "group" and chat_id else sender_open_id
|
||
_rid_type = "chat_id" if chat_type == "group" and chat_id else "open_id"
|
||
|
||
init_card = {
|
||
"schema": "2.0",
|
||
"config": {
|
||
"streaming_mode": True,
|
||
"locales": ["zh_cn", "en_us"],
|
||
"summary": {"content": "思考中..."},
|
||
},
|
||
"body": {
|
||
"elements": [
|
||
{"tag": "markdown", "content": "", "text_align": "left", "text_size": "normal_v2", "element_id": "streaming_content"},
|
||
{"tag": "markdown", "content": " ", "icon": {"tag": "custom_icon", "img_key": "img_v3_02vb_496bec09-4b43-4773-ad6b-0cdd103cd2bg", "size": "16px 16px"}, "element_id": "loading_icon"},
|
||
]
|
||
},
|
||
}
|
||
|
||
try:
|
||
cardkit_card_id = await feishu_service.create_card_entity(
|
||
config.app_id, config.app_secret, init_card
|
||
)
|
||
cardkit_sequence = 1
|
||
await feishu_service.send_card_by_card_id(
|
||
config.app_id, config.app_secret, _reply_target, cardkit_card_id,
|
||
receive_id_type=_rid_type,
|
||
)
|
||
logger.info(f"[Feishu] CardKit card created and sent: card_id={cardkit_card_id}")
|
||
except Exception as e:
|
||
logger.warning(f"[Feishu] CardKit flow failed, falling back to IM patch: {e}")
|
||
cardkit_card_id = None
|
||
init_card_fallback = {
|
||
"config": {"update_multi": True},
|
||
"header": {"template": "blue", "title": {"content": "思考中...", "tag": "plain_text"}},
|
||
"elements": [{"tag": "markdown", "content": "..."}],
|
||
}
|
||
try:
|
||
init_resp = await feishu_service.send_message(
|
||
config.app_id, config.app_secret, _reply_target, "interactive",
|
||
_json_card.dumps(init_card_fallback), receive_id_type=_rid_type, stage="stream_init_card",
|
||
)
|
||
msg_id_for_patch = init_resp.get("data", {}).get("message_id")
|
||
except Exception as e2:
|
||
logger.error(f"[Feishu] Fallback init card also failed: {e2}")
|
||
|
||
_stream_buffer = []
|
||
_thinking_buffer = []
|
||
_last_flush_time = time.time()
|
||
_FLUSH_INTERVAL_CARDKIT = 0.5
|
||
_FLUSH_INTERVAL_PATCH = 1.0
|
||
_agent_name = agent_obj.name if agent_obj else "AI 回复"
|
||
_tool_status_running: dict[str, str] = {}
|
||
_tool_status_done: list[str] = []
|
||
_patch_queue = _SerialPatchQueue()
|
||
_heartbeat_task: asyncio.Task | None = None
|
||
_llm_done = False
|
||
_last_flushed_hash: int = 0
|
||
_last_flushed_text: str = ""
|
||
_flush_lock = asyncio.Lock()
|
||
|
||
def _build_card(
|
||
answer_text: str,
|
||
thinking_text: str = "",
|
||
streaming: bool = False,
|
||
tool_status_lines: list[str] | None = None,
|
||
agent_name: str | None = None,
|
||
) -> dict:
|
||
"""Build a Feishu interactive card for streaming replies.
|
||
|
||
Args:
|
||
answer_text: Main reply text (may be partial during streaming).
|
||
thinking_text: Reasoning/thinking content shown in a collapsed section.
|
||
streaming: If True, appends a cursor glyph to indicate in-progress output.
|
||
tool_status_lines: Override list for image streaming (which maintains its
|
||
own done-list; pass None to use the default text-streaming state).
|
||
agent_name: Override the default _agent_name (for image streaming context).
|
||
"""
|
||
_name = agent_name if agent_name is not None else _agent_name
|
||
|
||
elements = []
|
||
|
||
# Tool status section.
|
||
# For the primary text-streaming path we use the split running/done dicts;
|
||
# callers may pass an explicit list (image streaming) as override.
|
||
if tool_status_lines is not None:
|
||
# Caller-supplied override (image path): plain list, no split needed.
|
||
if tool_status_lines:
|
||
elements.append({
|
||
"tag": "markdown",
|
||
"content": "\n".join(tool_status_lines[-_TOOL_STATUS_KEEP_LINES:]),
|
||
})
|
||
elements.append({"tag": "hr"})
|
||
else:
|
||
# Primary text-streaming path: show done history + any still-running tools.
|
||
# _tool_status_running entries are removed when the tool completes,
|
||
# so only genuinely in-flight tools appear here.
|
||
done_visible = _tool_status_done[-_TOOL_STATUS_KEEP_LINES:]
|
||
running_visible = list(_tool_status_running.values())
|
||
all_visible = done_visible + running_visible
|
||
if all_visible:
|
||
elements.append({
|
||
"tag": "markdown",
|
||
"content": "\n".join(all_visible),
|
||
})
|
||
elements.append({"tag": "hr"})
|
||
|
||
# Thinking section: collapsed grey block
|
||
if thinking_text:
|
||
think_preview = thinking_text[:200].replace("\n", " ")
|
||
elements.append({
|
||
"tag": "markdown",
|
||
"content": f"<font color='grey'>💭 **Thinking**\n{think_preview}{'...' if len(thinking_text) > 200 else ''}</font>",
|
||
})
|
||
elements.append({"tag": "hr"})
|
||
|
||
body = answer_text + ("▌" if streaming and answer_text else ("..." if streaming else ""))
|
||
elements.append({"tag": "markdown", "content": body or "..."})
|
||
return {
|
||
"config": {"update_multi": True},
|
||
"header": {
|
||
"template": "blue",
|
||
"title": {"content": _name, "tag": "plain_text"},
|
||
},
|
||
"elements": elements,
|
||
}
|
||
|
||
def _build_final_cardkit_card(answer_text: str, thinking_text: str = "") -> dict:
|
||
elements = []
|
||
if thinking_text:
|
||
elements.append({
|
||
"tag": "collapsible_panel",
|
||
"expanded": False,
|
||
"header": {
|
||
"title": {"tag": "markdown", "content": f"💭 Thinking... ({len(thinking_text)} chars)"},
|
||
"vertical_align": "center",
|
||
"icon": {"tag": "standard_icon", "token": "down-small-ccm_outlined", "size": "16px 16px"},
|
||
"icon_position": "follow_text",
|
||
"icon_expanded_angle": -180,
|
||
},
|
||
"border": {"color": "grey", "corner_radius": "5px"},
|
||
"elements": [{"tag": "markdown", "content": thinking_text, "text_size": "notation"}],
|
||
})
|
||
elements.append({"tag": "markdown", "content": answer_text or "..."})
|
||
return {
|
||
"schema": "2.0",
|
||
"config": {"wide_screen_mode": True, "update_multi": True},
|
||
"body": {"elements": elements},
|
||
}
|
||
|
||
async def _queue_patch_card(card: dict, stage: str) -> None:
|
||
if not msg_id_for_patch:
|
||
return
|
||
payload = _json_card.dumps(card)
|
||
|
||
async def _job():
|
||
try:
|
||
await feishu_service.patch_message(
|
||
config.app_id,
|
||
config.app_secret,
|
||
msg_id_for_patch,
|
||
payload,
|
||
stage=stage,
|
||
)
|
||
except Exception as e:
|
||
logger.warning(f"[Feishu] Patch failed (stage={stage}, message_id={msg_id_for_patch}): {e}")
|
||
|
||
_patch_queue.enqueue(_job)
|
||
|
||
async def _flush_stream(reason: str, force: bool = False):
|
||
nonlocal _last_flush_time, _last_flushed_hash, cardkit_sequence, _last_flushed_text
|
||
if not cardkit_card_id and not msg_id_for_patch:
|
||
return
|
||
async with _flush_lock:
|
||
logger.debug(f"[Feishu] flush({reason}): seq={cardkit_sequence}")
|
||
now = time.time()
|
||
flush_interval = _FLUSH_INTERVAL_CARDKIT if cardkit_card_id else _FLUSH_INTERVAL_PATCH
|
||
if not force and now - _last_flush_time < flush_interval:
|
||
return
|
||
accumulated = "".join(_stream_buffer)
|
||
if cardkit_card_id:
|
||
# Build composite content: tool status lines + answer text.
|
||
# This mirrors the IM Patch path where _build_card() includes the
|
||
# tool status section, so CardKit users also see which tools are
|
||
# running or completed during the LLM turn.
|
||
done_visible = _tool_status_done[-_TOOL_STATUS_KEEP_LINES:]
|
||
running_visible = list(_tool_status_running.values())
|
||
all_tool_lines = done_visible + running_visible
|
||
if all_tool_lines:
|
||
tool_section = "\n".join(all_tool_lines)
|
||
cardkit_text = f"{tool_section}\n---\n{accumulated}" if accumulated else tool_section
|
||
else:
|
||
cardkit_text = accumulated
|
||
if cardkit_text != _last_flushed_text:
|
||
cardkit_sequence += 1
|
||
try:
|
||
await asyncio.wait_for(
|
||
feishu_service.stream_card_content(
|
||
config.app_id, config.app_secret,
|
||
cardkit_card_id, "streaming_content",
|
||
cardkit_text, cardkit_sequence,
|
||
),
|
||
timeout=5.0,
|
||
)
|
||
_last_flushed_text = cardkit_text
|
||
except asyncio.TimeoutError:
|
||
logger.warning(f"[Feishu] CardKit stream timed out, seq={cardkit_sequence}")
|
||
except Exception as e:
|
||
logger.warning(f"[Feishu] CardKit stream failed: {e}")
|
||
elif msg_id_for_patch:
|
||
card = _build_card(accumulated, "".join(_thinking_buffer), streaming=True)
|
||
current_hash = hash(accumulated + "".join(_thinking_buffer) + str(_tool_status_done) + str(list(_tool_status_running.values())))
|
||
if reason == "heartbeat" and current_hash == _last_flushed_hash:
|
||
return
|
||
_last_flushed_hash = current_hash
|
||
await _queue_patch_card(card, stage=f"stream_{reason}")
|
||
_last_flush_time = now
|
||
|
||
async def _ws_on_chunk(text: str):
|
||
if not cardkit_card_id and not msg_id_for_patch:
|
||
return
|
||
_stream_buffer.append(text)
|
||
await _flush_stream("chunk")
|
||
|
||
async def _ws_on_thinking(text: str):
|
||
if not cardkit_card_id and not msg_id_for_patch:
|
||
return
|
||
_thinking_buffer.append(text)
|
||
await _flush_stream("thinking")
|
||
|
||
async def _ws_on_tool_call(evt: dict):
|
||
"""Receive tool call status events and update the card's progress section.
|
||
|
||
Uses the tool's call_id as the dict key so each tool shows only its
|
||
latest state. When a tool completes the "running" entry is removed from
|
||
_tool_status_running and a "done" line is appended to _tool_status_done,
|
||
ensuring finished tools never linger as ⏳ in the card.
|
||
"""
|
||
tool_name = evt.get("name") or "unknown_tool"
|
||
# Use call_id when available (unique per invocation); fall back to name.
|
||
call_id = evt.get("call_id") or tool_name
|
||
status = (evt.get("status") or "").lower()
|
||
if status == "running":
|
||
# Register as in-flight; will be removed when "done" arrives.
|
||
_tool_status_running[call_id] = f"⏳ Tool running: `{tool_name}`"
|
||
elif status == "done":
|
||
# Remove from running dict so the ⏳ icon disappears immediately.
|
||
_tool_status_running.pop(call_id, None)
|
||
_tool_status_done.append(f"✅ Tool done: `{tool_name}`")
|
||
else:
|
||
_tool_status_running.pop(call_id, None)
|
||
_tool_status_done.append(f"ℹ️ Tool update: `{tool_name}` ({status or 'unknown'})")
|
||
await _flush_stream("tool")
|
||
|
||
async def _heartbeat():
|
||
while not _llm_done:
|
||
await asyncio.sleep(_FLUSH_INTERVAL_CARDKIT if cardkit_card_id else _FLUSH_INTERVAL_PATCH)
|
||
await _flush_stream("heartbeat")
|
||
|
||
if cardkit_card_id or msg_id_for_patch:
|
||
_heartbeat_task = asyncio.create_task(_heartbeat())
|
||
|
||
# Call LLM with history and streaming callback
|
||
try:
|
||
reply_text = await _call_agent_llm(
|
||
db,
|
||
agent_id,
|
||
llm_user_text,
|
||
history=history,
|
||
user_id=platform_user_id,
|
||
on_chunk=_ws_on_chunk,
|
||
on_thinking=_ws_on_thinking,
|
||
on_tool_call=_ws_on_tool_call,
|
||
)
|
||
finally:
|
||
_llm_done = True
|
||
if _heartbeat_task:
|
||
_heartbeat_task.cancel()
|
||
try:
|
||
await _heartbeat_task
|
||
except (Exception, asyncio.CancelledError):
|
||
pass
|
||
_cfs.reset(_cfs_token)
|
||
_cfso.reset(_cfso_token)
|
||
logger.info(f"[Feishu] LLM reply: {reply_text[:100]}")
|
||
|
||
# Send final card update or fallback text
|
||
if cardkit_card_id:
|
||
try:
|
||
cardkit_sequence += 1
|
||
await asyncio.wait_for(
|
||
feishu_service.set_card_streaming_mode(
|
||
config.app_id, config.app_secret,
|
||
cardkit_card_id, 0, cardkit_sequence,
|
||
),
|
||
timeout=10.0,
|
||
)
|
||
cardkit_sequence += 1
|
||
final_card = _build_final_cardkit_card(reply_text, "".join(_thinking_buffer))
|
||
await asyncio.wait_for(
|
||
feishu_service.update_cardkit_card(
|
||
config.app_id, config.app_secret,
|
||
cardkit_card_id, final_card, cardkit_sequence,
|
||
),
|
||
timeout=10.0,
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"[Feishu] CardKit final update failed: {e}")
|
||
try:
|
||
await feishu_service.send_message(
|
||
config.app_id, config.app_secret, _reply_target, "text",
|
||
_json.dumps({"text": reply_text}), receive_id_type=_rid_type,
|
||
stage="stream_final_fallback_text",
|
||
)
|
||
except Exception as e2:
|
||
logger.error(f"[Feishu] CardKit fallback text also failed: {e2}")
|
||
elif msg_id_for_patch:
|
||
try:
|
||
await _patch_queue.drain()
|
||
except Exception as e:
|
||
logger.warning(f"[Feishu] Drain patch queue failed before final patch: {e}")
|
||
final_card = _build_card(
|
||
reply_text,
|
||
"".join(_thinking_buffer),
|
||
streaming=False,
|
||
)
|
||
try:
|
||
await feishu_service.patch_message(
|
||
config.app_id,
|
||
config.app_secret,
|
||
msg_id_for_patch,
|
||
_json_card.dumps(final_card),
|
||
stage="stream_final",
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"[Feishu] Final card patch failed: {e}")
|
||
try:
|
||
await feishu_service.send_message(
|
||
config.app_id, config.app_secret, _reply_target, "text",
|
||
_json.dumps({"text": reply_text}), receive_id_type=_rid_type,
|
||
stage="stream_final_fallback_text",
|
||
)
|
||
except Exception as e2:
|
||
logger.error(f"[Feishu] Fallback text also failed: {e2}")
|
||
else:
|
||
try:
|
||
await feishu_service.send_message(
|
||
config.app_id, config.app_secret, _reply_target, "text",
|
||
_json.dumps({"text": reply_text}), receive_id_type=_rid_type,
|
||
stage="stream_no_card_fallback_text",
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"[Feishu] Failed to send fallback message: {e}")
|
||
|
||
# Log activity
|
||
from app.services.activity_logger import log_activity
|
||
await log_activity(agent_id, "chat_reply", f"回复了飞书消息: {reply_text[:80]}", detail={"channel": "feishu", "user_text": user_text[:200], "reply": reply_text[:500]})
|
||
|
||
# If task creation detected, create a real Task record
|
||
if task_match:
|
||
task_title = task_match.group(1).strip()
|
||
if task_title:
|
||
try:
|
||
from app.models.task import Task as TaskModel
|
||
from app.models.agent import Agent as AgentModel
|
||
from app.services.task_executor import execute_task
|
||
import asyncio as _asyncio
|
||
|
||
# Find the agent's creator to use as task creator
|
||
agent_r = await db.execute(select(AgentModel).where(AgentModel.id == agent_id))
|
||
agent_obj = agent_r.scalar_one_or_none()
|
||
creator_id = agent_obj.creator_id if agent_obj else agent_id
|
||
|
||
task_obj = TaskModel(
|
||
agent_id=agent_id,
|
||
title=task_title,
|
||
created_by=creator_id,
|
||
status="pending",
|
||
priority="medium",
|
||
)
|
||
db.add(task_obj)
|
||
await db.commit()
|
||
await db.refresh(task_obj)
|
||
_asyncio.create_task(execute_task(task_obj.id, agent_id))
|
||
reply_text += f"\n\n📋 已同步创建任务到任务面板:【{task_title}】"
|
||
logger.info(f"[Feishu] Created task: {task_title}")
|
||
except Exception as e:
|
||
logger.error(f"[Feishu] Failed to create task: {e}")
|
||
|
||
# Save assistant reply to history (use platform_user_id so messages stay in one session)
|
||
db.add(ChatMessage(agent_id=agent_id, user_id=platform_user_id, role="assistant", content=reply_text, conversation_id=session_conv_id))
|
||
_sess.last_message_at = _dt.now(_tz.utc)
|
||
await db.commit()
|
||
|
||
return {"code": 0, "msg": "ok"}
|
||
|
||
|
||
IMPORT_RE = None # lazy sentinel
|
||
_FILE_ACK_MESSAGES = [
|
||
"收到你的文件,请问有什么需要帮忙的?",
|
||
"文件收到了!你想让我怎么处理它?",
|
||
"好的,我已经收到这份文件,请告诉我你的需求~",
|
||
"已收到文件,随时准备好为你处理!",
|
||
"收到!请问希望我对这份文件做什么?",
|
||
]
|
||
|
||
|
||
async def _handle_feishu_file(db, agent_id, config, message, sender_open_id, chat_type, chat_id):
|
||
"""Handle incoming file or image messages from Feishu (runs as a background task)."""
|
||
import asyncio, random, json
|
||
from pathlib import Path
|
||
from app.config import get_settings
|
||
from app.models.audit import ChatMessage
|
||
from app.models.agent import Agent as AgentModel
|
||
from app.models.user import User as UserModel
|
||
from app.services.channel_session import find_or_create_channel_session
|
||
from app.core.security import hash_password
|
||
from app.database import async_session as _async_session
|
||
from datetime import datetime as _dt, timezone as _tz
|
||
import uuid as _uuid
|
||
from sqlalchemy import select as _select
|
||
|
||
msg_type = message.get("message_type", "file")
|
||
message_id = message.get("message_id", "")
|
||
content = json.loads(message.get("content", "{}"))
|
||
|
||
# Extract file key and name
|
||
if msg_type == "image":
|
||
file_key = content.get("image_key", "")
|
||
filename = f"image_{file_key[-8:]}.jpg" if file_key else "image.jpg"
|
||
res_type = "image"
|
||
else:
|
||
file_key = content.get("file_key", "")
|
||
filename = content.get("file_name") or f"file_{file_key[-8:]}.bin"
|
||
res_type = "file"
|
||
|
||
if not file_key:
|
||
logger.warning(f"[Feishu] No file_key in {msg_type} message")
|
||
return
|
||
|
||
# Resolve workspace upload dir
|
||
settings = get_settings()
|
||
upload_dir = Path(settings.AGENT_DATA_DIR) / str(agent_id) / "workspace" / "uploads"
|
||
upload_dir.mkdir(parents=True, exist_ok=True)
|
||
save_path = upload_dir / filename
|
||
|
||
# Download the file
|
||
try:
|
||
file_bytes = await feishu_service.download_message_resource(
|
||
config.app_id, config.app_secret, message_id, file_key, res_type
|
||
)
|
||
save_path.write_bytes(file_bytes)
|
||
logger.info(f"[Feishu] Saved {msg_type} to {save_path} ({len(file_bytes)} bytes)")
|
||
except Exception as e:
|
||
logger.error(f"[Feishu] Failed to download {msg_type}: {e}")
|
||
err_tip = "抱歉,文件下载失败。可能原因:机器人缺少 `im:resource` 权限(文件读取)。\n请在飞书开放平台 → 权限管理 → 批量导入权限 JSON → 重新发布机器人版本后重试。"
|
||
try:
|
||
import json as _j
|
||
if chat_type == "group" and chat_id:
|
||
await feishu_service.send_message(config.app_id, config.app_secret, chat_id, "text", _j.dumps({"text": err_tip}), receive_id_type="chat_id")
|
||
else:
|
||
await feishu_service.send_message(config.app_id, config.app_secret, sender_open_id, "text", _j.dumps({"text": err_tip}))
|
||
except Exception as e2:
|
||
logger.error(f"[Feishu] Also failed to send error tip: {e2}")
|
||
return
|
||
|
||
# Resolve platform user and session using a fresh db session
|
||
async with _async_session() as db:
|
||
agent_r = await db.execute(_select(AgentModel).where(AgentModel.id == agent_id))
|
||
agent_obj = agent_r.scalar_one_or_none()
|
||
|
||
# Resolve sender's Feishu user_id (more stable than open_id)
|
||
sender_user_id_feishu = ""
|
||
extra_info: dict | None = None
|
||
try:
|
||
import httpx as _hx
|
||
async with _hx.AsyncClient() as _fc:
|
||
_tr = await _fc.post(
|
||
"https://open.feishu.cn/open-apis/auth/v3/app_access_token/internal",
|
||
json={"app_id": config.app_id, "app_secret": config.app_secret},
|
||
)
|
||
_at = _tr.json().get("app_access_token", "")
|
||
if _at:
|
||
_ur = await _fc.get(
|
||
f"https://open.feishu.cn/open-apis/contact/v3/users/{sender_open_id}",
|
||
params={"user_id_type": "open_id"},
|
||
headers={"Authorization": f"Bearer {_at}"},
|
||
)
|
||
_ud = _ur.json()
|
||
if _ud.get("code") == 0:
|
||
_user_info = _ud.get("data", {}).get("user", {})
|
||
sender_user_id_feishu = _user_info.get("user_id", "")
|
||
# Feishu contact API returns 'avatar' as a dict
|
||
# (keys: avatar_240, avatar_640, avatar_origin), NOT a plain URL.
|
||
_raw_avatar = _user_info.get("avatar")
|
||
if isinstance(_raw_avatar, dict):
|
||
_avatar_url = (
|
||
_raw_avatar.get("avatar_240")
|
||
or _raw_avatar.get("avatar_640")
|
||
or _raw_avatar.get("avatar_origin")
|
||
or ""
|
||
)
|
||
else:
|
||
_avatar_url = _raw_avatar or ""
|
||
extra_info = {
|
||
"name": _user_info.get("name"),
|
||
"avatar_url": _avatar_url,
|
||
"email": _user_info.get("email"),
|
||
"mobile": _user_info.get("mobile"),
|
||
"unionid": _user_info.get("user_id"),
|
||
"open_id": sender_open_id,
|
||
}
|
||
except Exception:
|
||
pass
|
||
|
||
# Resolve channel user via unified service (uses OrgMember + SSO patterns)
|
||
from app.services.channel_user_service import channel_user_service
|
||
platform_user = await channel_user_service.resolve_channel_user(
|
||
db=db,
|
||
agent=agent_obj,
|
||
channel_type="feishu",
|
||
external_user_id=sender_open_id,
|
||
extra_info=extra_info,
|
||
)
|
||
platform_user_id = platform_user.id
|
||
|
||
# Conv ID — prefer user_id for session continuity
|
||
if chat_type == "group" and chat_id:
|
||
conv_id = f"feishu_group_{chat_id}"
|
||
else:
|
||
conv_id = f"feishu_p2p_{sender_user_id_feishu or sender_open_id}"
|
||
|
||
# Find-or-create session
|
||
_is_group_file = (chat_type == "group")
|
||
# For group file sessions, use agent creator as placeholder user_id
|
||
_file_user_id = platform_user_id
|
||
if _is_group_file:
|
||
_ag_r = await db.execute(_select(AgentModel).where(AgentModel.id == agent_id))
|
||
_ag_obj = _ag_r.scalar_one_or_none()
|
||
_file_user_id = _ag_obj.creator_id if _ag_obj else platform_user_id
|
||
_sess = await find_or_create_channel_session(
|
||
db=db, agent_id=agent_id, user_id=_file_user_id,
|
||
external_conv_id=conv_id, source_channel="feishu",
|
||
first_message_title=f"[文件] {filename}",
|
||
is_group=_is_group_file,
|
||
group_name=f"Feishu Group {chat_id[:8]}" if _is_group_file else None,
|
||
)
|
||
session_conv_id = str(_sess.id)
|
||
|
||
# Store user message — include base64 marker for images so LLM can see them
|
||
if msg_type == "image":
|
||
import base64 as _b64_img
|
||
_b64_data = _b64_img.b64encode(file_bytes).decode("ascii")
|
||
_image_marker = f"[image_data:data:image/jpeg;base64,{_b64_data}]"
|
||
user_msg_content = f"[用户发送了图片]\n{_image_marker}"
|
||
else:
|
||
user_msg_content = f"[file:{filename}]"
|
||
db.add(ChatMessage(agent_id=agent_id, user_id=platform_user_id, role="user",
|
||
content=user_msg_content if msg_type != "image" else f"[file:{filename}]",
|
||
conversation_id=session_conv_id))
|
||
_sess.last_message_at = _dt.now(_tz.utc)
|
||
|
||
# Load conversation history for LLM context
|
||
from app.models.agent import DEFAULT_CONTEXT_WINDOW_SIZE
|
||
ctx_size = (agent_obj.context_window_size or DEFAULT_CONTEXT_WINDOW_SIZE) if agent_obj else DEFAULT_CONTEXT_WINDOW_SIZE
|
||
_hist_r = await db.execute(
|
||
_select(ChatMessage)
|
||
.where(ChatMessage.agent_id == agent_id, ChatMessage.conversation_id == session_conv_id)
|
||
.order_by(ChatMessage.created_at.desc())
|
||
.limit(ctx_size)
|
||
)
|
||
_history = [{"role": m.role, "content": m.content} for m in reversed(_hist_r.scalars().all())]
|
||
|
||
await db.commit()
|
||
|
||
# For images: call LLM so vision models can actually see the image
|
||
if msg_type == "image":
|
||
import json as _json_card_img
|
||
|
||
# Send initial loading card
|
||
_reply_to = chat_id if chat_type == "group" else sender_open_id
|
||
_rid_type = "chat_id" if chat_type == "group" else "open_id"
|
||
_agent_name = agent_obj.name if agent_obj else "AI"
|
||
_init_card = {
|
||
"config": {"update_multi": True},
|
||
"header": {"template": "blue", "title": {"content": "识别图片中...", "tag": "plain_text"}},
|
||
"elements": [{"tag": "markdown", "content": "..."}]
|
||
}
|
||
_patch_msg_id = None
|
||
try:
|
||
_init_resp = await feishu_service.send_message(
|
||
config.app_id, config.app_secret, _reply_to, "interactive",
|
||
_json_card_img.dumps(_init_card), receive_id_type=_rid_type, stage="image_stream_init_card"
|
||
)
|
||
_patch_msg_id = _init_resp.get("data", {}).get("message_id")
|
||
except Exception as _e_init:
|
||
logger.error(f"[Feishu] Failed to send init card for image: {_e_init}")
|
||
|
||
_img_stream_buf = []
|
||
_img_last_flush = time.time()
|
||
_img_flush_interval = 1.0
|
||
_img_patch_queue = _SerialPatchQueue()
|
||
_img_heartbeat_task: asyncio.Task | None = None
|
||
_img_llm_done = False
|
||
_img_last_flushed_hash: int = 0 # Content hash to skip no-op heartbeat patches
|
||
|
||
async def _queue_image_patch(_card: dict, _stage: str):
|
||
"""Enqueue a serialized PATCH request for the image streaming card."""
|
||
if not _patch_msg_id:
|
||
return
|
||
_payload = _json_card_img.dumps(_card)
|
||
|
||
async def _job():
|
||
try:
|
||
await feishu_service.patch_message(
|
||
config.app_id,
|
||
config.app_secret,
|
||
_patch_msg_id,
|
||
_payload,
|
||
stage=_stage,
|
||
)
|
||
except Exception as _e_patch:
|
||
logger.warning(f"[Feishu] Image patch failed (stage={_stage}, message_id={_patch_msg_id}): {_e_patch}")
|
||
|
||
_img_patch_queue.enqueue(_job)
|
||
|
||
async def _flush_image_stream(reason: str, force: bool = False):
|
||
"""Build and enqueue an image streaming card update.
|
||
|
||
Reuses _build_card so the image path supports the same thinking
|
||
and tool-status sections as the text streaming path.
|
||
Skips the patch on heartbeat ticks when content has not changed.
|
||
"""
|
||
nonlocal _img_last_flush, _img_last_flushed_hash
|
||
now = time.time()
|
||
if not force and now - _img_last_flush < _img_flush_interval:
|
||
return
|
||
# Reuse the shared card builder (no tool_status for image path yet,
|
||
# but the builder is ready to accept them in the future).
|
||
_card = _build_card(
|
||
"".join(_img_stream_buf),
|
||
streaming=True,
|
||
agent_name=_agent_name,
|
||
)
|
||
# Skip no-op heartbeat patches when content hasn't changed.
|
||
current_hash = hash("".join(_img_stream_buf))
|
||
if reason == "heartbeat" and current_hash == _img_last_flushed_hash:
|
||
return
|
||
_img_last_flushed_hash = current_hash
|
||
await _queue_image_patch(_card, _stage=f"image_stream_{reason}")
|
||
_img_last_flush = now
|
||
|
||
async def _img_on_chunk(text):
|
||
_img_stream_buf.append(text)
|
||
if _patch_msg_id:
|
||
await _flush_image_stream("chunk")
|
||
|
||
async def _img_heartbeat():
|
||
while not _img_llm_done:
|
||
await asyncio.sleep(_img_flush_interval)
|
||
if _patch_msg_id:
|
||
await _flush_image_stream("heartbeat")
|
||
|
||
if _patch_msg_id:
|
||
_img_heartbeat_task = asyncio.create_task(_img_heartbeat())
|
||
|
||
# Call LLM with image marker — vision models will parse it
|
||
async with _async_session() as _db_img:
|
||
try:
|
||
reply_text = await _call_agent_llm(
|
||
_db_img, agent_id, user_msg_content, history=_history,
|
||
user_id=platform_user_id, on_chunk=_img_on_chunk,
|
||
)
|
||
finally:
|
||
_img_llm_done = True
|
||
if _img_heartbeat_task:
|
||
_img_heartbeat_task.cancel()
|
||
try:
|
||
await _img_heartbeat_task
|
||
except Exception:
|
||
pass
|
||
|
||
logger.info(f"[Feishu] Image LLM reply: {reply_text[:100]}")
|
||
|
||
# Send final card or fallback text
|
||
if _patch_msg_id:
|
||
try:
|
||
await _img_patch_queue.drain()
|
||
except Exception as _e_drain:
|
||
logger.warning(f"[Feishu] Image patch queue drain failed: {_e_drain}")
|
||
# Build final card via shared builder (consistent with text streaming path).
|
||
_final_card = _build_card(
|
||
reply_text or "...",
|
||
streaming=False,
|
||
agent_name=_agent_name,
|
||
)
|
||
await feishu_service.patch_message(
|
||
config.app_id, config.app_secret, _patch_msg_id, _json_card_img.dumps(_final_card), stage="image_stream_final"
|
||
)
|
||
else:
|
||
try:
|
||
await feishu_service.send_message(
|
||
config.app_id, config.app_secret, _reply_to, "text",
|
||
json.dumps({"text": reply_text}), receive_id_type=_rid_type, stage="image_stream_fallback_text",
|
||
)
|
||
except Exception as _e_fb:
|
||
logger.error(f"[Feishu] Failed to send image reply: {_e_fb}")
|
||
|
||
# Save assistant reply in DB
|
||
async with _async_session() as _db_save:
|
||
_db_save.add(ChatMessage(agent_id=agent_id, user_id=platform_user_id, role="assistant",
|
||
content=reply_text, conversation_id=session_conv_id))
|
||
await _db_save.commit()
|
||
|
||
# Log activity
|
||
from app.services.activity_logger import log_activity
|
||
await log_activity(agent_id, "chat_reply", f"回复了飞书图片消息: {reply_text[:80]}", detail={"channel": "feishu", "type": "image"})
|
||
return
|
||
|
||
# For non-image files: send simple ack as before
|
||
await asyncio.sleep(random.uniform(1.0, 2.0))
|
||
|
||
ack = random.choice(_FILE_ACK_MESSAGES)
|
||
try:
|
||
if chat_type == "group" and chat_id:
|
||
await feishu_service.send_message(
|
||
config.app_id, config.app_secret, chat_id, "text",
|
||
json.dumps({"text": ack}), receive_id_type="chat_id",
|
||
)
|
||
else:
|
||
await feishu_service.send_message(
|
||
config.app_id, config.app_secret, sender_open_id, "text",
|
||
json.dumps({"text": ack}),
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"[Feishu] Failed to send ack: {e}")
|
||
|
||
# Store ack in DB
|
||
async with _async_session() as db2:
|
||
db2.add(ChatMessage(agent_id=agent_id, user_id=platform_user_id, role="assistant",
|
||
content=ack, conversation_id=session_conv_id))
|
||
await db2.commit()
|
||
|
||
|
||
|
||
async def _download_post_images(agent_id, config, message_id, image_keys):
|
||
"""Download images embedded in a Feishu post message to the agent's workspace."""
|
||
from pathlib import Path
|
||
from app.config import get_settings
|
||
settings = get_settings()
|
||
upload_dir = Path(settings.AGENT_DATA_DIR) / str(agent_id) / "workspace" / "uploads"
|
||
upload_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
for ik in image_keys:
|
||
try:
|
||
file_bytes = await feishu_service.download_message_resource(
|
||
config.app_id, config.app_secret, message_id, ik, "image"
|
||
)
|
||
save_path = upload_dir / f"image_{ik[-8:]}.jpg"
|
||
save_path.write_bytes(file_bytes)
|
||
logger.info(f"[Feishu] Saved post image to {save_path} ({len(file_bytes)} bytes)")
|
||
except Exception as e:
|
||
logger.error(f"[Feishu] Failed to download post image {ik}: {e}")
|
||
|
||
|
||
async def _call_agent_llm(
|
||
db: AsyncSession,
|
||
agent_id: uuid.UUID,
|
||
user_text: str,
|
||
history: list[dict] | None = None,
|
||
user_id=None,
|
||
on_chunk=None,
|
||
on_thinking=None,
|
||
on_tool_call=None,
|
||
) -> str:
|
||
"""Call the agent's configured LLM model with conversation history.
|
||
|
||
Reuses the same call_llm function as the WebSocket chat endpoint so that
|
||
all providers (OpenRouter, Qwen, etc.) work identically on both channels.
|
||
"""
|
||
from app.models.agent import Agent
|
||
from app.models.llm import LLMModel
|
||
from app.api.websocket import call_llm
|
||
|
||
# Load agent and model
|
||
agent_result = await db.execute(select(Agent).where(Agent.id == agent_id))
|
||
agent = agent_result.scalar_one_or_none()
|
||
if not agent:
|
||
return "⚠️ 数字员工未找到"
|
||
|
||
if is_agent_expired(agent):
|
||
return "This Agent has expired and is off duty. Please contact your admin to extend its service."
|
||
|
||
# Load primary model (skip if disabled by admin)
|
||
model = None
|
||
if agent.primary_model_id:
|
||
model_result = await db.execute(select(LLMModel).where(LLMModel.id == agent.primary_model_id))
|
||
model = model_result.scalar_one_or_none()
|
||
if model and not model.enabled:
|
||
logger.info(f"[Channel] Primary model {model.model} is disabled, skipping")
|
||
model = None
|
||
|
||
# Load fallback model (skip if disabled by admin)
|
||
fallback_model = None
|
||
if agent.fallback_model_id:
|
||
fb_result = await db.execute(select(LLMModel).where(LLMModel.id == agent.fallback_model_id))
|
||
fallback_model = fb_result.scalar_one_or_none()
|
||
if fallback_model and not fallback_model.enabled:
|
||
logger.info(f"[Channel] Fallback model {fallback_model.model} is disabled, skipping")
|
||
fallback_model = None
|
||
|
||
# Config-level fallback: primary missing -> use fallback
|
||
if not model and fallback_model:
|
||
model = fallback_model
|
||
fallback_model = None
|
||
logger.warning(f"[Channel] Primary model unavailable, using fallback: {model.model}")
|
||
|
||
if not model:
|
||
return f"⚠️ {agent.name} 未配置 LLM 模型,请在管理后台设置。"
|
||
|
||
# Build conversation messages (without system prompt — call_llm adds it)
|
||
messages: list[dict] = []
|
||
from app.models.agent import DEFAULT_CONTEXT_WINDOW_SIZE
|
||
ctx_size = agent.context_window_size or DEFAULT_CONTEXT_WINDOW_SIZE
|
||
if history:
|
||
messages.extend(history[-ctx_size:])
|
||
messages.append({"role": "user", "content": user_text})
|
||
|
||
# Use actual user_id so the system prompt knows who it's chatting with
|
||
effective_user_id = user_id or agent_id
|
||
|
||
# Determine effective timeout: prefer model-level setting, else use module default.
|
||
_timeout = _get_llm_timeout(model)
|
||
|
||
try:
|
||
reply = await asyncio.wait_for(
|
||
call_llm(
|
||
model,
|
||
messages,
|
||
agent.name,
|
||
agent.role_description or "",
|
||
agent_id=agent_id,
|
||
user_id=effective_user_id,
|
||
supports_vision=getattr(model, 'supports_vision', False),
|
||
on_chunk=on_chunk,
|
||
on_thinking=on_thinking,
|
||
on_tool_call=on_tool_call,
|
||
),
|
||
timeout=_timeout,
|
||
)
|
||
return reply
|
||
except asyncio.TimeoutError:
|
||
logger.error(
|
||
f"[LLM] Call timed out after {_timeout}s "
|
||
f"(agent_id={agent_id}, model={getattr(model, 'model', 'unknown')})"
|
||
)
|
||
if fallback_model:
|
||
# Use the fallback model's own timeout budget.
|
||
_fb_timeout = _get_llm_timeout(fallback_model)
|
||
logger.info(f"[LLM] Retrying timed-out request with fallback model: {fallback_model.model} (timeout={_fb_timeout}s)")
|
||
try:
|
||
reply = await asyncio.wait_for(
|
||
call_llm(
|
||
fallback_model,
|
||
messages,
|
||
agent.name,
|
||
agent.role_description or "",
|
||
agent_id=agent_id,
|
||
user_id=effective_user_id,
|
||
supports_vision=getattr(fallback_model, 'supports_vision', False),
|
||
on_chunk=on_chunk,
|
||
on_thinking=on_thinking,
|
||
on_tool_call=on_tool_call,
|
||
),
|
||
timeout=_fb_timeout,
|
||
)
|
||
return reply
|
||
except asyncio.TimeoutError:
|
||
logger.error(
|
||
f"[LLM] Fallback call also timed out after {_fb_timeout}s "
|
||
f"(agent_id={agent_id}, model={getattr(fallback_model, 'model', 'unknown')})"
|
||
)
|
||
return f"⚠️ Model response timed out (>{int(_fb_timeout)}s). Please retry or shorten your request."
|
||
except Exception as e2:
|
||
import traceback
|
||
traceback.print_exc()
|
||
return f"⚠️ Model error: Primary Timeout | Fallback: {str(e2)[:80]}"
|
||
return f"⚠️ Model response timed out (>{int(_timeout)}s). Please retry or shorten your request."
|
||
except Exception as e:
|
||
import traceback
|
||
traceback.print_exc()
|
||
error_msg = str(e) or repr(e)
|
||
logger.error(f"[LLM] Primary model error: {error_msg}")
|
||
# Runtime fallback: primary model failed -> retry with fallback model
|
||
if fallback_model:
|
||
logger.info(f"[LLM] Retrying with fallback model: {fallback_model.model}")
|
||
try:
|
||
_fb_timeout = _get_llm_timeout(fallback_model)
|
||
reply = await asyncio.wait_for(
|
||
call_llm(
|
||
fallback_model,
|
||
messages,
|
||
agent.name,
|
||
agent.role_description or "",
|
||
agent_id=agent_id,
|
||
user_id=effective_user_id,
|
||
supports_vision=getattr(fallback_model, 'supports_vision', False),
|
||
on_chunk=on_chunk,
|
||
on_thinking=on_thinking,
|
||
on_tool_call=on_tool_call,
|
||
),
|
||
timeout=_fb_timeout,
|
||
)
|
||
return reply
|
||
except asyncio.TimeoutError:
|
||
logger.error(
|
||
f"[LLM] Fallback call timed out after {_fb_timeout}s "
|
||
f"(agent_id={agent_id}, model={getattr(fallback_model, 'model', 'unknown')})"
|
||
)
|
||
return f"⚠️ Model error: Primary: {str(e)[:80]} | Fallback Timeout"
|
||
except Exception as e2:
|
||
traceback.print_exc()
|
||
return f"⚠️ Model error: Primary: {str(e)[:80]} | Fallback: {str(e2)[:80]}"
|
||
return f"⚠️ 调用模型出错: {error_msg[:150]}"
|