"""Microsoft Teams Bot Channel API routes.""" import hashlib import hmac import json import os import time import uuid from datetime import datetime, timezone import httpx from fastapi import APIRouter, Depends, HTTPException, Request, Response, status from loguru import logger from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.config import get_settings from app.core.permissions import check_agent_access, is_agent_creator from app.core.security import get_current_user from app.database import get_db from app.models.agent import Agent as AgentModel from app.models.audit import ChatMessage from app.models.channel_config import ChannelConfig from app.models.user import User from app.schemas.schemas import ChannelConfigOut from app.services.channel_session import find_or_create_channel_session from app.api.feishu import _call_agent_llm from app.services.agent_tools import channel_file_sender as _cfs_s from app.core.security import hash_password as _hp from pathlib import Path as _Path import asyncio as _asyncio import random as _random settings = get_settings() router = APIRouter(tags=["microsoft_teams"]) TEAMS_MSG_LIMIT = 28000 # Teams message char limit (approx 28KB) # In-memory cache for OAuth tokens _teams_tokens: dict[str, dict] = {} # agent_id -> {access_token, expires_at} async def _get_teams_access_token(config: ChannelConfig) -> str | None: """Get or refresh Microsoft Teams access token. Supports: - Client credentials (app_id + app_secret) - default - Managed Identity (when use_managed_identity is True in extra_config) """ agent_id = str(config.agent_id) cached = _teams_tokens.get(agent_id) if cached and cached["expires_at"] > time.time() + 60: # Refresh 60s before expiry logger.debug(f"Teams: Using cached access token for agent {agent_id}") return cached["access_token"] # Check if managed identity should be used use_managed_identity = config.extra_config.get("use_managed_identity", False) if use_managed_identity: # Use Azure Managed Identity try: from azure.identity.aio import DefaultAzureCredential from azure.core.credentials import AccessToken credential = DefaultAzureCredential() # For Bot Framework, we need the token for the Bot Framework API # Managed identity needs to be granted permissions to the Bot Framework API scope = "https://api.botframework.com/.default" token: AccessToken = await credential.get_token(scope) _teams_tokens[agent_id] = { "access_token": token.token, "expires_at": token.expires_on, } logger.info(f"Teams: Successfully obtained access token via managed identity for agent {agent_id}, expires at {token.expires_on}") await credential.close() return token.token except ImportError: logger.error(f"Teams: azure-identity package not installed. Install it with: pip install azure-identity") return None except Exception as e: logger.exception(f"Teams: Failed to get access token via managed identity for agent {agent_id}: {e}") return None # Use client credentials (app_id + app_secret) app_id = config.app_id app_secret = config.app_secret if not app_id or not app_secret: logger.error(f"Teams: Missing app_id or app_secret for agent {agent_id}") return None # Get tenant_id from config (per-agent), environment variable, or default to "common" (multi-tenant) tenant_id = config.extra_config.get("tenant_id") or os.environ.get("TEAMS_TENANT_ID") or "common" token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token" data = { "client_id": app_id, "client_secret": app_secret, "grant_type": "client_credentials", "scope": "https://api.botframework.com/.default", } try: async with httpx.AsyncClient(timeout=10) as client: resp = await client.post(token_url, data=data) if resp.status_code != 200: error_body = resp.text try: error_json = resp.json() error_description = error_json.get("error_description", "No description") error_code = error_json.get("error", "unknown") logger.error(f"Teams: OAuth token request failed for agent {agent_id}: status={resp.status_code}, error={error_code}, description={error_description}") except: logger.error(f"Teams: OAuth token request failed for agent {agent_id}: status={resp.status_code}, response={error_body[:500]}") logger.error(f"Teams: Token URL={token_url}, tenant_id={tenant_id}, client_id={app_id[:20]}...") return None token_data = resp.json() access_token = token_data["access_token"] expires_in = token_data["expires_in"] _teams_tokens[agent_id] = { "access_token": access_token, "expires_at": time.time() + expires_in, } logger.info(f"Teams: Successfully obtained access token for agent {agent_id}, expires in {expires_in}s") return access_token except httpx.HTTPStatusError as e: error_body = e.response.text if hasattr(e, 'response') and e.response else "No response body" try: if hasattr(e, 'response') and e.response: error_json = e.response.json() error_description = error_json.get("error_description", "No description") error_code = error_json.get("error", "unknown") logger.error(f"Teams: OAuth token HTTP error for agent {agent_id}: status={e.response.status_code}, error={error_code}, description={error_description}") except: logger.error(f"Teams: OAuth token HTTP error for agent {agent_id}: status={e.response.status_code if hasattr(e, 'response') and e.response else 'unknown'}, response={error_body[:500]}") logger.error(f"Teams: Token URL={token_url}, tenant_id={tenant_id}, client_id={app_id[:20]}...") return None except Exception as e: logger.exception(f"Teams: Failed to get access token for agent {agent_id}: {e}") return None async def _send_teams_message(config: ChannelConfig, conversation_id: str, activity: dict) -> None: """Send an activity (message) to Microsoft Teams.""" access_token = await _get_teams_access_token(config) if not access_token: logger.error(f"Teams: No access token for agent {config.agent_id}, cannot send message") raise ValueError("No access token available") service_url = config.extra_config.get("service_url") if not service_url: logger.error(f"Teams: No service_url in config for agent {config.agent_id}, cannot send message") raise ValueError(f"No service_url in config for agent {config.agent_id}") # Ensure activity has required fields if "type" not in activity: activity["type"] = "message" if "timestamp" not in activity: activity["timestamp"] = datetime.now(timezone.utc).isoformat() + "Z" # Teams API expects 'replyToId' for replies, not 'conversation.id' # If it's a reply, ensure the 'id' field is set to the message being replied to if activity.get("replyToId") and "id" not in activity: activity["id"] = str(uuid.uuid4()) # Generate a new ID for the reply activity # Teams has a 28KB limit for message activities. Chunk if needed. text_content = activity.get("text", "") if len(text_content.encode("utf-8")) > TEAMS_MSG_LIMIT: chunks = [text_content[i:i + TEAMS_MSG_LIMIT] for i in range(0, len(text_content), TEAMS_MSG_LIMIT)] for i, chunk in enumerate(chunks): chunk_activity = {**activity, "text": chunk} if i > 0: # Only the first chunk is a direct reply, subsequent are new messages chunk_activity.pop("replyToId", None) await _send_teams_message_single_chunk(access_token, service_url, conversation_id, chunk_activity) else: await _send_teams_message_single_chunk(access_token, service_url, conversation_id, activity) async def _send_teams_message_single_chunk(access_token: str, service_url: str, conversation_id: str, activity: dict) -> None: """Send a single chunked message to Microsoft Teams.""" # Ensure service_url doesn't have trailing slash to avoid double slashes service_url_clean = service_url.rstrip("/") post_url = f"{service_url_clean}/v3/conversations/{conversation_id}/activities" headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json", } try: async with httpx.AsyncClient(timeout=10) as client: resp = await client.post(post_url, headers=headers, json=activity) if resp.status_code != 200: error_body = resp.text try: error_json = resp.json() error_description = error_json.get("error", {}).get("message", error_json.get("message", "No description")) error_code = error_json.get("error", {}).get("code", "unknown") logger.error(f"Teams: Failed to send message: status={resp.status_code}, error={error_code}, description={error_description}") except: logger.error(f"Teams: Failed to send message: status={resp.status_code}, response={error_body[:500]}") logger.error(f"Teams: POST URL={post_url}, conversation_id={conversation_id}, service_url={service_url}") resp.raise_for_status() logger.info(f"Teams: Sent message to conversation {conversation_id}") except httpx.HTTPStatusError as e: error_body = e.response.text if hasattr(e, 'response') and e.response else "No response body" try: if hasattr(e, 'response') and e.response: error_json = e.response.json() error_description = error_json.get("error", {}).get("message", error_json.get("message", "No description")) error_code = error_json.get("error", {}).get("code", "unknown") logger.error(f"Teams: HTTP error sending message: status={e.response.status_code}, error={error_code}, description={error_description}") except: logger.error(f"Teams: HTTP error sending message: status={e.response.status_code if hasattr(e, 'response') and e.response else 'unknown'}, response={error_body[:500]}") logger.error(f"Teams: POST URL={post_url}, conversation_id={conversation_id}, service_url={service_url}") raise # ─── Config CRUD ──────────────────────────────────────── @router.post("/agents/{agent_id}/teams-channel", response_model=ChannelConfigOut, status_code=201) async def configure_teams_channel( agent_id: uuid.UUID, data: dict, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """Configure Microsoft Teams bot for an agent. Fields: app_id, app_secret.""" agent, _ = await check_agent_access(db, current_user, agent_id) if not is_agent_creator(current_user, agent): raise HTTPException(status_code=403, detail="Only creator can configure channel") app_id = data.get("app_id", "").strip() app_secret = data.get("app_secret", "").strip() tenant_id = data.get("tenant_id", "").strip() # Optional: for single-tenant apps use_managed_identity = data.get("use_managed_identity", False) # Optional: use Azure Managed Identity # Validate: either managed identity OR app_id + app_secret required if not use_managed_identity and (not app_id or not app_secret): raise HTTPException(status_code=422, detail="Either use_managed_identity must be enabled, or app_id and app_secret are required") result = await db.execute( select(ChannelConfig).where( ChannelConfig.agent_id == agent_id, ChannelConfig.channel_type == "microsoft_teams", ) ) existing = result.scalar_one_or_none() if existing: existing.app_id = app_id if not use_managed_identity else existing.app_id existing.app_secret = app_secret if not use_managed_identity else existing.app_secret existing.is_configured = True # Store tenant_id and use_managed_identity in extra_config if not existing.extra_config: existing.extra_config = {} if tenant_id: existing.extra_config["tenant_id"] = tenant_id elif "tenant_id" in existing.extra_config and not tenant_id: # Remove tenant_id if not provided (use default) existing.extra_config.pop("tenant_id", None) existing.extra_config["use_managed_identity"] = use_managed_identity await db.flush() return ChannelConfigOut.model_validate(existing) extra_config = {} if tenant_id: extra_config["tenant_id"] = tenant_id if use_managed_identity: extra_config["use_managed_identity"] = True config = ChannelConfig( agent_id=agent_id, channel_type="microsoft_teams", app_id=app_id if not use_managed_identity else None, app_secret=app_secret if not use_managed_identity else None, is_configured=True, extra_config=extra_config, ) db.add(config) await db.flush() return ChannelConfigOut.model_validate(config) @router.get("/agents/{agent_id}/teams-channel", response_model=ChannelConfigOut) async def get_teams_channel( agent_id: uuid.UUID, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """Get Microsoft Teams channel configuration for an agent.""" await check_agent_access(db, current_user, agent_id) result = await db.execute( select(ChannelConfig).where( ChannelConfig.agent_id == agent_id, ChannelConfig.channel_type == "microsoft_teams", ) ) config = result.scalar_one_or_none() if not config: raise HTTPException(status_code=404, detail="Microsoft Teams not configured") return ChannelConfigOut.model_validate(config) @router.get("/agents/{agent_id}/teams-channel/webhook-url") async def get_teams_webhook_url( agent_id: uuid.UUID, request: Request, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """Get the Microsoft Teams webhook URL for an agent.""" await check_agent_access(db, current_user, agent_id) from app.services.platform_service import platform_service public_base = await platform_service.get_public_base_url(db, request) return {"webhook_url": f"{public_base}/api/channel/teams/{agent_id}/webhook"} @router.delete("/agents/{agent_id}/teams-channel", status_code=204) async def delete_teams_channel( agent_id: uuid.UUID, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """Delete Microsoft Teams channel configuration for an agent.""" agent, _ = await check_agent_access(db, current_user, agent_id) if not is_agent_creator(current_user, agent): raise HTTPException(status_code=403, detail="Only creator can remove channel") result = await db.execute( select(ChannelConfig).where( ChannelConfig.agent_id == agent_id, ChannelConfig.channel_type == "microsoft_teams", ) ) config = result.scalar_one_or_none() if not config: raise HTTPException(status_code=404, detail="Microsoft Teams not configured") await db.delete(config) await db.commit() # ─── Event Webhook ────────────────────────────────────── _processed_teams_events: set[str] = set() @router.post("/channel/teams/{agent_id}/webhook") async def teams_event_webhook( agent_id: uuid.UUID, request: Request, db: AsyncSession = Depends(get_db), ): """Handle Microsoft Teams Bot Framework callbacks.""" try: body_bytes = await request.body() try: body = json.loads(body_bytes) except json.JSONDecodeError as e: logger.error(f"Teams: Failed to parse JSON body: {e}, body={body_bytes[:200]}") return Response(status_code=400, content="Invalid JSON") # Microsoft Teams Bot Framework sends the activity directly in the body (not wrapped in "activity" key) # Check if body itself is the activity (has "type" field) or if it's wrapped if isinstance(body, dict) and "type" in body: activity = body elif isinstance(body, dict) and "activity" in body: activity = body["activity"] else: logger.warning(f"Teams: Unexpected body structure for agent {agent_id}: {list(body.keys()) if isinstance(body, dict) else type(body)}") activity = body if isinstance(body, dict) else {} logger.info(f"Teams: Webhook received for agent {agent_id}, activity type={activity.get('type')}, from={activity.get('from', {}).get('id', 'unknown')}, text={activity.get('text', '')[:50] if activity.get('text') else 'no text'}") # Teams Bot Framework uses a simple token for authentication, not HMAC for incoming webhooks # For now, we rely on the unguessable URL token. # In a full production setup, you'd validate the JWT token in the Authorization header. # Get channel config result = await db.execute( select(ChannelConfig).where( ChannelConfig.agent_id == agent_id, ChannelConfig.channel_type == "microsoft_teams", ) ) config = result.scalar_one_or_none() if not config: logger.warning(f"Teams: Webhook received for unconfigured agent {agent_id}") return Response(status_code=404) # Extract serviceUrl from the activity for sending replies service_url = activity.get("serviceUrl") if service_url: if config.extra_config.get("service_url") != service_url: config.extra_config["service_url"] = service_url config.is_connected = True await db.flush() await db.commit() logger.info(f"Teams: Updated service_url for agent {agent_id} to {service_url}") # Dedup activity_id = activity.get("id") if activity_id in _processed_teams_events: return {"ok": True} if activity_id: _processed_teams_events.add(activity_id) if len(_processed_teams_events) > 1000: _processed_teams_events.clear() # Only process message activities if activity.get("type") != "message": return {"ok": True} # Ignore bot's own messages # Check if the message is from the bot itself (either by app_id or by comparing with recipient) bot_id = config.app_id if not bot_id: # If no app_id, use the recipient ID from the activity (the bot is the recipient) bot_id = activity.get("recipient", {}).get("id") if bot_id and activity.get("from", {}).get("id") == bot_id: return {"ok": True} user_text = activity.get("text", "").strip() if not user_text: return {"ok": True} # Extract conversation and sender info conversation_id = activity.get("conversation", {}).get("id") sender_id = activity.get("from", {}).get("id") sender_name = activity.get("from", {}).get("name", f"Teams User {sender_id[:8]}") reply_to_id = activity.get("id") # The ID of the incoming message to reply to if not conversation_id or not sender_id: logger.warning(f"Teams: Missing conversation_id or sender_id in activity for agent {agent_id}") return {"ok": True} logger.info(f"Teams: Message from={sender_id}, conversation={conversation_id}: {user_text[:80]}") # Load agent (must happen before user resolution for tenant_id) agent_r = await db.execute(select(AgentModel).where(AgentModel.id == agent_id)) agent_obj = agent_r.scalar_one_or_none() from app.models.agent import DEFAULT_CONTEXT_WINDOW_SIZE ctx_size = (agent_obj.context_window_size or DEFAULT_CONTEXT_WINDOW_SIZE) if agent_obj else DEFAULT_CONTEXT_WINDOW_SIZE # Find-or-create platform user for this Teams sender via unified service from app.services.channel_user_service import channel_user_service _extra_info = {"name": sender_name} platform_user = await channel_user_service.resolve_channel_user( db=db, agent=agent_obj, channel_type="teams", external_user_id=sender_id, extra_info=_extra_info, ) # Update display_name if we now have a better name if sender_name and platform_user.display_name and platform_user.display_name.startswith("Teams User ") and sender_name != platform_user.display_name: platform_user.display_name = sender_name await db.flush() platform_user_id = platform_user.id # Detect group vs P2P chat _conv_type = activity.get("conversation", {}).get("conversationType", "") _is_group_teams = (_conv_type in ("groupChat", "channel")) # Find-or-create session for this Teams conversation sess = await find_or_create_channel_session( db=db, agent_id=agent_id, user_id=platform_user_id if not _is_group_teams else (agent_obj.creator_id if agent_obj else platform_user_id), external_conv_id=conversation_id, source_channel="microsoft_teams", first_message_title=user_text, is_group=_is_group_teams, group_name=activity.get("conversation", {}).get("name") or (f"Teams Group {conversation_id[:8]}" if _is_group_teams else None), ) session_conv_id = str(sess.id) history_r = await db.execute( select(ChatMessage) .where(ChatMessage.agent_id == agent_id, ChatMessage.conversation_id == session_conv_id) .order_by(ChatMessage.created_at.desc()) .limit(ctx_size) ) history = [{"role": m.role, "content": m.content} for m in reversed(history_r.scalars().all())] # Save user message db.add(ChatMessage(agent_id=agent_id, user_id=platform_user_id, role="user", content=user_text, conversation_id=session_conv_id)) sess.last_message_at = datetime.now(timezone.utc) await db.commit() # Set channel_file_sender contextvar for agent → user file delivery async def _teams_file_sender(file_path, msg: str = ""): _fp = _Path(file_path) use_mi = config.extra_config.get("use_managed_identity", False) has_creds = (config.app_id and config.app_secret) or use_mi if not has_creds or not conversation_id: return # For simplicity, just send file info as text for now file_msg_activity = { "type": "message", "conversation": {"id": conversation_id}, "replyToId": reply_to_id, "text": f"Agent sent file: {_fp.name} (Note: file content not directly supported yet, but I can tell you about it: {msg})", } await _send_teams_message(config, conversation_id, file_msg_activity) _cfs_s_token = _cfs_s.set(_teams_file_sender) # Call LLM try: reply_text = await _call_agent_llm(db, agent_id, user_text, history=history) _cfs_s.reset(_cfs_s_token) logger.info(f"Teams: LLM reply generated: {reply_text[:80]}") except Exception as e: logger.exception(f"Teams: Failed to call LLM for agent {agent_id}: {e}") reply_text = "Sorry, I encountered an error processing your message." _cfs_s.reset(_cfs_s_token) # Save reply try: db.add(ChatMessage(agent_id=agent_id, user_id=platform_user_id, role="assistant", content=reply_text, conversation_id=session_conv_id)) sess.last_message_at = datetime.now(timezone.utc) await db.commit() logger.info(f"Teams: Saved reply to database for conversation {conversation_id}") except Exception as e: logger.exception(f"Teams: Failed to save reply to database: {e}") await db.rollback() # Send to Teams use_managed_identity = config.extra_config.get("use_managed_identity", False) has_credentials = (config.app_id and config.app_secret) or use_managed_identity if has_credentials and conversation_id: try: # Get bot's channel account ID from the incoming activity's recipient field # The recipient in the incoming message is the bot itself bot_channel_account = activity.get("recipient", {}) if not bot_channel_account.get("id"): # Fallback: use app_id if recipient not available if config.app_id: bot_channel_account = {"id": config.app_id} else: logger.error(f"Teams: Cannot determine bot channel account ID - no recipient in activity and no app_id configured") raise ValueError("Cannot determine bot channel account ID") # Get the user (sender) from the incoming activity's from field user_account = activity.get("from", {}) if not user_account.get("id"): user_account = {"id": sender_id, "name": sender_name} reply_activity = { "type": "message", "from": bot_channel_account, # Required: Bot's channel account ID (from incoming activity's recipient) "conversation": {"id": conversation_id}, "recipient": user_account, # The user who sent the message (from incoming activity's from) "replyToId": reply_to_id, # Reply to the specific incoming message "text": reply_text, } logger.info(f"Teams: Attempting to send reply to conversation {conversation_id}, from={bot_channel_account.get('id')}, recipient={user_account.get('id')}") await _send_teams_message(config, conversation_id, reply_activity) logger.info(f"Teams: Successfully sent reply to Teams") except Exception as e: logger.exception(f"Teams: Failed to send message to Teams: {e}") else: use_mi = config.extra_config.get("use_managed_identity", False) logger.warning(f"Teams: Cannot send reply - missing credentials (managed_identity={use_mi}, app_id={bool(config.app_id)}, app_secret={bool(config.app_secret)}), conversation_id={bool(conversation_id)}") return {"ok": True} except Exception as e: logger.exception(f"Teams: Unhandled exception in webhook handler for agent {agent_id}: {e}") return Response(status_code=500, content="Internal server error")