Clawith/backend/app/api/schedules.py

236 lines
7.9 KiB
Python

"""Schedule API — CRUD for agent cron jobs."""
import uuid
from datetime import datetime, timezone
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel, Field
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.permissions import check_agent_access, is_agent_creator, is_agent_expired
from app.core.security import get_current_user, require_role
from app.database import get_db
from app.models.schedule import AgentSchedule
from app.models.user import User
from app.services.scheduler import compute_next_run
router = APIRouter(prefix="/agents/{agent_id}/schedules", tags=["schedules"])
class ScheduleCreate(BaseModel):
name: str = Field(min_length=1, max_length=200)
instruction: str = Field(default='', max_length=5000)
cron_expr: str = Field(min_length=1, max_length=100)
is_enabled: bool = True
class ScheduleUpdate(BaseModel):
name: str | None = None
instruction: str | None = None
cron_expr: str | None = None
is_enabled: bool | None = None
class ScheduleOut(BaseModel):
id: uuid.UUID
agent_id: uuid.UUID
name: str
instruction: str
cron_expr: str
is_enabled: bool
last_run_at: datetime | None = None
next_run_at: datetime | None = None
run_count: int
created_by: uuid.UUID | None = None
creator_username: str | None = None
created_at: datetime | None = None
model_config = {"from_attributes": True}
@router.get("/", response_model=list[ScheduleOut])
async def list_schedules(
agent_id: uuid.UUID,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""List all schedules for an agent."""
await check_agent_access(db, current_user, agent_id)
result = await db.execute(
select(AgentSchedule)
.where(AgentSchedule.agent_id == agent_id)
.order_by(AgentSchedule.created_at.desc())
)
schedules = result.scalars().all()
# Batch-load creator usernames
creator_ids = {s.created_by for s in schedules if s.created_by}
creator_map = {}
if creator_ids:
users_result = await db.execute(select(User).where(User.id.in_(creator_ids)))
creator_map = {u.id: u.username for u in users_result.scalars().all()}
out_list = []
for s in schedules:
s_out = ScheduleOut.model_validate(s)
s_out.creator_username = creator_map.get(s.created_by)
out_list.append(s_out)
return out_list
@router.post("/", response_model=ScheduleOut, status_code=status.HTTP_201_CREATED)
async def create_schedule(
agent_id: uuid.UUID,
data: ScheduleCreate,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Create a new schedule for an agent."""
agent, _access = await check_agent_access(db, current_user, agent_id)
if not is_agent_creator(current_user, agent):
raise HTTPException(status_code=403, detail="Only creator can manage schedules")
# Validate cron expression
next_run = compute_next_run(data.cron_expr)
if not next_run:
raise HTTPException(status_code=400, detail=f"Invalid cron expression: {data.cron_expr}")
sched = AgentSchedule(
agent_id=agent_id,
name=data.name,
instruction=data.instruction,
cron_expr=data.cron_expr,
is_enabled=data.is_enabled,
next_run_at=next_run if data.is_enabled else None,
created_by=current_user.id,
)
db.add(sched)
await db.flush()
return ScheduleOut.model_validate(sched)
@router.patch("/{schedule_id}", response_model=ScheduleOut)
async def update_schedule(
agent_id: uuid.UUID,
schedule_id: uuid.UUID,
data: ScheduleUpdate,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Update a schedule."""
agent, _access = await check_agent_access(db, current_user, agent_id)
if not is_agent_creator(current_user, agent):
raise HTTPException(status_code=403, detail="Only creator can manage schedules")
result = await db.execute(
select(AgentSchedule).where(AgentSchedule.id == schedule_id, AgentSchedule.agent_id == agent_id)
)
sched = result.scalar_one_or_none()
if not sched:
raise HTTPException(status_code=404, detail="Schedule not found")
updates = data.model_dump(exclude_unset=True)
for field, value in updates.items():
setattr(sched, field, value)
# Recompute next_run if cron or enabled changed
if "cron_expr" in updates or "is_enabled" in updates:
if sched.is_enabled:
sched.next_run_at = compute_next_run(sched.cron_expr)
else:
sched.next_run_at = None
await db.flush()
return ScheduleOut.model_validate(sched)
@router.delete("/{schedule_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_schedule(
agent_id: uuid.UUID,
schedule_id: uuid.UUID,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Delete a schedule."""
agent, _access = await check_agent_access(db, current_user, agent_id)
if not is_agent_creator(current_user, agent):
raise HTTPException(status_code=403, detail="Only creator can manage schedules")
result = await db.execute(
select(AgentSchedule).where(AgentSchedule.id == schedule_id, AgentSchedule.agent_id == agent_id)
)
sched = result.scalar_one_or_none()
if not sched:
raise HTTPException(status_code=404, detail="Schedule not found")
await db.delete(sched)
await db.flush()
@router.post("/{schedule_id}/run")
async def trigger_schedule(
agent_id: uuid.UUID,
schedule_id: uuid.UUID,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Manually trigger a schedule execution."""
agent, _access = await check_agent_access(db, current_user, agent_id)
if is_agent_expired(agent):
raise HTTPException(status_code=403, detail="Agent has expired and cannot be triggered.")
result = await db.execute(
select(AgentSchedule).where(AgentSchedule.id == schedule_id, AgentSchedule.agent_id == agent_id)
)
sched = result.scalar_one_or_none()
if not sched:
raise HTTPException(status_code=404, detail="Schedule not found")
# Fire in background
import asyncio
from app.services.scheduler import _execute_schedule
asyncio.create_task(_execute_schedule(sched.id, sched.agent_id, sched.instruction))
# Update tracking
sched.last_run_at = datetime.now(timezone.utc)
sched.run_count = (sched.run_count or 0) + 1
await db.flush()
return {"status": "triggered", "schedule_id": str(schedule_id)}
@router.get("/{schedule_id}/history")
async def get_schedule_history(
agent_id: uuid.UUID,
schedule_id: uuid.UUID,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Get execution history for a schedule from activity logs."""
await check_agent_access(db, current_user, agent_id)
from app.models.activity_log import AgentActivityLog
result = await db.execute(
select(AgentActivityLog)
.where(
AgentActivityLog.agent_id == agent_id,
AgentActivityLog.action_type == "schedule_run",
)
.order_by(AgentActivityLog.created_at.desc())
)
logs = result.scalars().all()
# Filter by schedule_id in detail_json
history = []
for log in logs:
detail = log.detail_json or {}
if detail.get("schedule_id") == str(schedule_id):
history.append({
"id": str(log.id),
"created_at": log.created_at.isoformat() if log.created_at else None,
"summary": log.summary,
"instruction": detail.get("instruction", ""),
"reply": detail.get("reply", ""),
})
if len(history) >= 20:
break
return history