Clawith/backend/app/services/resource_discovery.py

775 lines
32 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Resource discovery — search Smithery & ModelScope registries and import MCP servers."""
import uuid
import httpx
from loguru import logger
from sqlalchemy import select
from app.database import async_session
from app.models.tool import Tool, AgentTool
# ── Smithery Registry Search ────────────────────────────────────
SMITHERY_API_BASE = "https://registry.smithery.ai"
MODELSCOPE_API_BASE = "https://modelscope.cn"
async def _get_smithery_api_key(agent_id: uuid.UUID | None = None) -> str:
"""Read Smithery API key.
Priority: 1) per-agent AgentTool config, 2) system-level tool config.
"""
try:
async with async_session() as db:
# 1) Per-agent: check AgentTool configs for any MCP tool with a smithery_api_key
if agent_id:
at_r = await db.execute(
select(AgentTool).where(AgentTool.agent_id == agent_id)
)
for at in at_r.scalars().all():
if at.config and at.config.get("smithery_api_key"):
return at.config["smithery_api_key"]
# 2) System-level fallback
for tool_name in ("discover_resources", "import_mcp_server"):
r = await db.execute(select(Tool).where(Tool.name == tool_name))
tool = r.scalar_one_or_none()
if tool and tool.config and tool.config.get("smithery_api_key"):
return tool.config["smithery_api_key"]
except Exception:
pass
return ""
async def _search_smithery_api(query: str, max_results: int, api_key: str) -> list[dict]:
"""Search Smithery registry, returns normalized results."""
headers = {"Accept": "application/json"}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
try:
async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client:
resp = await client.get(
f"{SMITHERY_API_BASE}/servers",
params={"q": query, "pageSize": max_results},
headers=headers,
)
if resp.status_code != 200:
return []
data = resp.json()
results = []
for srv in data.get("servers", [])[:max_results]:
results.append({
"name": srv.get("qualifiedName", ""),
"display_name": srv.get("displayName", ""),
"description": srv.get("description", "")[:200],
"remote": srv.get("remote", False),
"verified": srv.get("verified", False),
"use_count": srv.get("useCount", 0),
"homepage": srv.get("homepage", ""),
"source": "Smithery",
})
return results
except Exception:
return []
async def _get_modelscope_api_token() -> str:
"""Read ModelScope API token from discover_resources tool config."""
try:
async with async_session() as db:
for tool_name in ("discover_resources", "import_mcp_server"):
r = await db.execute(select(Tool).where(Tool.name == tool_name))
tool = r.scalar_one_or_none()
if tool and tool.config and tool.config.get("modelscope_api_token"):
return tool.config["modelscope_api_token"]
except Exception:
pass
return ""
async def _search_modelscope_api(query: str, max_results: int) -> list[dict]:
"""Search ModelScope MCP Hub via official OpenAPI (no WAF issues)."""
api_token = await _get_modelscope_api_token()
if not api_token:
return [] # Silently skip if no token configured
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_token}",
"Cookie": f"m_session_id={api_token}",
"User-Agent": "modelscope-mcp-server/1.0",
}
try:
async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client:
resp = await client.put(
f"{MODELSCOPE_API_BASE}/openapi/v1/mcp/servers",
json={"page_size": max_results, "page_number": 1, "search": query, "filter": {}},
headers=headers,
)
if resp.status_code != 200:
return []
data = resp.json()
if not data.get("success"):
return []
servers_data = data.get("data", {}).get("mcp_server_list", [])
if not servers_data:
return []
results = []
for srv in servers_data[:max_results]:
server_id = srv.get("id", "")
results.append({
"name": server_id,
"display_name": srv.get("name", server_id),
"description": srv.get("description", "")[:200],
"remote": srv.get("is_hosted", False),
"verified": True,
"use_count": 0,
"homepage": f"https://modelscope.cn/mcp/servers/{server_id}",
"source": "ModelScope",
})
return results
except Exception as e:
logger.error(f"[ResourceDiscovery] ModelScope search failed: {e}")
return []
async def search_registries(query: str, max_results: int = 5) -> str:
"""Search both Smithery and ModelScope for MCP servers."""
api_key = await _get_smithery_api_key()
# Search both registries in parallel
import asyncio
smithery_task = _search_smithery_api(query, max_results, api_key)
modelscope_task = _search_modelscope_api(query, max_results)
smithery_results, modelscope_results = await asyncio.gather(smithery_task, modelscope_task)
# Merge: Smithery first, then ModelScope (deduplicate by name)
seen_names = set()
all_results = []
for r in smithery_results + modelscope_results:
if r["name"] not in seen_names:
seen_names.add(r["name"])
all_results.append(r)
if not all_results:
return f'🔍 No MCP servers found for "{query}" on Smithery or ModelScope. Try different keywords.'
results = []
for i, srv in enumerate(all_results[:max_results], 1):
verified = "" if srv["verified"] else ""
source_tag = f"[{srv['source']}]"
if srv["remote"]:
deploy_info = "🌐 Remote (no local install needed)"
else:
deploy_info = "💻 Local install required"
use_info = f" · 👥 {srv['use_count']:,} users" if srv["use_count"] else ""
hp = srv['homepage']
results.append(
f"**{i}. {srv['display_name']}**{verified} {source_tag}\n"
f" ID: `{srv['name']}`\n"
f" {srv['description']}\n"
f" {deploy_info}{use_info}\n"
f" {'🔗 ' + hp if hp else ''}"
)
header = f'🔍 Found {len(results)} MCP server(s) for "{query}":\n\n'
footer = (
"\n\n---\n"
"💡 To import a remote server, use `import_mcp_server` with the server ID.\n"
' Example: import_mcp_server(server_id="gmail")'
)
return header + "\n\n".join(results) + footer
# Keep backward-compatible alias
async def search_smithery(query: str, max_results: int = 5) -> str:
return await search_registries(query, max_results)
# ── Import MCP Server ───────────────────────────────────────────
async def _ensure_smithery_connection(api_key: str, mcp_url: str, display_name: str) -> dict:
"""Create or reuse a Smithery Connect namespace + connection.
Returns dict with keys: namespace, connection_id, auth_url (if OAuth needed).
"""
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
try:
async with httpx.AsyncClient(timeout=20, follow_redirects=True) as client:
# Get or create namespace
ns_resp = await client.get("https://api.smithery.ai/namespaces", headers=headers)
namespaces = ns_resp.json().get("namespaces", []) if ns_resp.status_code == 200 else []
if namespaces:
namespace = namespaces[0]["name"]
else:
create_ns = await client.post(
"https://api.smithery.ai/namespaces",
json={"name": "clawith"},
headers=headers,
)
if create_ns.status_code not in (200, 201):
return {"error": f"Failed to create namespace: HTTP {create_ns.status_code}"}
namespace = create_ns.json()["name"]
# Create connection
conn_id = display_name.lower().replace(" ", "-").replace(":", "")
conn_resp = await client.post(
f"https://api.smithery.ai/connect/{namespace}",
json={"connectionId": conn_id, "mcpUrl": mcp_url, "name": display_name},
headers=headers,
)
if conn_resp.status_code not in (200, 201):
return {"error": f"Failed to create connection: HTTP {conn_resp.status_code}{conn_resp.text[:200]}"}
conn_data = conn_resp.json()
result = {
"namespace": namespace,
"connection_id": conn_data.get("connectionId", conn_id),
}
status = conn_data.get("status", {})
if isinstance(status, dict) and status.get("state") == "auth_required":
result["auth_url"] = status.get("authorizationUrl", "")
return result
except Exception as e:
return {"error": str(e)[:200]}
async def import_mcp_from_smithery(
server_id: str,
agent_id: uuid.UUID,
config: dict | None = None,
reauthorize: bool = False,
) -> str:
"""Import an MCP server from Smithery into the platform.
Uses the Smithery Registry detail API to get tool definitions,
and stores the deploymentUrl for runtime execution via Smithery Connect.
If config contains 'smithery_api_key', it's stored per-agent for future use.
"""
config = dict(config) if config else {} # mutable copy
# Extract smithery_api_key from config (user-provided) or fallback to stored
api_key = config.pop("smithery_api_key", None) or await _get_smithery_api_key(agent_id)
if not api_key:
return (
"❌ Smithery API key is required to import MCP servers.\n\n"
"请提供你的 Smithery API Key你可以通过以下步骤获取\n"
"1. 注册/登录 https://smithery.ai\n"
"2. 前往 https://smithery.ai/account/api-keys 创建 API Key\n"
"3. 将 Key 提供给我,例如:\n"
' `import_mcp_server(server_id="github", config={"smithery_api_key": "your-key"})`'
)
# Write key back to discover_resources / import_mcp_server AgentTool configs
# so it shows up in the Config dialog
try:
async with async_session() as db:
for tool_name in ("discover_resources", "import_mcp_server"):
r = await db.execute(select(Tool).where(Tool.name == tool_name))
tool = r.scalar_one_or_none()
if not tool:
continue
at_r = await db.execute(
select(AgentTool).where(
AgentTool.agent_id == agent_id,
AgentTool.tool_id == tool.id,
)
)
at = at_r.scalar_one_or_none()
if at:
at.config = {**(at.config or {}), "smithery_api_key": api_key}
else:
db.add(AgentTool(
agent_id=agent_id, tool_id=tool.id, enabled=True,
source="system", config={"smithery_api_key": api_key},
))
await db.commit()
except Exception:
pass # non-critical — key is still usable from MCP tool configs
# ---- Early exit: check if this server's tools are already installed for this agent ----
# Check by both tool name prefix AND mcp_server_name to catch different server_id variants
# (e.g., "github" vs "@anthropic/github" both produce server_name "GitHub")
clean_id_check = server_id.replace("/", "_").replace("@", "")
try:
async with async_session() as db:
from sqlalchemy import or_
existing_server_r = await db.execute(
select(Tool).where(
Tool.type == "mcp",
or_(
Tool.name.like(f"mcp_{clean_id_check}%"),
Tool.name.like(f"mcp_{clean_id_check.split('_')[-1]}%"),
),
)
)
existing_server_tools = existing_server_r.scalars().all()
if existing_server_tools and not config and not reauthorize:
# Check if this agent has assignments for these tools
tool_ids = [t.id for t in existing_server_tools]
agent_assignments_r = await db.execute(
select(AgentTool).where(
AgentTool.agent_id == agent_id,
AgentTool.tool_id.in_(tool_ids),
)
)
agent_assignments = agent_assignments_r.scalars().all()
if len(agent_assignments) >= len(existing_server_tools):
tool_names = [t.display_name for t in existing_server_tools[:5]]
more = f" ... and {len(existing_server_tools) - 5} more" if len(existing_server_tools) > 5 else ""
return (
f"⏭️ You already have **{len(existing_server_tools)}** tools from this MCP server installed:\n"
+ "\n".join(f"{n}" for n in tool_names) + more
+ "\n\nNo action needed. These tools are ready to use."
+ "\n\n💡 If tools stopped working (e.g. OAuth expired), use `import_mcp_server(server_id=\"....\", reauthorize=true)` to re-authorize."
)
except Exception:
pass # non-critical — proceed to normal import flow
# Step 1: Search for server by ID
headers = {"Accept": "application/json"}
try:
async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client:
resp = await client.get(
f"{SMITHERY_API_BASE}/servers",
params={"q": server_id.lstrip("@"), "pageSize": 5},
headers=headers,
)
if resp.status_code != 200:
return f"❌ Server '{server_id}' not found on Smithery (HTTP {resp.status_code})"
data = resp.json()
servers = data.get("servers", [])
server_info = None
clean_id = server_id.lstrip("@")
for s in servers:
if s.get("qualifiedName") == clean_id or s.get("qualifiedName") == server_id:
server_info = s
break
if not server_info and servers:
server_info = servers[0]
if not server_info:
return f"❌ Server '{server_id}' not found on Smithery."
except Exception as e:
return f"❌ Failed to fetch server info: {str(e)[:200]}"
display_name = server_info.get("displayName", server_id.split("/")[-1])
description = server_info.get("description", "")
qualified_name = server_info.get("qualifiedName", server_id.lstrip("@"))
# Check if server supports remote hosting
if not server_info.get("remote"):
return (
f"⚠️ **{display_name}** (`{qualified_name}`) does not support remote hosting via Smithery Connect.\n"
f"This server requires local installation and cannot be imported automatically.\n"
f"🔗 {server_info.get('homepage', '')}"
)
# Step 2: Get full server details including tools from registry API
tools_discovered = []
deployment_url = None
try:
async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client:
detail_resp = await client.get(
f"{SMITHERY_API_BASE}/servers/{qualified_name}",
headers=headers,
)
if detail_resp.status_code == 200:
detail = detail_resp.json()
deployment_url = detail.get("deploymentUrl")
raw_tools = detail.get("tools", [])
tools_discovered = [
{
"name": t.get("name", ""),
"description": t.get("description", ""),
"inputSchema": t.get("inputSchema", {}),
}
for t in raw_tools if t.get("name")
]
logger.info(f"[ResourceDiscovery] Got {len(tools_discovered)} tools from registry for {qualified_name}")
else:
logger.warning(f"[ResourceDiscovery] Could not fetch detail for {qualified_name}: HTTP {detail_resp.status_code}")
except Exception as e:
logger.error(f"[ResourceDiscovery] Could not fetch server detail: {e}")
# Step 3: Determine the MCP server URL for runtime execution
base_mcp_url = deployment_url or f"https://{qualified_name}.run.tools"
# Step 3.5: Auto-create Smithery Connect namespace + connection
smithery_config = {} # will be merged into every AgentTool.config
auth_message = ""
conn_result = await _ensure_smithery_connection(api_key, base_mcp_url, display_name)
if "error" in conn_result:
auth_message = f"\n\n⚠️ Could not auto-create Smithery connection: {conn_result['error']}"
else:
smithery_config = {
"smithery_namespace": conn_result["namespace"],
"smithery_connection_id": conn_result["connection_id"],
}
if conn_result.get("auth_url"):
auth_message = (
f"\n\n🔐 **OAuth 授权需要**: 请在浏览器中访问以下链接完成授权:\n"
f"{conn_result['auth_url']}\n"
f"授权完成后,工具即可使用。"
)
# Merge smithery_config + user config for AgentTool
agent_tool_config = {**smithery_config, **config}
async with async_session() as db:
imported_tools = []
# Helper: ensure AgentTool link exists and save config
async def _ensure_agent_tool(tool_id: uuid.UUID):
agent_check = await db.execute(
select(AgentTool).where(
AgentTool.agent_id == agent_id,
AgentTool.tool_id == tool_id,
)
)
at = agent_check.scalar_one_or_none()
if at:
at.config = {**(at.config or {}), **agent_tool_config}
else:
db.add(AgentTool(
agent_id=agent_id, tool_id=tool_id, enabled=True,
source="user_installed", installed_by_agent_id=agent_id,
config=agent_tool_config,
))
# On re-import/reauthorize: update ALL existing tools for this server
if config or reauthorize:
existing_server_tools_r = await db.execute(
select(Tool).where(Tool.mcp_server_name == display_name, Tool.type == "mcp")
)
for et in existing_server_tools_r.scalars().all():
et.mcp_server_url = base_mcp_url
await _ensure_agent_tool(et.id)
if tools_discovered:
# Clean up old generic entry if individual tools are now discovered
generic_name = f"mcp_{server_id.replace('/', '_').replace('@', '')}"
old_generic_r = await db.execute(select(Tool).where(Tool.name == generic_name))
old_generic = old_generic_r.scalar_one_or_none()
if old_generic:
await db.execute(
AgentTool.__table__.delete().where(AgentTool.tool_id == old_generic.id)
)
await db.delete(old_generic)
await db.flush()
# Create one Tool record per MCP tool
for mcp_tool in tools_discovered:
tool_name = f"mcp_{server_id.replace('/', '_').replace('@', '')}_{mcp_tool['name']}"
tool_display = f"{display_name}: {mcp_tool['name']}"
existing_r = await db.execute(select(Tool).where(Tool.name == tool_name))
existing_tool = existing_r.scalar_one_or_none()
if existing_tool:
existing_tool.mcp_server_url = base_mcp_url
await _ensure_agent_tool(existing_tool.id)
if reauthorize:
imported_tools.append(f"🔄 {tool_display} (reauthorized)")
elif config:
imported_tools.append(f"🔄 {tool_display} (config updated)")
else:
imported_tools.append(f"⏭️ {tool_display} (already imported)")
continue
tool = Tool(
name=tool_name,
display_name=tool_display,
description=mcp_tool.get("description", description)[:500],
type="mcp",
category="mcp",
icon="🔌",
parameters_schema=mcp_tool.get("inputSchema", {"type": "object", "properties": {}}),
mcp_server_url=base_mcp_url,
mcp_server_name=display_name,
mcp_tool_name=mcp_tool["name"],
enabled=True,
is_default=False,
source="agent",
)
db.add(tool)
await db.flush()
await _ensure_agent_tool(tool.id)
imported_tools.append(f"{tool_display}")
else:
# Fallback: create a single generic tool entry
tool_name = f"mcp_{server_id.replace('/', '_').replace('@', '')}"
tool_display = display_name
existing_r = await db.execute(select(Tool).where(Tool.name == tool_name))
existing_tool = existing_r.scalar_one_or_none()
if existing_tool:
existing_tool.mcp_server_url = base_mcp_url
await _ensure_agent_tool(existing_tool.id)
if config:
await db.commit()
return f"🔄 {tool_display} config updated. The tool is now ready to use."
else:
return f"⏭️ {tool_display} is already imported."
tool = Tool(
name=tool_name,
display_name=tool_display,
description=description[:500] or f"MCP Server: {server_id}",
type="mcp",
category="mcp",
icon="🔌",
parameters_schema={"type": "object", "properties": {}},
mcp_server_url=base_mcp_url,
mcp_server_name=display_name,
enabled=True,
is_default=False,
source="agent",
)
db.add(tool)
await db.flush()
await _ensure_agent_tool(tool.id)
imported_tools.append(f"{tool_display} (tool list not available from registry — may need configuration)")
await db.commit()
result = f"🔌 Imported MCP server: **{display_name}** (`{server_id}`)\n\n"
result += "\n".join(imported_tools)
result += f"\n\n📡 MCP Server URL: `{base_mcp_url}`"
if auth_message:
result += auth_message
else:
result += "\n\n💡 The imported tools are now available for use."
return result
# ── Direct URL Import ───────────────────────────────────────────
async def import_mcp_direct(
mcp_url: str,
agent_id: uuid.UUID,
server_name: str | None = None,
api_key: str | None = None,
) -> str:
"""Import an MCP server by directly connecting to its HTTP/SSE endpoint.
This bypasses Smithery entirely — useful for self-hosted or third-party
MCP servers that provide their own public endpoint.
"""
from app.services.mcp_client import MCPClient
# Build URL with apiKey if provided
full_url = mcp_url
if api_key and "?" in mcp_url:
full_url = f"{mcp_url}&apiKey={api_key}"
elif api_key:
full_url = f"{mcp_url}?apiKey={api_key}"
display_name = server_name or mcp_url.split("//")[-1].split("/")[0].split(":")[0]
safe_name = display_name.replace(".", "_").replace("/", "_").replace(":", "_").replace("-", "_")
# Try to list tools from the endpoint
tools_discovered = []
try:
client = MCPClient(full_url)
tools_discovered = await client.list_tools()
logger.info(f"[DirectImport] Got {len(tools_discovered)} tools from {mcp_url}")
except Exception as e:
logger.error(f"[DirectImport] Could not list tools from {mcp_url}: {e}")
# Config to store in AgentTool
agent_tool_config = {}
if api_key:
agent_tool_config["api_key"] = api_key
async with async_session() as db:
imported_tools = []
async def _ensure_agent_tool(tool_id: uuid.UUID):
agent_check = await db.execute(
select(AgentTool).where(
AgentTool.agent_id == agent_id,
AgentTool.tool_id == tool_id,
)
)
at = agent_check.scalar_one_or_none()
if at:
at.config = {**(at.config or {}), **agent_tool_config}
else:
db.add(AgentTool(
agent_id=agent_id, tool_id=tool_id, enabled=True,
source="user_installed", installed_by_agent_id=agent_id,
config=agent_tool_config,
))
if tools_discovered:
for mcp_tool in tools_discovered:
tool_name = f"mcp_{safe_name}_{mcp_tool['name']}"
tool_display = f"{display_name}: {mcp_tool['name']}"
existing_r = await db.execute(select(Tool).where(Tool.name == tool_name))
existing_tool = existing_r.scalar_one_or_none()
if existing_tool:
existing_tool.mcp_server_url = mcp_url
await _ensure_agent_tool(existing_tool.id)
imported_tools.append(f"⏭️ {tool_display} (already imported)")
continue
tool = Tool(
name=tool_name,
display_name=tool_display,
description=mcp_tool.get("description", "")[:500],
type="mcp",
category="mcp",
icon="🔌",
parameters_schema=mcp_tool.get("inputSchema", {"type": "object", "properties": {}}),
mcp_server_url=mcp_url,
mcp_server_name=display_name,
mcp_tool_name=mcp_tool["name"],
enabled=True,
is_default=False,
source="agent",
)
db.add(tool)
await db.flush()
await _ensure_agent_tool(tool.id)
imported_tools.append(f"{tool_display}")
else:
tool_name = f"mcp_{safe_name}"
existing_r = await db.execute(select(Tool).where(Tool.name == tool_name))
existing_tool = existing_r.scalar_one_or_none()
if existing_tool:
existing_tool.mcp_server_url = mcp_url
await _ensure_agent_tool(existing_tool.id)
return f"⏭️ {display_name} is already imported."
tool = Tool(
name=tool_name,
display_name=display_name,
description=f"MCP Server: {mcp_url}",
type="mcp",
category="mcp",
icon="🔌",
parameters_schema={"type": "object", "properties": {}},
mcp_server_url=mcp_url,
mcp_server_name=display_name,
enabled=True,
is_default=False,
source="agent",
)
db.add(tool)
await db.flush()
await _ensure_agent_tool(tool.id)
imported_tools.append(f"{display_name} (tools couldn't be listed — server may need configuration)")
await db.commit()
result = f"🔌 Imported MCP server: **{display_name}**\n\n"
result += "\n".join(imported_tools)
result += f"\n\n📡 MCP Server URL: `{mcp_url}`"
result += "\n\n💡 The imported tools are now available for use."
return result
# ── Atlassian Rovo MCP Auto-Seeding ─────────────────────────────────────────
ATLASSIAN_ROVO_MCP_URL = "https://mcp.atlassian.com/v1/mcp"
ATLASSIAN_ROVO_SERVER_NAME = "Atlassian Rovo"
ATLASSIAN_ROVO_TOOL_PREFIX = "atlassian_rovo_"
async def seed_atlassian_rovo_tools(api_key: str) -> None:
"""Connect to Atlassian Rovo MCP and seed all available tools as platform-level MCP tools.
Called on startup when an API key is configured. Existing tools are updated in-place;
new tools discovered from the server are created. The api_key is stored in each tool's
config so _execute_mcp_tool can authenticate requests.
"""
from app.services.mcp_client import MCPClient
logger.info(f"[AtlassianRovo] Connecting to {ATLASSIAN_ROVO_MCP_URL} ...")
try:
client = MCPClient(ATLASSIAN_ROVO_MCP_URL, api_key=api_key)
tools_discovered = await client.list_tools()
except Exception as e:
logger.error(f"[AtlassianRovo] Could not list tools: {e}")
return
if not tools_discovered:
logger.warning("[AtlassianRovo] No tools returned from server")
return
logger.info(f"[AtlassianRovo] Discovered {len(tools_discovered)} tools")
async with async_session() as db:
upserted = 0
for mcp_tool in tools_discovered:
raw_name = mcp_tool.get("name", "")
if not raw_name:
continue
tool_name = f"{ATLASSIAN_ROVO_TOOL_PREFIX}{raw_name}"
tool_display = f"Atlassian: {raw_name}"
tool_desc = mcp_tool.get("description", "")[:500]
tool_schema = mcp_tool.get("inputSchema", {"type": "object", "properties": {}})
# Determine icon based on tool name hints
if "jira" in raw_name.lower() or "issue" in raw_name.lower():
icon = "🔵"
elif "confluence" in raw_name.lower() or "page" in raw_name.lower():
icon = "📘"
elif "compass" in raw_name.lower() or "component" in raw_name.lower():
icon = "🧭"
else:
icon = "🔷"
existing_r = await db.execute(select(Tool).where(Tool.name == tool_name))
existing_tool = existing_r.scalar_one_or_none()
if existing_tool:
# Update description and schema in case they changed
existing_tool.description = tool_desc
existing_tool.parameters_schema = tool_schema
existing_tool.config = {"api_key": api_key}
else:
tool = Tool(
name=tool_name,
display_name=tool_display,
description=tool_desc,
type="mcp",
category="atlassian",
icon=icon,
parameters_schema=tool_schema,
mcp_server_url=ATLASSIAN_ROVO_MCP_URL,
mcp_server_name=ATLASSIAN_ROVO_SERVER_NAME,
mcp_tool_name=raw_name,
enabled=True,
is_default=False,
config={"api_key": api_key},
source="admin",
)
db.add(tool)
upserted += 1
await db.commit()
logger.info(f"[AtlassianRovo] Seeded {upserted} new Atlassian Rovo tools")
async def refresh_atlassian_rovo_api_key(api_key: str) -> None:
"""Update the stored api_key in all Atlassian Rovo tool records.
Called when the user updates the API key via the config UI.
"""
async with async_session() as db:
from sqlalchemy import update as _update
await db.execute(
_update(Tool)
.where(Tool.mcp_server_name == ATLASSIAN_ROVO_SERVER_NAME, Tool.type == "mcp")
.values(config={"api_key": api_key})
)
await db.commit()
logger.info("[AtlassianRovo] API key refreshed for all Rovo tools")