Clawith/backend/app/services/enterprise_sync.py

103 lines
3.4 KiB
Python

"""Enterprise information synchronization service.
Uses Redis Pub/Sub to notify online Agent containers when enterprise info changes.
Agents pull latest data based on their roles and write to local enterprise_info/ directory.
"""
import json
import uuid
from pathlib import Path
from loguru import logger
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import get_settings
from app.core.events import publish_event
from app.models.agent import Agent
from app.models.audit import EnterpriseInfo
settings = get_settings()
# Redis channel for enterprise info updates
ENTERPRISE_INFO_CHANNEL = "enterprise_info_updated"
class EnterpriseSyncService:
"""Synchronize enterprise information to all online Agent containers."""
async def update_enterprise_info(
self, db: AsyncSession, info_type: str, content: dict,
visible_roles: list[str], updated_by: uuid.UUID
) -> EnterpriseInfo:
"""Update enterprise info in database and notify all agents."""
result = await db.execute(
select(EnterpriseInfo).where(EnterpriseInfo.info_type == info_type)
)
info = result.scalar_one_or_none()
if info:
info.content = content
info.visible_roles = visible_roles
info.version += 1
info.updated_by = updated_by
else:
info = EnterpriseInfo(
info_type=info_type,
content=content,
visible_roles=visible_roles,
updated_by=updated_by,
)
db.add(info)
await db.flush()
# Publish update event
await publish_event(ENTERPRISE_INFO_CHANNEL, {
"info_type": info_type,
"version": info.version,
"visible_roles": visible_roles,
})
logger.info(f"Published enterprise_info update: {info_type} v{info.version}")
return info
async def sync_to_agent(self, db: AsyncSession, agent_id: uuid.UUID, agent_role: str = "") -> None:
"""Pull enterprise info from DB and write to agent's enterprise_info/ directory.
Filters by visible_roles — if empty, all roles can see it.
"""
agent_dir = Path(settings.AGENT_DATA_DIR) / str(agent_id) / "enterprise_info"
agent_dir.mkdir(parents=True, exist_ok=True)
result = await db.execute(select(EnterpriseInfo))
all_info = result.scalars().all()
for info in all_info:
# Filter by role visibility
if info.visible_roles and agent_role and agent_role not in info.visible_roles:
continue
file_path = agent_dir / f"{info.info_type}.json"
file_path.write_text(json.dumps({
"type": info.info_type,
"version": info.version,
"content": info.content,
}, ensure_ascii=False, indent=2))
logger.info(f"Synced enterprise info to agent {agent_id}")
async def sync_to_all_agents(self, db: AsyncSession) -> int:
"""Sync enterprise info to all running agents. Returns count."""
result = await db.execute(select(Agent).where(Agent.status == "running"))
agents = result.scalars().all()
for agent in agents:
await self.sync_to_agent(db, agent.id, agent.role_description)
logger.info(f"Synced enterprise info to {len(agents)} agents")
return len(agents)
enterprise_sync_service = EnterpriseSyncService()