400 lines
16 KiB
Python
400 lines
16 KiB
Python
"""Supervision reminder service — periodically sends reminders for supervision tasks.
|
||
|
||
Checks all supervision-type tasks that are not done and sends Feishu reminders
|
||
to the target person based on the configured schedule preset.
|
||
|
||
Schedule presets: daily, every_2_days, every_3_days, weekly
|
||
|
||
Runs as a background task inside the FastAPI process.
|
||
"""
|
||
|
||
import asyncio
|
||
from datetime import datetime, timezone, timedelta
|
||
|
||
from loguru import logger
|
||
from sqlalchemy import select
|
||
|
||
from app.database import async_session
|
||
from app.models.task import Task, TaskLog
|
||
from app.models.agent import Agent
|
||
|
||
# Schedule JSON format:
|
||
# {"freq": "daily"|"weekly", "interval": N, "time": "HH:MM", "weekdays": [0-6]}
|
||
# weekdays: 0=Sun, 1=Mon, ..., 6=Sat
|
||
|
||
|
||
def _parse_schedule(remind_schedule: str) -> dict | None:
|
||
"""Parse remind_schedule — supports JSON format or legacy simple presets."""
|
||
import json
|
||
if not remind_schedule:
|
||
return None
|
||
try:
|
||
sched = json.loads(remind_schedule)
|
||
if isinstance(sched, dict) and "freq" in sched:
|
||
return sched
|
||
except (json.JSONDecodeError, TypeError):
|
||
pass
|
||
# Legacy simple preset fallback
|
||
legacy_map = {
|
||
"daily": {"freq": "daily", "interval": 1, "time": "09:00"},
|
||
"every_2_days": {"freq": "daily", "interval": 2, "time": "09:00"},
|
||
"every_3_days": {"freq": "daily", "interval": 3, "time": "09:00"},
|
||
"weekly": {"freq": "weekly", "interval": 1, "time": "09:00", "weekdays": [1, 2, 3, 4, 5]},
|
||
}
|
||
return legacy_map.get(remind_schedule)
|
||
|
||
|
||
def _is_reminder_due(remind_schedule: str, last_reminded_at: datetime | None, now_utc: datetime) -> bool:
|
||
"""Check if a reminder is due based on the schedule config.
|
||
|
||
All time calculations are anchored to now_utc (provided by tick loop).
|
||
Default behavior is to use UTC for hour/minute checks unless a timezone is specified.
|
||
"""
|
||
sched = _parse_schedule(remind_schedule)
|
||
if not sched:
|
||
return False
|
||
|
||
freq = sched.get("freq", "daily")
|
||
interval = sched.get("interval", 1)
|
||
time_str = sched.get("time", "09:00")
|
||
|
||
# Parse target hour/minute
|
||
try:
|
||
th, tm = map(int, time_str.split(":"))
|
||
except Exception:
|
||
th, tm = 9, 0
|
||
|
||
# For now, we use UTC for the hour/minute check.
|
||
# In the future, we should load agent.timezone and convert now_utc.
|
||
current_time = now_utc
|
||
|
||
# Not yet time today
|
||
if current_time.hour < th or (current_time.hour == th and current_time.minute < tm):
|
||
return False
|
||
|
||
# Already past the time window (allow 60-min window)
|
||
if current_time.hour > th or (current_time.hour == th and current_time.minute > tm + 59):
|
||
return False
|
||
|
||
# Weekly: check if today is a selected weekday
|
||
if freq == "weekly":
|
||
weekdays = sched.get("weekdays", [1, 2, 3, 4, 5])
|
||
# Python: Monday=0, Sunday=6 → convert to our format: Sunday=0, Monday=1, ...
|
||
py_weekday = current_time.weekday() # Mon=0
|
||
our_weekday = (py_weekday + 1) % 7 # Sun=0
|
||
if our_weekday not in weekdays:
|
||
return False
|
||
|
||
# Check interval since last reminder
|
||
if last_reminded_at is None:
|
||
return True
|
||
|
||
# Ensure both are timezone-aware for comparison
|
||
if last_reminded_at.tzinfo is None:
|
||
last_reminded_at = last_reminded_at.replace(tzinfo=timezone.utc)
|
||
|
||
elapsed = now_utc - last_reminded_at
|
||
min_interval = timedelta(days=interval) - timedelta(hours=2) # tolerance
|
||
return elapsed >= min_interval
|
||
|
||
|
||
async def _get_agent_reply(target_agent, message: str, db) -> str | None:
|
||
"""Call target agent's LLM to generate a reply to a supervision reminder.
|
||
|
||
Returns the reply text, or None if the agent can't respond.
|
||
"""
|
||
from app.models.llm import LLMModel
|
||
from app.services.agent_context import build_agent_context
|
||
from app.services.llm_utils import (
|
||
get_provider_base_url,
|
||
create_llm_client,
|
||
LLMMessage,
|
||
LLMError,
|
||
)
|
||
|
||
model_id = target_agent.primary_model_id or target_agent.fallback_model_id
|
||
if not model_id:
|
||
return None
|
||
|
||
from sqlalchemy import select as _select
|
||
model_result = await db.execute(_select(LLMModel).where(LLMModel.id == model_id))
|
||
model = model_result.scalar_one_or_none()
|
||
if not model:
|
||
return None
|
||
|
||
base_url = get_provider_base_url(model.provider, model.base_url)
|
||
if not base_url:
|
||
return None
|
||
|
||
static_prompt, dynamic_prompt = await build_agent_context(
|
||
target_agent.id, target_agent.name, target_agent.role_description or ""
|
||
)
|
||
|
||
messages = [
|
||
LLMMessage(role="system", content=static_prompt, dynamic_content=dynamic_prompt),
|
||
LLMMessage(role="user", content=message),
|
||
]
|
||
|
||
client = create_llm_client(
|
||
provider=model.provider,
|
||
api_key=model.api_key_encrypted,
|
||
model=model.model,
|
||
base_url=base_url,
|
||
timeout=float(getattr(model, 'request_timeout', None) or 60.0),
|
||
)
|
||
try:
|
||
response = await client.complete(
|
||
messages=messages,
|
||
temperature=model.temperature,
|
||
max_tokens=512,
|
||
)
|
||
content = (response.content or "").strip()
|
||
return content if content else None
|
||
except LLMError as e:
|
||
logger.error(f"_get_agent_reply LLM error: {e}")
|
||
except Exception as e:
|
||
logger.error(f"_get_agent_reply LLM call failed: {e}")
|
||
finally:
|
||
await client.close()
|
||
return None
|
||
|
||
|
||
async def _send_supervision_reminder(task: Task, agent_name: str):
|
||
"""Send a single supervision reminder. Target can be an Agent or a Member."""
|
||
try:
|
||
from app.models.agent import Agent
|
||
from app.models.org import AgentRelationship
|
||
from app.models.channel_config import ChannelConfig
|
||
from app.models.activity_log import AgentActivityLog
|
||
from app.services.feishu_service import feishu_service
|
||
from sqlalchemy.orm import selectinload
|
||
import json as _json
|
||
|
||
target_name = task.supervision_target_name
|
||
if not target_name:
|
||
logger.warning(f"Supervision task {task.id} has no target name")
|
||
return
|
||
|
||
days_since = (datetime.now(timezone.utc) - task.created_at).days
|
||
reminder_msg = (
|
||
f"📋 督办提醒 — 来自 {agent_name}\n\n"
|
||
f"事项:{task.title}\n"
|
||
)
|
||
if task.description:
|
||
reminder_msg += f"说明:{task.description}\n"
|
||
reminder_msg += f"创建于:{days_since} 天前\n"
|
||
if task.due_date:
|
||
reminder_msg += f"截止日期:{task.due_date.strftime('%Y-%m-%d')}\n"
|
||
reminder_msg += f"\n请及时处理,谢谢!"
|
||
|
||
async with async_session() as db:
|
||
sent = False
|
||
send_method = ""
|
||
|
||
# 1. Try to find target as an Agent
|
||
agent_result = await db.execute(
|
||
select(Agent).where(Agent.name == target_name)
|
||
)
|
||
target_agent = agent_result.scalar_one_or_none()
|
||
|
||
if target_agent:
|
||
# Send agent-to-agent message via ChatSession + ChatMessage
|
||
from app.models.audit import ChatMessage
|
||
from app.models.chat_session import ChatSession
|
||
from app.models.participant import Participant
|
||
|
||
# Get participant for sender agent
|
||
src_part_r = await db.execute(
|
||
select(Participant).where(Participant.type == "agent", Participant.ref_id == task.agent_id)
|
||
)
|
||
src_part = src_part_r.scalar_one_or_none()
|
||
tgt_part_r = await db.execute(
|
||
select(Participant).where(Participant.type == "agent", Participant.ref_id == target_agent.id)
|
||
)
|
||
tgt_part = tgt_part_r.scalar_one_or_none()
|
||
|
||
# Find or create ChatSession
|
||
session_agent_id = min(task.agent_id, target_agent.id, key=str)
|
||
session_peer_id = max(task.agent_id, target_agent.id, key=str)
|
||
sess_r = await db.execute(
|
||
select(ChatSession).where(
|
||
ChatSession.agent_id == session_agent_id,
|
||
ChatSession.peer_agent_id == session_peer_id,
|
||
ChatSession.source_channel == "agent",
|
||
)
|
||
)
|
||
chat_session = sess_r.scalar_one_or_none()
|
||
if not chat_session:
|
||
# Get creator for user_id
|
||
src_agent_r = await db.execute(select(Agent).where(Agent.id == task.agent_id))
|
||
src_agent = src_agent_r.scalar_one_or_none()
|
||
owner_id = src_agent.creator_id if src_agent else task.agent_id
|
||
chat_session = ChatSession(
|
||
agent_id=session_agent_id,
|
||
user_id=owner_id,
|
||
title=f"{agent_name} ↔ {target_agent.name}",
|
||
source_channel="agent",
|
||
participant_id=src_part.id if src_part else None,
|
||
peer_agent_id=session_peer_id,
|
||
)
|
||
db.add(chat_session)
|
||
await db.flush()
|
||
|
||
session_id = str(chat_session.id)
|
||
src_agent_r2 = await db.execute(select(Agent).where(Agent.id == task.agent_id))
|
||
src_agent2 = src_agent_r2.scalar_one_or_none()
|
||
owner_id = src_agent2.creator_id if src_agent2 else task.agent_id
|
||
|
||
# Save reminder message
|
||
db.add(ChatMessage(
|
||
agent_id=session_agent_id, user_id=owner_id,
|
||
role="user", content=reminder_msg,
|
||
conversation_id=session_id,
|
||
participant_id=src_part.id if src_part else None,
|
||
))
|
||
await db.flush()
|
||
chat_session.last_message_at = datetime.now(timezone.utc)
|
||
sent = True
|
||
send_method = "agent消息"
|
||
|
||
# Trigger target agent's LLM to generate a reply
|
||
try:
|
||
reply = await _get_agent_reply(target_agent, reminder_msg, db)
|
||
if reply:
|
||
db.add(ChatMessage(
|
||
agent_id=session_agent_id, user_id=owner_id,
|
||
role="assistant", content=reply,
|
||
conversation_id=session_id,
|
||
participant_id=tgt_part.id if tgt_part else None,
|
||
))
|
||
send_method = f"agent消息+回复({reply[:40]})"
|
||
logger.info(f"📋 Target agent {target_agent.name} replied: {reply[:80]}")
|
||
except Exception as e:
|
||
logger.warning(f"Target agent reply failed: {e}")
|
||
else:
|
||
# 2. Fallback: find target as a Member in relationships
|
||
rel_result = await db.execute(
|
||
select(AgentRelationship)
|
||
.where(AgentRelationship.agent_id == task.agent_id)
|
||
.options(selectinload(AgentRelationship.member))
|
||
)
|
||
rels = rel_result.scalars().all()
|
||
target_member = None
|
||
for r in rels:
|
||
if r.member and r.member.name == target_name:
|
||
target_member = r.member
|
||
break
|
||
|
||
if target_member:
|
||
# Try Feishu
|
||
config_r = await db.execute(
|
||
select(ChannelConfig).where(
|
||
ChannelConfig.agent_id == task.agent_id,
|
||
ChannelConfig.channel_type == "feishu",
|
||
)
|
||
)
|
||
config = config_r.scalar_one_or_none()
|
||
if config and (target_member.email or target_member.phone):
|
||
try:
|
||
resolved = await feishu_service.resolve_open_id(
|
||
config.app_id, config.app_secret,
|
||
email=target_member.email, mobile=target_member.phone,
|
||
)
|
||
if resolved:
|
||
content = _json.dumps({"text": reminder_msg}, ensure_ascii=False)
|
||
resp = await feishu_service.send_message(
|
||
config.app_id, config.app_secret,
|
||
receive_id=resolved, msg_type="text",
|
||
content=content, receive_id_type="open_id",
|
||
)
|
||
if resp.get("code") == 0:
|
||
sent = True
|
||
send_method = "飞书"
|
||
except Exception:
|
||
pass
|
||
|
||
# Log result to TaskLog
|
||
if sent:
|
||
log = TaskLog(task_id=task.id, content=f"✅ 已向 {target_name} 发送督办提醒({send_method})")
|
||
elif target_agent or target_name:
|
||
log = TaskLog(task_id=task.id, content=f"📋 督办提醒已触发,目标:{target_name}")
|
||
else:
|
||
log = TaskLog(task_id=task.id, content=f"⚠️ 提醒失败:未找到联系人 '{target_name}'")
|
||
db.add(log)
|
||
|
||
# Log to AgentActivityLog for Activity tab visibility
|
||
activity = AgentActivityLog(
|
||
agent_id=task.agent_id,
|
||
action_type="schedule_run",
|
||
summary=f"📋 督办提醒:{task.title} → {target_name}" + (f"({send_method}已发送)" if sent else ""),
|
||
detail_json={"task_id": str(task.id), "target": target_name, "sent": sent},
|
||
related_id=task.id,
|
||
)
|
||
db.add(activity)
|
||
await db.commit()
|
||
|
||
logger.info(f"📋 Supervision reminder for '{task.title}' -> {target_name}, sent={sent}")
|
||
|
||
except Exception as e:
|
||
logger.exception(f"Supervision reminder error for task {task.id}: {e}")
|
||
|
||
|
||
async def _supervision_tick():
|
||
"""One tick: check all supervision tasks and send due reminders."""
|
||
logger.info("[supervision] tick running...")
|
||
from app.services.audit_logger import write_audit_log
|
||
|
||
try:
|
||
now = datetime.now(timezone.utc)
|
||
|
||
async with async_session() as db:
|
||
# Find active supervision tasks
|
||
result = await db.execute(
|
||
select(Task, Agent.name).join(Agent, Agent.id == Task.agent_id).where(
|
||
Task.type == "supervision",
|
||
Task.status.in_(["pending", "doing"]),
|
||
Task.remind_schedule.isnot(None),
|
||
)
|
||
)
|
||
rows = result.all()
|
||
logger.info(f"[supervision] found {len(rows)} supervision tasks")
|
||
|
||
await write_audit_log("supervision_tick", {"tasks_found": len(rows)})
|
||
|
||
for task, agent_name in rows:
|
||
try:
|
||
# Get last reminder log for this task
|
||
log_result = await db.execute(
|
||
select(TaskLog)
|
||
.where(TaskLog.task_id == task.id)
|
||
.order_by(TaskLog.created_at.desc())
|
||
.limit(1)
|
||
)
|
||
last_log = log_result.scalar_one_or_none()
|
||
last_reminded = last_log.created_at if last_log else None
|
||
|
||
if _is_reminder_due(task.remind_schedule, last_reminded, now):
|
||
logger.info(f"[supervision] FIRING reminder for '{task.title}' -> {task.supervision_target_name}")
|
||
await write_audit_log(
|
||
"supervision_fire",
|
||
{"task_id": str(task.id), "title": task.title, "target": task.supervision_target_name},
|
||
agent_id=task.agent_id,
|
||
)
|
||
await _send_supervision_reminder(task, agent_name)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error checking supervision task {task.id}: {e}")
|
||
|
||
except Exception as e:
|
||
logger.exception(f"Supervision tick error: {e}")
|
||
await write_audit_log("supervision_error", {"error": str(e)[:300]})
|
||
|
||
|
||
async def start_supervision_reminder():
|
||
"""Start the background supervision reminder loop. Call from FastAPI startup."""
|
||
logger.info("📋 [supervision] Reminder service started (60s tick)")
|
||
logger.info("📋 Supervision reminder service started (60s tick)")
|
||
while True:
|
||
await _supervision_tick()
|
||
await asyncio.sleep(60)
|