"""Migration script: Backfill feishu_user_id and clean up duplicate users. This script: 1. Uses the org sync App credentials to resolve user_id for all users that only have open_id 2. Merges duplicate users (same display_name + feishu identity but different records) 3. Updates chat session conv_ids from feishu_p2p_{open_id} to feishu_p2p_{user_id} Usage: Docker: docker exec clawith-backend-1 python3 -m app.scripts.cleanup_duplicate_feishu_users Source: cd backend && python3 -m app.scripts.cleanup_duplicate_feishu_users """ import asyncio from loguru import logger async def main(): # Import ALL models so SQLAlchemy can resolve all FK relationships from app.models import ( # noqa: F401 activity_log, agent, audit, channel_config, chat_session, gateway_message, invitation_code, llm, notification, org, participant, plaza, schedule, skill, system_settings, task, tenant, tenant_setting, tool, trigger, user, ) from app.database import async_session from app.models.user import User from app.models.org import OrgMember from app.services.auth_registry import auth_provider_registry from app.models.chat_session import ChatSession from app.models.audit import ChatMessage from sqlalchemy import select, update, func import httpx async with async_session() as db: # ── Step 0: Load org sync app credentials ── provider = await auth_provider_registry.get_provider(db, "feishu") if not provider: logger.warning("No feishu identity provider configured. Cannot resolve user_ids. Skipping backfill.") logger.info("You can still run Sync Now from the UI after configuring feishu identity provider.") return conf = provider.config or {} app_id = conf.get("app_id") or conf.get("client_id") app_secret = conf.get("app_secret") or conf.get("client_secret") if not app_id or not app_secret: logger.warning("Feishu identity provider missing app_id/app_secret. Skipping backfill.") return # Get app token async with httpx.AsyncClient() as client: tok_resp = await client.post( "https://open.feishu.cn/open-apis/auth/v3/app_access_token/internal", json={"app_id": app_id, "app_secret": app_secret}, ) app_token = tok_resp.json().get("app_access_token", "") if not app_token: logger.error("Failed to get app token. Check org sync App credentials.") return # ── Step 1: Backfill user_id for Users ── logger.info("=== Step 1: Backfill feishu_user_id for Users ===") logger.info("Skipped: User.open_id/union_id removed; use OrgMember backfill instead.") # ── Step 2: Backfill user_id for OrgMembers ── logger.info("=== Step 2: Backfill feishu_user_id for OrgMembers ===") r = await db.execute( select(OrgMember).where( OrgMember.open_id.isnot(None), (OrgMember.external_id.is_(None)) | (OrgMember.external_id == ""), ) ) members_to_fill = r.scalars().all() logger.info(f"Found {len(members_to_fill)} org members needing user_id backfill") member_filled = 0 for member in members_to_fill: try: async with httpx.AsyncClient() as client: resp = await client.get( f"https://open.feishu.cn/open-apis/contact/v3/users/{member.open_id}", params={"user_id_type": "open_id"}, headers={"Authorization": f"Bearer {app_token}"}, ) data = resp.json() if data.get("code") == 0: user_id = data.get("data", {}).get("user", {}).get("user_id", "") if user_id: member.external_id = user_id member_filled += 1 else: logger.warning(f" Cannot resolve OrgMember {member.name} (code={data.get('code')})") except Exception as e: logger.error(f" Error resolving OrgMember {member.name}: {e}") await db.commit() logger.info(f"Backfilled user_id for {member_filled}/{len(members_to_fill)} org members") # ── Step 2.5: Merge duplicate OrgMembers ── logger.info("=== Step 2.5: Merge duplicate OrgMembers ===") from app.models.org import AgentRelationship r = await db.execute( select(OrgMember.name, OrgMember.tenant_id, func.count(OrgMember.id).label("cnt")) .where(OrgMember.name.isnot(None), OrgMember.name != "") .group_by(OrgMember.name, OrgMember.tenant_id) .having(func.count(OrgMember.id) > 1) ) om_dup_groups = r.all() om_merge_count = 0 logger.info(f"Found {len(om_dup_groups)} groups of duplicate OrgMembers") for name, tid, cnt in om_dup_groups: q = select(OrgMember).where(OrgMember.name == name) if tid: q = q.where(OrgMember.tenant_id == tid) else: q = q.where(OrgMember.tenant_id.is_(None)) q = q.order_by(OrgMember.synced_at.desc()) # Keep the most recently synced r2 = await db.execute(q) dups = r2.scalars().all() if len(dups) <= 1: continue # Pick best: prefer has user_id > has open_id > most recent def om_score(m): s = 0 if m.external_id: s += 10 if m.open_id: s += 1 return s dups_sorted = sorted(dups, key=lambda m: (-om_score(m), m.synced_at)) primary = dups_sorted[0] to_merge = dups_sorted[1:] logger.info(f" Merging {cnt} OrgMembers named '{name}', keeping id={primary.id}") for dup in to_merge: # Migrate agent_relationships FK await db.execute( update(AgentRelationship) .where(AgentRelationship.member_id == dup.id) .values(member_id=primary.id) ) # Transfer missing identity fields if dup.external_id and not primary.external_id: primary.external_id = dup.external_id if dup.email and primary.email != dup.email and dup.email: if not primary.email: primary.email = dup.email # Clear unique field before delete dup.open_id = None await db.flush() await db.delete(dup) om_merge_count += 1 try: await db.commit() except Exception as e: logger.error(f" Failed to commit OrgMember merge for '{name}': {e}") await db.rollback() logger.info(f"Merged {om_merge_count} duplicate OrgMembers") # ── Step 3: Merge duplicate users ── logger.info("=== Step 3: Merge duplicate users ===") # Find duplicate display_names within the same tenant # These are likely the same person created multiple times from different apps from sqlalchemy import or_, and_, cast, String as SAString r = await db.execute( select(User.display_name, User.tenant_id, func.count(User.id).label("cnt")) .where(User.display_name.isnot(None), User.display_name != "") .group_by(User.display_name, User.tenant_id) .having(func.count(User.id) > 1) ) dup_groups = r.all() merge_count = 0 logger.info(f"Found {len(dup_groups)} groups of duplicate display_names") for name, tid, cnt in dup_groups: q = select(User).where(User.display_name == name) if tid: q = q.where(User.tenant_id == tid) else: q = q.where(User.tenant_id.is_(None)) q = q.order_by(User.created_at.asc()) r2 = await db.execute(q) dups = r2.scalars().all() if len(dups) <= 1: continue # Pick the best record as primary: # Priority: has real email > has feishu_user_id > oldest def score(u): s = 0 if u.email and "@" in u.email and not u.email.endswith("@feishu.local"): s += 100 # Real email = likely registered user if u.feishu_user_id: s += 10 return s dups_sorted = sorted(dups, key=lambda u: (-score(u), u.created_at)) primary = dups_sorted[0] to_merge = dups_sorted[1:] logger.info(f" Merging {cnt} users named '{name}', keeping {primary.username} (email={primary.email})") for dup in to_merge: # Migrate chat messages await db.execute( update(ChatMessage) .where(ChatMessage.user_id == dup.id) .values(user_id=primary.id) ) # Migrate chat sessions await db.execute( update(ChatSession) .where(ChatSession.user_id == dup.id) .values(user_id=primary.id) ) # Transfer missing identity fields to primary if dup.email and "@" in dup.email and not dup.email.endswith("@feishu.local"): if not primary.email or primary.email.endswith("@feishu.local"): primary.email = dup.email if dup.feishu_user_id and not primary.feishu_user_id: primary.feishu_user_id = dup.feishu_user_id # Clear identity fields on duplicate before delete to avoid constraint violations dup.email = f"deleted_{dup.id}@deleted.local" dup.username = f"deleted_{dup.id}" await db.flush() # Now safe to delete await db.delete(dup) merge_count += 1 logger.info(f" Merged {dup.display_name} ({dup.id}) into {primary.username}") # Commit after each group to isolate errors try: await db.commit() except Exception as e: logger.error(f" Failed to commit merge for '{name}': {e}") await db.rollback() logger.info(f"Merged {merge_count} duplicate users") # ── Step 4: Update conv_ids ── logger.info("=== Step 4: Update session conv_ids ===") # Find sessions with old-style feishu_p2p_{open_id} conv_ids r = await db.execute( select(ChatSession).where(ChatSession.external_conv_id.like("feishu_p2p_%")) ) sessions = r.scalars().all() updated_sessions = 0 for sess in sessions: old_conv = sess.external_conv_id # Extract the ID part old_id = old_conv.replace("feishu_p2p_", "") # Check if the old_id looks like an open_id (starts with "ou_") if old_id.startswith("ou_"): # Look up the user to find their user_id om_r = await db.execute( select(OrgMember).where(OrgMember.open_id == old_id) ) om = om_r.scalar_one_or_none() if om and om.external_id: new_conv = f"feishu_p2p_{om.external_id}" sess.external_conv_id = new_conv updated_sessions += 1 logger.info(f" Updated session conv_id: {old_conv} -> {new_conv}") await db.commit() logger.info(f"Updated {updated_sessions}/{len(sessions)} session conv_ids") logger.info("=== Migration complete ===") if __name__ == "__main__": asyncio.run(main())