1167 lines
48 KiB
Python
1167 lines
48 KiB
Python
"""Generic organization sync adapter framework.
|
|
|
|
This module provides a base class for syncing org structure (departments/members)
|
|
from various identity providers (Feishu, DingTalk, WeCom, etc.).
|
|
"""
|
|
|
|
import asyncio
|
|
import uuid
|
|
from abc import ABC, abstractmethod
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timedelta
|
|
from typing import Any
|
|
from sqlalchemy import DateTime, ForeignKey, Integer, String, Text, delete, func, or_, select, update
|
|
|
|
import httpx
|
|
from loguru import logger
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.models.identity import IdentityProvider
|
|
from app.models.org import OrgDepartment, OrgMember
|
|
from app.models.user import User, Identity
|
|
from pypinyin import pinyin, lazy_pinyin, Style
|
|
from anyascii import anyascii as _anyascii
|
|
|
|
from app.core.security import hash_password
|
|
|
|
|
|
def _normalize_contact(value: str | None) -> str | None:
|
|
if value is None:
|
|
return None
|
|
value = value.strip()
|
|
return value or None
|
|
|
|
|
|
@dataclass
|
|
class ExternalDepartment:
|
|
"""Standardized department info from external providers."""
|
|
|
|
external_id: str
|
|
name: str
|
|
parent_external_id: str | None = None
|
|
member_count: int = 0
|
|
raw_data: dict = field(default_factory=dict)
|
|
|
|
|
|
@dataclass
|
|
class ExternalUser:
|
|
"""Standardized user info from external providers."""
|
|
|
|
external_id: str # The unique, platform-stable ID (e.g., userid)
|
|
name: str
|
|
open_id: str = "" # OAuth open_id
|
|
unionid: str = "" # Union ID for cross-app identification
|
|
email: str = ""
|
|
avatar_url: str = ""
|
|
title: str = ""
|
|
department_external_id: str = ""
|
|
department_path: str = ""
|
|
department_ids: list[str] = field(default_factory=list) # List of dept IDs from provider
|
|
mobile: str = ""
|
|
status: str = "active"
|
|
raw_data: dict = field(default_factory=dict)
|
|
|
|
|
|
class BaseOrgSyncAdapter(ABC):
|
|
"""Abstract base class for organization sync adapters."""
|
|
|
|
provider_type: str = ""
|
|
|
|
def __init__(
|
|
self,
|
|
provider: IdentityProvider | None = None,
|
|
config: dict | None = None,
|
|
tenant_id: uuid.UUID | None = None,
|
|
):
|
|
"""Initialize adapter with provider config.
|
|
|
|
Args:
|
|
provider: IdentityProvider model from database
|
|
config: Configuration dict (fallback if no provider record)
|
|
tenant_id: Tenant ID for org sync
|
|
"""
|
|
self.provider = provider
|
|
self.config = config or {}
|
|
self.tenant_id = tenant_id
|
|
self._client: httpx.AsyncClient | None = None
|
|
|
|
if provider and provider.config:
|
|
self.config = provider.config
|
|
|
|
@property
|
|
@abstractmethod
|
|
def api_base_url(self) -> str:
|
|
"""Base URL for provider API."""
|
|
pass
|
|
|
|
@abstractmethod
|
|
async def get_access_token(self) -> str:
|
|
"""Get valid access token for API calls."""
|
|
pass
|
|
|
|
@abstractmethod
|
|
async def fetch_departments(self) -> list[ExternalDepartment]:
|
|
"""Fetch all departments from provider.
|
|
|
|
Returns:
|
|
List of ExternalDepartment
|
|
"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
async def fetch_users(self, department_external_id: str) -> list[ExternalUser]:
|
|
"""Fetch users in a department.
|
|
|
|
Args:
|
|
department_external_id: External department ID
|
|
|
|
Returns:
|
|
List of ExternalUser
|
|
"""
|
|
pass
|
|
|
|
async def sync_org_structure(self, db: AsyncSession) -> dict[str, Any]:
|
|
"""Main sync function - syncs departments and members.
|
|
|
|
Args:
|
|
db: Database session
|
|
|
|
Returns:
|
|
Dict with sync results: {"departments": count, "members": count, "users_created": count, "profiles_synced": count, "errors": []}
|
|
"""
|
|
errors = []
|
|
dept_count = 0
|
|
member_count = 0
|
|
user_count = 0
|
|
profile_count = 0
|
|
sync_start = datetime.now()
|
|
|
|
# Ensure provider exists
|
|
provider = await self._ensure_provider(db)
|
|
|
|
try:
|
|
# Fetch and sync departments
|
|
departments = await self.fetch_departments()
|
|
for dept in departments:
|
|
try:
|
|
async with db.begin_nested():
|
|
await self._upsert_department(db, provider, dept)
|
|
dept_count += 1
|
|
except Exception as e:
|
|
errors.append(f"Department {dept.external_id}: {str(e)}")
|
|
logger.error(f"[OrgSync] Failed to sync department {dept.external_id}: {e}")
|
|
|
|
# Fetch and sync users (from all departments)
|
|
for dept in departments:
|
|
try:
|
|
users = await self.fetch_users(dept.external_id)
|
|
except Exception as e:
|
|
logger.error(f"[OrgSync] Failed to fetch users in department {dept.external_id}: {e}")
|
|
errors.append(f"Fetch users in dept {dept.external_id}: {str(e)}")
|
|
continue
|
|
|
|
for user in users:
|
|
try:
|
|
async with db.begin_nested():
|
|
stats = await self._upsert_member(db, provider, user, dept.external_id)
|
|
if stats.get("user_created"):
|
|
user_count += 1
|
|
if stats.get("profile_synced"):
|
|
profile_count += 1
|
|
member_count += 1
|
|
except Exception as e:
|
|
logger.error(f"[OrgSync] Failed to sync member {user.external_id} ({user.name}): {e}")
|
|
errors.append(f"Member {user.external_id}: {str(e)}")
|
|
|
|
# Update provider metadata if possible
|
|
if self.provider:
|
|
config = (self.provider.config or {}).copy()
|
|
config["last_synced_at"] = datetime.now().isoformat()
|
|
self.provider.config = config
|
|
await db.flush()
|
|
|
|
# Reconciliation: mark records not updated in this sync as deleted
|
|
await self._reconcile(db, provider.id, sync_start)
|
|
await db.flush()
|
|
|
|
# Recalculate member counts for all departments (crucial for DingTalk/WeCom)
|
|
await self._update_member_counts(db, provider.id)
|
|
await db.flush()
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
logger.error(f"[OrgSync] Critical error during sync: {e}\n{traceback.format_exc()}")
|
|
errors.append(f"Critical: {str(e)}")
|
|
|
|
return {
|
|
"departments": dept_count,
|
|
"members": member_count,
|
|
"users_created": user_count,
|
|
"profiles_synced": profile_count,
|
|
"errors": errors,
|
|
"provider": self.provider_type,
|
|
"synced_at": datetime.now().isoformat()
|
|
}
|
|
|
|
async def _reconcile(self, db: AsyncSession, provider_id: uuid.UUID, sync_start: datetime):
|
|
"""Mark records that were not updated in this sync as deleted."""
|
|
|
|
# 1. Members reconciled
|
|
await db.execute(
|
|
update(OrgMember)
|
|
.where(OrgMember.provider_id == provider_id)
|
|
.where(OrgMember.synced_at < sync_start)
|
|
.where(OrgMember.status != "deleted")
|
|
.values(status="deleted", synced_at=datetime.now())
|
|
)
|
|
|
|
# 2. Departments reconciled
|
|
await db.execute(
|
|
update(OrgDepartment)
|
|
.where(OrgDepartment.provider_id == provider_id)
|
|
.where(OrgDepartment.synced_at < sync_start)
|
|
.where(OrgDepartment.status != "deleted")
|
|
.values(status="deleted", synced_at=datetime.now())
|
|
)
|
|
|
|
async def _update_member_counts(self, db: AsyncSession, provider_id: uuid.UUID):
|
|
"""Update member_count for all departments to include all their recursive sub-department members."""
|
|
from sqlalchemy import update, select, func
|
|
|
|
# 1. Update all departments to show their DIRECT member counts
|
|
direct_subquery = (
|
|
select(func.count(OrgMember.id))
|
|
.where(OrgMember.department_id == OrgDepartment.id)
|
|
.where(OrgMember.status == "active")
|
|
.scalar_subquery()
|
|
)
|
|
|
|
await db.execute(
|
|
update(OrgDepartment)
|
|
.where(OrgDepartment.provider_id == provider_id)
|
|
.where(OrgDepartment.status == "active")
|
|
.values(member_count=direct_subquery)
|
|
)
|
|
|
|
# 2. Fetch all active departments to compute recursive aggregated counts
|
|
result = await db.execute(
|
|
select(OrgDepartment.id, OrgDepartment.parent_id, OrgDepartment.member_count)
|
|
.where(OrgDepartment.provider_id == provider_id)
|
|
.where(OrgDepartment.status == "active")
|
|
)
|
|
rows = result.all()
|
|
|
|
# Build tree structure and lookup
|
|
dept_map = {row.id: {"parent_id": row.parent_id, "direct": row.member_count, "total": 0, "children": []} for row in rows}
|
|
root_ids = []
|
|
for d_id, d_data in dept_map.items():
|
|
parent_id = d_data["parent_id"]
|
|
if parent_id and parent_id in dept_map:
|
|
dept_map[parent_id]["children"].append(d_id)
|
|
else:
|
|
root_ids.append(d_id)
|
|
|
|
# Recursive function to calculate total
|
|
def compute_total(node_id):
|
|
node = dept_map[node_id]
|
|
total = node["direct"]
|
|
for child_id in node["children"]:
|
|
total += compute_total(child_id)
|
|
node["total"] = total
|
|
return total
|
|
|
|
for root_id in root_ids:
|
|
compute_total(root_id)
|
|
|
|
# 3. Bulk update all departments with their aggregated total counts
|
|
# Skip if no updates needed to avoid unnecessary writes, but usually it's fast enough
|
|
update_mappings = [{"id": d_id, "member_count": d_data["total"]} for d_id, d_data in dept_map.items()]
|
|
|
|
if update_mappings:
|
|
# Execute individual UPDATE statements to avoid SQLAlchemy 2.x
|
|
# "Bulk UPDATE by Primary Key" ambiguity when passing a list to execute().
|
|
for m in update_mappings:
|
|
await db.execute(
|
|
update(OrgDepartment)
|
|
.where(OrgDepartment.id == m["id"])
|
|
.values(member_count=m["member_count"])
|
|
)
|
|
|
|
async def _ensure_provider(self, db: AsyncSession) -> IdentityProvider:
|
|
"""Ensure IdentityProvider record exists."""
|
|
if self.provider:
|
|
return self.provider
|
|
|
|
# If we have an ID, look it up
|
|
if hasattr(self, 'provider_id') and self.provider_id:
|
|
result = await db.execute(select(IdentityProvider).where(IdentityProvider.id == self.provider_id))
|
|
self.provider = result.scalar_one_or_none()
|
|
if self.provider:
|
|
return self.provider
|
|
|
|
# Fallback by type (scoped by tenant)
|
|
query = select(IdentityProvider).where(IdentityProvider.provider_type == self.provider_type)
|
|
if self.tenant_id:
|
|
query = query.where(IdentityProvider.tenant_id == self.tenant_id)
|
|
else:
|
|
query = query.where(IdentityProvider.tenant_id.is_(None))
|
|
|
|
result = await db.execute(query)
|
|
provider = result.scalars().first()
|
|
|
|
if not provider:
|
|
provider = IdentityProvider(
|
|
provider_type=self.provider_type,
|
|
name=self.provider_type.capitalize(),
|
|
is_active=True,
|
|
config=self.config,
|
|
tenant_id=self.tenant_id
|
|
)
|
|
db.add(provider)
|
|
await db.flush()
|
|
|
|
self.provider = provider
|
|
return provider
|
|
|
|
async def _upsert_department(
|
|
self, db: AsyncSession, provider: IdentityProvider, dept: ExternalDepartment
|
|
):
|
|
"""Insert or update a department."""
|
|
# Check if exists by external_id and provider
|
|
result = await db.execute(
|
|
select(OrgDepartment).where(
|
|
OrgDepartment.external_id == dept.external_id,
|
|
OrgDepartment.provider_id == provider.id,
|
|
)
|
|
)
|
|
existing = result.scalars().first()
|
|
|
|
now = datetime.now()
|
|
path = f"{dept.parent_external_id}/{dept.name}" if dept.parent_external_id else dept.name
|
|
|
|
# Resolve parent_id from parent_external_id
|
|
parent_id = None
|
|
if dept.parent_external_id:
|
|
parent_result = await db.execute(
|
|
select(OrgDepartment).where(
|
|
OrgDepartment.external_id == dept.parent_external_id,
|
|
OrgDepartment.provider_id == provider.id,
|
|
)
|
|
)
|
|
parent_dept = parent_result.scalars().first()
|
|
if parent_dept:
|
|
parent_id = parent_dept.id
|
|
|
|
if existing:
|
|
existing.name = dept.name
|
|
existing.member_count = dept.member_count
|
|
existing.path = path
|
|
existing.external_id = dept.external_id
|
|
existing.provider_id = provider.id
|
|
existing.parent_id = parent_id
|
|
existing.status = "active"
|
|
existing.synced_at = now
|
|
else:
|
|
new_dept = OrgDepartment(
|
|
external_id=dept.external_id,
|
|
provider_id=provider.id,
|
|
name=dept.name,
|
|
parent_id=parent_id,
|
|
path=path,
|
|
member_count=dept.member_count,
|
|
tenant_id=self.tenant_id,
|
|
synced_at=now,
|
|
)
|
|
db.add(new_dept)
|
|
|
|
await db.flush()
|
|
|
|
async def _upsert_member(
|
|
self,
|
|
db: AsyncSession,
|
|
provider: IdentityProvider,
|
|
user: ExternalUser,
|
|
department_external_id: str,
|
|
) -> dict[str, Any]:
|
|
"""Insert or update a member, platform user, and identity."""
|
|
stats = {"user_created": False, "profile_synced": False}
|
|
|
|
# Find department using user's actual department list.
|
|
# DingTalk's dept_id_list last item is the most specific (leaf) department.
|
|
# We prefer the last entry that exists in our local DB.
|
|
department = None
|
|
if user.department_ids:
|
|
# Iterate in reverse so we try the most specific dept first
|
|
for dept_ext_id in reversed(user.department_ids):
|
|
dept_result = await db.execute(
|
|
select(OrgDepartment).where(
|
|
OrgDepartment.external_id == dept_ext_id,
|
|
OrgDepartment.provider_id == provider.id,
|
|
)
|
|
)
|
|
department = dept_result.scalars().first()
|
|
if department:
|
|
break
|
|
# Fallback: use the department_external_id that was set during fetch_users
|
|
if not department and user.department_external_id:
|
|
dept_result = await db.execute(
|
|
select(OrgDepartment).where(
|
|
OrgDepartment.external_id == user.department_external_id,
|
|
OrgDepartment.provider_id == provider.id,
|
|
)
|
|
)
|
|
department = dept_result.scalars().first()
|
|
|
|
# Check if exists by unionid or external_id or open_id (any matches), and provider
|
|
conditions = []
|
|
if user.unionid:
|
|
conditions.append(OrgMember.unionid == user.unionid)
|
|
if user.external_id:
|
|
conditions.append(OrgMember.external_id == user.external_id)
|
|
if user.open_id:
|
|
conditions.append(OrgMember.open_id == user.open_id)
|
|
|
|
if conditions:
|
|
result = await db.execute(
|
|
select(OrgMember).where(
|
|
OrgMember.provider_id == provider.id,
|
|
or_(*conditions)
|
|
)
|
|
)
|
|
existing_member = result.scalars().first()
|
|
|
|
now = datetime.now()
|
|
|
|
# Note: Platform user creation is disabled - just sync OrgMember
|
|
# Users will be linked to platform users manually or via SSO login
|
|
|
|
# Search for existing platform user by email/phone to associate with this member
|
|
user_id = None
|
|
platform_user = None
|
|
email = _normalize_contact(user.email)
|
|
mobile = _normalize_contact(user.mobile)
|
|
|
|
if email:
|
|
user_query = select(User).join(User.identity).where(Identity.email == email)
|
|
if self.tenant_id:
|
|
user_query = user_query.where(User.tenant_id == self.tenant_id)
|
|
user_res = await db.execute(user_query)
|
|
platform_user = user_res.scalars().first()
|
|
if platform_user:
|
|
user_id = platform_user.id
|
|
|
|
if not user_id and mobile:
|
|
user_query = select(User).join(User.identity).where(Identity.phone == mobile)
|
|
if self.tenant_id:
|
|
user_query = user_query.where(User.tenant_id == self.tenant_id)
|
|
user_res = await db.execute(user_query)
|
|
platform_user = user_res.scalars().first()
|
|
if platform_user:
|
|
user_id = platform_user.id
|
|
|
|
# Update/Create OrgMember
|
|
if existing_member:
|
|
existing_member.name = user.name
|
|
# Generate transliteration using layered strategy:
|
|
# 1. pypinyin converts CJK characters to pinyin
|
|
# 2. anyascii handles remaining non-ASCII scripts (Korean, Japanese kana, Arabic, etc.)
|
|
existing_member.name_translit_full = _anyascii("".join(lazy_pinyin(user.name, errors="default")))
|
|
existing_member.name_translit_initial = "".join([i[0] for i in pinyin(user.name, style=Style.FIRST_LETTER)])
|
|
|
|
if email is not None:
|
|
existing_member.email = email
|
|
existing_member.avatar_url = user.avatar_url
|
|
existing_member.title = user.title
|
|
existing_member.department_id = department.id if department else None
|
|
existing_member.department_path = department.path if department else user.department_path
|
|
if mobile is not None:
|
|
existing_member.phone = mobile
|
|
existing_member.status = user.status
|
|
|
|
# Universal ID fields
|
|
existing_member.external_id = user.external_id
|
|
existing_member.open_id = user.open_id
|
|
existing_member.unionid = user.unionid
|
|
|
|
existing_member.provider_id = provider.id
|
|
existing_member.synced_at = now
|
|
if user_id and not existing_member.user_id:
|
|
existing_member.user_id = user_id
|
|
stats["profile_synced"] = True
|
|
else:
|
|
# Generate transliteration using layered strategy:
|
|
# 1. pypinyin converts CJK characters to pinyin
|
|
# 2. anyascii handles remaining non-ASCII scripts (Korean, Japanese kana, Arabic, etc.)
|
|
translit_full = _anyascii("".join(lazy_pinyin(user.name, errors="default")))
|
|
translit_initial = "".join([i[0] for i in pinyin(user.name, style=Style.FIRST_LETTER)])
|
|
|
|
new_member = OrgMember(
|
|
external_id=user.external_id,
|
|
open_id=user.open_id,
|
|
unionid=user.unionid,
|
|
|
|
provider_id=provider.id,
|
|
user_id=user_id,
|
|
name=user.name,
|
|
name_translit_full=translit_full,
|
|
name_translit_initial=translit_initial,
|
|
email=email,
|
|
avatar_url=user.avatar_url,
|
|
title=user.title,
|
|
department_id=department.id if department else None,
|
|
department_path=department.path if department else user.department_path,
|
|
phone=mobile,
|
|
status=user.status,
|
|
tenant_id=self.tenant_id,
|
|
synced_at=now,
|
|
)
|
|
db.add(new_member)
|
|
stats["profile_synced"] = True
|
|
|
|
# Sync email/phone from OrgMember to User (if linked)
|
|
target_user = platform_user
|
|
if not target_user and (user_id or (existing_member and existing_member.user_id)):
|
|
target_id = user_id or existing_member.user_id
|
|
user_res = await db.execute(select(User).where(User.id == target_id))
|
|
target_user = user_res.scalars().first()
|
|
|
|
if target_user:
|
|
if email and target_user.email != email:
|
|
target_user.email = email
|
|
if mobile and target_user.primary_mobile != mobile:
|
|
target_user.primary_mobile = mobile
|
|
|
|
await db.flush()
|
|
return stats
|
|
|
|
async def _resolve_platform_user(self, db: AsyncSession, user: ExternalUser) -> User | None:
|
|
"""Resolve platform user from external user info."""
|
|
# 1. Try by Email matching (primary way now)
|
|
email = _normalize_contact(user.email)
|
|
if email:
|
|
result = await db.execute(
|
|
select(User).join(User.identity).where(Identity.email == email)
|
|
)
|
|
u = result.scalars().first()
|
|
if u: return u
|
|
|
|
# 2. Try by mobile matching
|
|
mobile = _normalize_contact(user.mobile)
|
|
if mobile:
|
|
result = await db.execute(
|
|
select(User).join(User.identity).where(Identity.phone == mobile)
|
|
)
|
|
u = result.scalars().first()
|
|
if u: return u
|
|
|
|
return None
|
|
|
|
|
|
class FeishuOrgSyncAdapter(BaseOrgSyncAdapter):
|
|
"""Feishu organization sync adapter."""
|
|
|
|
provider_type = "feishu"
|
|
|
|
FEISHU_APP_TOKEN_URL = "https://open.feishu.cn/open-apis/auth/v3/app_access_token/internal"
|
|
FEISHU_DEPT_URL = "https://open.feishu.cn/open-apis/contact/v3/departments"
|
|
FEISHU_USERS_URL = "https://open.feishu.cn/open-apis/contact/v3/users/find_by_department"
|
|
|
|
def __init__(self, provider: IdentityProvider | None = None, config: dict | None = None, tenant_id: uuid.UUID | None = None):
|
|
super().__init__(provider, config, tenant_id)
|
|
self.app_id = self.config.get("app_id")
|
|
self.app_secret = self.config.get("app_secret")
|
|
|
|
@property
|
|
def api_base_url(self) -> str:
|
|
return "https://open.feishu.cn/open-apis"
|
|
|
|
async def get_access_token(self) -> str:
|
|
async with httpx.AsyncClient() as client:
|
|
resp = await client.post(
|
|
self.FEISHU_APP_TOKEN_URL,
|
|
json={"app_id": self.app_id, "app_secret": self.app_secret},
|
|
)
|
|
data = resp.json()
|
|
return data.get("tenant_access_token") or data.get("app_access_token") or ""
|
|
|
|
async def fetch_departments(self) -> list[ExternalDepartment]:
|
|
"""Fetch all departments from Feishu using concurrent recursive calls to get parent-child relationships."""
|
|
token = await self.get_access_token()
|
|
all_depts: list[ExternalDepartment] = []
|
|
# Add a virtual root for the tenant, consistent with DingTalk root behavior
|
|
all_depts.append(
|
|
ExternalDepartment(
|
|
external_id="0",
|
|
name="Root",
|
|
parent_external_id=None,
|
|
member_count=0,
|
|
raw_data={"department_id": "0", "name": "Root"}
|
|
)
|
|
)
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
sem = asyncio.Semaphore(15) # Limit concurrent requests to avoid rate limits
|
|
|
|
async def fetch_children(parent_id: str):
|
|
page_token = ""
|
|
tasks = []
|
|
while True:
|
|
params = {
|
|
"department_id_type": "open_department_id",
|
|
"fetch_child": "false",
|
|
"page_size": "50",
|
|
}
|
|
if page_token:
|
|
params["page_token"] = page_token
|
|
|
|
async with sem:
|
|
resp = await client.get(
|
|
f"{self.FEISHU_DEPT_URL}/{parent_id}/children",
|
|
params=params,
|
|
headers={"Authorization": f"Bearer {token}"}
|
|
)
|
|
data = resp.json()
|
|
|
|
if data.get("code") != 0:
|
|
logger.error(f"Feishu fetch departments list error for parent {parent_id}: {data}")
|
|
break
|
|
|
|
res_data = data.get("data", {})
|
|
items = res_data.get("items", []) or []
|
|
for item in items:
|
|
dept_id = item.get("open_department_id")
|
|
if not dept_id: continue
|
|
|
|
# Since we fetched using parent_id, we intrinsically know the parent!
|
|
parent_external = parent_id if parent_id and parent_id != "0" else "0"
|
|
|
|
dept = ExternalDepartment(
|
|
external_id=dept_id,
|
|
name=item.get("name", ""),
|
|
parent_external_id=parent_external,
|
|
member_count=item.get("member_count", 0),
|
|
raw_data=item,
|
|
)
|
|
all_depts.append(dept)
|
|
|
|
# Recursively fetch children for this department
|
|
tasks.append(fetch_children(dept_id))
|
|
|
|
page_token = res_data.get("page_token", "")
|
|
if not page_token:
|
|
break
|
|
|
|
if tasks:
|
|
await asyncio.gather(*tasks)
|
|
|
|
await fetch_children("0")
|
|
|
|
logger.info(f"Feishu fetched {len(all_depts)} departments total.")
|
|
return all_depts
|
|
|
|
async def fetch_users(self, department_external_id: str) -> list[ExternalUser]:
|
|
"""Fetch users in a department.
|
|
|
|
IMPORTANT: Uses user_id_type=user_id (employee_id), which requires the
|
|
'contact:user.employee_id:readonly' permission in the Feishu app.
|
|
|
|
WHY user_id (not open_id or union_id):
|
|
- open_id is app-specific: the same user has a different open_id in each Feishu app.
|
|
Using open_id would break matching between org-sync users and Feishu bot channel users,
|
|
since they use different apps.
|
|
- union_id is ISV-scoped (same across apps from the same ISV), but not universal.
|
|
- user_id (employee_id) is the only enterprise-wide stable identifier that works
|
|
consistently across org sync, SSO, and bot channel user resolution.
|
|
|
|
This permission requires app re-publishing in Feishu console (not instant like DingTalk).
|
|
"""
|
|
token = await self.get_access_token()
|
|
users: list[ExternalUser] = []
|
|
page_token = ""
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
while True:
|
|
params = {
|
|
"department_id": department_external_id,
|
|
"department_id_type": "open_department_id",
|
|
# user_id (employee_id) is the enterprise-wide stable identifier.
|
|
# Requires 'contact:user.employee_id:readonly' permission + app re-publish.
|
|
"user_id_type": "user_id",
|
|
"page_size": "50",
|
|
}
|
|
if page_token:
|
|
params["page_token"] = page_token
|
|
|
|
resp = await client.get(
|
|
self.FEISHU_USERS_URL,
|
|
params=params,
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
)
|
|
data = resp.json()
|
|
|
|
if data.get("code") != 0:
|
|
error_code = data.get("code")
|
|
error_msg = data.get("msg", "")
|
|
logger.error(
|
|
f"Feishu fetch users error for dept {department_external_id}: "
|
|
f"code={error_code}, msg={error_msg}"
|
|
)
|
|
raise RuntimeError(
|
|
f"Feishu API error (code {error_code}): {error_msg}. "
|
|
f"Access denied. One of the following scopes is required: "
|
|
f"[contact:user.employee_id:readonly]. "
|
|
f"Please enable this permission in Feishu Open Platform -> App -> "
|
|
f"Permissions -> search 'employee_id' -> enable and publish a new version. "
|
|
f"Note: unlike DingTalk, Feishu permissions require app re-publishing to take effect."
|
|
)
|
|
|
|
res_data = data.get("data", {})
|
|
items = res_data.get("items", []) or []
|
|
for item in items:
|
|
# Collect all departments the user belongs to
|
|
raw_dept_ids = item.get("department_ids", [])
|
|
department_ids = [str(did) for did in raw_dept_ids] if raw_dept_ids else [department_external_id]
|
|
|
|
# When user_id_type=open_id, Feishu returns the open_id value in the
|
|
# "user_id" field of the response. So external_id == open_id == open_id field.
|
|
# The open_id field is also present for consistency.
|
|
external_id = item.get("user_id", "") or item.get("open_id", "")
|
|
|
|
# For Feishu, a user is considered inactive if they are explicitly frozen or resigned.
|
|
# Merely not being activated (is_activated=False) shouldn't hide them from the org chart.
|
|
feishu_status = item.get("status", {})
|
|
is_frozen = feishu_status.get("is_frozen", False)
|
|
is_resigned = feishu_status.get("is_resigned", False)
|
|
member_status = "inactive" if (is_frozen or is_resigned) else "active"
|
|
|
|
user = ExternalUser(
|
|
external_id=external_id,
|
|
open_id=item.get("open_id", ""),
|
|
unionid=item.get("union_id", ""),
|
|
name=item.get("name", ""),
|
|
email=item.get("email", ""),
|
|
avatar_url=item.get("avatar_url", ""),
|
|
title=item.get("title", ""),
|
|
department_external_id=department_external_id,
|
|
department_ids=department_ids,
|
|
mobile=item.get("mobile", ""),
|
|
status=member_status,
|
|
raw_data=item,
|
|
)
|
|
users.append(user)
|
|
|
|
page_token = res_data.get("page_token", "")
|
|
if not page_token:
|
|
break
|
|
|
|
return users
|
|
|
|
|
|
class DingTalkOrgSyncAdapter(BaseOrgSyncAdapter):
|
|
"""DingTalk organization sync adapter."""
|
|
|
|
provider_type = "dingtalk"
|
|
|
|
DINGTALK_API_URL = "https://oapi.dingtalk.com"
|
|
DINGTALK_TOKEN_URL = "https://oapi.dingtalk.com/gettoken"
|
|
DINGTALK_DEPT_LIST_URL = "https://oapi.dingtalk.com/topapi/v2/department/listsub"
|
|
DINGTALK_USER_LIST_URL = "https://oapi.dingtalk.com/topapi/v2/user/list"
|
|
|
|
def __init__(self, provider: IdentityProvider | None = None, config: dict | None = None, tenant_id: uuid.UUID | None = None):
|
|
super().__init__(provider, config, tenant_id)
|
|
self.app_key = self.config.get("app_key") or self.config.get("appkey") or self.config.get("app_id")
|
|
self.app_secret = self.config.get("app_secret") or self.config.get("appsecret") or self.config.get("app_secret_key")
|
|
self._access_token: str | None = None
|
|
self._token_expires_at: datetime | None = None
|
|
self._dept_path_map: dict[str, str] = {}
|
|
|
|
@property
|
|
def api_base_url(self) -> str:
|
|
return self.DINGTALK_API_URL
|
|
|
|
async def get_access_token(self) -> str:
|
|
if self._access_token and self._token_expires_at and datetime.now() < self._token_expires_at:
|
|
return self._access_token
|
|
|
|
if not self.app_key or not self.app_secret:
|
|
raise ValueError("DingTalk app_key/app_secret missing in provider config")
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
resp = await client.get(
|
|
self.DINGTALK_TOKEN_URL,
|
|
params={"appkey": self.app_key, "appsecret": self.app_secret},
|
|
)
|
|
data = resp.json()
|
|
if data.get("errcode") != 0:
|
|
raise RuntimeError(f"DingTalk token error: {data.get('errmsg') or data}")
|
|
token = data.get("access_token") or ""
|
|
expires_in = int(data.get("expires_in") or 7200)
|
|
self._access_token = token
|
|
# refresh a bit earlier
|
|
self._token_expires_at = datetime.now() + timedelta(seconds=max(expires_in - 60, 60))
|
|
return token
|
|
|
|
async def fetch_departments(self) -> list[ExternalDepartment]:
|
|
token = await self.get_access_token()
|
|
all_depts: list[ExternalDepartment] = []
|
|
# dept_index: external_id -> (name, parent_external_id_str | None)
|
|
dept_index: dict[str, tuple[str, str | None]] = {}
|
|
|
|
seen: set[int] = set()
|
|
queue: list[int] = [1] # DingTalk root dept id
|
|
_request_count = 0
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
while queue:
|
|
parent_id = queue.pop(0)
|
|
if parent_id in seen:
|
|
continue
|
|
seen.add(parent_id)
|
|
|
|
# DingTalk rate limit: ~20 QPS per app per interface.
|
|
# Sleep 60ms between requests to stay under the limit.
|
|
if _request_count > 0:
|
|
await asyncio.sleep(0.06)
|
|
_request_count += 1
|
|
|
|
resp = await client.post(
|
|
self.DINGTALK_DEPT_LIST_URL,
|
|
params={"access_token": token},
|
|
json={"dept_id": parent_id},
|
|
)
|
|
data = resp.json()
|
|
if data.get("errcode") != 0:
|
|
raise RuntimeError(f"DingTalk department list error: {data.get('errmsg') or data}")
|
|
|
|
result = data.get("result")
|
|
if isinstance(result, list):
|
|
items = result
|
|
elif isinstance(result, dict):
|
|
items = result.get("department", []) or []
|
|
else:
|
|
items = []
|
|
|
|
for item in items:
|
|
dept_id = int(item.get("dept_id"))
|
|
dept_name = item.get("name", "")
|
|
# Use actual parent_id from API response to preserve real hierarchy
|
|
raw_parent_id = item.get("parent_id")
|
|
if dept_id == 1 or not raw_parent_id or int(raw_parent_id) == dept_id:
|
|
parent_external = None # Root has no parent
|
|
else:
|
|
parent_external = str(int(raw_parent_id))
|
|
external_id = str(dept_id)
|
|
dept_index[external_id] = (dept_name, parent_external)
|
|
all_depts.append(
|
|
ExternalDepartment(
|
|
external_id=external_id,
|
|
name=dept_name,
|
|
parent_external_id=parent_external,
|
|
member_count=item.get("member_count", 0) or 0,
|
|
raw_data=item,
|
|
)
|
|
)
|
|
if dept_id not in seen:
|
|
queue.append(dept_id)
|
|
|
|
# Ensure root exists in index (for path building and possible member sync)
|
|
if "1" not in dept_index:
|
|
dept_index["1"] = ("Root", None)
|
|
all_depts.append(ExternalDepartment(external_id="1", name="Root", parent_external_id=None, member_count=0, raw_data={"dept_id": 1, "name": "Root"}))
|
|
|
|
self._dept_path_map = self._build_dept_paths(dept_index)
|
|
return all_depts
|
|
|
|
async def fetch_users(self, department_external_id: str) -> list[ExternalUser]:
|
|
token = await self.get_access_token()
|
|
users: list[ExternalUser] = []
|
|
cursor = 0
|
|
dept_id = int(department_external_id)
|
|
dept_path = self._dept_path_map.get(department_external_id, "")
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
while True:
|
|
# DingTalk rate limit: ~20 QPS per app per interface.
|
|
# Sleep 60ms between requests to stay under the limit.
|
|
await asyncio.sleep(0.06)
|
|
|
|
resp = await client.post(
|
|
self.DINGTALK_USER_LIST_URL,
|
|
params={"access_token": token},
|
|
json={"dept_id": dept_id, "cursor": cursor, "size": 100},
|
|
)
|
|
data = resp.json()
|
|
if data.get("errcode") != 0:
|
|
raise RuntimeError(f"DingTalk user list error: {data.get('errmsg') or data}")
|
|
|
|
result = data.get("result", {}) or {}
|
|
items = result.get("list", []) or []
|
|
for item in items:
|
|
external_id = item.get("userid") or item.get("user_id") or ""
|
|
# Get user's actual department list from DingTalk data
|
|
dept_id_list = item.get("dept_id_list", [])
|
|
department_ids = [str(did) for did in dept_id_list] if dept_id_list else [department_external_id]
|
|
# Use last level department (last item in list is most specific)
|
|
last_dept_id = department_ids[-1] if department_ids else department_external_id
|
|
last_dept_path = self._dept_path_map.get(last_dept_id, "")
|
|
user = ExternalUser(
|
|
external_id=external_id,
|
|
unionid=item.get("unionid", "") or "",
|
|
open_id=item.get("openid", "") or "",
|
|
name=item.get("name", ""),
|
|
email=item.get("email", "") or "",
|
|
avatar_url=item.get("avatar", "") or "",
|
|
title=item.get("title", "") or "",
|
|
department_external_id=last_dept_id,
|
|
department_path=last_dept_path,
|
|
department_ids=department_ids,
|
|
mobile=item.get("mobile", "") or "",
|
|
status="active" if item.get("active", True) else "inactive",
|
|
raw_data=item,
|
|
)
|
|
users.append(user)
|
|
|
|
if not result.get("has_more"):
|
|
break
|
|
cursor = int(result.get("next_cursor") or 0)
|
|
|
|
return users
|
|
|
|
def _build_dept_paths(self, dept_index: dict[str, tuple[str, str | None]]) -> dict[str, str]:
|
|
paths: dict[str, str] = {}
|
|
|
|
def compute_path(dept_id: str, visited: set[str] | None = None) -> str:
|
|
if dept_id in paths:
|
|
return paths[dept_id]
|
|
if visited is None:
|
|
visited = set()
|
|
if dept_id in visited:
|
|
# Cycle guard
|
|
paths[dept_id] = dept_id
|
|
return dept_id
|
|
visited.add(dept_id)
|
|
name, parent_id = dept_index.get(dept_id, ("", None))
|
|
if not parent_id or parent_id not in dept_index:
|
|
paths[dept_id] = name
|
|
return name
|
|
parent_path = compute_path(parent_id, visited)
|
|
full = f"{parent_path}/{name}" if parent_path else name
|
|
paths[dept_id] = full
|
|
return full
|
|
|
|
for did in list(dept_index.keys()):
|
|
compute_path(did)
|
|
return paths
|
|
|
|
|
|
class WeComOrgSyncAdapter(BaseOrgSyncAdapter):
|
|
"""WeCom organization sync adapter."""
|
|
|
|
provider_type = "wecom"
|
|
|
|
WECOM_API_URL = "https://qyapi.weixin.qq.com"
|
|
WECOM_TOKEN_URL = "https://qyapi.weixin.qq.com/cgi-bin/gettoken"
|
|
# Use simplelist (newer API) instead of the deprecated department/list.
|
|
# The simplelist endpoint is accessible to the contact assistant token
|
|
# (obtained via the 通讯录同步 Secret) without requiring app-level IP whitelist.
|
|
WECOM_DEPT_LIST_URL = "https://qyapi.weixin.qq.com/cgi-bin/department/simplelist"
|
|
WECOM_USER_LIST_URL = "https://qyapi.weixin.qq.com/cgi-bin/user/list"
|
|
# Fallback APIs for contact assistant token (cannot call user/list):
|
|
# list_id returns {userid, open_userid} for all dept members
|
|
# user/get returns full details for a single user by userid
|
|
WECOM_USER_LIST_ID_URL = "https://qyapi.weixin.qq.com/cgi-bin/user/list_id"
|
|
WECOM_USER_GET_URL = "https://qyapi.weixin.qq.com/cgi-bin/user/get"
|
|
|
|
def __init__(self, provider: IdentityProvider | None = None, config: dict | None = None, tenant_id: uuid.UUID | None = None):
|
|
super().__init__(provider, config, tenant_id)
|
|
# corp_id: the enterprise's WeCom corp ID
|
|
# secret: the 通讯录同步 (contact-sync) secret — used for department/simplelist and user/list_id
|
|
self.corp_id = self.config.get("corp_id") or self.config.get("app_id") or self.config.get("corpid")
|
|
self.secret = self.config.get("secret") or self.config.get("app_secret") or self.config.get("corpsecret")
|
|
self._access_token: str | None = None
|
|
self._token_expires_at: datetime | None = None
|
|
|
|
async def _fetch_token(self, corp_id: str, secret: str) -> str:
|
|
"""Fetch a fresh WeCom access_token for the given corp_id/secret pair."""
|
|
async with httpx.AsyncClient(timeout=10) as client:
|
|
resp = await client.get(
|
|
self.WECOM_TOKEN_URL,
|
|
params={"corpid": corp_id, "corpsecret": secret},
|
|
)
|
|
data = resp.json()
|
|
if data.get("errcode") == 0:
|
|
return data.get("access_token") or ""
|
|
raise RuntimeError(f"[WeCom] gettoken failed for corpid={corp_id}: {data}")
|
|
|
|
@property
|
|
def api_base_url(self) -> str:
|
|
return self.WECOM_API_URL
|
|
|
|
async def get_access_token(self) -> str:
|
|
"""Get valid access token using the 通讯录同步 (contact-sync) secret.
|
|
|
|
This token can call department/simplelist and user/list_id.
|
|
It cannot call user/list or user/get (those raise errcode 48009).
|
|
Full user profiles are obtained passively via SSO login instead.
|
|
"""
|
|
if self._access_token and self._token_expires_at and datetime.now() < self._token_expires_at:
|
|
return self._access_token
|
|
|
|
if not self.corp_id or not self.secret:
|
|
raise ValueError("WeCom corp_id or secret missing in provider config")
|
|
|
|
token = await self._fetch_token(self.corp_id, self.secret)
|
|
self._access_token = token
|
|
# Refresh slightly before true expiry to avoid clock-skew issues
|
|
self._token_expires_at = datetime.now() + timedelta(seconds=7200 - 300)
|
|
return token
|
|
|
|
|
|
|
|
async def fetch_departments(self) -> list[ExternalDepartment]:
|
|
"""Fetch all departments from WeCom using the simplelist endpoint.
|
|
|
|
department/simplelist is accessible to the 通讯录助手 (contact assistant)
|
|
token obtained from the 通讯录同步 Secret, unlike the deprecated
|
|
department/list which requires strict app-level IP whitelist.
|
|
"""
|
|
token = await self.get_access_token()
|
|
all_depts: list[ExternalDepartment] = []
|
|
|
|
async with httpx.AsyncClient(timeout=15) as client:
|
|
resp = await client.get(
|
|
self.WECOM_DEPT_LIST_URL,
|
|
# id omitted → returns all departments
|
|
params={"access_token": token},
|
|
)
|
|
data = resp.json()
|
|
if data.get("errcode") != 0:
|
|
raise RuntimeError(f"WeCom department list error: {data.get('errmsg') or data}")
|
|
|
|
# simplelist response: {"department_id": [{"id":x, "parentid":x, "name":…, "order":…}]}
|
|
items = data.get("department_id", []) or data.get("department", [])
|
|
for item in items:
|
|
dept_id = str(item.get("id"))
|
|
parentid = item.get("parentid", 0)
|
|
parent_id = str(parentid) if parentid and parentid != 0 else None
|
|
|
|
all_depts.append(
|
|
ExternalDepartment(
|
|
external_id=dept_id,
|
|
name=item.get("name", ""),
|
|
parent_external_id=parent_id,
|
|
member_count=0, # simplelist does not return member count
|
|
raw_data=item,
|
|
)
|
|
)
|
|
return all_depts
|
|
|
|
async def fetch_users(self, department_external_id: str) -> list[ExternalUser]:
|
|
"""Fetch user stubs for a department using user/list_id.
|
|
|
|
WeCom API strategy for org sync:
|
|
- user/list (bulk detail) → errcode 48009 for contact-sync token; removed.
|
|
- user/get (per-user detail) → IP-whitelisted only; removed.
|
|
- user/list_id (ID only) → works with contact-sync token; used here.
|
|
|
|
Only userid and open_userid are obtained in org sync. Full profile
|
|
data (name, avatar, email, mobile) is enriched passively when each
|
|
user completes their first WeCom SSO login (via auth/getuserdetail).
|
|
"""
|
|
token = await self.get_access_token()
|
|
return await self._fetch_user_stubs(token, department_external_id)
|
|
|
|
async def _fetch_user_stubs(self, sync_token: str, department_external_id: str) -> list[ExternalUser]:
|
|
"""Fetch minimal user stubs via user/list_id.
|
|
|
|
Returns placeholder ExternalUser objects with only userid and open_userid
|
|
populated. The name is intentionally set to the userid so the passive
|
|
SSO enrichment in sso_service.link_identity() can detect the placeholder
|
|
and overwrite it with the real name from auth/getuserdetail.
|
|
"""
|
|
user_stubs: list[ExternalUser] = []
|
|
cursor = ""
|
|
|
|
async with httpx.AsyncClient(timeout=15) as client:
|
|
while True:
|
|
params: dict = {
|
|
"access_token": sync_token,
|
|
"department_id": department_external_id,
|
|
"limit": 1000,
|
|
}
|
|
if cursor:
|
|
params["cursor"] = cursor
|
|
|
|
resp = await client.get(self.WECOM_USER_LIST_ID_URL, params=params)
|
|
data = resp.json()
|
|
if data.get("errcode") != 0:
|
|
raise RuntimeError(f"WeCom user/list_id error: {data.get('errmsg') or data}")
|
|
|
|
for entry in data.get("dept_user", []):
|
|
uid = entry.get("userid", "")
|
|
if not uid:
|
|
continue
|
|
# Use userid as the name placeholder so link_identity() knows
|
|
# to overwrite it once the user logs in via SSO.
|
|
user_stubs.append(ExternalUser(
|
|
external_id=uid,
|
|
name=uid, # placeholder — enriched on first SSO login
|
|
open_id=entry.get("open_userid", ""),
|
|
department_external_id=department_external_id,
|
|
department_ids=[department_external_id],
|
|
))
|
|
|
|
cursor = data.get("next_cursor", "")
|
|
if not cursor:
|
|
break
|
|
|
|
return user_stubs
|
|
|
|
|
|
# Adapter class mapping
|
|
SYNC_ADAPTER_CLASSES = {
|
|
"feishu": FeishuOrgSyncAdapter,
|
|
"dingtalk": DingTalkOrgSyncAdapter,
|
|
"wecom": WeComOrgSyncAdapter,
|
|
}
|
|
|
|
|
|
async def get_org_sync_adapter(
|
|
db: AsyncSession,
|
|
provider_type: str,
|
|
tenant_id: uuid.UUID | None = None,
|
|
provider_id: uuid.UUID | None = None,
|
|
) -> BaseOrgSyncAdapter | None:
|
|
"""Factory function to create org sync adapter.
|
|
|
|
Args:
|
|
db: Database session
|
|
provider_type: Type of provider (feishu, dingtalk, etc.)
|
|
tenant_id: Optional tenant ID
|
|
provider_id: Optional specific provider ID (if not provided, uses first found by type)
|
|
|
|
Returns:
|
|
Adapter instance or None if not supported
|
|
"""
|
|
# Get provider config from database - prefer specific provider_id if provided
|
|
if provider_id:
|
|
result = await db.execute(
|
|
select(IdentityProvider).where(IdentityProvider.id == provider_id)
|
|
)
|
|
else:
|
|
query = select(IdentityProvider).where(IdentityProvider.provider_type == provider_type)
|
|
if tenant_id:
|
|
query = query.where(IdentityProvider.tenant_id == tenant_id)
|
|
else:
|
|
query = query.where(IdentityProvider.tenant_id.is_(None))
|
|
result = await db.execute(query)
|
|
provider = result.scalar_one_or_none()
|
|
|
|
adapter_class = SYNC_ADAPTER_CLASSES.get(provider_type)
|
|
if not adapter_class:
|
|
return None
|
|
|
|
config = provider.config if provider else {}
|
|
return adapter_class(provider=provider, config=config, tenant_id=tenant_id)
|