236 lines
7.9 KiB
Python
236 lines
7.9 KiB
Python
"""Schedule API — CRUD for agent cron jobs."""
|
|
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from pydantic import BaseModel, Field
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.core.permissions import check_agent_access, is_agent_creator, is_agent_expired
|
|
from app.core.security import get_current_user, require_role
|
|
from app.database import get_db
|
|
from app.models.schedule import AgentSchedule
|
|
from app.models.user import User
|
|
from app.services.scheduler import compute_next_run
|
|
|
|
router = APIRouter(prefix="/agents/{agent_id}/schedules", tags=["schedules"])
|
|
|
|
|
|
class ScheduleCreate(BaseModel):
|
|
name: str = Field(min_length=1, max_length=200)
|
|
instruction: str = Field(default='', max_length=5000)
|
|
cron_expr: str = Field(min_length=1, max_length=100)
|
|
is_enabled: bool = True
|
|
|
|
|
|
class ScheduleUpdate(BaseModel):
|
|
name: str | None = None
|
|
instruction: str | None = None
|
|
cron_expr: str | None = None
|
|
is_enabled: bool | None = None
|
|
|
|
|
|
class ScheduleOut(BaseModel):
|
|
id: uuid.UUID
|
|
agent_id: uuid.UUID
|
|
name: str
|
|
instruction: str
|
|
cron_expr: str
|
|
is_enabled: bool
|
|
last_run_at: datetime | None = None
|
|
next_run_at: datetime | None = None
|
|
run_count: int
|
|
created_by: uuid.UUID | None = None
|
|
creator_username: str | None = None
|
|
created_at: datetime | None = None
|
|
|
|
model_config = {"from_attributes": True}
|
|
|
|
|
|
@router.get("/", response_model=list[ScheduleOut])
|
|
async def list_schedules(
|
|
agent_id: uuid.UUID,
|
|
current_user: User = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
"""List all schedules for an agent."""
|
|
await check_agent_access(db, current_user, agent_id)
|
|
result = await db.execute(
|
|
select(AgentSchedule)
|
|
.where(AgentSchedule.agent_id == agent_id)
|
|
.order_by(AgentSchedule.created_at.desc())
|
|
)
|
|
schedules = result.scalars().all()
|
|
# Batch-load creator usernames
|
|
creator_ids = {s.created_by for s in schedules if s.created_by}
|
|
creator_map = {}
|
|
if creator_ids:
|
|
users_result = await db.execute(select(User).where(User.id.in_(creator_ids)))
|
|
creator_map = {u.id: u.username for u in users_result.scalars().all()}
|
|
out_list = []
|
|
for s in schedules:
|
|
s_out = ScheduleOut.model_validate(s)
|
|
s_out.creator_username = creator_map.get(s.created_by)
|
|
out_list.append(s_out)
|
|
return out_list
|
|
|
|
|
|
@router.post("/", response_model=ScheduleOut, status_code=status.HTTP_201_CREATED)
|
|
async def create_schedule(
|
|
agent_id: uuid.UUID,
|
|
data: ScheduleCreate,
|
|
current_user: User = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
"""Create a new schedule for an agent."""
|
|
agent, _access = await check_agent_access(db, current_user, agent_id)
|
|
if not is_agent_creator(current_user, agent):
|
|
raise HTTPException(status_code=403, detail="Only creator can manage schedules")
|
|
|
|
# Validate cron expression
|
|
next_run = compute_next_run(data.cron_expr)
|
|
if not next_run:
|
|
raise HTTPException(status_code=400, detail=f"Invalid cron expression: {data.cron_expr}")
|
|
|
|
sched = AgentSchedule(
|
|
agent_id=agent_id,
|
|
name=data.name,
|
|
instruction=data.instruction,
|
|
cron_expr=data.cron_expr,
|
|
is_enabled=data.is_enabled,
|
|
next_run_at=next_run if data.is_enabled else None,
|
|
created_by=current_user.id,
|
|
)
|
|
db.add(sched)
|
|
await db.flush()
|
|
return ScheduleOut.model_validate(sched)
|
|
|
|
|
|
@router.patch("/{schedule_id}", response_model=ScheduleOut)
|
|
async def update_schedule(
|
|
agent_id: uuid.UUID,
|
|
schedule_id: uuid.UUID,
|
|
data: ScheduleUpdate,
|
|
current_user: User = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
"""Update a schedule."""
|
|
agent, _access = await check_agent_access(db, current_user, agent_id)
|
|
if not is_agent_creator(current_user, agent):
|
|
raise HTTPException(status_code=403, detail="Only creator can manage schedules")
|
|
|
|
result = await db.execute(
|
|
select(AgentSchedule).where(AgentSchedule.id == schedule_id, AgentSchedule.agent_id == agent_id)
|
|
)
|
|
sched = result.scalar_one_or_none()
|
|
if not sched:
|
|
raise HTTPException(status_code=404, detail="Schedule not found")
|
|
|
|
updates = data.model_dump(exclude_unset=True)
|
|
for field, value in updates.items():
|
|
setattr(sched, field, value)
|
|
|
|
# Recompute next_run if cron or enabled changed
|
|
if "cron_expr" in updates or "is_enabled" in updates:
|
|
if sched.is_enabled:
|
|
sched.next_run_at = compute_next_run(sched.cron_expr)
|
|
else:
|
|
sched.next_run_at = None
|
|
|
|
await db.flush()
|
|
return ScheduleOut.model_validate(sched)
|
|
|
|
|
|
@router.delete("/{schedule_id}", status_code=status.HTTP_204_NO_CONTENT)
|
|
async def delete_schedule(
|
|
agent_id: uuid.UUID,
|
|
schedule_id: uuid.UUID,
|
|
current_user: User = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
"""Delete a schedule."""
|
|
agent, _access = await check_agent_access(db, current_user, agent_id)
|
|
if not is_agent_creator(current_user, agent):
|
|
raise HTTPException(status_code=403, detail="Only creator can manage schedules")
|
|
|
|
result = await db.execute(
|
|
select(AgentSchedule).where(AgentSchedule.id == schedule_id, AgentSchedule.agent_id == agent_id)
|
|
)
|
|
sched = result.scalar_one_or_none()
|
|
if not sched:
|
|
raise HTTPException(status_code=404, detail="Schedule not found")
|
|
|
|
await db.delete(sched)
|
|
await db.flush()
|
|
|
|
|
|
@router.post("/{schedule_id}/run")
|
|
async def trigger_schedule(
|
|
agent_id: uuid.UUID,
|
|
schedule_id: uuid.UUID,
|
|
current_user: User = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
"""Manually trigger a schedule execution."""
|
|
agent, _access = await check_agent_access(db, current_user, agent_id)
|
|
if is_agent_expired(agent):
|
|
raise HTTPException(status_code=403, detail="Agent has expired and cannot be triggered.")
|
|
|
|
result = await db.execute(
|
|
select(AgentSchedule).where(AgentSchedule.id == schedule_id, AgentSchedule.agent_id == agent_id)
|
|
)
|
|
sched = result.scalar_one_or_none()
|
|
if not sched:
|
|
raise HTTPException(status_code=404, detail="Schedule not found")
|
|
|
|
# Fire in background
|
|
import asyncio
|
|
from app.services.scheduler import _execute_schedule
|
|
asyncio.create_task(_execute_schedule(sched.id, sched.agent_id, sched.instruction))
|
|
|
|
# Update tracking
|
|
sched.last_run_at = datetime.now(timezone.utc)
|
|
sched.run_count = (sched.run_count or 0) + 1
|
|
await db.flush()
|
|
|
|
return {"status": "triggered", "schedule_id": str(schedule_id)}
|
|
|
|
|
|
@router.get("/{schedule_id}/history")
|
|
async def get_schedule_history(
|
|
agent_id: uuid.UUID,
|
|
schedule_id: uuid.UUID,
|
|
current_user: User = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
"""Get execution history for a schedule from activity logs."""
|
|
await check_agent_access(db, current_user, agent_id)
|
|
from app.models.activity_log import AgentActivityLog
|
|
result = await db.execute(
|
|
select(AgentActivityLog)
|
|
.where(
|
|
AgentActivityLog.agent_id == agent_id,
|
|
AgentActivityLog.action_type == "schedule_run",
|
|
)
|
|
.order_by(AgentActivityLog.created_at.desc())
|
|
)
|
|
logs = result.scalars().all()
|
|
# Filter by schedule_id in detail_json
|
|
history = []
|
|
for log in logs:
|
|
detail = log.detail_json or {}
|
|
if detail.get("schedule_id") == str(schedule_id):
|
|
history.append({
|
|
"id": str(log.id),
|
|
"created_at": log.created_at.isoformat() if log.created_at else None,
|
|
"summary": log.summary,
|
|
"instruction": detail.get("instruction", ""),
|
|
"reply": detail.get("reply", ""),
|
|
})
|
|
if len(history) >= 20:
|
|
break
|
|
return history
|
|
|