233 lines
10 KiB
Python
233 lines
10 KiB
Python
"""Lightweight asyncio scheduler for agent cron jobs.
|
|
|
|
Runs as a background task inside the FastAPI process.
|
|
Every 30 seconds, checks for schedules whose next_run_at <= now
|
|
and executes them by calling the LLM with the schedule's instruction.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
|
|
from croniter import croniter
|
|
from loguru import logger
|
|
from sqlalchemy import select, update
|
|
|
|
|
|
def compute_next_run(cron_expr: str, after: datetime | None = None) -> datetime | None:
|
|
"""Compute the next run time from a cron expression."""
|
|
try:
|
|
base = after or datetime.now(timezone.utc)
|
|
cron = croniter(cron_expr, base)
|
|
return cron.get_next(datetime).replace(tzinfo=timezone.utc)
|
|
except Exception as e:
|
|
logger.error(f"Invalid cron expression '{cron_expr}': {e}")
|
|
return None
|
|
|
|
|
|
async def _execute_schedule(schedule_id: uuid.UUID, agent_id: uuid.UUID, instruction: str):
|
|
"""Execute a single schedule by calling the LLM with the instruction."""
|
|
try:
|
|
from app.database import async_session
|
|
from app.models.agent import Agent
|
|
from app.models.llm import LLMModel
|
|
|
|
async with async_session() as db:
|
|
# Load agent + model
|
|
result = await db.execute(select(Agent).where(Agent.id == agent_id))
|
|
agent = result.scalar_one_or_none()
|
|
if not agent:
|
|
logger.warning(f"Schedule {schedule_id}: agent {agent_id} not found")
|
|
return
|
|
|
|
if agent.status != "running":
|
|
logger.info(f"Schedule {schedule_id}: agent {agent.name} not running, skipping")
|
|
return
|
|
|
|
from app.core.permissions import is_agent_expired
|
|
if is_agent_expired(agent):
|
|
logger.info(f"Schedule {schedule_id}: agent {agent.name} has expired, skipping")
|
|
return
|
|
|
|
model_id = agent.primary_model_id or agent.fallback_model_id
|
|
if not model_id:
|
|
logger.warning(f"Schedule {schedule_id}: agent {agent.name} has no LLM model")
|
|
return
|
|
|
|
model_result = await db.execute(select(LLMModel).where(LLMModel.id == model_id))
|
|
model = model_result.scalar_one_or_none()
|
|
if not model:
|
|
logger.warning(f"Schedule {schedule_id}: LLM model {model_id} not found")
|
|
return
|
|
|
|
# Build context and call LLM
|
|
from app.services.agent_context import build_agent_context
|
|
from app.services.agent_tools import execute_tool, get_agent_tools_for_llm
|
|
from app.services.llm_utils import create_llm_client, get_max_tokens, LLMMessage, LLMError
|
|
|
|
static_prompt, dynamic_prompt = await build_agent_context(agent_id, agent.name, agent.role_description or "")
|
|
|
|
messages = [
|
|
LLMMessage(role="system", content=static_prompt, dynamic_content=dynamic_prompt),
|
|
LLMMessage(role="user", content=f"[自动调度任务] {instruction}"),
|
|
]
|
|
|
|
# Load tools dynamically from DB (respects per-agent config and MCP tools)
|
|
tools_for_llm = await get_agent_tools_for_llm(agent_id)
|
|
|
|
# Create unified LLM client
|
|
try:
|
|
client = create_llm_client(
|
|
provider=model.provider,
|
|
api_key=model.api_key_encrypted,
|
|
model=model.model,
|
|
base_url=model.base_url,
|
|
timeout=float(getattr(model, 'request_timeout', None) or 120.0),
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Schedule {schedule_id}: Failed to create LLM client: {e}")
|
|
return
|
|
|
|
# Tool-calling loop (max 50 rounds for scheduled tasks)
|
|
reply = ""
|
|
for round_i in range(50):
|
|
try:
|
|
response = await client.complete(
|
|
messages=messages,
|
|
tools=tools_for_llm if tools_for_llm else None,
|
|
temperature=model.temperature,
|
|
max_tokens=get_max_tokens(model.provider, model.model, getattr(model, 'max_output_tokens', None)),
|
|
)
|
|
except LLMError as e:
|
|
logger.error(f"Schedule {schedule_id}: LLM error: {e}")
|
|
reply = f"(LLM 错误: {e})"
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Schedule {schedule_id}: LLM call error: {e}")
|
|
reply = f"(LLM 调用异常: {str(e)[:200]})"
|
|
break
|
|
|
|
if response.tool_calls:
|
|
# Add assistant message with tool calls
|
|
messages.append(LLMMessage(
|
|
role="assistant",
|
|
content=response.content or None,
|
|
tool_calls=[{
|
|
"id": tc["id"],
|
|
"type": "function",
|
|
"function": tc["function"],
|
|
} for tc in response.tool_calls],
|
|
reasoning_content=response.reasoning_content,
|
|
))
|
|
|
|
# Tools that require arguments — if LLM sends empty args, skip and ask to retry
|
|
_TOOLS_REQUIRING_ARGS = {
|
|
"write_file", "read_file", "delete_file", "read_document",
|
|
"send_message_to_agent", "send_feishu_message", "send_email",
|
|
"web_search", "jina_search", "jina_read",
|
|
}
|
|
|
|
for tc in response.tool_calls:
|
|
fn = tc["function"]
|
|
tool_name = fn["name"]
|
|
raw_args = fn.get("arguments", "{}")
|
|
logger.info(f"[Scheduler] Raw arguments for {tool_name} (len={len(raw_args) if raw_args else 0}): {repr(raw_args[:300]) if raw_args else 'None'}")
|
|
try:
|
|
args = json.loads(raw_args) if raw_args else {}
|
|
except json.JSONDecodeError as je:
|
|
logger.warning(f"[Scheduler] JSON parse failed for {tool_name}: {je}. Raw: {repr(raw_args[:200])}")
|
|
args = {}
|
|
|
|
# Guard: if a tool that requires arguments received empty args,
|
|
# return an error to LLM instead of executing
|
|
if not args and tool_name in _TOOLS_REQUIRING_ARGS:
|
|
logger.warning(f"[Scheduler] Empty arguments for {tool_name}, asking LLM to retry")
|
|
messages.append(LLMMessage(
|
|
role="tool",
|
|
tool_call_id=tc["id"],
|
|
content=f"Error: {tool_name} was called with empty arguments. You must provide the required parameters. Please retry with the correct arguments.",
|
|
))
|
|
continue
|
|
|
|
tool_result = await execute_tool(fn["name"], args, agent_id, agent.creator_id)
|
|
messages.append(LLMMessage(
|
|
role="tool",
|
|
tool_call_id=tc["id"],
|
|
content=str(tool_result),
|
|
))
|
|
else:
|
|
reply = response.content or ""
|
|
break
|
|
else:
|
|
reply = "(已达到最大工具调用轮数)"
|
|
|
|
await client.close()
|
|
|
|
# Log activity
|
|
from app.services.activity_logger import log_activity
|
|
await log_activity(
|
|
agent_id, "schedule_run",
|
|
f"定时任务执行: {instruction[:60]}",
|
|
detail={"schedule_id": str(schedule_id), "instruction": instruction, "reply": reply[:500]},
|
|
)
|
|
|
|
logger.info(f"Schedule {schedule_id} executed for agent {agent.name}: {reply[:80]}")
|
|
|
|
except Exception as e:
|
|
logger.exception(f"Schedule {schedule_id} execution error: {e}")
|
|
|
|
|
|
async def _tick():
|
|
"""One scheduler tick: find and execute due schedules."""
|
|
from app.database import async_session
|
|
from app.models.schedule import AgentSchedule
|
|
from app.services.audit_logger import write_audit_log
|
|
|
|
now = datetime.now(timezone.utc)
|
|
|
|
try:
|
|
async with async_session() as db:
|
|
result = await db.execute(
|
|
select(AgentSchedule).where(
|
|
AgentSchedule.is_enabled == True,
|
|
AgentSchedule.next_run_at <= now,
|
|
)
|
|
)
|
|
due_schedules = result.scalars().all()
|
|
|
|
if due_schedules:
|
|
await write_audit_log("schedule_tick", {"due_count": len(due_schedules)})
|
|
|
|
for sched in due_schedules:
|
|
# Update run tracking immediately
|
|
next_run = compute_next_run(sched.cron_expr, now)
|
|
sched.last_run_at = now
|
|
sched.next_run_at = next_run
|
|
sched.run_count = (sched.run_count or 0) + 1
|
|
await db.commit()
|
|
|
|
await write_audit_log(
|
|
"schedule_fire",
|
|
{"schedule_id": str(sched.id), "name": sched.name, "instruction": sched.instruction[:100], "next_run": str(next_run)},
|
|
agent_id=sched.agent_id,
|
|
)
|
|
|
|
# Fire execution in background (don't block ticker)
|
|
asyncio.create_task(
|
|
_execute_schedule(sched.id, sched.agent_id, sched.instruction)
|
|
)
|
|
logger.info(f"Triggered schedule '{sched.name}' (next: {next_run})")
|
|
|
|
except Exception as e:
|
|
logger.exception(f"Scheduler tick error: {e}")
|
|
await write_audit_log("schedule_error", {"error": str(e)[:300]})
|
|
|
|
|
|
async def start_scheduler():
|
|
"""Start the background scheduler loop. Call from FastAPI startup."""
|
|
logger.info("🕐 Agent scheduler started (30s interval)")
|
|
while True:
|
|
await _tick()
|
|
await asyncio.sleep(30)
|