Clawith/backend/app/scripts/cleanup_duplicate_feishu_us...

285 lines
12 KiB
Python

"""Migration script: Backfill feishu_user_id and clean up duplicate users.
This script:
1. Uses the org sync App credentials to resolve user_id for all users that only have open_id
2. Merges duplicate users (same display_name + feishu identity but different records)
3. Updates chat session conv_ids from feishu_p2p_{open_id} to feishu_p2p_{user_id}
Usage:
Docker: docker exec clawith-backend-1 python3 -m app.scripts.cleanup_duplicate_feishu_users
Source: cd backend && python3 -m app.scripts.cleanup_duplicate_feishu_users
"""
import asyncio
from loguru import logger
async def main():
# Import ALL models so SQLAlchemy can resolve all FK relationships
from app.models import ( # noqa: F401
activity_log, agent, audit, channel_config, chat_session,
gateway_message, invitation_code, llm, notification, org,
participant, plaza, schedule, skill, system_settings, task,
tenant, tenant_setting, tool, trigger, user,
)
from app.database import async_session
from app.models.user import User
from app.models.org import OrgMember
from app.services.auth_registry import auth_provider_registry
from app.models.chat_session import ChatSession
from app.models.audit import ChatMessage
from sqlalchemy import select, update, func
import httpx
async with async_session() as db:
# ── Step 0: Load org sync app credentials ──
provider = await auth_provider_registry.get_provider(db, "feishu")
if not provider:
logger.warning("No feishu identity provider configured. Cannot resolve user_ids. Skipping backfill.")
logger.info("You can still run Sync Now from the UI after configuring feishu identity provider.")
return
conf = provider.config or {}
app_id = conf.get("app_id") or conf.get("client_id")
app_secret = conf.get("app_secret") or conf.get("client_secret")
if not app_id or not app_secret:
logger.warning("Feishu identity provider missing app_id/app_secret. Skipping backfill.")
return
# Get app token
async with httpx.AsyncClient() as client:
tok_resp = await client.post(
"https://open.feishu.cn/open-apis/auth/v3/app_access_token/internal",
json={"app_id": app_id, "app_secret": app_secret},
)
app_token = tok_resp.json().get("app_access_token", "")
if not app_token:
logger.error("Failed to get app token. Check org sync App credentials.")
return
# ── Step 1: Backfill user_id for Users ──
logger.info("=== Step 1: Backfill feishu_user_id for Users ===")
logger.info("Skipped: User.open_id/union_id removed; use OrgMember backfill instead.")
# ── Step 2: Backfill user_id for OrgMembers ──
logger.info("=== Step 2: Backfill feishu_user_id for OrgMembers ===")
r = await db.execute(
select(OrgMember).where(
OrgMember.open_id.isnot(None),
(OrgMember.external_id.is_(None)) | (OrgMember.external_id == ""),
)
)
members_to_fill = r.scalars().all()
logger.info(f"Found {len(members_to_fill)} org members needing user_id backfill")
member_filled = 0
for member in members_to_fill:
try:
async with httpx.AsyncClient() as client:
resp = await client.get(
f"https://open.feishu.cn/open-apis/contact/v3/users/{member.open_id}",
params={"user_id_type": "open_id"},
headers={"Authorization": f"Bearer {app_token}"},
)
data = resp.json()
if data.get("code") == 0:
user_id = data.get("data", {}).get("user", {}).get("user_id", "")
if user_id:
member.external_id = user_id
member_filled += 1
else:
logger.warning(f" Cannot resolve OrgMember {member.name} (code={data.get('code')})")
except Exception as e:
logger.error(f" Error resolving OrgMember {member.name}: {e}")
await db.commit()
logger.info(f"Backfilled user_id for {member_filled}/{len(members_to_fill)} org members")
# ── Step 2.5: Merge duplicate OrgMembers ──
logger.info("=== Step 2.5: Merge duplicate OrgMembers ===")
from app.models.org import AgentRelationship
r = await db.execute(
select(OrgMember.name, OrgMember.tenant_id, func.count(OrgMember.id).label("cnt"))
.where(OrgMember.name.isnot(None), OrgMember.name != "")
.group_by(OrgMember.name, OrgMember.tenant_id)
.having(func.count(OrgMember.id) > 1)
)
om_dup_groups = r.all()
om_merge_count = 0
logger.info(f"Found {len(om_dup_groups)} groups of duplicate OrgMembers")
for name, tid, cnt in om_dup_groups:
q = select(OrgMember).where(OrgMember.name == name)
if tid:
q = q.where(OrgMember.tenant_id == tid)
else:
q = q.where(OrgMember.tenant_id.is_(None))
q = q.order_by(OrgMember.synced_at.desc()) # Keep the most recently synced
r2 = await db.execute(q)
dups = r2.scalars().all()
if len(dups) <= 1:
continue
# Pick best: prefer has user_id > has open_id > most recent
def om_score(m):
s = 0
if m.external_id:
s += 10
if m.open_id:
s += 1
return s
dups_sorted = sorted(dups, key=lambda m: (-om_score(m), m.synced_at))
primary = dups_sorted[0]
to_merge = dups_sorted[1:]
logger.info(f" Merging {cnt} OrgMembers named '{name}', keeping id={primary.id}")
for dup in to_merge:
# Migrate agent_relationships FK
await db.execute(
update(AgentRelationship)
.where(AgentRelationship.member_id == dup.id)
.values(member_id=primary.id)
)
# Transfer missing identity fields
if dup.external_id and not primary.external_id:
primary.external_id = dup.external_id
if dup.email and primary.email != dup.email and dup.email:
if not primary.email:
primary.email = dup.email
# Clear unique field before delete
dup.open_id = None
await db.flush()
await db.delete(dup)
om_merge_count += 1
try:
await db.commit()
except Exception as e:
logger.error(f" Failed to commit OrgMember merge for '{name}': {e}")
await db.rollback()
logger.info(f"Merged {om_merge_count} duplicate OrgMembers")
# ── Step 3: Merge duplicate users ──
logger.info("=== Step 3: Merge duplicate users ===")
# Find duplicate display_names within the same tenant
# These are likely the same person created multiple times from different apps
from sqlalchemy import or_, and_, cast, String as SAString
r = await db.execute(
select(User.display_name, User.tenant_id, func.count(User.id).label("cnt"))
.where(User.display_name.isnot(None), User.display_name != "")
.group_by(User.display_name, User.tenant_id)
.having(func.count(User.id) > 1)
)
dup_groups = r.all()
merge_count = 0
logger.info(f"Found {len(dup_groups)} groups of duplicate display_names")
for name, tid, cnt in dup_groups:
q = select(User).where(User.display_name == name)
if tid:
q = q.where(User.tenant_id == tid)
else:
q = q.where(User.tenant_id.is_(None))
q = q.order_by(User.created_at.asc())
r2 = await db.execute(q)
dups = r2.scalars().all()
if len(dups) <= 1:
continue
# Pick the best record as primary:
# Priority: has real email > has feishu_user_id > oldest
def score(u):
s = 0
if u.email and "@" in u.email and not u.email.endswith("@feishu.local"):
s += 100 # Real email = likely registered user
if u.feishu_user_id:
s += 10
return s
dups_sorted = sorted(dups, key=lambda u: (-score(u), u.created_at))
primary = dups_sorted[0]
to_merge = dups_sorted[1:]
logger.info(f" Merging {cnt} users named '{name}', keeping {primary.username} (email={primary.email})")
for dup in to_merge:
# Migrate chat messages
await db.execute(
update(ChatMessage)
.where(ChatMessage.user_id == dup.id)
.values(user_id=primary.id)
)
# Migrate chat sessions
await db.execute(
update(ChatSession)
.where(ChatSession.user_id == dup.id)
.values(user_id=primary.id)
)
# Transfer missing identity fields to primary
if dup.email and "@" in dup.email and not dup.email.endswith("@feishu.local"):
if not primary.email or primary.email.endswith("@feishu.local"):
primary.email = dup.email
if dup.feishu_user_id and not primary.feishu_user_id:
primary.feishu_user_id = dup.feishu_user_id
# Clear identity fields on duplicate before delete to avoid constraint violations
dup.email = f"deleted_{dup.id}@deleted.local"
dup.username = f"deleted_{dup.id}"
await db.flush()
# Now safe to delete
await db.delete(dup)
merge_count += 1
logger.info(f" Merged {dup.display_name} ({dup.id}) into {primary.username}")
# Commit after each group to isolate errors
try:
await db.commit()
except Exception as e:
logger.error(f" Failed to commit merge for '{name}': {e}")
await db.rollback()
logger.info(f"Merged {merge_count} duplicate users")
# ── Step 4: Update conv_ids ──
logger.info("=== Step 4: Update session conv_ids ===")
# Find sessions with old-style feishu_p2p_{open_id} conv_ids
r = await db.execute(
select(ChatSession).where(ChatSession.external_conv_id.like("feishu_p2p_%"))
)
sessions = r.scalars().all()
updated_sessions = 0
for sess in sessions:
old_conv = sess.external_conv_id
# Extract the ID part
old_id = old_conv.replace("feishu_p2p_", "")
# Check if the old_id looks like an open_id (starts with "ou_")
if old_id.startswith("ou_"):
# Look up the user to find their user_id
om_r = await db.execute(
select(OrgMember).where(OrgMember.open_id == old_id)
)
om = om_r.scalar_one_or_none()
if om and om.external_id:
new_conv = f"feishu_p2p_{om.external_id}"
sess.external_conv_id = new_conv
updated_sessions += 1
logger.info(f" Updated session conv_id: {old_conv} -> {new_conv}")
await db.commit()
logger.info(f"Updated {updated_sessions}/{len(sessions)} session conv_ids")
logger.info("=== Migration complete ===")
if __name__ == "__main__":
asyncio.run(main())