Clawith/backend/app/services/agentbay_client.py

941 lines
40 KiB
Python

"""AgentBay API client using official SDK.
This module provides a client wrapper around the official AgentBay SDK
for browser and code execution operations.
"""
import asyncio
import uuid
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional
from loguru import logger
from agentbay import AgentBay, BrowserOption, CreateSessionParams
@dataclass
class AgentBaySession:
"""AgentBay session info."""
session_id: str
image: str
created_at: datetime
expires_at: Optional[datetime] = None
class AgentBayClient:
"""Client for AgentBay SDK interactions."""
def __init__(self, api_key: str):
self.api_key = api_key
self._sdk = AgentBay(api_key=api_key)
self._session = None
self._image_type = None
async def create_session(self, image: str = "linux_latest") -> AgentBaySession:
"""Create a new session using SDK.
Closes any existing session first to prevent leaked sessions
on the AgentBay API side.
"""
# Close existing session to prevent leaking concurrent sessions
if self._session:
logger.info("[AgentBay] Closing existing session before creating new one")
await self.close_session()
image_id_map = {
"browser_latest": "browser_latest",
"code_latest": "linux_latest",
"linux_latest": "linux_latest",
"windows_latest": "windows_latest",
}
image_id = image_id_map.get(image, image)
self._image_type = image
result = await asyncio.to_thread(self._sdk.create, CreateSessionParams(image_id=image_id))
if not result.success:
raise RuntimeError(f"Failed to create session: {result.error_message}")
self._session = result.session
self._browser_initialized = False
logger.info(f"[AgentBay] Created session with image {image_id}")
return AgentBaySession(
session_id=self._session.session_id,
image=image,
created_at=datetime.now(),
expires_at=datetime.now() + timedelta(hours=1),
)
async def close_session(self):
"""Release the current session."""
if not self._session:
return
try:
await asyncio.to_thread(self._session.delete)
logger.info(f"[AgentBay] Closed session")
except Exception as e:
logger.warning(f"[AgentBay] Failed to close session: {e}")
finally:
self._session = None
self._browser_initialized = False
# ─── Browser Operations ──────────────────────────
async def _ensure_browser_initialized(self):
"""Ensure the browser is initialized for the current session."""
if not self._session:
raise RuntimeError("No active browser session")
if not getattr(self, "_browser_initialized", False):
from agentbay import BrowserOption
from agentbay._common.models.browser import BrowserViewport, BrowserScreen
# Use high-res viewport for clearer screenshots and better layout
options = BrowserOption(
viewport=BrowserViewport(width=1920, height=1080),
screen=BrowserScreen(width=1920, height=1080)
)
success = await asyncio.to_thread(self._session.browser.initialize, options)
if success is False:
raise RuntimeError("SDK failed to initialize browser (returned False).")
self._browser_initialized = True
async def browser_navigate(self, url: str, wait_for: str = "", screenshot: bool = False) -> dict:
"""Navigate browser to URL using SDK.
The AgentBay SDK default navigation timeout is ~60 s. We wrap the call
with a 40-second asyncio soft-timeout so callers receive an actionable
error quickly rather than hanging the whole agent loop. The underlying
SDK thread may continue briefly in the background but its result is
discarded — the browser will eventually settle on its own.
"""
if not self._session or self._image_type not in ("browser", "browser_latest"):
await self.create_session("browser_latest")
await self._ensure_browser_initialized()
# Navigate to URL with a 40-second soft timeout.
# asyncio.wait_for cancels the coroutine wrapper; the blocking thread
# inside asyncio.to_thread keeps running until SDK returns, but we
# no longer block the agent loop waiting for it.
try:
await asyncio.wait_for(
asyncio.to_thread(self._session.browser.operator.navigate, url),
timeout=40.0,
)
except asyncio.TimeoutError:
logger.warning(f"[AgentBay] navigate to {url!r} timed out after 40 s")
raise RuntimeError(
f"Navigation to '{url}' timed out (>40 s). "
"The browser may be busy or the page is unreachable. "
"Try calling agentbay_browser_screenshot to check the current "
"state, or retry the navigation."
)
result = {"url": url, "success": True, "title": url}
if screenshot:
# Wait for dynamic content and SPA rendering (React/Vue) before screenshotting
await asyncio.sleep(3)
screenshot_data = await asyncio.to_thread(
self._session.browser.operator.screenshot, full_page=False
)
result["screenshot"] = screenshot_data
return result
async def browser_screenshot(self) -> dict:
"""Take a screenshot of the current browser page without navigating.
Use this after actions (click, type, form submit) to verify results
without refreshing the page. Never call browser_navigate just to screenshot.
"""
await self._ensure_browser_initialized()
# Wait for dynamic content and SPA rendering before screenshotting
await asyncio.sleep(3)
screenshot_data = await asyncio.to_thread(
self._session.browser.operator.screenshot, full_page=False
)
return {"success": True, "screenshot": screenshot_data}
async def browser_click(self, selector: str) -> dict:
"""Click element by CSS selector using SDK."""
await self._ensure_browser_initialized()
from agentbay import ActOptions
await asyncio.to_thread(self._session.browser.operator.act, ActOptions(action=f"click on {selector}"))
return {"success": True, "selector": selector}
async def browser_type(self, selector: str, text: str) -> dict:
"""Type text into element using SDK."""
await self._ensure_browser_initialized()
from agentbay import ActOptions
# Detect OTP/PIN-style inputs: short digit-only strings (4-8 chars)
# These use segmented input boxes that auto-advance focus per digit,
# so character-by-character typing often fails. Use paste strategy instead.
is_otp = text.isdigit() and 4 <= len(text) <= 8
if is_otp:
action_msg = (
f"The text '{text}' appears to be a verification/OTP code. "
f"Find the verification code input area near '{selector}'. "
f"Click on the first input box, then paste or type the full code '{text}'. "
f"If the input is split into individual digit boxes, click the first box "
f"and type each digit one at a time: {', '.join(text)}. "
f"Each box should auto-advance to the next after entering a digit."
)
else:
# Standard input: click to focus, then type character by character
# to correctly trigger React/Vue input events.
action_msg = (
f"Click on the element matching '{selector}' to focus it, "
f"then use the keyboard to type the text '{text}' character by character. "
f"This ensures modern web frameworks like React register the input."
)
await asyncio.to_thread(self._session.browser.operator.act, ActOptions(action=action_msg))
return {"success": True, "selector": selector, "text": text}
async def browser_login(self, url: str, login_config: str) -> dict:
"""Perform an automated login using AgentBay's built-in login skill.
This leverages AgentBay's AI-driven login capability to handle complex
login flows including CAPTCHAs, OTP inputs, and multi-step authentication.
Args:
url: The login page URL to navigate to first.
login_config: JSON string with login configuration, e.g.
'{"api_key": "xxx", "skill_id": "yyy"}'
"""
if not self._session or self._image_type != "browser":
await self.create_session("browser_latest")
await self._ensure_browser_initialized()
# Navigate to the login page first
await asyncio.to_thread(self._session.browser.operator.navigate, url)
# Execute the login skill
result = await asyncio.to_thread(
self._session.browser.operator.login,
login_config,
use_vision=True,
)
return {
"success": result.success,
"message": result.message or "",
}
# ─── Code Operations ──────────────────────────
async def code_execute(self, language: str, code: str, timeout: int = 30) -> dict:
"""Execute code in code space using SDK."""
lang_map = {
"python": "python",
"bash": "bash",
"shell": "bash",
"node": "node",
"javascript": "node",
}
sdk_lang = lang_map.get(language.lower(), "python")
if not self._session or self._image_type not in ("code", "code_latest"):
await self.create_session("code_latest")
result = await asyncio.to_thread(self._session.code.run_code, code, sdk_lang)
return {
"stdout": result.result if result.success else "",
"stderr": result.error_message if not result.success else "",
"exit_code": 0 if result.success else 1,
"success": result.success,
}
# ─── Browser: Extract & Observe ───────────────────
async def browser_extract(self, instruction: str, selector: str = "") -> dict:
"""Extract structured data from current page using natural language instruction."""
await self._ensure_browser_initialized()
# Wait for dynamic content and SPA rendering before extracting
await asyncio.sleep(3)
from agentbay._common.models.browser_operator import ExtractOptions
# Use a generic dict schema since we cannot define a Pydantic model at runtime
options = ExtractOptions(
instruction=instruction,
schema=dict,
selector=selector or None,
)
success, data = await asyncio.to_thread(
self._session.browser.operator.extract, options
)
return {"success": success, "data": data}
async def browser_observe(self, instruction: str, selector: str = "") -> dict:
"""Observe the current page state and return interactive elements."""
await self._ensure_browser_initialized()
# Wait for dynamic content and SPA rendering before observing
await asyncio.sleep(3)
from agentbay._common.models.browser_operator import ObserveOptions
options = ObserveOptions(
instruction=instruction,
selector=selector or None,
)
success, results = await asyncio.to_thread(
self._session.browser.operator.observe, options
)
# Convert ObserveResult objects to dicts for serialization
result_dicts = []
for r in (results or []):
result_dicts.append(vars(r) if hasattr(r, "__dict__") else str(r))
return {"success": success, "elements": result_dicts}
# ─── Command (Shell) Operations ──────────────────
async def command_exec(self, command: str, timeout_ms: int = 50000, cwd: str = "") -> dict:
"""Execute a shell command in the AgentBay environment."""
if not self._session:
await self.create_session("linux_latest")
result = await asyncio.to_thread(
self._session.command.exec,
command,
timeout_ms=timeout_ms,
cwd=cwd or None,
)
return {
"success": result.success,
"stdout": getattr(result, "stdout", "") or getattr(result, "output", "") or "",
"stderr": getattr(result, "stderr", "") or "",
"exit_code": getattr(result, "exit_code", -1),
"error_message": result.error_message or "",
}
# ─── Computer Operations ──────────────────────────
async def _ensure_computer_session(self):
"""Ensure a computer (linux or windows desktop) session is active."""
if not self._session or self._image_type not in ("computer", "linux_latest", "windows_latest"):
await self.create_session("linux_latest")
async def computer_screenshot(self) -> dict:
"""Take a screenshot of the desktop.
Tries the standard screenshot() API first, then falls back to
beta_take_screenshot() for cloud environments that don't support
the standard API yet.
"""
await self._ensure_computer_session()
# Wait briefly for UI animations/rendering to settle
await asyncio.sleep(2)
try:
result = await asyncio.to_thread(self._session.computer.screenshot)
# Some cloud environments return success=False with a message
# telling us to use beta_take_screenshot() instead of throwing.
if not result.success and "beta_take_screenshot" in (result.error_message or ""):
logger.info("[AgentBay] screenshot() unsupported, falling back to beta_take_screenshot()")
result = await asyncio.to_thread(self._session.computer.beta_take_screenshot)
except Exception as e:
# Also handle the case where it raises an exception
if "beta_take_screenshot" in str(e):
logger.info("[AgentBay] Falling back to beta_take_screenshot() after exception")
result = await asyncio.to_thread(self._session.computer.beta_take_screenshot)
else:
raise
return {
"success": result.success,
"data": getattr(result, "data", None),
"error_message": result.error_message or "",
}
async def computer_click(self, x: int, y: int, button: str = "left") -> dict:
"""Click the mouse at coordinates (x, y)."""
await self._ensure_computer_session()
result = await asyncio.to_thread(self._session.computer.click_mouse, x, y, button)
return {"success": result.success, "x": x, "y": y, "button": button}
async def computer_input_text(self, text: str) -> dict:
"""Input text at the current cursor position."""
await self._ensure_computer_session()
result = await asyncio.to_thread(self._session.computer.input_text, text)
return {"success": result.success, "text": text}
async def computer_press_keys(self, keys: list, hold: bool = False) -> dict:
"""Press keyboard keys (e.g. ['ctrl', 'c'] for Ctrl+C)."""
await self._ensure_computer_session()
result = await asyncio.to_thread(self._session.computer.press_keys, keys, hold=hold)
return {"success": result.success, "keys": keys, "hold": hold}
async def computer_scroll(self, x: int, y: int, direction: str = "down", amount: int = 1) -> dict:
"""Scroll the screen at position (x, y)."""
await self._ensure_computer_session()
result = await asyncio.to_thread(
self._session.computer.scroll, x, y, direction=direction, amount=amount
)
return {"success": result.success, "direction": direction, "amount": amount}
async def computer_move_mouse(self, x: int, y: int) -> dict:
"""Move mouse to coordinates (x, y) without clicking."""
await self._ensure_computer_session()
result = await asyncio.to_thread(self._session.computer.move_mouse, x, y)
return {"success": result.success, "x": x, "y": y}
async def computer_drag_mouse(
self, from_x: int, from_y: int, to_x: int, to_y: int, button: str = "left"
) -> dict:
"""Drag mouse from (from_x, from_y) to (to_x, to_y)."""
await self._ensure_computer_session()
result = await asyncio.to_thread(
self._session.computer.drag_mouse, from_x, from_y, to_x, to_y, button=button
)
return {"success": result.success, "from": [from_x, from_y], "to": [to_x, to_y]}
async def computer_get_screen_size(self) -> dict:
"""Get the screen resolution."""
await self._ensure_computer_session()
result = await asyncio.to_thread(self._session.computer.get_screen_size)
return {
"success": result.success,
"data": getattr(result, "data", None),
"error_message": result.error_message or "",
}
async def computer_start_app(self, cmd: str, work_dir: str = "") -> dict:
"""Start an application by its command."""
await self._ensure_computer_session()
result = await asyncio.to_thread(
self._session.computer.start_app, cmd, work_directory=work_dir
)
return {
"success": result.success,
"data": getattr(result, "data", None),
"error_message": result.error_message or "",
}
async def computer_get_cursor_position(self) -> dict:
"""Get current cursor position."""
await self._ensure_computer_session()
result = await asyncio.to_thread(self._session.computer.get_cursor_position)
return {
"success": result.success,
"data": getattr(result, "data", None),
"error_message": result.error_message or "",
}
async def computer_get_active_window(self) -> dict:
"""Get info about the currently active window."""
await self._ensure_computer_session()
result = await asyncio.to_thread(self._session.computer.get_active_window)
window = getattr(result, "window", None)
return {
"success": result.success,
"window": vars(window) if window and hasattr(window, "__dict__") else str(window),
"error_message": result.error_message or "",
}
async def computer_activate_window(self, window_id: int) -> dict:
"""Activate (bring to front) a window by its ID."""
await self._ensure_computer_session()
result = await asyncio.to_thread(self._session.computer.activate_window, window_id)
return {"success": result.success, "window_id": window_id}
async def computer_list_visible_apps(self) -> dict:
"""List currently visible/running applications."""
await self._ensure_computer_session()
result = await asyncio.to_thread(self._session.computer.list_visible_apps)
data = getattr(result, "data", [])
# Convert process objects to dicts
apps = []
for p in (data or []):
apps.append(vars(p) if hasattr(p, "__dict__") else str(p))
return {
"success": result.success,
"apps": apps,
"error_message": result.error_message or "",
}
# ─── Live Preview Support ──────────────────────────
async def get_live_url(self) -> str | None:
"""Get the VNC/viewer URL for the current computer session.
Calls session.get_link() which returns a shareable viewer URL
for the cloud desktop. Returns None if no session is active
or the API call fails.
"""
if not self._session:
return None
try:
result = await asyncio.to_thread(self._session.get_link)
if result.success and result.data:
logger.info(f"[AgentBay] Got live URL: {str(result.data)[:80]}...")
return result.data
logger.warning(f"[AgentBay] get_link() failed: {result.error_message}")
return None
except Exception as e:
logger.warning(f"[AgentBay] Failed to get live URL: {e}")
return None
async def get_desktop_snapshot_base64(self) -> str | None:
"""Take a quick desktop screenshot and return compressed base64 JPEG.
Used for live preview panel. Calls the same screenshot API as
computer_screenshot() but without the sleep delay, and compresses
the result for efficient WebSocket transfer.
Returns data:image/jpeg;base64,... or None on failure.
"""
if not self._session:
return None
try:
# Use the same screenshot logic as computer_screenshot()
try:
result = await asyncio.to_thread(self._session.computer.screenshot)
if not result.success and "beta_take_screenshot" in (result.error_message or ""):
result = await asyncio.to_thread(self._session.computer.beta_take_screenshot)
except Exception as e:
if "beta_take_screenshot" in str(e):
result = await asyncio.to_thread(self._session.computer.beta_take_screenshot)
else:
raise
screenshot_data = getattr(result, "data", None)
if not screenshot_data:
return None
# Compress to JPEG base64 for live preview
import base64
from io import BytesIO
from PIL import Image
img = Image.open(BytesIO(screenshot_data))
# Resize to max 1920px wide for live preview (up from 1280px to preserve details)
if img.width > 1920:
ratio = 1920 / img.width
img = img.resize((int(img.width * ratio), int(img.height * ratio)), Image.LANCZOS)
if img.mode in ("RGBA", "P"):
img = img.convert("RGB")
buffer = BytesIO()
img.save(buffer, format="JPEG", quality=80, optimize=True)
b64 = base64.b64encode(buffer.getvalue()).decode("ascii")
return f"data:image/jpeg;base64,{b64}"
except Exception as e:
logger.warning(f"[AgentBay] Desktop snapshot failed: {e}")
return None
async def get_browser_snapshot_base64(self) -> str | None:
"""Take a quick browser screenshot and return compressed base64 JPEG.
Used for live preview panel — no wait/sleep since we want
the snapshot to reflect the current state immediately.
Returns data:image/jpeg;base64,... or None on failure.
"""
if not self._session:
logger.info("[AgentBay] Browser snapshot skipped: No active session")
return None
if not getattr(self, "_browser_initialized", False):
logger.info("[AgentBay] Browser snapshot skipped: Browser not initialized")
return None
try:
screenshot_data = await asyncio.to_thread(
self._session.browser.operator.screenshot, full_page=False
)
if not screenshot_data:
logger.info("[AgentBay] Browser snapshot returned empty data")
return None
# Compress screenshot to JPEG base64 for efficient transfer
import base64
from io import BytesIO
from PIL import Image
if isinstance(screenshot_data, str):
# The AgentBay SDK may return a raw base64 string without proper
# padding. Normalize by stripping whitespace and adding padding chars.
screenshot_data = screenshot_data.strip()
# Remove data URI prefix if present (e.g., "data:image/png;base64,")
if "," in screenshot_data:
screenshot_data = screenshot_data.split(",", 1)[1]
# Add base64 padding if missing
missing_padding = len(screenshot_data) % 4
if missing_padding:
screenshot_data += "=" * (4 - missing_padding)
screenshot_data = base64.b64decode(screenshot_data)
img = Image.open(BytesIO(screenshot_data))
# Resize to max 1920px wide for live preview (up from 1280px to preserve details)
if img.width > 1920:
ratio = 1920 / img.width
img = img.resize((int(img.width * ratio), int(img.height * ratio)), Image.LANCZOS)
if img.mode in ("RGBA", "P"):
img = img.convert("RGB")
buffer = BytesIO()
img.save(buffer, format="JPEG", quality=80, optimize=True)
b64 = base64.b64encode(buffer.getvalue()).decode("ascii")
return f"data:image/jpeg;base64,{b64}"
except Exception as e:
logger.warning(f"[AgentBay] Browser snapshot failed: {e}")
return None
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close_session()
# ─── Session Cache for Tool Executions ──────────────────────────
# Key: (agent_id, session_id, image_type) so each ChatSession gets
# its own independent AgentBay instance for browser/computer/code.
# Previously keyed by (agent_id, image_type) which meant all users
# of the same Agent shared one browser/desktop — causing conflicts.
_agentbay_sessions: dict[tuple[uuid.UUID, str, str], tuple[AgentBayClient, datetime]] = {}
_AGENTBAY_SESSION_TIMEOUT = timedelta(minutes=5)
AGENTBAY_API_URL = "https://api.agentbay.ai/v1"
async def get_agentbay_api_key_for_agent(agent_id: uuid.UUID, db=None) -> Optional[str]:
"""Return the configured AgentBay API key for the given agent.
Resolution order:
1. Per-agent ChannelConfig (channel_type='agentbay') — set via Agent detail page
2. Global Tool.config.api_key (category='agentbay') — set via Company Settings
"""
from app.models.channel_config import ChannelConfig
from app.models.tool import Tool
from sqlalchemy import select
from app.database import async_session
from app.core.security import decrypt_data
from app.config import get_settings
async def _fetch(session):
# 1) Check per-agent ChannelConfig first (highest priority)
result = await session.execute(
select(ChannelConfig).where(
ChannelConfig.agent_id == agent_id,
ChannelConfig.channel_type == "agentbay",
ChannelConfig.is_configured == True,
)
)
config = result.scalar_one_or_none()
if config and config.app_secret:
# Try to decrypt, fallback to plaintext if it fails
try:
return decrypt_data(config.app_secret, get_settings().SECRET_KEY)
except Exception:
return config.app_secret
# 2) Fallback: check global Tool.config.api_key for agentbay tools.
#
# Only agentbay_browser_navigate (the "primary" AgentBay tool) has a
# config_schema with an api_key field, so it is the only tool whose
# config is ever populated with a key via the Company Settings UI.
# We therefore query it first, then fall back to scanning all agentbay
# tools — this prevents a non-deterministic .limit(1) from returning a
# tool with an empty config (e.g. agentbay_computer_screenshot), which
# would silently return None even when a key IS configured.
tool_result = await session.execute(
select(Tool).where(
Tool.name == "agentbay_browser_navigate",
Tool.enabled == True,
).limit(1)
)
tool = tool_result.scalar_one_or_none()
# Also scan all agentbay tools in case the key was stored differently
if not (tool and tool.config and tool.config.get("api_key")):
all_result = await session.execute(
select(Tool).where(
Tool.category == "agentbay",
Tool.enabled == True,
)
)
for candidate in all_result.scalars().all():
if candidate.config and candidate.config.get("api_key"):
tool = candidate
break
if tool and tool.config and tool.config.get("api_key"):
api_key = tool.config["api_key"]
# Try to decrypt (global config is encrypted via _encrypt_sensitive_fields)
try:
return decrypt_data(api_key, get_settings().SECRET_KEY)
except Exception:
return api_key
return None
if db:
return await _fetch(db)
async with async_session() as session:
return await _fetch(session)
async def test_agentbay_channel(agent_id: uuid.UUID, current_user, db) -> dict:
"""Test AgentBay connectivity."""
key = await get_agentbay_api_key_for_agent(agent_id, db)
if not key:
return {"ok": False, "error": "AgentBay not configured"}
try:
from agentbay import AgentBay, CreateSessionParams
sdk = AgentBay(api_key=key)
# Using linux_latest instead of browser_latest. AgentBay tokens may be
# scoped/bound to specific instance types, and requesting browser_latest
# might trigger an 'InvalidParameter.Authorization' error for this key.
result = await asyncio.to_thread(sdk.create, CreateSessionParams(image_id="linux_latest"))
if result.success:
if result.session:
await asyncio.to_thread(result.session.delete)
return {"ok": True, "message": "✅ Successfully connected to AgentBay API"}
return {"ok": False, "error": result.error_message}
except Exception as e:
return {"ok": False, "error": str(e)}
async def get_agentbay_client_for_agent(agent_id: uuid.UUID, image_type: str, session_id: str = "") -> AgentBayClient:
"""Get or create AgentBay client for agent.
Sessions are cached per (agent_id, session_id, image_type) so that each
ChatSession gets its own independent AgentBay instance. Multiple users
chatting with the same Agent will each have isolated browser/desktop/code
environments.
Args:
agent_id: The agent UUID.
image_type: One of 'browser', 'computer', 'code'.
session_id: The ChatSession ID. Defaults to '' for backward compat
(e.g. test_agentbay_channel, single-session callers).
"""
now = datetime.now()
cache_key = (agent_id, session_id, image_type)
if cache_key in _agentbay_sessions:
client, last_used = _agentbay_sessions[cache_key]
if now - last_used < _AGENTBAY_SESSION_TIMEOUT:
# Session still valid, refresh timestamp and reuse
_agentbay_sessions[cache_key] = (client, now)
return client
else:
# Session expired, close and remove
logger.info(f"[AgentBay] Session expired for {image_type} (session={session_id[:8]}), closing")
await client.close_session()
del _agentbay_sessions[cache_key]
from app.services.agent_tools import _get_tool_config
tool_config = await _get_tool_config(agent_id, "agentbay_browser_navigate")
api_key = None
if tool_config and tool_config.get("api_key"):
api_key = tool_config.get("api_key")
from app.core.security import decrypt_data
from app.config import get_settings
try:
api_key = decrypt_data(api_key, get_settings().SECRET_KEY)
except Exception:
pass # Fallback if it's somehow plaintext
else:
api_key = await get_agentbay_api_key_for_agent(agent_id)
if not api_key:
raise RuntimeError("AgentBay not configured for this agent. Please configure in Tools > AgentBay.")
client = AgentBayClient(api_key)
if image_type == "browser":
await client.create_session("browser_latest")
# Inject stored cookies after browser initialization
await _inject_credentials(client, agent_id)
elif image_type == "computer":
# Read OS preference from tool config (default: windows)
os_type = (tool_config or {}).get("os_type", "windows")
computer_image = "windows_latest" if os_type == "windows" else "linux_latest"
logger.info(f"[AgentBay] Creating computer session with OS: {os_type} (image: {computer_image}) for session={session_id[:8]}")
await client.create_session(computer_image)
else:
await client.create_session("code_latest")
_agentbay_sessions[cache_key] = (client, now)
return client
async def cleanup_agentbay_sessions():
"""Clean up expired AgentBay sessions."""
now = datetime.now()
expired = [
cache_key for cache_key, (client, last_used) in _agentbay_sessions.items()
if now - last_used > _AGENTBAY_SESSION_TIMEOUT
]
for cache_key in expired:
client, _ = _agentbay_sessions.pop(cache_key)
agent_id, session_id, image_type = cache_key
logger.info(f"[AgentBay] Cleaning up expired {image_type} session for agent {agent_id} (session={session_id[:8]})")
await client.close_session()
async def _inject_credentials(client: AgentBayClient, agent_id: uuid.UUID):
"""Inject stored cookies into the browser via CDP after initialization.
Reads all 'active' credentials with cookies from the agent_credentials table,
decrypts cookies_json, and injects them via a Playwright Node.js script that
connects to Chrome's CDP port (localhost:9222).
This runs automatically after every browser session creation. If no credentials
exist or injection fails, it logs a warning but does not block the session.
"""
import json
from app.database import async_session as async_session_factory
from app.models.agent_credential import AgentCredential
from sqlalchemy import select
from app.core.security import decrypt_data
from app.config import get_settings
settings = get_settings()
# Fetch active credentials with stored cookies
try:
async with async_session_factory() as db:
result = await db.execute(
select(AgentCredential).where(
AgentCredential.agent_id == agent_id,
AgentCredential.status == "active",
AgentCredential.cookies_json.isnot(None),
)
)
credentials = result.scalars().all()
except Exception as e:
logger.warning(f"[AgentBay] Failed to query credentials for injection: {e}")
return
if not credentials:
return # No cookies to inject
# Collect and decrypt all cookies
all_cookies = []
for cred in credentials:
try:
raw = decrypt_data(cred.cookies_json, settings.SECRET_KEY)
cookies = json.loads(raw)
if isinstance(cookies, list):
all_cookies.extend(cookies)
except Exception as e:
logger.warning(f"[AgentBay] Failed to decrypt cookies for {cred.platform}: {e}")
if not all_cookies:
return
# Ensure browser is initialized before injection (Chrome must be running)
try:
await client._ensure_browser_initialized()
except Exception as e:
logger.warning(f"[AgentBay] Cannot inject cookies — browser not initialized: {e}")
return
# Build Node.js injection script.
# Use base64 encoding to write the script to the current working dir (not /tmp,
# which may lack write permissions in the Wuying browser sandbox).
#
# Cookies stored in DB were already sanitized at export time (sameSite title-cased,
# expires:-1 removed, domain without leading dot), so we only do a defensive
# re-sanitize here in case older records were stored before the fix.
import base64 as _base64
cookies_json_str = json.dumps(all_cookies)
inject_script = r"""
const { chromium } = require('/usr/local/lib/node_modules/playwright');
(async () => {
try {
const browser = await chromium.connectOverCDP('http://localhost:9222');
const context = browser.contexts()[0];
const rawCookies = """ + cookies_json_str + r""";
// Defensive sanitize: normalize sameSite casing and strip invalid expires
const sameSiteMap = { none: 'None', lax: 'Lax', strict: 'Strict' };
const cookies = rawCookies.map(c => {
const out = { ...c };
if (out.sameSite != null) {
out.sameSite = sameSiteMap[String(out.sameSite).toLowerCase()] || 'Lax';
}
if (out.expires != null && out.expires <= 0) {
delete out.expires;
}
// Ensure domain has leading dot for subdomain matching
if (out.domain && !out.domain.startsWith('.')) {
out.domain = '.' + out.domain;
}
return out;
});
let injected = 0;
let failed = 0;
// Inject one at a time so a single bad cookie doesn't break the rest
for (const cookie of cookies) {
try {
await context.addCookies([cookie]);
injected++;
} catch (e) {
failed++;
if (failed <= 3) {
// Log first few failures to aid debugging
console.error('INJECT_SKIP:' + e.message + ' cookie=' + JSON.stringify(cookie).slice(0, 200));
}
}
}
console.log('INJECT_OK:' + injected + ' injected, ' + failed + ' skipped');
process.exit(0);
} catch (e) {
console.error('INJECT_FAIL:' + e.message);
process.exit(1);
}
})();
"""
try:
# Write script via base64 decode to avoid shell quoting issues and /tmp permission errors
script_b64 = _base64.b64encode(inject_script.encode('utf-8')).decode('ascii')
write_result = await asyncio.to_thread(
client._session.command.exec,
f"echo '{script_b64}' | /usr/bin/base64 -d > tc_inject_cookies.js",
)
write_ok = getattr(write_result, 'success', False)
logger.info(f"[AgentBay] Cookie inject script write: success={write_ok}")
# Execute the injection script
exec_result = await asyncio.to_thread(
client._session.command.exec,
"node tc_inject_cookies.js",
timeout_ms=15000,
)
stdout = getattr(exec_result, 'stdout', '') or getattr(exec_result, 'output', '') or ''
stderr = getattr(exec_result, 'stderr', '') or ''
if "INJECT_OK" in stdout:
logger.info(f"[AgentBay] Cookie injection successful for agent {agent_id}: {stdout.strip()[:100]}")
# Update last_injected_at for all injected credentials
try:
from datetime import timezone as tz
now = datetime.now(tz.utc)
async with async_session_factory() as db:
for cred in credentials:
cred.last_injected_at = now
db.add(cred)
await db.commit()
except Exception as e:
logger.warning(f"[AgentBay] Failed to update last_injected_at: {e}")
else:
logger.warning(f"[AgentBay] Cookie injection may have failed: stdout={stdout[:200]}, stderr={stderr[:200]}")
except Exception as e:
logger.warning(f"[AgentBay] Cookie injection error: {e}")