305 lines
13 KiB
Python
305 lines
13 KiB
Python
"""Agent lifecycle manager — Docker container management for OpenClaw Gateway instances."""
|
|
|
|
import json
|
|
import shutil
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
import docker
|
|
from docker.errors import DockerException, NotFound
|
|
from loguru import logger
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.config import get_settings
|
|
from app.models.agent import Agent
|
|
from app.models.llm import LLMModel
|
|
|
|
settings = get_settings()
|
|
|
|
|
|
class AgentManager:
|
|
"""Manage OpenClaw Gateway Docker containers for digital employees."""
|
|
|
|
def __init__(self):
|
|
try:
|
|
self.docker_client = docker.from_env()
|
|
except DockerException:
|
|
logger.warning("Docker not available — agent containers will not be managed")
|
|
self.docker_client = None
|
|
|
|
def _agent_dir(self, agent_id: uuid.UUID) -> Path:
|
|
return Path(settings.AGENT_DATA_DIR) / str(agent_id)
|
|
|
|
def _template_dir(self) -> Path:
|
|
return Path(settings.AGENT_TEMPLATE_DIR)
|
|
|
|
async def initialize_agent_files(self, db: AsyncSession, agent: Agent,
|
|
personality: str = "", boundaries: str = "") -> None:
|
|
"""Copy template files and customize for this agent."""
|
|
agent_dir = self._agent_dir(agent.id)
|
|
template_dir = self._template_dir()
|
|
|
|
if agent_dir.exists():
|
|
logger.warning(f"Agent dir already exists: {agent_dir}")
|
|
return
|
|
|
|
if template_dir.exists():
|
|
# Copy template
|
|
shutil.copytree(str(template_dir), str(agent_dir))
|
|
else:
|
|
# No template dir (local dev) — create minimal workspace structure
|
|
logger.info(f"Template dir not found ({template_dir}), creating minimal workspace")
|
|
agent_dir.mkdir(parents=True, exist_ok=True)
|
|
(agent_dir / "workspace").mkdir(exist_ok=True)
|
|
(agent_dir / "workspace" / "knowledge_base").mkdir(exist_ok=True)
|
|
(agent_dir / "memory").mkdir(exist_ok=True)
|
|
(agent_dir / "skills").mkdir(exist_ok=True)
|
|
(agent_dir / "tasks.json").write_text("[]", encoding="utf-8")
|
|
|
|
# Customize soul.md
|
|
soul_path = agent_dir / "soul.md"
|
|
# Get creator name
|
|
from app.models.user import User
|
|
result = await db.execute(select(User).where(User.id == agent.creator_id))
|
|
creator = result.scalar_one_or_none()
|
|
creator_name = creator.display_name if creator else "Unknown"
|
|
|
|
soul_content = f"# Personality\n\nI'm {agent.name}, {agent.role_description or 'a digital assistant'}.\n"
|
|
if soul_path.exists():
|
|
# template_content = soul_path.read_text() # 读取 soul.md 内容
|
|
template_content = soul_path.read_text(encoding="utf-8") # 读取 soul.md 内容
|
|
soul_content = template_content.replace("{{agent_name}}", agent.name)
|
|
soul_content = soul_content.replace("{{role_description}}", agent.role_description or "通用助手")
|
|
soul_content = soul_content.replace("{{creator_name}}", creator_name)
|
|
soul_content = soul_content.replace("{{created_at}}", datetime.now(timezone.utc).strftime("%Y-%m-%d"))
|
|
|
|
soul_path.write_text(soul_content, encoding="utf-8") # 保存 soul.md
|
|
|
|
# Helper function to replace or append sections
|
|
def replace_or_append_section(content: str, section_name: str, section_content: str) -> str:
|
|
"""Replace existing ## SectionName or append if not found."""
|
|
if not section_content:
|
|
return content
|
|
|
|
# Pattern to match existing section (case-insensitive header)
|
|
import re
|
|
pattern = rf"^##\s+{re.escape(section_name)}\s*$"
|
|
lines = content.split('\n')
|
|
|
|
# Find the section header
|
|
for i, line in enumerate(lines):
|
|
if re.match(pattern, line.strip(), re.IGNORECASE):
|
|
# Found existing section - replace until next ## header or end
|
|
section_start = i
|
|
section_end = len(lines)
|
|
for j in range(i + 1, len(lines)):
|
|
if lines[j].strip().startswith('## '):
|
|
section_end = j
|
|
break
|
|
|
|
# Replace the section content (with trailing newline for proper spacing)
|
|
new_section = f"## {section_name}\n{section_content}\n"
|
|
lines = lines[:section_start] + [new_section] + lines[section_end:]
|
|
return '\n'.join(lines)
|
|
|
|
# Section not found - append at the end
|
|
return content + f"\n## {section_name}\n{section_content}\n"
|
|
|
|
# Use the helper to replace or append Personality and Boundaries
|
|
soul_content = replace_or_append_section(soul_content, "Personality", personality)
|
|
soul_content = replace_or_append_section(soul_content, "Boundaries", boundaries)
|
|
|
|
soul_path.write_text(soul_content, encoding="utf-8")
|
|
|
|
# Ensure memory.md exists
|
|
mem_path = agent_dir / "memory" / "memory.md"
|
|
if not mem_path.exists():
|
|
mem_path.write_text("# Memory\n\n_Record important information and knowledge here._\n", encoding="utf-8")
|
|
|
|
# Ensure reflections.md exists — copy from central template
|
|
refl_path = agent_dir / "memory" / "reflections.md"
|
|
if not refl_path.exists():
|
|
refl_template = Path(__file__).parent.parent / "templates" / "reflections.md"
|
|
refl_content = refl_template.read_text(encoding="utf-8") if refl_template.exists() else "# Reflections Journal\n"
|
|
refl_path.write_text(refl_content, encoding="utf-8")
|
|
|
|
# Ensure HEARTBEAT.md exists — copy from central template
|
|
hb_path = agent_dir / "HEARTBEAT.md"
|
|
if not hb_path.exists():
|
|
hb_template = Path(__file__).parent.parent / "templates" / "HEARTBEAT.md"
|
|
hb_content = hb_template.read_text(encoding="utf-8") if hb_template.exists() else "# Heartbeat Instructions\n"
|
|
hb_path.write_text(hb_content, encoding="utf-8")
|
|
|
|
# Customize state.json
|
|
state_path = agent_dir / "state.json"
|
|
if state_path.exists():
|
|
state = json.loads(state_path.read_text())
|
|
state["agent_id"] = str(agent.id)
|
|
state["name"] = agent.name
|
|
state_path.write_text(json.dumps(state, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
|
|
logger.info(f"Initialized agent files at {agent_dir}")
|
|
|
|
def _generate_openclaw_config(self, agent: Agent, model: LLMModel | None) -> dict:
|
|
"""Generate openclaw.json config for the agent container."""
|
|
config = {
|
|
"agent": {
|
|
"model": f"{model.provider}/{model.model}" if model else "anthropic/claude-sonnet-4-5",
|
|
},
|
|
"agents": {
|
|
"defaults": {
|
|
"workspace": "/home/node/.openclaw/workspace",
|
|
},
|
|
},
|
|
}
|
|
|
|
if model and model.api_key_encrypted:
|
|
config["env"] = {
|
|
f"{model.provider.upper()}_API_KEY": model.api_key_encrypted,
|
|
}
|
|
|
|
return config
|
|
|
|
async def start_container(self, db: AsyncSession, agent: Agent) -> str | None:
|
|
"""Start an OpenClaw Gateway Docker container for the agent.
|
|
|
|
Returns container_id or None if Docker not available.
|
|
"""
|
|
if not self.docker_client:
|
|
logger.info("Docker not available, skipping container start")
|
|
agent.status = "idle"
|
|
agent.last_active_at = datetime.now(timezone.utc)
|
|
return None
|
|
|
|
agent_dir = self._agent_dir(agent.id)
|
|
|
|
# Get model config
|
|
model = None
|
|
if agent.primary_model_id:
|
|
result = await db.execute(select(LLMModel).where(LLMModel.id == agent.primary_model_id))
|
|
model = result.scalar_one_or_none()
|
|
|
|
# Generate OpenClaw config
|
|
config = self._generate_openclaw_config(agent, model)
|
|
config_dir = agent_dir / ".openclaw"
|
|
config_dir.mkdir(parents=True, exist_ok=True)
|
|
(config_dir / "openclaw.json").write_text(json.dumps(config, indent=2), encoding="utf-8")
|
|
|
|
# Create workspace symlink
|
|
workspace_dir = config_dir / "workspace"
|
|
if not workspace_dir.exists():
|
|
workspace_dir.symlink_to(agent_dir / "workspace")
|
|
|
|
# Assign a unique port
|
|
container_port = 18789 + hash(str(agent.id)) % 10000
|
|
|
|
try:
|
|
container = self.docker_client.containers.run(
|
|
settings.OPENCLAW_IMAGE,
|
|
detach=True,
|
|
name=f"clawith-agent-{str(agent.id)[:8]}",
|
|
network=settings.DOCKER_NETWORK,
|
|
ports={f"{settings.OPENCLAW_GATEWAY_PORT}/tcp": container_port},
|
|
volumes={
|
|
str(agent_dir): {"bind": "/home/node/.openclaw", "mode": "rw"},
|
|
},
|
|
environment={
|
|
"OPENCLAW_GATEWAY_TOKEN": str(uuid.uuid4()),
|
|
},
|
|
restart_policy={"Name": "unless-stopped"},
|
|
labels={
|
|
"clawith.agent_id": str(agent.id),
|
|
"clawith.agent_name": agent.name,
|
|
},
|
|
)
|
|
|
|
agent.container_id = container.id
|
|
agent.container_port = container_port
|
|
agent.status = "running"
|
|
agent.last_active_at = datetime.now(timezone.utc)
|
|
|
|
logger.info(f"Started container {container.id[:12]} for agent {agent.name} on port {container_port}")
|
|
return container.id
|
|
|
|
except DockerException as e:
|
|
logger.error(f"Failed to start container for agent {agent.name}: {e}")
|
|
agent.status = "error"
|
|
return None
|
|
|
|
async def stop_container(self, agent: Agent) -> bool:
|
|
"""Stop the agent's Docker container."""
|
|
if not self.docker_client or not agent.container_id:
|
|
agent.status = "stopped"
|
|
return True
|
|
|
|
try:
|
|
container = self.docker_client.containers.get(agent.container_id)
|
|
container.stop(timeout=10)
|
|
agent.status = "stopped"
|
|
logger.info(f"Stopped container {agent.container_id[:12]} for agent {agent.name}")
|
|
return True
|
|
except NotFound:
|
|
agent.status = "stopped"
|
|
agent.container_id = None
|
|
return True
|
|
except DockerException as e:
|
|
logger.error(f"Failed to stop container: {e}")
|
|
return False
|
|
|
|
async def remove_container(self, agent: Agent) -> bool:
|
|
"""Stop and remove the agent's Docker container."""
|
|
if not self.docker_client or not agent.container_id:
|
|
return True
|
|
|
|
try:
|
|
container = self.docker_client.containers.get(agent.container_id)
|
|
container.stop(timeout=10)
|
|
container.remove()
|
|
agent.container_id = None
|
|
agent.container_port = None
|
|
logger.info(f"Removed container for agent {agent.name}")
|
|
return True
|
|
except NotFound:
|
|
agent.container_id = None
|
|
return True
|
|
except DockerException as e:
|
|
logger.error(f"Failed to remove container: {e}")
|
|
return False
|
|
|
|
async def archive_agent_files(self, agent_id: uuid.UUID) -> Path:
|
|
"""Archive agent files to a backup location and return the archive directory."""
|
|
agent_dir = self._agent_dir(agent_id)
|
|
archive_dir = Path(settings.AGENT_DATA_DIR) / "_archived"
|
|
archive_dir.mkdir(parents=True, exist_ok=True)
|
|
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
|
|
dest = archive_dir / f"{agent_id}_{timestamp}"
|
|
if agent_dir.exists():
|
|
shutil.move(str(agent_dir), str(dest))
|
|
logger.info(f"Archived agent files to {dest}")
|
|
else:
|
|
dest.mkdir(parents=True, exist_ok=True)
|
|
return dest
|
|
|
|
def get_container_status(self, agent: Agent) -> dict:
|
|
"""Get real-time container status."""
|
|
if not self.docker_client or not agent.container_id:
|
|
return {"running": False, "status": agent.status}
|
|
|
|
try:
|
|
container = self.docker_client.containers.get(agent.container_id)
|
|
return {
|
|
"running": container.status == "running",
|
|
"status": container.status,
|
|
"ports": container.ports,
|
|
"created": container.attrs.get("Created", ""),
|
|
}
|
|
except NotFound:
|
|
return {"running": False, "status": "not_found"}
|
|
except DockerException:
|
|
return {"running": False, "status": "error"}
|
|
|
|
|
|
agent_manager = AgentManager()
|