Clawith/backend/app/services/org_sync_adapter.py

1167 lines
48 KiB
Python

"""Generic organization sync adapter framework.
This module provides a base class for syncing org structure (departments/members)
from various identity providers (Feishu, DingTalk, WeCom, etc.).
"""
import asyncio
import uuid
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Any
from sqlalchemy import DateTime, ForeignKey, Integer, String, Text, delete, func, or_, select, update
import httpx
from loguru import logger
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.identity import IdentityProvider
from app.models.org import OrgDepartment, OrgMember
from app.models.user import User, Identity
from pypinyin import pinyin, lazy_pinyin, Style
from anyascii import anyascii as _anyascii
from app.core.security import hash_password
def _normalize_contact(value: str | None) -> str | None:
if value is None:
return None
value = value.strip()
return value or None
@dataclass
class ExternalDepartment:
"""Standardized department info from external providers."""
external_id: str
name: str
parent_external_id: str | None = None
member_count: int = 0
raw_data: dict = field(default_factory=dict)
@dataclass
class ExternalUser:
"""Standardized user info from external providers."""
external_id: str # The unique, platform-stable ID (e.g., userid)
name: str
open_id: str = "" # OAuth open_id
unionid: str = "" # Union ID for cross-app identification
email: str = ""
avatar_url: str = ""
title: str = ""
department_external_id: str = ""
department_path: str = ""
department_ids: list[str] = field(default_factory=list) # List of dept IDs from provider
mobile: str = ""
status: str = "active"
raw_data: dict = field(default_factory=dict)
class BaseOrgSyncAdapter(ABC):
"""Abstract base class for organization sync adapters."""
provider_type: str = ""
def __init__(
self,
provider: IdentityProvider | None = None,
config: dict | None = None,
tenant_id: uuid.UUID | None = None,
):
"""Initialize adapter with provider config.
Args:
provider: IdentityProvider model from database
config: Configuration dict (fallback if no provider record)
tenant_id: Tenant ID for org sync
"""
self.provider = provider
self.config = config or {}
self.tenant_id = tenant_id
self._client: httpx.AsyncClient | None = None
if provider and provider.config:
self.config = provider.config
@property
@abstractmethod
def api_base_url(self) -> str:
"""Base URL for provider API."""
pass
@abstractmethod
async def get_access_token(self) -> str:
"""Get valid access token for API calls."""
pass
@abstractmethod
async def fetch_departments(self) -> list[ExternalDepartment]:
"""Fetch all departments from provider.
Returns:
List of ExternalDepartment
"""
pass
@abstractmethod
async def fetch_users(self, department_external_id: str) -> list[ExternalUser]:
"""Fetch users in a department.
Args:
department_external_id: External department ID
Returns:
List of ExternalUser
"""
pass
async def sync_org_structure(self, db: AsyncSession) -> dict[str, Any]:
"""Main sync function - syncs departments and members.
Args:
db: Database session
Returns:
Dict with sync results: {"departments": count, "members": count, "users_created": count, "profiles_synced": count, "errors": []}
"""
errors = []
dept_count = 0
member_count = 0
user_count = 0
profile_count = 0
sync_start = datetime.now()
# Ensure provider exists
provider = await self._ensure_provider(db)
try:
# Fetch and sync departments
departments = await self.fetch_departments()
for dept in departments:
try:
async with db.begin_nested():
await self._upsert_department(db, provider, dept)
dept_count += 1
except Exception as e:
errors.append(f"Department {dept.external_id}: {str(e)}")
logger.error(f"[OrgSync] Failed to sync department {dept.external_id}: {e}")
# Fetch and sync users (from all departments)
for dept in departments:
try:
users = await self.fetch_users(dept.external_id)
except Exception as e:
logger.error(f"[OrgSync] Failed to fetch users in department {dept.external_id}: {e}")
errors.append(f"Fetch users in dept {dept.external_id}: {str(e)}")
continue
for user in users:
try:
async with db.begin_nested():
stats = await self._upsert_member(db, provider, user, dept.external_id)
if stats.get("user_created"):
user_count += 1
if stats.get("profile_synced"):
profile_count += 1
member_count += 1
except Exception as e:
logger.error(f"[OrgSync] Failed to sync member {user.external_id} ({user.name}): {e}")
errors.append(f"Member {user.external_id}: {str(e)}")
# Update provider metadata if possible
if self.provider:
config = (self.provider.config or {}).copy()
config["last_synced_at"] = datetime.now().isoformat()
self.provider.config = config
await db.flush()
# Reconciliation: mark records not updated in this sync as deleted
await self._reconcile(db, provider.id, sync_start)
await db.flush()
# Recalculate member counts for all departments (crucial for DingTalk/WeCom)
await self._update_member_counts(db, provider.id)
await db.flush()
except Exception as e:
import traceback
logger.error(f"[OrgSync] Critical error during sync: {e}\n{traceback.format_exc()}")
errors.append(f"Critical: {str(e)}")
return {
"departments": dept_count,
"members": member_count,
"users_created": user_count,
"profiles_synced": profile_count,
"errors": errors,
"provider": self.provider_type,
"synced_at": datetime.now().isoformat()
}
async def _reconcile(self, db: AsyncSession, provider_id: uuid.UUID, sync_start: datetime):
"""Mark records that were not updated in this sync as deleted."""
# 1. Members reconciled
await db.execute(
update(OrgMember)
.where(OrgMember.provider_id == provider_id)
.where(OrgMember.synced_at < sync_start)
.where(OrgMember.status != "deleted")
.values(status="deleted", synced_at=datetime.now())
)
# 2. Departments reconciled
await db.execute(
update(OrgDepartment)
.where(OrgDepartment.provider_id == provider_id)
.where(OrgDepartment.synced_at < sync_start)
.where(OrgDepartment.status != "deleted")
.values(status="deleted", synced_at=datetime.now())
)
async def _update_member_counts(self, db: AsyncSession, provider_id: uuid.UUID):
"""Update member_count for all departments to include all their recursive sub-department members."""
from sqlalchemy import update, select, func
# 1. Update all departments to show their DIRECT member counts
direct_subquery = (
select(func.count(OrgMember.id))
.where(OrgMember.department_id == OrgDepartment.id)
.where(OrgMember.status == "active")
.scalar_subquery()
)
await db.execute(
update(OrgDepartment)
.where(OrgDepartment.provider_id == provider_id)
.where(OrgDepartment.status == "active")
.values(member_count=direct_subquery)
)
# 2. Fetch all active departments to compute recursive aggregated counts
result = await db.execute(
select(OrgDepartment.id, OrgDepartment.parent_id, OrgDepartment.member_count)
.where(OrgDepartment.provider_id == provider_id)
.where(OrgDepartment.status == "active")
)
rows = result.all()
# Build tree structure and lookup
dept_map = {row.id: {"parent_id": row.parent_id, "direct": row.member_count, "total": 0, "children": []} for row in rows}
root_ids = []
for d_id, d_data in dept_map.items():
parent_id = d_data["parent_id"]
if parent_id and parent_id in dept_map:
dept_map[parent_id]["children"].append(d_id)
else:
root_ids.append(d_id)
# Recursive function to calculate total
def compute_total(node_id):
node = dept_map[node_id]
total = node["direct"]
for child_id in node["children"]:
total += compute_total(child_id)
node["total"] = total
return total
for root_id in root_ids:
compute_total(root_id)
# 3. Bulk update all departments with their aggregated total counts
# Skip if no updates needed to avoid unnecessary writes, but usually it's fast enough
update_mappings = [{"id": d_id, "member_count": d_data["total"]} for d_id, d_data in dept_map.items()]
if update_mappings:
# Execute individual UPDATE statements to avoid SQLAlchemy 2.x
# "Bulk UPDATE by Primary Key" ambiguity when passing a list to execute().
for m in update_mappings:
await db.execute(
update(OrgDepartment)
.where(OrgDepartment.id == m["id"])
.values(member_count=m["member_count"])
)
async def _ensure_provider(self, db: AsyncSession) -> IdentityProvider:
"""Ensure IdentityProvider record exists."""
if self.provider:
return self.provider
# If we have an ID, look it up
if hasattr(self, 'provider_id') and self.provider_id:
result = await db.execute(select(IdentityProvider).where(IdentityProvider.id == self.provider_id))
self.provider = result.scalar_one_or_none()
if self.provider:
return self.provider
# Fallback by type (scoped by tenant)
query = select(IdentityProvider).where(IdentityProvider.provider_type == self.provider_type)
if self.tenant_id:
query = query.where(IdentityProvider.tenant_id == self.tenant_id)
else:
query = query.where(IdentityProvider.tenant_id.is_(None))
result = await db.execute(query)
provider = result.scalars().first()
if not provider:
provider = IdentityProvider(
provider_type=self.provider_type,
name=self.provider_type.capitalize(),
is_active=True,
config=self.config,
tenant_id=self.tenant_id
)
db.add(provider)
await db.flush()
self.provider = provider
return provider
async def _upsert_department(
self, db: AsyncSession, provider: IdentityProvider, dept: ExternalDepartment
):
"""Insert or update a department."""
# Check if exists by external_id and provider
result = await db.execute(
select(OrgDepartment).where(
OrgDepartment.external_id == dept.external_id,
OrgDepartment.provider_id == provider.id,
)
)
existing = result.scalars().first()
now = datetime.now()
path = f"{dept.parent_external_id}/{dept.name}" if dept.parent_external_id else dept.name
# Resolve parent_id from parent_external_id
parent_id = None
if dept.parent_external_id:
parent_result = await db.execute(
select(OrgDepartment).where(
OrgDepartment.external_id == dept.parent_external_id,
OrgDepartment.provider_id == provider.id,
)
)
parent_dept = parent_result.scalars().first()
if parent_dept:
parent_id = parent_dept.id
if existing:
existing.name = dept.name
existing.member_count = dept.member_count
existing.path = path
existing.external_id = dept.external_id
existing.provider_id = provider.id
existing.parent_id = parent_id
existing.status = "active"
existing.synced_at = now
else:
new_dept = OrgDepartment(
external_id=dept.external_id,
provider_id=provider.id,
name=dept.name,
parent_id=parent_id,
path=path,
member_count=dept.member_count,
tenant_id=self.tenant_id,
synced_at=now,
)
db.add(new_dept)
await db.flush()
async def _upsert_member(
self,
db: AsyncSession,
provider: IdentityProvider,
user: ExternalUser,
department_external_id: str,
) -> dict[str, Any]:
"""Insert or update a member, platform user, and identity."""
stats = {"user_created": False, "profile_synced": False}
# Find department using user's actual department list.
# DingTalk's dept_id_list last item is the most specific (leaf) department.
# We prefer the last entry that exists in our local DB.
department = None
if user.department_ids:
# Iterate in reverse so we try the most specific dept first
for dept_ext_id in reversed(user.department_ids):
dept_result = await db.execute(
select(OrgDepartment).where(
OrgDepartment.external_id == dept_ext_id,
OrgDepartment.provider_id == provider.id,
)
)
department = dept_result.scalars().first()
if department:
break
# Fallback: use the department_external_id that was set during fetch_users
if not department and user.department_external_id:
dept_result = await db.execute(
select(OrgDepartment).where(
OrgDepartment.external_id == user.department_external_id,
OrgDepartment.provider_id == provider.id,
)
)
department = dept_result.scalars().first()
# Check if exists by unionid or external_id or open_id (any matches), and provider
conditions = []
if user.unionid:
conditions.append(OrgMember.unionid == user.unionid)
if user.external_id:
conditions.append(OrgMember.external_id == user.external_id)
if user.open_id:
conditions.append(OrgMember.open_id == user.open_id)
if conditions:
result = await db.execute(
select(OrgMember).where(
OrgMember.provider_id == provider.id,
or_(*conditions)
)
)
existing_member = result.scalars().first()
now = datetime.now()
# Note: Platform user creation is disabled - just sync OrgMember
# Users will be linked to platform users manually or via SSO login
# Search for existing platform user by email/phone to associate with this member
user_id = None
platform_user = None
email = _normalize_contact(user.email)
mobile = _normalize_contact(user.mobile)
if email:
user_query = select(User).join(User.identity).where(Identity.email == email)
if self.tenant_id:
user_query = user_query.where(User.tenant_id == self.tenant_id)
user_res = await db.execute(user_query)
platform_user = user_res.scalars().first()
if platform_user:
user_id = platform_user.id
if not user_id and mobile:
user_query = select(User).join(User.identity).where(Identity.phone == mobile)
if self.tenant_id:
user_query = user_query.where(User.tenant_id == self.tenant_id)
user_res = await db.execute(user_query)
platform_user = user_res.scalars().first()
if platform_user:
user_id = platform_user.id
# Update/Create OrgMember
if existing_member:
existing_member.name = user.name
# Generate transliteration using layered strategy:
# 1. pypinyin converts CJK characters to pinyin
# 2. anyascii handles remaining non-ASCII scripts (Korean, Japanese kana, Arabic, etc.)
existing_member.name_translit_full = _anyascii("".join(lazy_pinyin(user.name, errors="default")))
existing_member.name_translit_initial = "".join([i[0] for i in pinyin(user.name, style=Style.FIRST_LETTER)])
if email is not None:
existing_member.email = email
existing_member.avatar_url = user.avatar_url
existing_member.title = user.title
existing_member.department_id = department.id if department else None
existing_member.department_path = department.path if department else user.department_path
if mobile is not None:
existing_member.phone = mobile
existing_member.status = user.status
# Universal ID fields
existing_member.external_id = user.external_id
existing_member.open_id = user.open_id
existing_member.unionid = user.unionid
existing_member.provider_id = provider.id
existing_member.synced_at = now
if user_id and not existing_member.user_id:
existing_member.user_id = user_id
stats["profile_synced"] = True
else:
# Generate transliteration using layered strategy:
# 1. pypinyin converts CJK characters to pinyin
# 2. anyascii handles remaining non-ASCII scripts (Korean, Japanese kana, Arabic, etc.)
translit_full = _anyascii("".join(lazy_pinyin(user.name, errors="default")))
translit_initial = "".join([i[0] for i in pinyin(user.name, style=Style.FIRST_LETTER)])
new_member = OrgMember(
external_id=user.external_id,
open_id=user.open_id,
unionid=user.unionid,
provider_id=provider.id,
user_id=user_id,
name=user.name,
name_translit_full=translit_full,
name_translit_initial=translit_initial,
email=email,
avatar_url=user.avatar_url,
title=user.title,
department_id=department.id if department else None,
department_path=department.path if department else user.department_path,
phone=mobile,
status=user.status,
tenant_id=self.tenant_id,
synced_at=now,
)
db.add(new_member)
stats["profile_synced"] = True
# Sync email/phone from OrgMember to User (if linked)
target_user = platform_user
if not target_user and (user_id or (existing_member and existing_member.user_id)):
target_id = user_id or existing_member.user_id
user_res = await db.execute(select(User).where(User.id == target_id))
target_user = user_res.scalars().first()
if target_user:
if email and target_user.email != email:
target_user.email = email
if mobile and target_user.primary_mobile != mobile:
target_user.primary_mobile = mobile
await db.flush()
return stats
async def _resolve_platform_user(self, db: AsyncSession, user: ExternalUser) -> User | None:
"""Resolve platform user from external user info."""
# 1. Try by Email matching (primary way now)
email = _normalize_contact(user.email)
if email:
result = await db.execute(
select(User).join(User.identity).where(Identity.email == email)
)
u = result.scalars().first()
if u: return u
# 2. Try by mobile matching
mobile = _normalize_contact(user.mobile)
if mobile:
result = await db.execute(
select(User).join(User.identity).where(Identity.phone == mobile)
)
u = result.scalars().first()
if u: return u
return None
class FeishuOrgSyncAdapter(BaseOrgSyncAdapter):
"""Feishu organization sync adapter."""
provider_type = "feishu"
FEISHU_APP_TOKEN_URL = "https://open.feishu.cn/open-apis/auth/v3/app_access_token/internal"
FEISHU_DEPT_URL = "https://open.feishu.cn/open-apis/contact/v3/departments"
FEISHU_USERS_URL = "https://open.feishu.cn/open-apis/contact/v3/users/find_by_department"
def __init__(self, provider: IdentityProvider | None = None, config: dict | None = None, tenant_id: uuid.UUID | None = None):
super().__init__(provider, config, tenant_id)
self.app_id = self.config.get("app_id")
self.app_secret = self.config.get("app_secret")
@property
def api_base_url(self) -> str:
return "https://open.feishu.cn/open-apis"
async def get_access_token(self) -> str:
async with httpx.AsyncClient() as client:
resp = await client.post(
self.FEISHU_APP_TOKEN_URL,
json={"app_id": self.app_id, "app_secret": self.app_secret},
)
data = resp.json()
return data.get("tenant_access_token") or data.get("app_access_token") or ""
async def fetch_departments(self) -> list[ExternalDepartment]:
"""Fetch all departments from Feishu using concurrent recursive calls to get parent-child relationships."""
token = await self.get_access_token()
all_depts: list[ExternalDepartment] = []
# Add a virtual root for the tenant, consistent with DingTalk root behavior
all_depts.append(
ExternalDepartment(
external_id="0",
name="Root",
parent_external_id=None,
member_count=0,
raw_data={"department_id": "0", "name": "Root"}
)
)
async with httpx.AsyncClient() as client:
sem = asyncio.Semaphore(15) # Limit concurrent requests to avoid rate limits
async def fetch_children(parent_id: str):
page_token = ""
tasks = []
while True:
params = {
"department_id_type": "open_department_id",
"fetch_child": "false",
"page_size": "50",
}
if page_token:
params["page_token"] = page_token
async with sem:
resp = await client.get(
f"{self.FEISHU_DEPT_URL}/{parent_id}/children",
params=params,
headers={"Authorization": f"Bearer {token}"}
)
data = resp.json()
if data.get("code") != 0:
logger.error(f"Feishu fetch departments list error for parent {parent_id}: {data}")
break
res_data = data.get("data", {})
items = res_data.get("items", []) or []
for item in items:
dept_id = item.get("open_department_id")
if not dept_id: continue
# Since we fetched using parent_id, we intrinsically know the parent!
parent_external = parent_id if parent_id and parent_id != "0" else "0"
dept = ExternalDepartment(
external_id=dept_id,
name=item.get("name", ""),
parent_external_id=parent_external,
member_count=item.get("member_count", 0),
raw_data=item,
)
all_depts.append(dept)
# Recursively fetch children for this department
tasks.append(fetch_children(dept_id))
page_token = res_data.get("page_token", "")
if not page_token:
break
if tasks:
await asyncio.gather(*tasks)
await fetch_children("0")
logger.info(f"Feishu fetched {len(all_depts)} departments total.")
return all_depts
async def fetch_users(self, department_external_id: str) -> list[ExternalUser]:
"""Fetch users in a department.
IMPORTANT: Uses user_id_type=user_id (employee_id), which requires the
'contact:user.employee_id:readonly' permission in the Feishu app.
WHY user_id (not open_id or union_id):
- open_id is app-specific: the same user has a different open_id in each Feishu app.
Using open_id would break matching between org-sync users and Feishu bot channel users,
since they use different apps.
- union_id is ISV-scoped (same across apps from the same ISV), but not universal.
- user_id (employee_id) is the only enterprise-wide stable identifier that works
consistently across org sync, SSO, and bot channel user resolution.
This permission requires app re-publishing in Feishu console (not instant like DingTalk).
"""
token = await self.get_access_token()
users: list[ExternalUser] = []
page_token = ""
async with httpx.AsyncClient() as client:
while True:
params = {
"department_id": department_external_id,
"department_id_type": "open_department_id",
# user_id (employee_id) is the enterprise-wide stable identifier.
# Requires 'contact:user.employee_id:readonly' permission + app re-publish.
"user_id_type": "user_id",
"page_size": "50",
}
if page_token:
params["page_token"] = page_token
resp = await client.get(
self.FEISHU_USERS_URL,
params=params,
headers={"Authorization": f"Bearer {token}"},
)
data = resp.json()
if data.get("code") != 0:
error_code = data.get("code")
error_msg = data.get("msg", "")
logger.error(
f"Feishu fetch users error for dept {department_external_id}: "
f"code={error_code}, msg={error_msg}"
)
raise RuntimeError(
f"Feishu API error (code {error_code}): {error_msg}. "
f"Access denied. One of the following scopes is required: "
f"[contact:user.employee_id:readonly]. "
f"Please enable this permission in Feishu Open Platform -> App -> "
f"Permissions -> search 'employee_id' -> enable and publish a new version. "
f"Note: unlike DingTalk, Feishu permissions require app re-publishing to take effect."
)
res_data = data.get("data", {})
items = res_data.get("items", []) or []
for item in items:
# Collect all departments the user belongs to
raw_dept_ids = item.get("department_ids", [])
department_ids = [str(did) for did in raw_dept_ids] if raw_dept_ids else [department_external_id]
# When user_id_type=open_id, Feishu returns the open_id value in the
# "user_id" field of the response. So external_id == open_id == open_id field.
# The open_id field is also present for consistency.
external_id = item.get("user_id", "") or item.get("open_id", "")
# For Feishu, a user is considered inactive if they are explicitly frozen or resigned.
# Merely not being activated (is_activated=False) shouldn't hide them from the org chart.
feishu_status = item.get("status", {})
is_frozen = feishu_status.get("is_frozen", False)
is_resigned = feishu_status.get("is_resigned", False)
member_status = "inactive" if (is_frozen or is_resigned) else "active"
user = ExternalUser(
external_id=external_id,
open_id=item.get("open_id", ""),
unionid=item.get("union_id", ""),
name=item.get("name", ""),
email=item.get("email", ""),
avatar_url=item.get("avatar_url", ""),
title=item.get("title", ""),
department_external_id=department_external_id,
department_ids=department_ids,
mobile=item.get("mobile", ""),
status=member_status,
raw_data=item,
)
users.append(user)
page_token = res_data.get("page_token", "")
if not page_token:
break
return users
class DingTalkOrgSyncAdapter(BaseOrgSyncAdapter):
"""DingTalk organization sync adapter."""
provider_type = "dingtalk"
DINGTALK_API_URL = "https://oapi.dingtalk.com"
DINGTALK_TOKEN_URL = "https://oapi.dingtalk.com/gettoken"
DINGTALK_DEPT_LIST_URL = "https://oapi.dingtalk.com/topapi/v2/department/listsub"
DINGTALK_USER_LIST_URL = "https://oapi.dingtalk.com/topapi/v2/user/list"
def __init__(self, provider: IdentityProvider | None = None, config: dict | None = None, tenant_id: uuid.UUID | None = None):
super().__init__(provider, config, tenant_id)
self.app_key = self.config.get("app_key") or self.config.get("appkey") or self.config.get("app_id")
self.app_secret = self.config.get("app_secret") or self.config.get("appsecret") or self.config.get("app_secret_key")
self._access_token: str | None = None
self._token_expires_at: datetime | None = None
self._dept_path_map: dict[str, str] = {}
@property
def api_base_url(self) -> str:
return self.DINGTALK_API_URL
async def get_access_token(self) -> str:
if self._access_token and self._token_expires_at and datetime.now() < self._token_expires_at:
return self._access_token
if not self.app_key or not self.app_secret:
raise ValueError("DingTalk app_key/app_secret missing in provider config")
async with httpx.AsyncClient() as client:
resp = await client.get(
self.DINGTALK_TOKEN_URL,
params={"appkey": self.app_key, "appsecret": self.app_secret},
)
data = resp.json()
if data.get("errcode") != 0:
raise RuntimeError(f"DingTalk token error: {data.get('errmsg') or data}")
token = data.get("access_token") or ""
expires_in = int(data.get("expires_in") or 7200)
self._access_token = token
# refresh a bit earlier
self._token_expires_at = datetime.now() + timedelta(seconds=max(expires_in - 60, 60))
return token
async def fetch_departments(self) -> list[ExternalDepartment]:
token = await self.get_access_token()
all_depts: list[ExternalDepartment] = []
# dept_index: external_id -> (name, parent_external_id_str | None)
dept_index: dict[str, tuple[str, str | None]] = {}
seen: set[int] = set()
queue: list[int] = [1] # DingTalk root dept id
_request_count = 0
async with httpx.AsyncClient() as client:
while queue:
parent_id = queue.pop(0)
if parent_id in seen:
continue
seen.add(parent_id)
# DingTalk rate limit: ~20 QPS per app per interface.
# Sleep 60ms between requests to stay under the limit.
if _request_count > 0:
await asyncio.sleep(0.06)
_request_count += 1
resp = await client.post(
self.DINGTALK_DEPT_LIST_URL,
params={"access_token": token},
json={"dept_id": parent_id},
)
data = resp.json()
if data.get("errcode") != 0:
raise RuntimeError(f"DingTalk department list error: {data.get('errmsg') or data}")
result = data.get("result")
if isinstance(result, list):
items = result
elif isinstance(result, dict):
items = result.get("department", []) or []
else:
items = []
for item in items:
dept_id = int(item.get("dept_id"))
dept_name = item.get("name", "")
# Use actual parent_id from API response to preserve real hierarchy
raw_parent_id = item.get("parent_id")
if dept_id == 1 or not raw_parent_id or int(raw_parent_id) == dept_id:
parent_external = None # Root has no parent
else:
parent_external = str(int(raw_parent_id))
external_id = str(dept_id)
dept_index[external_id] = (dept_name, parent_external)
all_depts.append(
ExternalDepartment(
external_id=external_id,
name=dept_name,
parent_external_id=parent_external,
member_count=item.get("member_count", 0) or 0,
raw_data=item,
)
)
if dept_id not in seen:
queue.append(dept_id)
# Ensure root exists in index (for path building and possible member sync)
if "1" not in dept_index:
dept_index["1"] = ("Root", None)
all_depts.append(ExternalDepartment(external_id="1", name="Root", parent_external_id=None, member_count=0, raw_data={"dept_id": 1, "name": "Root"}))
self._dept_path_map = self._build_dept_paths(dept_index)
return all_depts
async def fetch_users(self, department_external_id: str) -> list[ExternalUser]:
token = await self.get_access_token()
users: list[ExternalUser] = []
cursor = 0
dept_id = int(department_external_id)
dept_path = self._dept_path_map.get(department_external_id, "")
async with httpx.AsyncClient() as client:
while True:
# DingTalk rate limit: ~20 QPS per app per interface.
# Sleep 60ms between requests to stay under the limit.
await asyncio.sleep(0.06)
resp = await client.post(
self.DINGTALK_USER_LIST_URL,
params={"access_token": token},
json={"dept_id": dept_id, "cursor": cursor, "size": 100},
)
data = resp.json()
if data.get("errcode") != 0:
raise RuntimeError(f"DingTalk user list error: {data.get('errmsg') or data}")
result = data.get("result", {}) or {}
items = result.get("list", []) or []
for item in items:
external_id = item.get("userid") or item.get("user_id") or ""
# Get user's actual department list from DingTalk data
dept_id_list = item.get("dept_id_list", [])
department_ids = [str(did) for did in dept_id_list] if dept_id_list else [department_external_id]
# Use last level department (last item in list is most specific)
last_dept_id = department_ids[-1] if department_ids else department_external_id
last_dept_path = self._dept_path_map.get(last_dept_id, "")
user = ExternalUser(
external_id=external_id,
unionid=item.get("unionid", "") or "",
open_id=item.get("openid", "") or "",
name=item.get("name", ""),
email=item.get("email", "") or "",
avatar_url=item.get("avatar", "") or "",
title=item.get("title", "") or "",
department_external_id=last_dept_id,
department_path=last_dept_path,
department_ids=department_ids,
mobile=item.get("mobile", "") or "",
status="active" if item.get("active", True) else "inactive",
raw_data=item,
)
users.append(user)
if not result.get("has_more"):
break
cursor = int(result.get("next_cursor") or 0)
return users
def _build_dept_paths(self, dept_index: dict[str, tuple[str, str | None]]) -> dict[str, str]:
paths: dict[str, str] = {}
def compute_path(dept_id: str, visited: set[str] | None = None) -> str:
if dept_id in paths:
return paths[dept_id]
if visited is None:
visited = set()
if dept_id in visited:
# Cycle guard
paths[dept_id] = dept_id
return dept_id
visited.add(dept_id)
name, parent_id = dept_index.get(dept_id, ("", None))
if not parent_id or parent_id not in dept_index:
paths[dept_id] = name
return name
parent_path = compute_path(parent_id, visited)
full = f"{parent_path}/{name}" if parent_path else name
paths[dept_id] = full
return full
for did in list(dept_index.keys()):
compute_path(did)
return paths
class WeComOrgSyncAdapter(BaseOrgSyncAdapter):
"""WeCom organization sync adapter."""
provider_type = "wecom"
WECOM_API_URL = "https://qyapi.weixin.qq.com"
WECOM_TOKEN_URL = "https://qyapi.weixin.qq.com/cgi-bin/gettoken"
# Use simplelist (newer API) instead of the deprecated department/list.
# The simplelist endpoint is accessible to the contact assistant token
# (obtained via the 通讯录同步 Secret) without requiring app-level IP whitelist.
WECOM_DEPT_LIST_URL = "https://qyapi.weixin.qq.com/cgi-bin/department/simplelist"
WECOM_USER_LIST_URL = "https://qyapi.weixin.qq.com/cgi-bin/user/list"
# Fallback APIs for contact assistant token (cannot call user/list):
# list_id returns {userid, open_userid} for all dept members
# user/get returns full details for a single user by userid
WECOM_USER_LIST_ID_URL = "https://qyapi.weixin.qq.com/cgi-bin/user/list_id"
WECOM_USER_GET_URL = "https://qyapi.weixin.qq.com/cgi-bin/user/get"
def __init__(self, provider: IdentityProvider | None = None, config: dict | None = None, tenant_id: uuid.UUID | None = None):
super().__init__(provider, config, tenant_id)
# corp_id: the enterprise's WeCom corp ID
# secret: the 通讯录同步 (contact-sync) secret — used for department/simplelist and user/list_id
self.corp_id = self.config.get("corp_id") or self.config.get("app_id") or self.config.get("corpid")
self.secret = self.config.get("secret") or self.config.get("app_secret") or self.config.get("corpsecret")
self._access_token: str | None = None
self._token_expires_at: datetime | None = None
async def _fetch_token(self, corp_id: str, secret: str) -> str:
"""Fetch a fresh WeCom access_token for the given corp_id/secret pair."""
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(
self.WECOM_TOKEN_URL,
params={"corpid": corp_id, "corpsecret": secret},
)
data = resp.json()
if data.get("errcode") == 0:
return data.get("access_token") or ""
raise RuntimeError(f"[WeCom] gettoken failed for corpid={corp_id}: {data}")
@property
def api_base_url(self) -> str:
return self.WECOM_API_URL
async def get_access_token(self) -> str:
"""Get valid access token using the 通讯录同步 (contact-sync) secret.
This token can call department/simplelist and user/list_id.
It cannot call user/list or user/get (those raise errcode 48009).
Full user profiles are obtained passively via SSO login instead.
"""
if self._access_token and self._token_expires_at and datetime.now() < self._token_expires_at:
return self._access_token
if not self.corp_id or not self.secret:
raise ValueError("WeCom corp_id or secret missing in provider config")
token = await self._fetch_token(self.corp_id, self.secret)
self._access_token = token
# Refresh slightly before true expiry to avoid clock-skew issues
self._token_expires_at = datetime.now() + timedelta(seconds=7200 - 300)
return token
async def fetch_departments(self) -> list[ExternalDepartment]:
"""Fetch all departments from WeCom using the simplelist endpoint.
department/simplelist is accessible to the 通讯录助手 (contact assistant)
token obtained from the 通讯录同步 Secret, unlike the deprecated
department/list which requires strict app-level IP whitelist.
"""
token = await self.get_access_token()
all_depts: list[ExternalDepartment] = []
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.get(
self.WECOM_DEPT_LIST_URL,
# id omitted → returns all departments
params={"access_token": token},
)
data = resp.json()
if data.get("errcode") != 0:
raise RuntimeError(f"WeCom department list error: {data.get('errmsg') or data}")
# simplelist response: {"department_id": [{"id":x, "parentid":x, "name":…, "order":…}]}
items = data.get("department_id", []) or data.get("department", [])
for item in items:
dept_id = str(item.get("id"))
parentid = item.get("parentid", 0)
parent_id = str(parentid) if parentid and parentid != 0 else None
all_depts.append(
ExternalDepartment(
external_id=dept_id,
name=item.get("name", ""),
parent_external_id=parent_id,
member_count=0, # simplelist does not return member count
raw_data=item,
)
)
return all_depts
async def fetch_users(self, department_external_id: str) -> list[ExternalUser]:
"""Fetch user stubs for a department using user/list_id.
WeCom API strategy for org sync:
- user/list (bulk detail) → errcode 48009 for contact-sync token; removed.
- user/get (per-user detail) → IP-whitelisted only; removed.
- user/list_id (ID only) → works with contact-sync token; used here.
Only userid and open_userid are obtained in org sync. Full profile
data (name, avatar, email, mobile) is enriched passively when each
user completes their first WeCom SSO login (via auth/getuserdetail).
"""
token = await self.get_access_token()
return await self._fetch_user_stubs(token, department_external_id)
async def _fetch_user_stubs(self, sync_token: str, department_external_id: str) -> list[ExternalUser]:
"""Fetch minimal user stubs via user/list_id.
Returns placeholder ExternalUser objects with only userid and open_userid
populated. The name is intentionally set to the userid so the passive
SSO enrichment in sso_service.link_identity() can detect the placeholder
and overwrite it with the real name from auth/getuserdetail.
"""
user_stubs: list[ExternalUser] = []
cursor = ""
async with httpx.AsyncClient(timeout=15) as client:
while True:
params: dict = {
"access_token": sync_token,
"department_id": department_external_id,
"limit": 1000,
}
if cursor:
params["cursor"] = cursor
resp = await client.get(self.WECOM_USER_LIST_ID_URL, params=params)
data = resp.json()
if data.get("errcode") != 0:
raise RuntimeError(f"WeCom user/list_id error: {data.get('errmsg') or data}")
for entry in data.get("dept_user", []):
uid = entry.get("userid", "")
if not uid:
continue
# Use userid as the name placeholder so link_identity() knows
# to overwrite it once the user logs in via SSO.
user_stubs.append(ExternalUser(
external_id=uid,
name=uid, # placeholder — enriched on first SSO login
open_id=entry.get("open_userid", ""),
department_external_id=department_external_id,
department_ids=[department_external_id],
))
cursor = data.get("next_cursor", "")
if not cursor:
break
return user_stubs
# Adapter class mapping
SYNC_ADAPTER_CLASSES = {
"feishu": FeishuOrgSyncAdapter,
"dingtalk": DingTalkOrgSyncAdapter,
"wecom": WeComOrgSyncAdapter,
}
async def get_org_sync_adapter(
db: AsyncSession,
provider_type: str,
tenant_id: uuid.UUID | None = None,
provider_id: uuid.UUID | None = None,
) -> BaseOrgSyncAdapter | None:
"""Factory function to create org sync adapter.
Args:
db: Database session
provider_type: Type of provider (feishu, dingtalk, etc.)
tenant_id: Optional tenant ID
provider_id: Optional specific provider ID (if not provided, uses first found by type)
Returns:
Adapter instance or None if not supported
"""
# Get provider config from database - prefer specific provider_id if provided
if provider_id:
result = await db.execute(
select(IdentityProvider).where(IdentityProvider.id == provider_id)
)
else:
query = select(IdentityProvider).where(IdentityProvider.provider_type == provider_type)
if tenant_id:
query = query.where(IdentityProvider.tenant_id == tenant_id)
else:
query = query.where(IdentityProvider.tenant_id.is_(None))
result = await db.execute(query)
provider = result.scalar_one_or_none()
adapter_class = SYNC_ADAPTER_CLASSES.get(provider_type)
if not adapter_class:
return None
config = provider.config if provider else {}
return adapter_class(provider=provider, config=config, tenant_id=tenant_id)