"""Clawith Backend — FastAPI Application Entry Point.""" from contextlib import asynccontextmanager from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from loguru import logger from app.config import get_settings from app.core.events import close_redis from app.core.logging_config import configure_logging, intercept_standard_logging from app.core.middleware import TraceIdMiddleware from app.schemas.schemas import HealthResponse settings = get_settings() async def _start_ss_local() -> None: """Start ss-local SOCKS5 proxy for Discord API calls. Tries nodes in priority order.""" import asyncio, json, os, shutil, tempfile if not shutil.which("ss-local"): logger.info("[Proxy] ss-local not found — Discord proxy disabled") return # Load proxy nodes from config file (gitignored, mounted as Docker volume) import json as _json cfg_file = os.environ.get("SS_CONFIG_FILE", "/data/ss-nodes.json") if os.path.exists(cfg_file): # Guard against empty or malformed config file — both produce a clear # warning and a clean exit rather than an unhandled JSONDecodeError. try: raw = open(cfg_file).read().strip() if not raw: logger.warning(f"[Proxy] {cfg_file} exists but is empty — skipping proxy") return nodes = _json.loads(raw) except (json.JSONDecodeError, ValueError) as exc: logger.warning(f"[Proxy] Failed to parse {cfg_file}: {exc} — skipping proxy") return logger.info(f"[Proxy] Loaded {len(nodes)} node(s) from {cfg_file}") elif os.environ.get("SS_SERVER") and os.environ.get("SS_PASSWORD"): nodes = [{"server": os.environ["SS_SERVER"], "port": int(os.environ.get("SS_PORT", "1080")), "password": os.environ["SS_PASSWORD"], "method": os.environ.get("SS_METHOD", "chacha20-ietf-poly1305"), "label": "env"}] else: logger.info(f"[Proxy] {cfg_file} not found and SS_SERVER not set — skipping proxy") return for node in nodes: cfg = {"server": node["server"], "server_port": node["port"], "local_address": "127.0.0.1", "local_port": 1080, "password": node["password"], "method": node["method"], "timeout": 10} tf = tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) json.dump(cfg, tf); tf.close() try: proc = await asyncio.create_subprocess_exec( "ss-local", "-c", tf.name, stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.PIPE) await asyncio.sleep(2) if proc.returncode is None: os.environ["DISCORD_PROXY"] = "socks5h://127.0.0.1:1080" logger.info(f"[Proxy] ss-local → {node['label']} ({node['server']}:{node['port']})") return err = (await proc.stderr.read()).decode()[:120] logger.warning(f"[Proxy] {node['label']} failed: {err}") except Exception as e: logger.error(f"[Proxy] {node['label']} error: {e}") logger.warning("[Proxy] All SS nodes failed — Discord API calls will run without proxy") @asynccontextmanager async def lifespan(app: FastAPI): """Application startup and shutdown events.""" # Configure logging first configure_logging() intercept_standard_logging() logger.info("[startup] Logging configured") import asyncio import sys import os from app.services.trigger_daemon import start_trigger_daemon from app.services.tool_seeder import seed_builtin_tools from app.services.template_seeder import seed_agent_templates from app.services.feishu_ws import feishu_ws_manager from app.services.dingtalk_stream import dingtalk_stream_manager from app.services.wecom_stream import wecom_stream_manager from app.services.discord_gateway import discord_gateway_manager # ── Step 0: Ensure all DB tables exist (idempotent, safe to run on every startup) ── try: from app.database import Base, engine # Import all models so Base.metadata is fully populated import app.models.user # noqa import app.models.agent # noqa import app.models.task # noqa import app.models.llm # noqa import app.models.tool # noqa import app.models.audit # noqa import app.models.skill # noqa import app.models.channel_config # noqa import app.models.schedule # noqa import app.models.plaza # noqa import app.models.activity_log # noqa import app.models.org # noqa import app.models.system_settings # noqa import app.models.invitation_code # noqa import app.models.tenant # noqa import app.models.tenant_setting # noqa import app.models.participant # noqa import app.models.chat_session # noqa import app.models.trigger # noqa import app.models.notification # noqa import app.models.gateway_message # noqa import app.models.agent_credential # noqa import app.models.identity # noqa async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) logger.info("[startup] Database tables ready") except Exception as e: logger.warning(f"[startup] create_all failed: {e}") # Startup: seed data — each step isolated so one failure doesn't block others logger.info("[startup] seeding...") # Seed default company (Tenant) — required before users can register try: from app.models.tenant import Tenant from app.database import async_session as _session from sqlalchemy import select as _select async with _session() as _db: _existing = await _db.execute(_select(Tenant).where(Tenant.slug == "default")) if not _existing.scalar_one_or_none(): _db.add(Tenant(name="Default", slug="default", im_provider="web_only")) await _db.commit() logger.info("[startup] Default company created") except Exception as e: logger.warning(f"[startup] Default company seed failed: {e}") # Migrate old shared enterprise_info/ → enterprise_info_{first_tenant_id}/ try: import shutil from pathlib import Path as _Path from app.config import get_settings as _gs from app.models.tenant import Tenant as _T from app.database import async_session as _ses from sqlalchemy import select as _sel _data_dir = _Path(_gs().AGENT_DATA_DIR) _old_dir = _data_dir / "enterprise_info" if _old_dir.exists() and any(_old_dir.iterdir()): async with _ses() as _db: _first = await _db.execute(_sel(_T).order_by(_T.created_at).limit(1)) _tenant = _first.scalar_one_or_none() if _tenant: _new_dir = _data_dir / f"enterprise_info_{_tenant.id}" if not _new_dir.exists(): shutil.copytree(str(_old_dir), str(_new_dir)) print(f"[startup] ✅ Migrated enterprise_info → enterprise_info_{_tenant.id}", flush=True) else: print(f"[startup] ℹ️ enterprise_info_{_tenant.id} already exists, skipping migration", flush=True) except Exception as e: print(f"[startup] ⚠️ enterprise_info migration failed: {e}", flush=True) try: from app.services.tool_seeder import seed_builtin_tools, clean_orphaned_mcp_tools await seed_builtin_tools() await clean_orphaned_mcp_tools() except Exception as e: logger.warning(f"[startup] Builtin tools seed or cleanup failed: {e}") try: from app.services.tool_seeder import seed_atlassian_rovo_config, get_atlassian_api_key await seed_atlassian_rovo_config() # Auto-import Atlassian Rovo tools if an API key is already configured _rovo_key = await get_atlassian_api_key() if _rovo_key: from app.services.resource_discovery import seed_atlassian_rovo_tools await seed_atlassian_rovo_tools(_rovo_key) except Exception as e: logger.warning(f"[startup] Atlassian tools seed failed: {e}") try: await seed_agent_templates() except Exception as e: logger.warning(f"[startup] Agent templates seed failed: {e}") try: from app.services.skill_seeder import seed_skills, push_default_skills_to_existing_agents await seed_skills() await push_default_skills_to_existing_agents() except Exception as e: logger.warning(f"[startup] Skills seed failed: {e}") try: from app.services.agent_seeder import seed_default_agents await seed_default_agents() except Exception as e: logger.warning(f"[startup] Default agents seed failed: {e}") # Start background tasks (always, even if seeding failed) try: logger.info("[startup] starting background tasks...") from app.services.audit_logger import write_audit_log await write_audit_log("server_startup", {"pid": os.getpid()}) def _bg_task_error(t): """Callback to surface background task exceptions.""" try: exc = t.exception() except asyncio.CancelledError: return if exc: logger.error(f"[startup] Background task {t.get_name()} CRASHED: {exc}") import traceback traceback.print_exception(type(exc), exc, exc.__traceback__) for name, coro in [ ("trigger_daemon", start_trigger_daemon()), ("feishu_ws", feishu_ws_manager.start_all()), ("dingtalk_stream", dingtalk_stream_manager.start_all()), ("wecom_stream", wecom_stream_manager.start_all()), ("discord_gw", discord_gateway_manager.start_all()), ]: task = asyncio.create_task(coro, name=name) task.add_done_callback(_bg_task_error) logger.info(f"[startup] created bg task: {name}") logger.info("[startup] all background tasks created!") except Exception as e: logger.error(f"[startup] Background tasks failed: {e}") import traceback traceback.print_exc() # Start ss-local SOCKS5 proxy for Discord API calls (non-fatal) ss_task = asyncio.create_task(_start_ss_local(), name="ss-local-proxy") ss_task.add_done_callback(_bg_task_error) yield # Shutdown await close_redis() app = FastAPI( title=settings.APP_NAME, version=settings.APP_VERSION, lifespan=lifespan, ) # Add TraceIdMiddleware first so it's executed for all requests app.add_middleware(TraceIdMiddleware) # CORS _cors_origins = settings.CORS_ORIGINS _allow_creds = "*" not in _cors_origins # CORS spec forbids credentials with wildcard app.add_middleware( CORSMiddleware, allow_origins=_cors_origins, allow_credentials=_allow_creds, allow_methods=["*"], allow_headers=["*"], ) # Register API routes from app.api.auth import router as auth_router from app.api.agents import router as agents_router from app.api.tasks import router as tasks_router from app.api.files import router as files_router from app.api.websocket import router as ws_router from app.api.feishu import router as feishu_router from app.api.sso import router as sso_router from app.api.organization import router as org_router from app.api.enterprise import router as enterprise_router from app.api.advanced import router as advanced_router from app.api.upload import router as upload_router from app.api.relationships import router as relationships_router from app.api.files import upload_router as files_upload_router, enterprise_kb_router from app.api.activity import router as activity_router from app.api.messages import router as messages_router from app.api.tenants import router as tenants_router from app.api.schedules import router as schedules_router from app.api.tools import router as tools_router from app.api.plaza import router as plaza_router from app.api.skills import router as skills_router from app.api.users import router as users_router from app.api.chat_sessions import router as chat_sessions_router from app.api.slack import router as slack_router from app.api.discord_bot import router as discord_router from app.api.dingtalk import router as dingtalk_router from app.api.wecom import router as wecom_router from app.api.teams import router as teams_router from app.api.triggers import router as triggers_router from app.api.atlassian import router as atlassian_router from app.api.webhooks import router as webhooks_router from app.api.notification import router as notification_router from app.api.gateway import router as gateway_router from app.api.admin import router as admin_router from app.api.pages import router as pages_router, public_router as pages_public_router from app.api.agent_credentials import router as credentials_router from app.api.agentbay_control import router as agentbay_control_router app.include_router(auth_router, prefix=settings.API_PREFIX) app.include_router(agents_router, prefix=settings.API_PREFIX) app.include_router(tasks_router, prefix=settings.API_PREFIX) app.include_router(files_router, prefix=settings.API_PREFIX) app.include_router(feishu_router, prefix=settings.API_PREFIX) app.include_router(sso_router, prefix=settings.API_PREFIX) app.include_router(org_router, prefix=settings.API_PREFIX) app.include_router(enterprise_router, prefix=settings.API_PREFIX) app.include_router(advanced_router, prefix=settings.API_PREFIX) app.include_router(upload_router, prefix=settings.API_PREFIX) app.include_router(relationships_router, prefix=settings.API_PREFIX) app.include_router(activity_router, prefix=settings.API_PREFIX) app.include_router(messages_router, prefix=settings.API_PREFIX) app.include_router(tenants_router, prefix=settings.API_PREFIX) app.include_router(schedules_router, prefix=settings.API_PREFIX) app.include_router(tools_router, prefix=settings.API_PREFIX) app.include_router(files_upload_router, prefix=settings.API_PREFIX) app.include_router(enterprise_kb_router, prefix=settings.API_PREFIX) app.include_router(skills_router, prefix=settings.API_PREFIX) app.include_router(users_router, prefix=settings.API_PREFIX) app.include_router(slack_router, prefix=settings.API_PREFIX) app.include_router(discord_router, prefix=settings.API_PREFIX) app.include_router(dingtalk_router, prefix=settings.API_PREFIX) app.include_router(wecom_router, prefix=settings.API_PREFIX) app.include_router(teams_router, prefix=settings.API_PREFIX) app.include_router(atlassian_router, prefix=settings.API_PREFIX) app.include_router(triggers_router) app.include_router(chat_sessions_router) app.include_router(plaza_router) app.include_router(notification_router, prefix=settings.API_PREFIX) app.include_router(webhooks_router) # Public endpoint, no API prefix app.include_router(ws_router) app.include_router(gateway_router, prefix=settings.API_PREFIX) app.include_router(admin_router, prefix=settings.API_PREFIX) app.include_router(pages_router, prefix=settings.API_PREFIX) app.include_router(pages_public_router) # Public endpoint for /p/{short_id}, no API prefix app.include_router(credentials_router, prefix=settings.API_PREFIX) app.include_router(agentbay_control_router, prefix=settings.API_PREFIX) @app.get("/api/health", response_model=HealthResponse, tags=["health"]) async def health_check(): """Health check endpoint.""" return HealthResponse(status="ok", version=settings.APP_VERSION) # ── Version endpoint (public, no auth required) ── def _load_version_info() -> dict[str, str]: """Read version + commit hash once at startup.""" import os, subprocess version = "unknown" for candidate in ["../frontend/VERSION", "frontend/VERSION", "VERSION"]: try: version = open(candidate).read().strip() break except FileNotFoundError: continue commit = "" for commit_file in ["../COMMIT", "COMMIT", "../frontend/COMMIT"]: try: commit = open(commit_file).read().strip() break except FileNotFoundError: continue if not commit: try: commit = subprocess.check_output( ["git", "rev-parse", "--short", "HEAD"], stderr=subprocess.DEVNULL, timeout=3, ).decode().strip() except Exception: pass return {"version": version, "commit": commit} _version_cache = _load_version_info() @app.get("/api/version", tags=["system"]) async def get_version(): """Return current Clawith version and commit hash.""" return _version_cache