Clawith/backend/app/services/agent_manager.py

305 lines
13 KiB
Python

"""Agent lifecycle manager — Docker container management for OpenClaw Gateway instances."""
import json
import shutil
import uuid
from datetime import datetime, timezone
from pathlib import Path
import docker
from docker.errors import DockerException, NotFound
from loguru import logger
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import get_settings
from app.models.agent import Agent
from app.models.llm import LLMModel
settings = get_settings()
class AgentManager:
"""Manage OpenClaw Gateway Docker containers for digital employees."""
def __init__(self):
try:
self.docker_client = docker.from_env()
except DockerException:
logger.warning("Docker not available — agent containers will not be managed")
self.docker_client = None
def _agent_dir(self, agent_id: uuid.UUID) -> Path:
return Path(settings.AGENT_DATA_DIR) / str(agent_id)
def _template_dir(self) -> Path:
return Path(settings.AGENT_TEMPLATE_DIR)
async def initialize_agent_files(self, db: AsyncSession, agent: Agent,
personality: str = "", boundaries: str = "") -> None:
"""Copy template files and customize for this agent."""
agent_dir = self._agent_dir(agent.id)
template_dir = self._template_dir()
if agent_dir.exists():
logger.warning(f"Agent dir already exists: {agent_dir}")
return
if template_dir.exists():
# Copy template
shutil.copytree(str(template_dir), str(agent_dir))
else:
# No template dir (local dev) — create minimal workspace structure
logger.info(f"Template dir not found ({template_dir}), creating minimal workspace")
agent_dir.mkdir(parents=True, exist_ok=True)
(agent_dir / "workspace").mkdir(exist_ok=True)
(agent_dir / "workspace" / "knowledge_base").mkdir(exist_ok=True)
(agent_dir / "memory").mkdir(exist_ok=True)
(agent_dir / "skills").mkdir(exist_ok=True)
(agent_dir / "tasks.json").write_text("[]", encoding="utf-8")
# Customize soul.md
soul_path = agent_dir / "soul.md"
# Get creator name
from app.models.user import User
result = await db.execute(select(User).where(User.id == agent.creator_id))
creator = result.scalar_one_or_none()
creator_name = creator.display_name if creator else "Unknown"
soul_content = f"# Personality\n\nI'm {agent.name}, {agent.role_description or 'a digital assistant'}.\n"
if soul_path.exists():
# template_content = soul_path.read_text() # 读取 soul.md 内容
template_content = soul_path.read_text(encoding="utf-8") # 读取 soul.md 内容
soul_content = template_content.replace("{{agent_name}}", agent.name)
soul_content = soul_content.replace("{{role_description}}", agent.role_description or "通用助手")
soul_content = soul_content.replace("{{creator_name}}", creator_name)
soul_content = soul_content.replace("{{created_at}}", datetime.now(timezone.utc).strftime("%Y-%m-%d"))
soul_path.write_text(soul_content, encoding="utf-8") # 保存 soul.md
# Helper function to replace or append sections
def replace_or_append_section(content: str, section_name: str, section_content: str) -> str:
"""Replace existing ## SectionName or append if not found."""
if not section_content:
return content
# Pattern to match existing section (case-insensitive header)
import re
pattern = rf"^##\s+{re.escape(section_name)}\s*$"
lines = content.split('\n')
# Find the section header
for i, line in enumerate(lines):
if re.match(pattern, line.strip(), re.IGNORECASE):
# Found existing section - replace until next ## header or end
section_start = i
section_end = len(lines)
for j in range(i + 1, len(lines)):
if lines[j].strip().startswith('## '):
section_end = j
break
# Replace the section content (with trailing newline for proper spacing)
new_section = f"## {section_name}\n{section_content}\n"
lines = lines[:section_start] + [new_section] + lines[section_end:]
return '\n'.join(lines)
# Section not found - append at the end
return content + f"\n## {section_name}\n{section_content}\n"
# Use the helper to replace or append Personality and Boundaries
soul_content = replace_or_append_section(soul_content, "Personality", personality)
soul_content = replace_or_append_section(soul_content, "Boundaries", boundaries)
soul_path.write_text(soul_content, encoding="utf-8")
# Ensure memory.md exists
mem_path = agent_dir / "memory" / "memory.md"
if not mem_path.exists():
mem_path.write_text("# Memory\n\n_Record important information and knowledge here._\n", encoding="utf-8")
# Ensure reflections.md exists — copy from central template
refl_path = agent_dir / "memory" / "reflections.md"
if not refl_path.exists():
refl_template = Path(__file__).parent.parent / "templates" / "reflections.md"
refl_content = refl_template.read_text(encoding="utf-8") if refl_template.exists() else "# Reflections Journal\n"
refl_path.write_text(refl_content, encoding="utf-8")
# Ensure HEARTBEAT.md exists — copy from central template
hb_path = agent_dir / "HEARTBEAT.md"
if not hb_path.exists():
hb_template = Path(__file__).parent.parent / "templates" / "HEARTBEAT.md"
hb_content = hb_template.read_text(encoding="utf-8") if hb_template.exists() else "# Heartbeat Instructions\n"
hb_path.write_text(hb_content, encoding="utf-8")
# Customize state.json
state_path = agent_dir / "state.json"
if state_path.exists():
state = json.loads(state_path.read_text())
state["agent_id"] = str(agent.id)
state["name"] = agent.name
state_path.write_text(json.dumps(state, ensure_ascii=False, indent=2), encoding="utf-8")
logger.info(f"Initialized agent files at {agent_dir}")
def _generate_openclaw_config(self, agent: Agent, model: LLMModel | None) -> dict:
"""Generate openclaw.json config for the agent container."""
config = {
"agent": {
"model": f"{model.provider}/{model.model}" if model else "anthropic/claude-sonnet-4-5",
},
"agents": {
"defaults": {
"workspace": "/home/node/.openclaw/workspace",
},
},
}
if model and model.api_key_encrypted:
config["env"] = {
f"{model.provider.upper()}_API_KEY": model.api_key_encrypted,
}
return config
async def start_container(self, db: AsyncSession, agent: Agent) -> str | None:
"""Start an OpenClaw Gateway Docker container for the agent.
Returns container_id or None if Docker not available.
"""
if not self.docker_client:
logger.info("Docker not available, skipping container start")
agent.status = "idle"
agent.last_active_at = datetime.now(timezone.utc)
return None
agent_dir = self._agent_dir(agent.id)
# Get model config
model = None
if agent.primary_model_id:
result = await db.execute(select(LLMModel).where(LLMModel.id == agent.primary_model_id))
model = result.scalar_one_or_none()
# Generate OpenClaw config
config = self._generate_openclaw_config(agent, model)
config_dir = agent_dir / ".openclaw"
config_dir.mkdir(parents=True, exist_ok=True)
(config_dir / "openclaw.json").write_text(json.dumps(config, indent=2), encoding="utf-8")
# Create workspace symlink
workspace_dir = config_dir / "workspace"
if not workspace_dir.exists():
workspace_dir.symlink_to(agent_dir / "workspace")
# Assign a unique port
container_port = 18789 + hash(str(agent.id)) % 10000
try:
container = self.docker_client.containers.run(
settings.OPENCLAW_IMAGE,
detach=True,
name=f"clawith-agent-{str(agent.id)[:8]}",
network=settings.DOCKER_NETWORK,
ports={f"{settings.OPENCLAW_GATEWAY_PORT}/tcp": container_port},
volumes={
str(agent_dir): {"bind": "/home/node/.openclaw", "mode": "rw"},
},
environment={
"OPENCLAW_GATEWAY_TOKEN": str(uuid.uuid4()),
},
restart_policy={"Name": "unless-stopped"},
labels={
"clawith.agent_id": str(agent.id),
"clawith.agent_name": agent.name,
},
)
agent.container_id = container.id
agent.container_port = container_port
agent.status = "running"
agent.last_active_at = datetime.now(timezone.utc)
logger.info(f"Started container {container.id[:12]} for agent {agent.name} on port {container_port}")
return container.id
except DockerException as e:
logger.error(f"Failed to start container for agent {agent.name}: {e}")
agent.status = "error"
return None
async def stop_container(self, agent: Agent) -> bool:
"""Stop the agent's Docker container."""
if not self.docker_client or not agent.container_id:
agent.status = "stopped"
return True
try:
container = self.docker_client.containers.get(agent.container_id)
container.stop(timeout=10)
agent.status = "stopped"
logger.info(f"Stopped container {agent.container_id[:12]} for agent {agent.name}")
return True
except NotFound:
agent.status = "stopped"
agent.container_id = None
return True
except DockerException as e:
logger.error(f"Failed to stop container: {e}")
return False
async def remove_container(self, agent: Agent) -> bool:
"""Stop and remove the agent's Docker container."""
if not self.docker_client or not agent.container_id:
return True
try:
container = self.docker_client.containers.get(agent.container_id)
container.stop(timeout=10)
container.remove()
agent.container_id = None
agent.container_port = None
logger.info(f"Removed container for agent {agent.name}")
return True
except NotFound:
agent.container_id = None
return True
except DockerException as e:
logger.error(f"Failed to remove container: {e}")
return False
async def archive_agent_files(self, agent_id: uuid.UUID) -> Path:
"""Archive agent files to a backup location and return the archive directory."""
agent_dir = self._agent_dir(agent_id)
archive_dir = Path(settings.AGENT_DATA_DIR) / "_archived"
archive_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
dest = archive_dir / f"{agent_id}_{timestamp}"
if agent_dir.exists():
shutil.move(str(agent_dir), str(dest))
logger.info(f"Archived agent files to {dest}")
else:
dest.mkdir(parents=True, exist_ok=True)
return dest
def get_container_status(self, agent: Agent) -> dict:
"""Get real-time container status."""
if not self.docker_client or not agent.container_id:
return {"running": False, "status": agent.status}
try:
container = self.docker_client.containers.get(agent.container_id)
return {
"running": container.status == "running",
"status": container.status,
"ports": container.ports,
"created": container.attrs.get("Created", ""),
}
except NotFound:
return {"running": False, "status": "not_found"}
except DockerException:
return {"running": False, "status": "error"}
agent_manager = AgentManager()