Clawith/backend/alembic/versions/20260330_refactor_user_syst...

145 lines
6.9 KiB
Python

"""Refactor user system to global Identities - Phase 2 (Consolidated & Idempotent)
Revision ID: 440261f5594f
Revises: add_agent_credentials
Create Date: 2026-03-30
"""
import uuid
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
from sqlalchemy import inspect
# revision identifiers, used by Alembic.
revision: str = '440261f5594f'
down_revision: Union[str, None] = 'add_agent_credentials'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
conn = op.get_bind()
inspector = inspect(conn)
tables = inspector.get_table_names()
# 1. Baseline: Add missing/intermediate columns to users (idempotent)
op.execute("ALTER TABLE users ADD COLUMN IF NOT EXISTS email_verified BOOLEAN NOT NULL DEFAULT True")
op.execute("ALTER TABLE users ADD COLUMN IF NOT EXISTS created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()")
# 2. Cleanup: Drop obsolete SSO columns
op.execute("ALTER TABLE users DROP COLUMN IF EXISTS feishu_user_id")
op.execute("ALTER TABLE users DROP COLUMN IF EXISTS wecom_user_id")
op.execute("ALTER TABLE users DROP COLUMN IF EXISTS dingtalk_user_id")
# 3. Create identities table if not exists
if 'identities' not in tables:
op.create_table(
'identities',
sa.Column('id', sa.UUID(), nullable=False),
sa.Column('email', sa.String(length=255), nullable=True),
sa.Column('phone', sa.String(length=50), nullable=True),
sa.Column('username', sa.String(length=100), nullable=True),
sa.Column('password_hash', sa.String(length=255), nullable=True),
sa.Column('is_active', sa.Boolean(), server_default='true', nullable=False),
sa.Column('is_platform_admin', sa.Boolean(), server_default='false', nullable=False),
sa.Column('email_verified', sa.Boolean(), server_default='false', nullable=False),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False),
sa.Column('updated_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_identities_email'), 'identities', ['email'], unique=True)
op.create_index(op.f('ix_identities_phone'), 'identities', ['phone'], unique=True)
op.create_index(op.f('ix_identities_username'), 'identities', ['username'], unique=True)
# 4. Add identity_id to users if not exists
user_columns = [c['name'] for c in inspector.get_columns('users')]
if 'identity_id' not in user_columns:
op.add_column('users', sa.Column('identity_id', sa.UUID(), nullable=True))
op.create_index(op.f('ix_users_identity_id'), 'users', ['identity_id'], unique=False)
op.create_foreign_key('fk_users_identity_id', 'users', 'identities', ['identity_id'], ['id'])
# 5. Data migration (idempotent)
# Only migrate users that don't have an identity_id yet
result = conn.execute(sa.text("""
SELECT id, email, primary_mobile, username, password_hash, email_verified, is_active, role
FROM users
WHERE identity_id IS NULL
"""))
users_data = result.fetchall()
if users_data:
# Load existing identities to match against
ident_res = conn.execute(sa.text("SELECT id, email, phone, username FROM identities"))
existing_idents = ident_res.fetchall()
# Build map: (type, val) -> identity_id
identity_map = {}
for r in existing_idents:
if r[1]: identity_map[f"e:{r[1]}"] = r[0]
if r[2]: identity_map[f"p:{r[2]}"] = r[0]
if r[3]: identity_map[f"u:{r[3]}"] = r[0]
for row in users_data:
u_id, u_email, u_phone, u_username, u_pwd, u_email_verified, u_active, u_role = row
# Check if this person already has an identity
found_id = None
if u_email and f"e:{u_email}" in identity_map: found_id = identity_map[f"e:{u_email}"]
elif u_phone and f"p:{u_phone}" in identity_map: found_id = identity_map[f"p:{u_phone}"]
elif u_username and f"u:{u_username}" in identity_map: found_id = identity_map[f"u:{u_username}"]
if not found_id:
# Create new identity
found_id = str(uuid.uuid4())
is_platform_admin = (u_role == 'platform_admin')
conn.execute(sa.text("""
INSERT INTO identities (id, email, phone, username, password_hash, email_verified, is_active, is_platform_admin)
VALUES (:id, :email, :phone, :username, :password_hash, :email_verified, :is_active, :admin)
"""), {
"id": found_id,
"email": u_email,
"phone": u_phone,
"username": u_username,
"password_hash": u_pwd,
"email_verified": u_email_verified if u_email_verified is not None else False,
"is_active": u_active if u_active is not None else True,
"admin": is_platform_admin
})
# Update map to prevent duplicates in this loop
if u_email: identity_map[f"e:{u_email}"] = found_id
if u_phone: identity_map[f"p:{u_phone}"] = found_id
if u_username: identity_map[f"u:{u_username}"] = found_id
# Update user
conn.execute(sa.text("UPDATE users SET identity_id = :identity_id WHERE id = :user_id"), {
"identity_id": found_id,
"user_id": u_id
})
# 6. Cleanup: Make username/email nullable and DROP redundant columns
op.alter_column('users', 'username', existing_type=sa.String(length=100), nullable=True)
op.alter_column('users', 'email', existing_type=sa.String(length=255), nullable=True)
# Physically drop redundant columns
op.execute("ALTER TABLE users DROP COLUMN IF EXISTS username")
op.execute("ALTER TABLE users DROP COLUMN IF EXISTS email")
op.execute("ALTER TABLE users DROP COLUMN IF EXISTS password_hash")
op.execute("ALTER TABLE users DROP COLUMN IF EXISTS email_verified")
op.execute("ALTER TABLE users DROP COLUMN IF EXISTS primary_mobile")
op.execute("ALTER TABLE users DROP COLUMN IF EXISTS primary_email")
def downgrade() -> None:
# Cleanup identity linking
op.drop_constraint('fk_users_identity_id', 'users', type_='foreignkey')
op.drop_index(op.f('ix_users_identity_id'), table_name='users')
op.drop_column('users', 'identity_id')
op.drop_index(op.f('ix_identities_username'), table_name='identities')
op.drop_index(op.f('ix_identities_phone'), table_name='identities')
op.drop_index(op.f('ix_identities_email'), table_name='identities')
op.drop_table('identities')