Clawith/backend/app/services/collaboration.py

142 lines
4.8 KiB
Python

"""Agent collaboration service — Agent-to-Agent communication."""
import json
import uuid
from datetime import datetime, timezone
from loguru import logger
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.agent import Agent
from app.models.audit import AuditLog
class CollaborationService:
"""Enable digital employees to collaborate with each other.
Collaboration patterns:
1. Delegate — Agent A sends a task to Agent B
2. Consult — Agent A asks Agent B a question and waits for response
3. Notify — Agent A sends information to Agent B (fire-and-forget)
"""
async def delegate_task(
self, db: AsyncSession, from_agent_id: uuid.UUID,
to_agent_id: uuid.UUID, task_title: str, task_description: str
) -> dict:
"""Agent A delegates a task to Agent B."""
from app.models.task import Task
# Verify both agents exist and are running
from_result = await db.execute(select(Agent).where(Agent.id == from_agent_id))
from_agent = from_result.scalar_one_or_none()
to_result = await db.execute(select(Agent).where(Agent.id == to_agent_id))
to_agent = to_result.scalar_one_or_none()
if not from_agent or not to_agent:
raise ValueError("Agent not found")
if to_agent.status != "running":
raise ValueError(f"Target agent '{to_agent.name}' is not running")
# Create task for target agent
task = Task(
agent_id=to_agent_id,
title=f"[委托自 {from_agent.name}] {task_title}",
description=task_description,
type="todo",
priority="medium",
created_by=from_agent.creator_id,
assignee="self",
)
db.add(task)
# Audit log
db.add(AuditLog(
agent_id=from_agent_id,
action="collaboration:delegate",
details={
"from_agent": str(from_agent_id),
"to_agent": str(to_agent_id),
"task_title": task_title,
},
))
await db.flush()
logger.info(f"Agent {from_agent.name} delegated task to {to_agent.name}: {task_title}")
return {
"task_id": str(task.id),
"from_agent": from_agent.name,
"to_agent": to_agent.name,
"status": "delegated",
}
async def list_collaborators(self, db: AsyncSession, agent_id: uuid.UUID) -> list[dict]:
"""List agents that can collaborate with the given agent.
Returns agents from the same enterprise (same creator's org).
"""
result = await db.execute(select(Agent).where(Agent.id == agent_id))
agent = result.scalar_one_or_none()
if not agent:
return []
# Find agents by same creator or with company-wide permissions
collaborators_result = await db.execute(
select(Agent).where(
Agent.id != agent_id,
Agent.status.in_(["running", "stopped"]),
).order_by(Agent.name)
)
agents = collaborators_result.scalars().all()
return [
{
"id": str(a.id),
"name": a.name,
"role": a.role_description,
"status": a.status,
}
for a in agents
]
async def send_message_between_agents(
self, db: AsyncSession, from_agent_id: uuid.UUID,
to_agent_id: uuid.UUID, message: str, msg_type: str = "notify"
) -> dict:
"""Send an inter-agent message.
msg_type: 'notify' (fire-and-forget) or 'consult' (expects reply)
"""
from_result = await db.execute(select(Agent).where(Agent.id == from_agent_id))
from_agent = from_result.scalar_one_or_none()
# Write message to target agent's workspace
from pathlib import Path
from app.config import get_settings
settings = get_settings()
inbox_dir = Path(settings.AGENT_DATA_DIR) / str(to_agent_id) / "workspace" / "inbox"
inbox_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
msg_file = inbox_dir / f"{timestamp}_{str(from_agent_id)[:8]}.md"
msg_file.write_text(
f"# 来自 {from_agent.name if from_agent else 'Unknown'} 的消息\n"
f"- 类型: {msg_type}\n"
f"- 时间: {datetime.now(timezone.utc).isoformat()}\n\n"
f"{message}\n"
)
db.add(AuditLog(
agent_id=from_agent_id,
action=f"collaboration:{msg_type}",
details={"to_agent": str(to_agent_id), "message_preview": message[:100]},
))
await db.flush()
return {"status": "sent", "type": msg_type}
collaboration_service = CollaborationService()