Clawith/backend/app/services/registration_service.py

528 lines
18 KiB
Python

"""Registration service for user account creation with SSO support.
This module handles user registration including:
- Email domain-based tenant detection
- SSO-based registration flow
- Duplicate identity detection
"""
import re
import uuid
from datetime import datetime
from typing import Any
from sqlalchemy import select, or_, and_
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import get_settings
from app.core.security import hash_password
from app.models.identity import IdentityProvider
from app.models.tenant import Tenant
from app.models.user import User, Identity
from app.services.sso_service import sso_service
from loguru import logger
class RegistrationService:
"""Service for handling user registration flows."""
async def detect_tenant_by_email(self, db: AsyncSession, email: str) -> Tenant | None:
"""Detect tenant based on email domain.
Args:
db: Database session
email: User email address
Returns:
Tenant if found by domain match, None otherwise
"""
if not email or "@" not in email:
return None
domain = email.split("@")[1].lower()
# Try to find tenant by custom domain
result = await db.execute(
select(Tenant).where(
Tenant.sso_domain.ilike(f"%{domain}%"),
Tenant.is_active == True,
)
)
return result.scalar_one_or_none()
async def check_duplicate_identity(
self,
db: AsyncSession,
email: str | None = None,
mobile: str | None = None,
) -> dict[str, Any]:
"""Check for existing identities or tenant-users that might conflict.
Args:
db: Database session
email: Email address
mobile: Mobile phone
username: Username
tenant_id: Optional tenant to scope the search (for tenant-user conflicts)
Returns:
Dict with conflict information
"""
conflicts = []
# 1. Check Global Identity Conflicts
if email:
ident_result = await db.execute(select(Identity).where(Identity.email == email))
if ident_result.scalar_one_or_none():
conflicts.append({
"type": "email",
"scope": "global",
"message": "Email already registered",
})
if mobile:
normalized_mobile = re.sub(r"[\s\-\+]", "", mobile)
ident_result = await db.execute(select(Identity).where(Identity.phone == normalized_mobile))
if ident_result.scalar_one_or_none():
conflicts.append({
"type": "mobile",
"scope": "global",
"message": "Mobile already registered",
})
return {
"has_conflict": len(conflicts) > 0,
"conflicts": conflicts,
}
async def find_or_create_identity(
self,
db: AsyncSession,
email: str | None = None,
phone: str | None = None,
username: str | None = None,
password: str | None = None,
is_platform_admin: bool = False,
) -> Identity:
"""Find an existing identity or create a new one.
Security note: only email and phone are authoritative identity claims.
Username is NOT used as a lookup key — it is just a display name and
cannot prove ownership. Using it as a fallback would allow account
takeover when two users share the same email prefix (e.g. alice@gmail.com
and alice@yahoo.com both produce username 'alice').
"""
identity = None
# Match by email (primary ownership claim)
if email:
res = await db.execute(select(Identity).where(Identity.email == email))
identity = res.scalar_one_or_none()
# Match by phone (secondary ownership claim)
if not identity and phone:
normalized_phone = re.sub(r"[\s\-\+]", "", phone)
res = await db.execute(select(Identity).where(Identity.phone == normalized_phone))
identity = res.scalar_one_or_none()
# Username is intentionally NOT used as a lookup key.
# If we cannot establish ownership via email or phone, treat this as a
# new identity to avoid returning another user's record.
if identity:
# Auto-verify if SMTP is not configured anywhere (env or DB)
from app.services.system_email_service import resolve_email_config_async
email_config = await resolve_email_config_async(db)
if not email_config:
if not identity.email_verified:
identity.email_verified = True
db.add(identity)
return identity
# Check if SMTP is configured anywhere (env or DB) for auto-verification
from app.services.system_email_service import resolve_email_config_async
email_config = await resolve_email_config_async(db)
is_verified = not email_config # Auto-verify only if no SMTP configured
# Resolve a safe username: if the desired username is already taken by
# another identity, append a short random hex suffix to avoid collisions
# without blocking the registration.
final_username = username
if username:
existing_res = await db.execute(
select(Identity).where(Identity.username == username)
)
if existing_res.scalar_one_or_none():
final_username = f"{username}_{uuid.uuid4().hex[:6]}"
logger.info(
"Username '%s' already taken; assigned '%s' to new identity",
username,
final_username,
)
# Create new identity
normalized_phone = re.sub(r"[\s\-\+]", "", phone) if phone else None
identity = Identity(
email=email,
phone=normalized_phone,
username=final_username,
password_hash=hash_password(password) if password else None,
is_platform_admin=is_platform_admin,
email_verified=is_verified,
)
db.add(identity)
await db.flush()
return identity
async def create_user_with_identity(
self,
db: AsyncSession,
identity: Identity,
display_name: str | None = None,
role: str = "member",
tenant_id: uuid.UUID | None = None,
registration_source: str = "web",
) -> User:
"""Create a new tenant-specific user linked to an identity.
Args:
db: Database session
identity: The global identity
display_name: Tenant-specific display name
role: Role within the tenant
tenant_id: Tenant ID
registration_source: Source of registration
Returns:
Created User (tenant-user)
"""
# Ensure unique display name / username within tenant if needed
# (Using display_name or identity info)
name = display_name or identity.username or "User"
# Check if SMTP is configured anywhere (env or DB) for auto-activation
from app.services.system_email_service import resolve_email_config_async
email_config = await resolve_email_config_async(db)
is_active = identity.email_verified
if not email_config:
is_active = True # Auto-activate if no SMTP configured
# Create tenant-user record
user = User(
identity_id=identity.id,
tenant_id=tenant_id,
display_name=name,
role=role,
registration_source=registration_source,
is_active=is_active or identity.is_platform_admin,
)
db.add(user)
await db.flush()
# Link to OrgMember if exists
await self.bind_org_member(db, user)
# Create Participant record
from app.models.participant import Participant
db.add(Participant(
type="user",
ref_id=user.id,
display_name=user.display_name,
avatar_url=user.avatar_url,
))
await db.flush()
return user
async def handle_sso_registration(
self,
db: AsyncSession,
provider_type: str,
provider_user_id: str,
user_info: dict,
existing_user: User | None = None,
) -> tuple[User, bool]:
"""Handle SSO-based registration flow.
If existing_user is provided, links the identity to that user.
Otherwise, creates a new user or returns existing one.
Args:
db: Database session
provider_type: Provider type (feishu, dingtalk, etc.)
provider_user_id: User ID in external system
user_info: User info from provider
existing_user: Optional existing user to link to
Returns:
Tuple of (user, is_new)
"""
# Try to detect tenant from email
email = user_info.get("email", "")
tenant = None
tenant_id = None
if email:
tenant = await self.detect_tenant_by_email(db, email)
tenant_id = tenant.id if tenant else None
# Check if identity already exists
existing = await sso_service.resolve_user_identity(db, provider_user_id, provider_type, tenant_id=tenant_id)
if existing:
# Identity already linked
return existing, False
if existing_user:
# Link to existing user
await sso_service.link_identity(
db,
str(existing_user.id),
provider_type,
provider_user_id,
user_info,
tenant_id=str(existing_user.tenant_id) if existing_user.tenant_id else tenant_id,
)
return existing_user, False
# (moved up)
pass
# Step 2: Ensure Identity exists
# Generate username from email or provider ID (fallback to open_id)
effective_id = provider_user_id or user_info.get("open_id") or user_info.get("union_id") or uuid.uuid4().hex[:8]
username = email.split("@")[0] if email else f"{provider_type}_{effective_id[:8]}"
identity = await self.find_or_create_identity(
db,
email=email,
phone=user_info.get("mobile") or user_info.get("phone"),
username=username,
password=effective_id, # Placeholder for SSO users
)
# Step 3: Create User linked to Identity
user = await self.create_user_with_identity(
db,
identity=identity,
display_name=user_info.get("name", username),
registration_source=provider_type,
tenant_id=tenant_id,
)
return user, True
async def register_with_sso(
self,
db: AsyncSession,
provider_type: str,
code: str,
auth_provider,
) -> tuple[User, bool, str | None]:
"""Register or login user via SSO.
Args:
db: Database session
provider_type: Provider type
code: OAuth authorization code
auth_provider: Auth provider instance
Returns:
Tuple of (user, is_new, error_message)
"""
try:
# Exchange code for token
token_data = await auth_provider.exchange_code_for_token(code)
access_token = token_data.get("access_token")
if not access_token:
return None, False, "Failed to get access token from provider"
# Get user info
from app.services.auth_provider import ExternalUserInfo
user_info_obj = await auth_provider.get_user_info(access_token)
# Convert to dict
user_info = {
"name": user_info_obj.name,
"email": user_info_obj.email,
"avatar_url": user_info_obj.avatar_url,
"mobile": user_info_obj.mobile,
"raw_data": user_info_obj.raw_data,
}
# Try to detect tenant from email
email_addr = user_info_obj.email
tenant_id = None
if email_addr:
tenant = await self.detect_tenant_by_email(db, email_addr)
tenant_id = tenant.id if tenant else None
# Try to find existing user by identity
existing_user = await sso_service.resolve_user_identity(
db, user_info_obj.provider_user_id, provider_type, tenant_id=tenant_id
)
if existing_user:
# Update last login
return existing_user, False, None
# Also try matching by email
if user_info_obj.email:
existing_by_email = await sso_service.match_user_by_email(db, user_info_obj.email)
if existing_by_email:
# Link identity to existing user
await sso_service.link_identity(
db,
str(existing_by_email.id),
provider_type,
user_info_obj.provider_user_id,
user_info,
tenant_id=str(existing_by_email.tenant_id) if existing_by_email.tenant_id else tenant_id,
)
return existing_by_email, False, None
# Create new user
user, is_new = await self.handle_sso_registration(
db,
provider_type,
user_info_obj.provider_user_id,
user_info,
)
# Bind to OrgMember via email/phone if possible
await self.bind_org_member(db, user)
return user, is_new, None
except Exception as e:
logger.exception("SSO registration failed for %s provider", provider_type)
return None, False, f"SSO registration failed: {str(e)}"
async def get_tenant_for_registration(
self, db: AsyncSession, email: str | None = None, invitation_code: str | None = None
) -> tuple[Tenant | None, str]:
"""Determine tenant for new user registration.
Args:
db: Database session
email: User email (for domain matching)
invitation_code: Invitation code (for tenant association)
Returns:
Tuple of (tenant, error_message)
"""
# First check invitation code
if invitation_code:
from app.models.invitation_code import InvitationCode
result = await db.execute(
select(InvitationCode).where(
InvitationCode.code == invitation_code,
InvitationCode.is_active == True,
InvitationCode.tenant_id.is_not(None),
)
)
inv = result.scalar_one_or_none()
if inv and inv.used_count < inv.max_uses:
# Get tenant from invitation
tenant_result = await db.execute(select(Tenant).where(Tenant.id == inv.tenant_id))
tenant = tenant_result.scalar_one_or_none()
if tenant and tenant.is_active:
return tenant, None
return None, "Invitation code tenant is inactive"
# Try email domain matching
if email:
tenant = await self.detect_tenant_by_email(db, email)
if tenant:
return tenant, None
# No tenant association - user will need to create/join
return None, None
async def bind_org_member(self, db: AsyncSession, user: User) -> None:
"""Find and bind OrgMember to User based on email/phone and tenant_id.
This establishes the link between a platform user and their entry in the
synchronized organizational structure.
"""
if not user.tenant_id:
return
from app.models.org import OrgMember
member = None
# Prefer email match
if user.email:
result = await db.execute(
select(OrgMember).where(
OrgMember.email == user.email,
OrgMember.tenant_id == user.tenant_id,
OrgMember.user_id == None
)
)
member = result.scalar_one_or_none()
# Fallback to phone match
if not member and user.primary_mobile:
result = await db.execute(
select(OrgMember).where(
OrgMember.phone == user.primary_mobile,
OrgMember.tenant_id == user.tenant_id,
OrgMember.user_id == None
)
)
member = result.scalar_one_or_none()
if member:
member.user_id = user.id
# Sync email/phone both ways (prefer user if provided)
if user.email and member.email != user.email:
member.email = user.email
elif not user.email and member.email:
user.email = member.email
if user.primary_mobile and member.phone != user.primary_mobile:
member.phone = user.primary_mobile
elif not user.primary_mobile and member.phone:
user.primary_mobile = member.phone
await db.flush()
async def sync_org_member_contact_from_user(
self,
db: AsyncSession,
user: User,
*,
sync_email: bool = False,
sync_phone: bool = False,
) -> None:
"""Sync email/phone from User to linked OrgMember (user is source of truth)."""
if not user.tenant_id or not (sync_email or sync_phone):
return
from app.models.org import OrgMember
result = await db.execute(
select(OrgMember).where(
OrgMember.user_id == user.id,
OrgMember.tenant_id == user.tenant_id,
)
)
member = result.scalar_one_or_none()
if not member:
return
if sync_email and member.email != user.email:
member.email = user.email
if sync_phone and member.phone != user.primary_mobile:
member.phone = user.primary_mobile
await db.flush()
# Global registration service
registration_service = RegistrationService()