Clawith/backend/app/services/autonomy_service.py

316 lines
13 KiB
Python

"""Autonomy boundary enforcement service.
Implements the three-level autonomy system:
L1 — Auto-execute, notify creator
L2 — Notify creator, auto-execute
L3 — Require explicit approval before execution
"""
import json
import uuid
from datetime import datetime, timezone
from loguru import logger
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.agent import Agent
from app.models.audit import ApprovalRequest, AuditLog
from app.models.channel_config import ChannelConfig
from app.models.user import User
from app.services.feishu_service import feishu_service
class AutonomyService:
"""Enforce autonomy boundaries for agent operations."""
async def check_and_enforce(
self, db: AsyncSession, agent: Agent, action_type: str, details: dict
) -> dict:
"""Check if an action is allowed under the agent's autonomy policy.
Returns:
{
"allowed": True/False,
"level": "L1"/"L2"/"L3",
"approval_id": uuid (if L3),
"message": str,
}
"""
policy = agent.autonomy_policy or {}
level = policy.get(action_type, "L2") # Default to L2
# Log the action regardless of level
audit = AuditLog(
agent_id=agent.id,
action=f"autonomy_check:{action_type}",
details={"level": level, **details},
)
db.add(audit)
if level == "L1":
# Auto-execute, just log
logger.info(f"L1: Auto-executing {action_type} for agent {agent.name}")
return {
"allowed": True,
"level": "L1",
"message": "Auto-executed",
}
elif level == "L2":
# Auto-execute but notify creator
logger.info(f"L2: Executing {action_type} for agent {agent.name} with notification")
await self._notify_creator(db, agent, action_type, details)
return {
"allowed": True,
"level": "L2",
"message": "Executed and creator notified",
}
elif level == "L3":
# Create approval request and block
approval = ApprovalRequest(
agent_id=agent.id,
action_type=action_type,
details=details,
)
db.add(approval)
await db.flush()
logger.info(f"L3: Approval required for {action_type} by agent {agent.name}")
await self._request_approval(db, agent, approval)
return {
"allowed": False,
"level": "L3",
"approval_id": str(approval.id),
"message": "Approval requested from creator",
}
return {"allowed": False, "level": "unknown", "message": "Unknown autonomy level"}
async def resolve_approval(
self, db: AsyncSession, approval_id: uuid.UUID, user: User, action: str
) -> ApprovalRequest:
"""Approve or reject a pending approval request."""
result = await db.execute(
select(ApprovalRequest).where(ApprovalRequest.id == approval_id)
)
approval = result.scalar_one_or_none()
if not approval:
raise ValueError("Approval not found")
if approval.status != "pending":
raise ValueError("Approval already resolved")
# Permission check: only agent creator or platform admin can resolve
agent_result = await db.execute(select(Agent).where(Agent.id == approval.agent_id))
agent = agent_result.scalar_one_or_none()
if agent and agent.creator_id != user.id and user.role != "platform_admin":
raise ValueError("Only the agent creator or platform admin can resolve approvals")
approval.status = "approved" if action == "approve" else "rejected"
approval.resolved_at = datetime.now(timezone.utc)
approval.resolved_by = user.id
# Log
db.add(AuditLog(
user_id=user.id,
agent_id=approval.agent_id,
action=f"approval_{approval.status}",
details={"approval_id": str(approval.id), "action_type": approval.action_type},
))
# Post-processing: execute the approved action
execution_result = None
if approval.status == "approved" and approval.details:
execution_result = await self._execute_approved_action(
approval.agent_id, approval.action_type, approval.details
)
logger.info(f"Post-approval execution for {approval.action_type}: {execution_result}")
# Web notification to agent creator about the result
if agent:
from app.services.notification_service import send_notification
status_label = "approved" if approval.status == "approved" else "rejected"
body_text = json.dumps(approval.details, ensure_ascii=False)[:200]
if execution_result:
body_text = f"Result: {execution_result}"
await send_notification(
db,
user_id=agent.creator_id,
type="approval_resolved",
title=f"[{agent.name}] {approval.action_type}{status_label}",
body=body_text,
link=f"/agents/{agent.id}#approvals",
ref_id=approval.id,
)
# Also notify the user who requested the action (if different from creator)
requested_by = approval.details.get("requested_by") if approval.details else None
if requested_by:
try:
requester_id = uuid.UUID(requested_by)
if requester_id != agent.creator_id:
await send_notification(
db,
user_id=requester_id,
type="approval_resolved",
title=f"[{agent.name}] {approval.action_type}{status_label}",
body=body_text,
link=f"/agents/{agent.id}#activityLog",
ref_id=approval.id,
)
except (ValueError, AttributeError):
pass # Invalid UUID, skip
await db.flush()
return approval
async def _execute_approved_action(
self, agent_id: uuid.UUID, action_type: str, details: dict
) -> str | None:
"""Execute the tool action that was approved.
Reads the tool name and arguments from the approval details,
then directly calls the tool executor (bypassing autonomy check).
"""
tool_name = details.get("tool")
args_raw = details.get("args", "{}")
if not tool_name:
return None
try:
# Parse args — stored as str(dict) so we need ast.literal_eval
import ast
if isinstance(args_raw, str):
try:
arguments = ast.literal_eval(args_raw)
except (ValueError, SyntaxError):
try:
arguments = json.loads(args_raw)
except json.JSONDecodeError:
arguments = {}
else:
arguments = args_raw
# Import and call the tool's direct executor (no autonomy re-check)
from app.services.agent_tools import _execute_tool_direct
result = await _execute_tool_direct(tool_name, arguments, agent_id)
return result
except Exception as e:
logger.error(f"Failed to execute approved action {tool_name}: {e}")
return f"Execution failed: {e}"
async def _notify_creator(self, db: AsyncSession, agent: Agent,
action_type: str, details: dict) -> None:
"""Send L2 notification to agent creator via Feishu + web."""
# Web notification (always)
from app.services.notification_service import send_notification
await send_notification(
db,
user_id=agent.creator_id,
type="autonomy_l2",
title=f"[{agent.name}] executed: {action_type}",
body=json.dumps(details, ensure_ascii=False)[:200],
link=f"/agents/{agent.id}#activityLog",
)
# Try Feishu notification if channel is configured
channel_result = await db.execute(
select(ChannelConfig).where(ChannelConfig.agent_id == agent.id)
)
channel = channel_result.scalars().first()
if channel and channel.app_id and channel.app_secret:
creator_result = await db.execute(
select(User).where(User.id == agent.creator_id)
)
creator = creator_result.scalar_one_or_none()
if creator:
from app.models.identity import IdentityProvider
from app.models.org import OrgMember
provider_r = await db.execute(
select(IdentityProvider).where(
IdentityProvider.provider_type == "feishu",
IdentityProvider.tenant_id == creator.tenant_id,
)
)
provider = provider_r.scalar_one_or_none()
if provider:
member_r = await db.execute(
select(OrgMember).where(
OrgMember.user_id == creator.id,
OrgMember.provider_id == provider.id,
)
)
member = member_r.scalar_one_or_none()
if member and (member.external_id or member.open_id):
receive_id = member.external_id or member.open_id
id_type = "user_id" if member.external_id else "open_id"
await feishu_service.send_message(
channel.app_id, channel.app_secret,
receive_id, "text",
json.dumps({"text": f"[{agent.name}] executed: {action_type}"}),
receive_id_type=id_type,
)
async def _request_approval(self, db: AsyncSession, agent: Agent,
approval: ApprovalRequest) -> None:
"""Send L3 approval request to creator via Feishu card + web notification."""
# Web notification (always)
from app.services.notification_service import send_notification
await send_notification(
db,
user_id=agent.creator_id,
type="approval_pending",
title=f"[{agent.name}] requests approval: {approval.action_type}",
body=json.dumps(approval.details, ensure_ascii=False)[:200],
link=f"/agents/{agent.id}#approvals",
ref_id=approval.id,
)
# Try Feishu notification
channel_result = await db.execute(
select(ChannelConfig).where(ChannelConfig.agent_id == agent.id)
)
channel = channel_result.scalars().first()
if channel and channel.app_id and channel.app_secret:
creator_result = await db.execute(
select(User).where(User.id == agent.creator_id)
)
creator = creator_result.scalar_one_or_none()
if creator:
from app.models.identity import IdentityProvider
from app.models.org import OrgMember
provider_r = await db.execute(
select(IdentityProvider).where(
IdentityProvider.provider_type == "feishu",
IdentityProvider.tenant_id == creator.tenant_id,
)
)
provider = provider_r.scalar_one_or_none()
if provider:
member_r = await db.execute(
select(OrgMember).where(
OrgMember.user_id == creator.id,
OrgMember.provider_id == provider.id,
)
)
member = member_r.scalar_one_or_none()
if member and (member.external_id or member.open_id):
receive_id = member.external_id or member.open_id
await feishu_service.send_approval_card(
channel.app_id, channel.app_secret,
receive_id,
agent.name, approval.action_type,
json.dumps(approval.details, ensure_ascii=False),
str(approval.id),
)
autonomy_service = AutonomyService()