Clawith/backend/app/api/tenants.py

489 lines
19 KiB
Python

"""Tenant (Company) management API.
Public endpoints for self-service company creation and joining.
Admin endpoints for platform-level company management.
"""
import re
import secrets
import uuid
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel, Field
from sqlalchemy import func as sqla_func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.security import get_current_user, require_role, get_authenticated_user
from app.database import get_db
from app.models.tenant import Tenant
from app.models.user import User
router = APIRouter(prefix="/tenants", tags=["tenants"])
# ─── Schemas ────────────────────────────────────────────
class TenantCreate(BaseModel):
name: str = Field(min_length=1, max_length=200)
target_tenant_id: uuid.UUID | None = None
class TenantOut(BaseModel):
id: uuid.UUID
name: str
slug: str
im_provider: str
timezone: str = "UTC"
is_active: bool
sso_enabled: bool = False
sso_domain: str | None = None
a2a_async_enabled: bool = False
created_at: datetime | None = None
model_config = {"from_attributes": True}
class TenantUpdate(BaseModel):
name: str | None = None
im_provider: str | None = None
timezone: str | None = None
is_active: bool | None = None
sso_enabled: bool | None = None
sso_domain: str | None = None
a2a_async_enabled: bool | None = None
# ─── Helpers ────────────────────────────────────────────
def _slugify(name: str) -> str:
"""Generate a URL-friendly slug from a company name.
Uses a layered transliteration strategy so non-Latin company names produce
meaningful, readable slugs instead of collapsing to the generic 'company'
placeholder:
1. pypinyin — CJK/Chinese characters → pinyin (e.g. '公司''gongsi')
2. anyascii — remaining non-ASCII scripts → closest ASCII approximation
(Korean '안녕''annyeong', Japanese 'ひらがな''hiragana',
Arabic 'مرحبا''mrhb', Cyrillic 'Привет''Privet', …)
3. NFKD norm — accented Latin chars stripped of diacritics (é → e)
A short random hex suffix is always appended to guarantee global uniqueness
even when two tenants choose the same company name.
"""
import unicodedata
from pypinyin import lazy_pinyin
from anyascii import anyascii
# Step 1: Convert CJK characters to pinyin; non-CJK chars pass through unchanged.
# lazy_pinyin with errors='default' keeps non-CJK chars as-is so they are
# handled by the subsequent anyascii pass rather than being silently dropped.
parts = lazy_pinyin(name, errors="default")
text = "".join(parts)
# Step 2: Convert remaining non-ASCII characters using anyascii.
# anyascii is a no-op on ASCII input, so it is safe to apply to the whole
# string after pypinyin has already processed the CJK portion.
text = anyascii(text)
# Step 3: Normalize any remaining accented Latin chars (é → e, ü → u, etc.)
# and drop anything that still cannot be represented in ASCII.
text = unicodedata.normalize("NFKD", text)
text = text.encode("ascii", "ignore").decode("ascii")
# Step 4: Lowercase, collapse non-alphanumeric runs to hyphens, trim to 40 chars.
slug = re.sub(r"[^a-z0-9]+", "-", text.lower().strip())
slug = slug.strip("-")[:40]
if not slug:
# Extremely unlikely after anyascii, but keep as a safety net
# for inputs that are entirely punctuation or whitespace.
slug = "company"
# Add a short random hex suffix to ensure global uniqueness.
slug = f"{slug}-{secrets.token_hex(3)}"
return slug
class SelfCreateResponse(BaseModel):
"""Response for self-create company, includes token for context switching."""
tenant: TenantOut
access_token: str | None = None # Non-null when a new User record was created (multi-tenant switch)
@router.post("/self-create", response_model=SelfCreateResponse, status_code=status.HTTP_201_CREATED)
async def self_create_company(
data: TenantCreate,
current_user: User = Depends(get_authenticated_user),
db: AsyncSession = Depends(get_db),
):
"""Create a new company (self-service). The creator becomes org_admin.
Supports both:
- Registration flow (user has no tenant yet): assigns tenant directly
- Switch-org flow (user already has a tenant): creates a new User record for the new tenant
"""
# Block self-creation if locked to a specific tenant (Dedicated Link flow)
if data.target_tenant_id is not None:
raise HTTPException(status_code=403, detail="Company creation is not allowed via this link. Please join your assigned organization.")
# Check if self-creation is allowed
from app.models.system_settings import SystemSetting
setting = await db.execute(
select(SystemSetting).where(SystemSetting.key == "allow_self_create_company")
)
s = setting.scalar_one_or_none()
allowed = s.value.get("enabled", True) if s else True
if not allowed and current_user.role != "platform_admin":
raise HTTPException(status_code=403, detail="Company self-creation is currently disabled")
slug = _slugify(data.name)
tenant = Tenant(name=data.name, slug=slug, im_provider="web_only")
db.add(tenant)
await db.flush()
access_token = None
if current_user.tenant_id is not None:
# Multi-tenant: user already belongs to a company.
# Create a NEW User record for the new tenant instead of overwriting.
from app.core.security import create_access_token
from app.models.participant import Participant
new_user = User(
identity_id=current_user.identity_id,
tenant_id=tenant.id,
display_name=current_user.display_name,
role="org_admin",
registration_source="web",
is_active=current_user.is_active,
quota_message_limit=tenant.default_message_limit,
quota_message_period=tenant.default_message_period,
quota_max_agents=tenant.default_max_agents,
quota_agent_ttl_hours=tenant.default_agent_ttl_hours,
)
db.add(new_user)
await db.flush()
# Create Participant for the new user record
db.add(Participant(
type="user",
ref_id=new_user.id,
display_name=new_user.display_name,
avatar_url=new_user.avatar_url,
))
await db.flush()
# Generate token scoped to the new user so frontend can switch context
access_token = create_access_token(str(new_user.id), new_user.role)
else:
# Registration flow: user has no tenant yet, assign directly
current_user.tenant_id = tenant.id
current_user.role = "org_admin" if current_user.role == "member" else current_user.role
# Inherit quota defaults from new tenant
current_user.quota_message_limit = tenant.default_message_limit
current_user.quota_message_period = tenant.default_message_period
current_user.quota_max_agents = tenant.default_max_agents
current_user.quota_agent_ttl_hours = tenant.default_agent_ttl_hours
await db.flush()
await db.commit()
return SelfCreateResponse(
tenant=TenantOut.model_validate(tenant),
access_token=access_token,
)
# ─── Self-Service: Join Company via Invite Code ─────────
class JoinRequest(BaseModel):
invitation_code: str = Field(min_length=1, max_length=32)
target_tenant_id: uuid.UUID | None = None
class JoinResponse(BaseModel):
tenant: TenantOut
role: str
access_token: str | None = None # Non-null when a new User record was created (multi-tenant switch)
@router.post("/join", response_model=JoinResponse)
async def join_company(
data: JoinRequest,
current_user: User = Depends(get_authenticated_user),
db: AsyncSession = Depends(get_db),
):
"""Join an existing company using an invitation code.
Supports both:
- Registration flow (user has no tenant yet): assigns tenant directly
- Switch-org flow (user already has a tenant): creates a new User record"""
from app.models.invitation_code import InvitationCode
ic_result = await db.execute(
select(InvitationCode).where(
InvitationCode.code == data.invitation_code,
InvitationCode.is_active == True,
InvitationCode.tenant_id.is_not(None),
)
)
code_obj = ic_result.scalar_one_or_none()
if not code_obj:
raise HTTPException(status_code=400, detail="Invalid invitation code")
# Verify matching tenant if locked (Dedicated Link flow)
if data.target_tenant_id and str(code_obj.tenant_id) != str(data.target_tenant_id):
raise HTTPException(status_code=403, detail="This invitation code does not belong to the required organization.")
if code_obj.used_count >= code_obj.max_uses:
raise HTTPException(status_code=400, detail="Invitation code has reached its usage limit")
# Find the company
t_result = await db.execute(select(Tenant).where(Tenant.id == code_obj.tenant_id))
tenant = t_result.scalar_one_or_none()
if not tenant or not tenant.is_active:
raise HTTPException(status_code=400, detail="Company not found or is disabled")
# Check if user already belongs to this specific tenant
existing_membership = await db.execute(
select(User).where(
User.identity_id == current_user.identity_id,
User.tenant_id == tenant.id,
)
)
if existing_membership.scalar_one_or_none():
raise HTTPException(status_code=400, detail="You already belong to this company")
# Check if this company has an org_admin already
admin_check = await db.execute(
select(sqla_func.count()).select_from(User).where(
User.tenant_id == tenant.id,
User.role.in_(["org_admin", "platform_admin"]),
)
)
has_admin = admin_check.scalar() > 0
# First joiner of an empty company becomes org_admin
assigned_role = "member" if has_admin else "org_admin"
access_token = None
if current_user.tenant_id is not None:
# Multi-tenant: user already belongs to a company.
# Create a NEW User record for the new tenant.
from app.core.security import create_access_token
from app.models.participant import Participant
new_user = User(
identity_id=current_user.identity_id,
tenant_id=tenant.id,
display_name=current_user.display_name,
role=assigned_role,
registration_source="web",
is_active=current_user.is_active,
quota_message_limit=tenant.default_message_limit,
quota_message_period=tenant.default_message_period,
quota_max_agents=tenant.default_max_agents,
quota_agent_ttl_hours=tenant.default_agent_ttl_hours,
)
db.add(new_user)
await db.flush()
# Create Participant for the new user record
db.add(Participant(
type="user",
ref_id=new_user.id,
display_name=new_user.display_name,
avatar_url=new_user.avatar_url,
))
await db.flush()
# Generate token scoped to the new user so frontend can switch context
access_token = create_access_token(str(new_user.id), new_user.role)
final_role = new_user.role
else:
# Registration flow: user has no tenant yet, assign directly
current_user.tenant_id = tenant.id
if current_user.role == "member":
current_user.role = assigned_role
# Inherit quota defaults from tenant
current_user.quota_message_limit = tenant.default_message_limit
current_user.quota_message_period = tenant.default_message_period
current_user.quota_max_agents = tenant.default_max_agents
current_user.quota_agent_ttl_hours = tenant.default_agent_ttl_hours
final_role = current_user.role
# Increment invitation code usage
code_obj.used_count += 1
await db.flush()
await db.commit()
return JoinResponse(
tenant=TenantOut.model_validate(tenant),
role=final_role,
access_token=access_token,
)
# ─── Registration Config ───────────────────────────────
@router.get("/registration-config")
async def get_registration_config(db: AsyncSession = Depends(get_db)):
"""Public — returns whether self-creation of companies is allowed."""
from app.models.system_settings import SystemSetting
result = await db.execute(
select(SystemSetting).where(SystemSetting.key == "allow_self_create_company")
)
s = result.scalar_one_or_none()
allowed = s.value.get("enabled", True) if s else True
return {"allow_self_create_company": allowed}
# ─── Public: Resolve Tenant by Domain ───────────────────
@router.get("/resolve-by-domain")
async def resolve_tenant_by_domain(
domain: str,
db: AsyncSession = Depends(get_db),
):
"""Resolve a tenant by its sso_domain or subdomain slug.
sso_domain is stored as a full URL (e.g. "https://acme.clawith.ai" or "http://1.2.3.4:3009").
The incoming `domain` parameter is the host (without protocol).
Lookup precedence:
1. Exact match on tenant.sso_domain ending with the host (strips protocol)
2. Extract slug from "{slug}.clawith.ai" and match tenant.slug
"""
tenant = None
# 1. Match by stripping protocol from stored sso_domain
# sso_domain = "https://acme.clawith.ai" → compare against "acme.clawith.ai"
for proto in ("https://", "http://"):
result = await db.execute(
select(Tenant).where(Tenant.sso_domain == f"{proto}{domain}")
)
tenant = result.scalar_one_or_none()
if tenant:
break
# 2. Try without port (e.g. domain = "1.2.3.4:3009" → try "1.2.3.4")
if not tenant and ":" in domain:
domain_no_port = domain.split(":")[0]
for proto in ("https://", "http://"):
result = await db.execute(
select(Tenant).where(Tenant.sso_domain.like(f"{proto}{domain_no_port}%"))
)
tenant = result.scalar_one_or_none()
if tenant:
break
# 3. Fallback: extract slug from subdomain pattern
if not tenant:
import re
m = re.match(r"^([a-z0-9][a-z0-9\-]*[a-z0-9])\.clawith\.ai$", domain.lower())
if m:
slug = m.group(1)
result = await db.execute(select(Tenant).where(Tenant.slug == slug))
tenant = result.scalar_one_or_none()
if not tenant or not tenant.is_active or not tenant.sso_enabled:
raise HTTPException(status_code=404, detail="Tenant not found or not active or SSO not enabled")
return {
"id": tenant.id,
"name": tenant.name,
"slug": tenant.slug,
"sso_enabled": tenant.sso_enabled,
"sso_domain": tenant.sso_domain,
"is_active": tenant.is_active,
}
# ─── Authenticated: List / Get ──────────────────────────
@router.get("/", response_model=list[TenantOut])
async def list_tenants(
current_user: User = Depends(require_role("platform_admin")),
db: AsyncSession = Depends(get_db),
):
"""List all tenants (platform_admin only)."""
result = await db.execute(select(Tenant).order_by(Tenant.created_at.desc()))
return [TenantOut.model_validate(t) for t in result.scalars().all()]
@router.get("/{tenant_id}", response_model=TenantOut)
async def get_tenant(
tenant_id: uuid.UUID,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Get tenant details. Platform admins can view any; org_admins only their own."""
if current_user.role not in ("platform_admin", "org_admin"):
raise HTTPException(status_code=403, detail="Admin access required")
if current_user.role == "org_admin" and str(current_user.tenant_id) != str(tenant_id):
raise HTTPException(status_code=403, detail="Access denied")
result = await db.execute(select(Tenant).where(Tenant.id == tenant_id))
tenant = result.scalar_one_or_none()
if not tenant:
raise HTTPException(status_code=404, detail="Tenant not found")
return TenantOut.model_validate(tenant)
@router.put("/{tenant_id}", response_model=TenantOut)
async def update_tenant(
tenant_id: uuid.UUID,
data: TenantUpdate,
current_user: User = Depends(require_role("org_admin", "platform_admin")),
db: AsyncSession = Depends(get_db),
):
"""Update tenant settings. Platform admins can update any; org_admins only their own."""
if current_user.role == "org_admin" and str(current_user.tenant_id) != str(tenant_id):
raise HTTPException(status_code=403, detail="Can only update your own company")
result = await db.execute(select(Tenant).where(Tenant.id == tenant_id))
tenant = result.scalar_one_or_none()
if not tenant:
raise HTTPException(status_code=404, detail="Tenant not found")
update_data = data.model_dump(exclude_unset=True)
# SSO configuration is managed exclusively by the company's own org_admin
# via the Enterprise Settings page. Platform admins should not override it here.
if current_user.role == "platform_admin":
update_data.pop("sso_enabled", None)
update_data.pop("sso_domain", None)
for field, value in update_data.items():
setattr(tenant, field, value)
await db.flush()
return TenantOut.model_validate(tenant)
@router.put("/{tenant_id}/assign-user/{user_id}")
async def assign_user_to_tenant(
tenant_id: uuid.UUID,
user_id: uuid.UUID,
role: str = "member",
current_user: User = Depends(require_role("platform_admin")),
db: AsyncSession = Depends(get_db),
):
"""Assign a user to a tenant with a specific role."""
# Verify tenant
t_result = await db.execute(select(Tenant).where(Tenant.id == tenant_id))
if not t_result.scalar_one_or_none():
raise HTTPException(status_code=404, detail="Tenant not found")
# Verify user
u_result = await db.execute(select(User).where(User.id == user_id))
user = u_result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
if role not in ("org_admin", "agent_admin", "member"):
raise HTTPException(status_code=400, detail="Invalid role")
user.tenant_id = tenant_id
user.role = role
await db.flush()
return {"status": "ok", "user_id": str(user_id), "tenant_id": str(tenant_id), "role": role}