103 lines
3.4 KiB
Python
103 lines
3.4 KiB
Python
"""Enterprise information synchronization service.
|
|
|
|
Uses Redis Pub/Sub to notify online Agent containers when enterprise info changes.
|
|
Agents pull latest data based on their roles and write to local enterprise_info/ directory.
|
|
"""
|
|
|
|
import json
|
|
import uuid
|
|
from pathlib import Path
|
|
|
|
from loguru import logger
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.config import get_settings
|
|
from app.core.events import publish_event
|
|
from app.models.agent import Agent
|
|
from app.models.audit import EnterpriseInfo
|
|
|
|
settings = get_settings()
|
|
|
|
# Redis channel for enterprise info updates
|
|
ENTERPRISE_INFO_CHANNEL = "enterprise_info_updated"
|
|
|
|
|
|
class EnterpriseSyncService:
|
|
"""Synchronize enterprise information to all online Agent containers."""
|
|
|
|
async def update_enterprise_info(
|
|
self, db: AsyncSession, info_type: str, content: dict,
|
|
visible_roles: list[str], updated_by: uuid.UUID
|
|
) -> EnterpriseInfo:
|
|
"""Update enterprise info in database and notify all agents."""
|
|
result = await db.execute(
|
|
select(EnterpriseInfo).where(EnterpriseInfo.info_type == info_type)
|
|
)
|
|
info = result.scalar_one_or_none()
|
|
|
|
if info:
|
|
info.content = content
|
|
info.visible_roles = visible_roles
|
|
info.version += 1
|
|
info.updated_by = updated_by
|
|
else:
|
|
info = EnterpriseInfo(
|
|
info_type=info_type,
|
|
content=content,
|
|
visible_roles=visible_roles,
|
|
updated_by=updated_by,
|
|
)
|
|
db.add(info)
|
|
|
|
await db.flush()
|
|
|
|
# Publish update event
|
|
await publish_event(ENTERPRISE_INFO_CHANNEL, {
|
|
"info_type": info_type,
|
|
"version": info.version,
|
|
"visible_roles": visible_roles,
|
|
})
|
|
|
|
logger.info(f"Published enterprise_info update: {info_type} v{info.version}")
|
|
return info
|
|
|
|
async def sync_to_agent(self, db: AsyncSession, agent_id: uuid.UUID, agent_role: str = "") -> None:
|
|
"""Pull enterprise info from DB and write to agent's enterprise_info/ directory.
|
|
|
|
Filters by visible_roles — if empty, all roles can see it.
|
|
"""
|
|
agent_dir = Path(settings.AGENT_DATA_DIR) / str(agent_id) / "enterprise_info"
|
|
agent_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
result = await db.execute(select(EnterpriseInfo))
|
|
all_info = result.scalars().all()
|
|
|
|
for info in all_info:
|
|
# Filter by role visibility
|
|
if info.visible_roles and agent_role and agent_role not in info.visible_roles:
|
|
continue
|
|
|
|
file_path = agent_dir / f"{info.info_type}.json"
|
|
file_path.write_text(json.dumps({
|
|
"type": info.info_type,
|
|
"version": info.version,
|
|
"content": info.content,
|
|
}, ensure_ascii=False, indent=2))
|
|
|
|
logger.info(f"Synced enterprise info to agent {agent_id}")
|
|
|
|
async def sync_to_all_agents(self, db: AsyncSession) -> int:
|
|
"""Sync enterprise info to all running agents. Returns count."""
|
|
result = await db.execute(select(Agent).where(Agent.status == "running"))
|
|
agents = result.scalars().all()
|
|
|
|
for agent in agents:
|
|
await self.sync_to_agent(db, agent.id, agent.role_description)
|
|
|
|
logger.info(f"Synced enterprise info to {len(agents)} agents")
|
|
return len(agents)
|
|
|
|
|
|
enterprise_sync_service = EnterpriseSyncService()
|