Clawith/backend/app/api/agent_credentials.py

245 lines
8.5 KiB
Python

"""Agent Credentials CRUD API routes.
Provides endpoints for managing encrypted login credentials and cookies
per agent. Sensitive fields (cookies_json, password) are encrypted at rest
using AES-256-CBC and are NEVER returned in API responses.
"""
import json
import uuid
from datetime import datetime, timezone
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import get_settings
from app.core.permissions import check_agent_access
from app.core.security import encrypt_data, decrypt_data, get_current_user
from app.database import get_db
from app.models.agent_credential import AgentCredential
from app.models.user import User
from app.schemas.agent_credential import (
AgentCredentialCreate,
AgentCredentialResponse,
AgentCredentialUpdate,
)
router = APIRouter(prefix="/agents/{agent_id}/credentials", tags=["agent-credentials"])
def _to_response(cred: AgentCredential) -> dict:
"""Convert an AgentCredential ORM object to a safe response dict.
Decrypts username for display but NEVER exposes password or cookies_json.
Uses has_cookies / has_password boolean flags instead.
"""
settings = get_settings()
# Decrypt username for display (safe to show)
decrypted_username = None
if cred.username:
try:
decrypted_username = decrypt_data(cred.username, settings.SECRET_KEY)
except Exception:
decrypted_username = "(decryption failed)"
return {
"id": cred.id,
"agent_id": cred.agent_id,
"credential_type": cred.credential_type,
"platform": cred.platform,
"display_name": cred.display_name or "",
"username": decrypted_username,
"login_url": cred.login_url,
"status": cred.status,
"cookies_updated_at": cred.cookies_updated_at,
"last_login_at": cred.last_login_at,
"last_injected_at": cred.last_injected_at,
"has_cookies": bool(cred.cookies_json),
"has_password": bool(cred.password),
"created_at": cred.created_at,
"updated_at": cred.updated_at,
}
@router.get("/")
async def list_credentials(
agent_id: uuid.UUID,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""List all credentials for an agent (sensitive data excluded)."""
# Verify the user has manage-level access to this agent
_agent, access_level = await check_agent_access(db, current_user, agent_id)
if access_level not in ("manage",) and current_user.role not in ("platform_admin", "org_admin"):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Manage access required to view credentials",
)
result = await db.execute(
select(AgentCredential)
.where(AgentCredential.agent_id == agent_id)
.order_by(AgentCredential.created_at.desc())
)
credentials = result.scalars().all()
return [_to_response(c) for c in credentials]
@router.post("/", status_code=status.HTTP_201_CREATED)
async def create_credential(
agent_id: uuid.UUID,
data: AgentCredentialCreate,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Create a new credential for an agent.
Sensitive fields (username, password, cookies_json) are encrypted before storage.
"""
_agent, access_level = await check_agent_access(db, current_user, agent_id)
if access_level not in ("manage",) and current_user.role not in ("platform_admin", "org_admin"):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Manage access required to create credentials",
)
settings = get_settings()
# Validate cookies_json format if provided
if data.cookies_json:
try:
parsed = json.loads(data.cookies_json)
if not isinstance(parsed, list):
raise ValueError("cookies_json must be a JSON array")
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid cookies_json format: {e}",
)
cred = AgentCredential(
agent_id=agent_id,
credential_type=data.credential_type,
platform=data.platform,
display_name=data.display_name or "",
login_url=data.login_url,
status="active",
)
# Encrypt sensitive fields
if data.username:
cred.username = encrypt_data(data.username, settings.SECRET_KEY)
if data.password:
cred.password = encrypt_data(data.password, settings.SECRET_KEY)
if data.cookies_json:
cred.cookies_json = encrypt_data(data.cookies_json, settings.SECRET_KEY)
cred.cookies_updated_at = datetime.now(timezone.utc)
db.add(cred)
await db.commit()
await db.refresh(cred)
return _to_response(cred)
@router.put("/{credential_id}")
async def update_credential(
agent_id: uuid.UUID,
credential_id: uuid.UUID,
data: AgentCredentialUpdate,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Update an existing credential.
Only provided fields are updated. Sensitive fields are re-encrypted.
If cookies_json is updated, status is reset to 'active'.
"""
_agent, access_level = await check_agent_access(db, current_user, agent_id)
if access_level not in ("manage",) and current_user.role not in ("platform_admin", "org_admin"):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Manage access required to update credentials",
)
result = await db.execute(
select(AgentCredential).where(
AgentCredential.id == credential_id,
AgentCredential.agent_id == agent_id,
)
)
cred = result.scalar_one_or_none()
if not cred:
raise HTTPException(status_code=404, detail="Credential not found")
settings = get_settings()
update_data = data.model_dump(exclude_unset=True)
# Handle plaintext fields
for field in ("credential_type", "platform", "display_name", "login_url", "status"):
if field in update_data:
setattr(cred, field, update_data[field])
# Handle encrypted fields
if "username" in update_data:
cred.username = encrypt_data(update_data["username"], settings.SECRET_KEY) if update_data["username"] else None
if "password" in update_data:
cred.password = encrypt_data(update_data["password"], settings.SECRET_KEY) if update_data["password"] else None
if "cookies_json" in update_data:
if update_data["cookies_json"]:
# Validate JSON format
try:
parsed = json.loads(update_data["cookies_json"])
if not isinstance(parsed, list):
raise ValueError("cookies_json must be a JSON array")
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid cookies_json format: {e}",
)
cred.cookies_json = encrypt_data(update_data["cookies_json"], settings.SECRET_KEY)
cred.cookies_updated_at = datetime.now(timezone.utc)
# Reset status to active when cookies are updated
cred.status = "active"
else:
cred.cookies_json = None
cred.cookies_updated_at = None
await db.commit()
await db.refresh(cred)
return _to_response(cred)
@router.delete("/{credential_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_credential(
agent_id: uuid.UUID,
credential_id: uuid.UUID,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Delete a credential."""
_agent, access_level = await check_agent_access(db, current_user, agent_id)
if access_level not in ("manage",) and current_user.role not in ("platform_admin", "org_admin"):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Manage access required to delete credentials",
)
result = await db.execute(
select(AgentCredential).where(
AgentCredential.id == credential_id,
AgentCredential.agent_id == agent_id,
)
)
cred = result.scalar_one_or_none()
if not cred:
raise HTTPException(status_code=404, detail="Credential not found")
await db.delete(cred)
await db.commit()