Clawith/backend/app/api/admin.py

559 lines
19 KiB
Python

"""Platform Admin company management API.
Provides endpoints for platform admins to manage companies, view stats,
and control platform-level settings.
"""
import secrets
import uuid
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, Field
from sqlalchemy import func as sqla_func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.security import require_role
from app.database import get_db
from app.models.agent import Agent
from app.models.invitation_code import InvitationCode
from app.models.system_settings import SystemSetting
from app.models.tenant import Tenant
from app.models.user import User, Identity
router = APIRouter(prefix="/admin", tags=["admin"])
# ─── Schemas ────────────────────────────────────────────
class CompanyStats(BaseModel):
id: uuid.UUID
name: str
slug: str
is_active: bool
sso_enabled: bool = False
sso_domain: str | None = None
created_at: datetime | None = None
user_count: int = 0
agent_count: int = 0
agent_running_count: int = 0
total_tokens: int = 0
org_admin_email: str | None = None
class CompanyCreateRequest(BaseModel):
name: str = Field(min_length=1, max_length=200)
class CompanyCreateResponse(BaseModel):
company: CompanyStats
admin_invitation_code: str
class PlatformSettingsOut(BaseModel):
allow_self_create_company: bool = True
invitation_code_enabled: bool = False
class PlatformSettingsUpdate(BaseModel):
allow_self_create_company: bool | None = None
invitation_code_enabled: bool | None = None
# ─── Company Management ────────────────────────────────
@router.get("/companies", response_model=list[CompanyStats])
async def list_companies(
current_user: User = Depends(require_role("platform_admin")),
db: AsyncSession = Depends(get_db),
):
"""List all companies with stats."""
tenants = await db.execute(select(Tenant).order_by(Tenant.created_at.desc()))
result = []
for tenant in tenants.scalars().all():
tid = tenant.id
# User count
uc = await db.execute(
select(sqla_func.count()).select_from(User).where(User.tenant_id == tid)
)
user_count = uc.scalar() or 0
# Agent count
ac = await db.execute(
select(sqla_func.count()).select_from(Agent).where(Agent.tenant_id == tid)
)
agent_count = ac.scalar() or 0
# Running agents
rc = await db.execute(
select(sqla_func.count()).select_from(Agent).where(
Agent.tenant_id == tid, Agent.status == "running"
)
)
agent_running = rc.scalar() or 0
# Total tokens
tc = await db.execute(
select(sqla_func.coalesce(sqla_func.sum(Agent.tokens_used_total), 0)).where(
Agent.tenant_id == tid
)
)
total_tokens = tc.scalar() or 0
# Org Admin Email (first found if multiple)
admin_q = await db.execute(
select(Identity.email)
.join(User, Identity.id == User.identity_id)
.where(User.tenant_id == tid, User.role == "org_admin")
.order_by(User.created_at.asc())
.limit(1)
)
org_admin_email = admin_q.scalar()
result.append(CompanyStats(
id=tenant.id,
name=tenant.name,
slug=tenant.slug,
is_active=tenant.is_active,
sso_enabled=tenant.sso_enabled,
sso_domain=tenant.sso_domain,
created_at=tenant.created_at,
user_count=user_count,
agent_count=agent_count,
agent_running_count=agent_running,
total_tokens=total_tokens,
org_admin_email=org_admin_email,
))
return result
@router.post("/companies", response_model=CompanyCreateResponse, status_code=201)
async def create_company(
data: CompanyCreateRequest,
current_user: User = Depends(require_role("platform_admin")),
db: AsyncSession = Depends(get_db),
):
"""Create a new company and generate an admin invitation code (max_uses=1)."""
import re
slug = re.sub(r"[^a-z0-9]+", "-", data.name.lower().strip()).strip("-")[:40]
if not slug:
slug = "company"
slug = f"{slug}-{secrets.token_hex(3)}"
tenant = Tenant(name=data.name, slug=slug, im_provider="web_only")
db.add(tenant)
await db.flush()
# Generate admin invitation code (single-use)
code_str = secrets.token_urlsafe(12)[:16].upper()
invite = InvitationCode(
code=code_str,
tenant_id=tenant.id,
max_uses=1,
created_by=current_user.id,
)
db.add(invite)
await db.flush()
return CompanyCreateResponse(
company=CompanyStats(
id=tenant.id,
name=tenant.name,
slug=tenant.slug,
is_active=tenant.is_active,
created_at=tenant.created_at,
),
admin_invitation_code=code_str,
)
@router.put("/companies/{company_id}/toggle")
async def toggle_company(
company_id: uuid.UUID,
current_user: User = Depends(require_role("platform_admin")),
db: AsyncSession = Depends(get_db),
):
"""Enable or disable a company."""
result = await db.execute(select(Tenant).where(Tenant.id == company_id))
tenant = result.scalar_one_or_none()
if not tenant:
raise HTTPException(status_code=404, detail="Company not found")
new_state = not tenant.is_active
tenant.is_active = new_state
# When disabling: pause all running agents
if not new_state:
agents = await db.execute(
select(Agent).where(Agent.tenant_id == company_id, Agent.status == "running")
)
for agent in agents.scalars().all():
agent.status = "paused"
await db.flush()
return {"ok": True, "is_active": new_state}
# ─── Platform Metrics Dashboard ─────────────────────────
from typing import Any
from fastapi import Query
@router.get("/metrics/timeseries", response_model=list[dict[str, Any]])
async def get_platform_timeseries(
start_date: datetime,
end_date: datetime,
current_user: User = Depends(require_role("platform_admin")),
db: AsyncSession = Depends(get_db),
):
"""Get daily platform metrics within a date range.
Returns per-day: companies, users, tokens (existing) +
sessions, DAU, WAU, MAU (new).
"""
from app.models.activity_log import DailyTokenUsage
from app.models.chat_session import ChatSession
from sqlalchemy import cast, Date, text
from datetime import timedelta
# 1. New Companies per day
companies_q = await db.execute(
select(
cast(Tenant.created_at, Date).label('d'),
sqla_func.count().label('c')
).where(
Tenant.created_at >= start_date,
Tenant.created_at <= end_date
).group_by('d')
)
companies_by_day = {row.d: row.c for row in companies_q.all()}
# 2. New Users per day
users_q = await db.execute(
select(
cast(User.created_at, Date).label('d'),
sqla_func.count().label('c')
).where(
User.created_at >= start_date,
User.created_at <= end_date
).group_by('d')
)
users_by_day = {row.d: row.c for row in users_q.all()}
# 3. Tokens consumed per day
tokens_q = await db.execute(
select(
cast(DailyTokenUsage.date, Date).label('d'),
sqla_func.sum(DailyTokenUsage.tokens_used).label('c')
).where(
DailyTokenUsage.date >= start_date,
DailyTokenUsage.date <= end_date
).group_by('d')
)
tokens_by_day = {row.d: row.c for row in tokens_q.all()}
# 4. New Sessions per day (DAU = distinct users with sessions that day)
sessions_q = await db.execute(
select(
cast(ChatSession.created_at, Date).label('d'),
sqla_func.count().label('sessions'),
sqla_func.count(sqla_func.distinct(ChatSession.user_id)).label('dau'),
).where(
ChatSession.created_at >= start_date,
ChatSession.created_at <= end_date
).group_by('d')
)
sessions_by_day = {}
dau_by_day = {}
for row in sessions_q.all():
sessions_by_day[row.d] = row.sessions
dau_by_day[row.d] = row.dau
# 5. WAU/MAU: for each day, count distinct users in rolling 7/30-day window.
# Use a single SQL query with window functions for efficiency.
wau_mau_q = await db.execute(text("""
WITH daily_users AS (
SELECT DISTINCT
DATE(created_at) AS d,
user_id
FROM chat_sessions
WHERE created_at >= CAST(:range_start AS timestamptz)
AND created_at <= CAST(:range_end AS timestamptz)
),
day_series AS (
SELECT CAST(generate_series(
CAST(:series_start AS date),
CAST(:series_end AS date),
CAST('1 day' AS interval)
) AS date) AS d
)
SELECT
ds.d,
(SELECT COUNT(DISTINCT du.user_id) FROM daily_users du
WHERE du.d BETWEEN ds.d - 6 AND ds.d) AS wau,
(SELECT COUNT(DISTINCT du.user_id) FROM daily_users du
WHERE du.d BETWEEN ds.d - 29 AND ds.d) AS mau
FROM day_series ds
ORDER BY ds.d
"""), {
"range_start": start_date - timedelta(days=30),
"range_end": end_date,
"series_start": start_date.date(),
"series_end": end_date.date(),
})
wau_by_day = {}
mau_by_day = {}
for row in wau_mau_q.all():
wau_by_day[row[0]] = row[1]
mau_by_day[row[0]] = row[2]
# Generate date range list with cumulative totals
result = []
current_d = start_date.date()
end_d = end_date.date()
# Cumulative totals up to start_date
total_companies = (await db.execute(select(sqla_func.count()).select_from(Tenant).where(Tenant.created_at < start_date))).scalar() or 0
total_users = (await db.execute(select(sqla_func.count()).select_from(User).where(User.created_at < start_date))).scalar() or 0
total_tokens = (await db.execute(select(sqla_func.coalesce(sqla_func.sum(Agent.tokens_used_total), 0)).where(Agent.created_at < start_date))).scalar() or 0
total_sessions = (await db.execute(select(sqla_func.count()).select_from(ChatSession).where(ChatSession.created_at < start_date))).scalar() or 0
while current_d <= end_d:
nc = companies_by_day.get(current_d, 0)
nu = users_by_day.get(current_d, 0)
nt = tokens_by_day.get(current_d, 0)
ns = sessions_by_day.get(current_d, 0)
total_companies += nc
total_users += nu
total_tokens += nt
total_sessions += ns
result.append({
"date": current_d.isoformat(),
"new_companies": nc,
"total_companies": total_companies,
"new_users": nu,
"total_users": total_users,
"new_tokens": nt,
"total_tokens": total_tokens,
# New metrics
"new_sessions": ns,
"total_sessions": total_sessions,
"dau": dau_by_day.get(current_d, 0),
"wau": wau_by_day.get(current_d, 0),
"mau": mau_by_day.get(current_d, 0),
})
current_d += timedelta(days=1)
return result
@router.get("/metrics/leaderboards")
async def get_platform_leaderboards(
current_user: User = Depends(require_role("platform_admin")),
db: AsyncSession = Depends(get_db),
):
"""Get Top 20 token consuming companies and agents."""
# Top 20 Companies by total tokens
top_companies_q = await db.execute(
select(Tenant.name, sqla_func.coalesce(sqla_func.sum(Agent.tokens_used_total), 0).label('total'))
.join(Agent, Agent.tenant_id == Tenant.id)
.group_by(Tenant.id)
.order_by(sqla_func.sum(Agent.tokens_used_total).desc())
.limit(20)
)
top_companies = [{"name": row.name, "tokens": row.total} for row in top_companies_q.all()]
# Top 20 Agents by total tokens
top_agents_q = await db.execute(
select(Agent.name, Tenant.name.label('tenant_name'), Agent.tokens_used_total)
.join(Tenant, Tenant.id == Agent.tenant_id)
.order_by(Agent.tokens_used_total.desc())
.limit(20)
)
top_agents = [{"name": row.name, "company": row.tenant_name, "tokens": row.tokens_used_total} for row in top_agents_q.all()]
return {
"top_companies": top_companies,
"top_agents": top_agents
}
@router.get("/metrics/enhanced")
async def get_enhanced_metrics(
current_user: User = Depends(require_role("platform_admin")),
db: AsyncSession = Depends(get_db),
):
"""Enhanced platform metrics: retention, avg tokens/session,
channel distribution, tool categories, and churn warnings.
"""
from app.models.chat_session import ChatSession
from app.models.tool import Tool, AgentTool
from sqlalchemy import text
from datetime import timedelta
now = datetime.utcnow()
# ── 1. Average tokens per session (last 30 days) ──
# Sum of daily_token_usage / count of chat_sessions in last 30 days
thirty_days_ago = now - timedelta(days=30)
from app.models.activity_log import DailyTokenUsage
total_tok_30d = (await db.execute(
select(sqla_func.coalesce(sqla_func.sum(DailyTokenUsage.tokens_used), 0))
.where(DailyTokenUsage.date >= thirty_days_ago)
)).scalar() or 0
total_sess_30d = (await db.execute(
select(sqla_func.count())
.select_from(ChatSession)
.where(ChatSession.created_at >= thirty_days_ago)
)).scalar() or 1 # avoid div by zero
avg_tokens_per_session = round(total_tok_30d / max(total_sess_30d, 1))
# ── 2. 7-Day Retention Rate (excluding companies <14 days old) ──
# Last week = 14..7 days ago, This week = 7..0 days ago
retention_q = await db.execute(text("""
WITH established AS (
SELECT id FROM tenants WHERE created_at < NOW() - INTERVAL '14 days'
),
last_week_active AS (
SELECT DISTINCT a.tenant_id
FROM chat_sessions cs
JOIN agents a ON a.id = cs.agent_id
WHERE cs.created_at BETWEEN NOW() - INTERVAL '14 days' AND NOW() - INTERVAL '7 days'
AND a.tenant_id IN (SELECT id FROM established)
),
this_week_active AS (
SELECT DISTINCT a.tenant_id
FROM chat_sessions cs
JOIN agents a ON a.id = cs.agent_id
WHERE cs.created_at > NOW() - INTERVAL '7 days'
AND a.tenant_id IN (SELECT id FROM established)
)
SELECT
COUNT(DISTINCT lw.tenant_id) AS last_week_total,
COUNT(DISTINCT lw.tenant_id) FILTER (
WHERE lw.tenant_id IN (SELECT tenant_id FROM this_week_active)
) AS retained
FROM last_week_active lw
"""))
ret_row = retention_q.first()
last_week_total = ret_row[0] if ret_row else 0
retained = ret_row[1] if ret_row else 0
retention_rate = round(retained * 100.0 / max(last_week_total, 1), 1)
# ── 3. Channel Distribution (last 30 days) ──
channel_q = await db.execute(
select(
ChatSession.source_channel,
sqla_func.count().label('count')
).where(
ChatSession.created_at >= thirty_days_ago
).group_by(ChatSession.source_channel)
.order_by(sqla_func.count().desc())
)
channel_distribution = [
{"channel": row.source_channel, "count": row.count}
for row in channel_q.all()
]
# ── 4. Top 10 Tool Categories ──
# Count enabled agent_tools grouped by tool category
tool_q = await db.execute(
select(
Tool.category,
sqla_func.count().label('count')
).join(AgentTool, AgentTool.tool_id == Tool.id)
.where(AgentTool.enabled == True) # noqa: E712
.group_by(Tool.category)
.order_by(sqla_func.count().desc())
.limit(10)
)
tool_category_top10 = [
{"category": row.category or "uncategorized", "count": row.count}
for row in tool_q.all()
]
# ── 5. Churn Warnings (>10M tokens, 14+ days inactive) ──
churn_q = await db.execute(text("""
SELECT
t.name,
SUM(a.tokens_used_total) AS total_tokens,
MAX(cs.created_at) AS last_active,
EXTRACT(DAY FROM NOW() - MAX(cs.created_at))::int AS days_inactive
FROM tenants t
JOIN agents a ON a.tenant_id = t.id
LEFT JOIN chat_sessions cs ON cs.agent_id = a.id
GROUP BY t.id, t.name
HAVING SUM(a.tokens_used_total) > 10000000
AND (
MAX(cs.created_at) IS NULL
OR MAX(cs.created_at) < NOW() - INTERVAL '14 days'
)
ORDER BY SUM(a.tokens_used_total) DESC
"""))
churn_warnings = []
for row in churn_q.all():
churn_warnings.append({
"name": row[0],
"total_tokens": row[1],
"last_active": row[2].isoformat() if row[2] else None,
"days_inactive": row[3] if row[3] else None,
})
return {
"avg_tokens_per_session_30d": avg_tokens_per_session,
"retention_rate_7d": retention_rate,
"last_week_active_companies": last_week_total,
"retained_companies": retained,
"channel_distribution": channel_distribution,
"tool_category_top10": tool_category_top10,
"churn_warnings": churn_warnings,
}
# ─── Platform Settings ─────────────────────────────────
@router.get("/platform-settings", response_model=PlatformSettingsOut)
async def get_platform_settings(
current_user: User = Depends(require_role("platform_admin")),
db: AsyncSession = Depends(get_db),
):
"""Get platform-level settings."""
settings: dict[str, bool] = {}
for key, default in [
("allow_self_create_company", True),
("invitation_code_enabled", False),
]:
r = await db.execute(select(SystemSetting).where(SystemSetting.key == key))
s = r.scalar_one_or_none()
settings[key] = s.value.get("enabled", default) if s else default
return PlatformSettingsOut(**settings)
@router.put("/platform-settings", response_model=PlatformSettingsOut)
async def update_platform_settings(
data: PlatformSettingsUpdate,
current_user: User = Depends(require_role("platform_admin")),
db: AsyncSession = Depends(get_db),
):
"""Update platform-level settings."""
updates = data.model_dump(exclude_unset=True)
for key, value in updates.items():
r = await db.execute(select(SystemSetting).where(SystemSetting.key == key))
s = r.scalar_one_or_none()
if s:
s.value = {"enabled": value}
else:
db.add(SystemSetting(key=key, value={"enabled": value}))
await db.flush()
return await get_platform_settings(current_user=current_user, db=db)