145 lines
6.9 KiB
Python
145 lines
6.9 KiB
Python
"""Refactor user system to global Identities - Phase 2 (Consolidated & Idempotent)
|
|
|
|
Revision ID: 440261f5594f
|
|
Revises: add_agent_credentials
|
|
Create Date: 2026-03-30
|
|
"""
|
|
import uuid
|
|
from typing import Sequence, Union
|
|
|
|
from alembic import op
|
|
import sqlalchemy as sa
|
|
from sqlalchemy.dialects import postgresql
|
|
from sqlalchemy import inspect
|
|
|
|
# revision identifiers, used by Alembic.
|
|
revision: str = '440261f5594f'
|
|
down_revision: Union[str, None] = 'add_agent_credentials'
|
|
branch_labels: Union[str, Sequence[str], None] = None
|
|
depends_on: Union[str, Sequence[str], None] = None
|
|
|
|
|
|
def upgrade() -> None:
|
|
conn = op.get_bind()
|
|
inspector = inspect(conn)
|
|
tables = inspector.get_table_names()
|
|
|
|
# 1. Baseline: Add missing/intermediate columns to users (idempotent)
|
|
op.execute("ALTER TABLE users ADD COLUMN IF NOT EXISTS email_verified BOOLEAN NOT NULL DEFAULT True")
|
|
op.execute("ALTER TABLE users ADD COLUMN IF NOT EXISTS created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()")
|
|
|
|
# 2. Cleanup: Drop obsolete SSO columns
|
|
op.execute("ALTER TABLE users DROP COLUMN IF EXISTS feishu_user_id")
|
|
op.execute("ALTER TABLE users DROP COLUMN IF EXISTS wecom_user_id")
|
|
op.execute("ALTER TABLE users DROP COLUMN IF EXISTS dingtalk_user_id")
|
|
|
|
# 3. Create identities table if not exists
|
|
if 'identities' not in tables:
|
|
op.create_table(
|
|
'identities',
|
|
sa.Column('id', sa.UUID(), nullable=False),
|
|
sa.Column('email', sa.String(length=255), nullable=True),
|
|
sa.Column('phone', sa.String(length=50), nullable=True),
|
|
sa.Column('username', sa.String(length=100), nullable=True),
|
|
sa.Column('password_hash', sa.String(length=255), nullable=True),
|
|
sa.Column('is_active', sa.Boolean(), server_default='true', nullable=False),
|
|
sa.Column('is_platform_admin', sa.Boolean(), server_default='false', nullable=False),
|
|
sa.Column('email_verified', sa.Boolean(), server_default='false', nullable=False),
|
|
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False),
|
|
sa.Column('updated_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False),
|
|
sa.PrimaryKeyConstraint('id')
|
|
)
|
|
op.create_index(op.f('ix_identities_email'), 'identities', ['email'], unique=True)
|
|
op.create_index(op.f('ix_identities_phone'), 'identities', ['phone'], unique=True)
|
|
op.create_index(op.f('ix_identities_username'), 'identities', ['username'], unique=True)
|
|
|
|
# 4. Add identity_id to users if not exists
|
|
user_columns = [c['name'] for c in inspector.get_columns('users')]
|
|
if 'identity_id' not in user_columns:
|
|
op.add_column('users', sa.Column('identity_id', sa.UUID(), nullable=True))
|
|
op.create_index(op.f('ix_users_identity_id'), 'users', ['identity_id'], unique=False)
|
|
op.create_foreign_key('fk_users_identity_id', 'users', 'identities', ['identity_id'], ['id'])
|
|
|
|
# 5. Data migration (idempotent)
|
|
# Only migrate users that don't have an identity_id yet
|
|
result = conn.execute(sa.text("""
|
|
SELECT id, email, primary_mobile, username, password_hash, email_verified, is_active, role
|
|
FROM users
|
|
WHERE identity_id IS NULL
|
|
"""))
|
|
users_data = result.fetchall()
|
|
|
|
if users_data:
|
|
# Load existing identities to match against
|
|
ident_res = conn.execute(sa.text("SELECT id, email, phone, username FROM identities"))
|
|
existing_idents = ident_res.fetchall()
|
|
|
|
# Build map: (type, val) -> identity_id
|
|
identity_map = {}
|
|
for r in existing_idents:
|
|
if r[1]: identity_map[f"e:{r[1]}"] = r[0]
|
|
if r[2]: identity_map[f"p:{r[2]}"] = r[0]
|
|
if r[3]: identity_map[f"u:{r[3]}"] = r[0]
|
|
|
|
for row in users_data:
|
|
u_id, u_email, u_phone, u_username, u_pwd, u_email_verified, u_active, u_role = row
|
|
|
|
# Check if this person already has an identity
|
|
found_id = None
|
|
if u_email and f"e:{u_email}" in identity_map: found_id = identity_map[f"e:{u_email}"]
|
|
elif u_phone and f"p:{u_phone}" in identity_map: found_id = identity_map[f"p:{u_phone}"]
|
|
elif u_username and f"u:{u_username}" in identity_map: found_id = identity_map[f"u:{u_username}"]
|
|
|
|
if not found_id:
|
|
# Create new identity
|
|
found_id = str(uuid.uuid4())
|
|
is_platform_admin = (u_role == 'platform_admin')
|
|
|
|
conn.execute(sa.text("""
|
|
INSERT INTO identities (id, email, phone, username, password_hash, email_verified, is_active, is_platform_admin)
|
|
VALUES (:id, :email, :phone, :username, :password_hash, :email_verified, :is_active, :admin)
|
|
"""), {
|
|
"id": found_id,
|
|
"email": u_email,
|
|
"phone": u_phone,
|
|
"username": u_username,
|
|
"password_hash": u_pwd,
|
|
"email_verified": u_email_verified if u_email_verified is not None else False,
|
|
"is_active": u_active if u_active is not None else True,
|
|
"admin": is_platform_admin
|
|
})
|
|
# Update map to prevent duplicates in this loop
|
|
if u_email: identity_map[f"e:{u_email}"] = found_id
|
|
if u_phone: identity_map[f"p:{u_phone}"] = found_id
|
|
if u_username: identity_map[f"u:{u_username}"] = found_id
|
|
|
|
# Update user
|
|
conn.execute(sa.text("UPDATE users SET identity_id = :identity_id WHERE id = :user_id"), {
|
|
"identity_id": found_id,
|
|
"user_id": u_id
|
|
})
|
|
|
|
# 6. Cleanup: Make username/email nullable and DROP redundant columns
|
|
op.alter_column('users', 'username', existing_type=sa.String(length=100), nullable=True)
|
|
op.alter_column('users', 'email', existing_type=sa.String(length=255), nullable=True)
|
|
|
|
# Physically drop redundant columns
|
|
op.execute("ALTER TABLE users DROP COLUMN IF EXISTS username")
|
|
op.execute("ALTER TABLE users DROP COLUMN IF EXISTS email")
|
|
op.execute("ALTER TABLE users DROP COLUMN IF EXISTS password_hash")
|
|
op.execute("ALTER TABLE users DROP COLUMN IF EXISTS email_verified")
|
|
op.execute("ALTER TABLE users DROP COLUMN IF EXISTS primary_mobile")
|
|
op.execute("ALTER TABLE users DROP COLUMN IF EXISTS primary_email")
|
|
|
|
|
|
def downgrade() -> None:
|
|
# Cleanup identity linking
|
|
op.drop_constraint('fk_users_identity_id', 'users', type_='foreignkey')
|
|
op.drop_index(op.f('ix_users_identity_id'), table_name='users')
|
|
op.drop_column('users', 'identity_id')
|
|
|
|
op.drop_index(op.f('ix_identities_username'), table_name='identities')
|
|
op.drop_index(op.f('ix_identities_phone'), table_name='identities')
|
|
op.drop_index(op.f('ix_identities_email'), table_name='identities')
|
|
op.drop_table('identities')
|