Clawith/backend/app/api/agentbay_control.py

1155 lines
48 KiB
Python

"""AgentBay Take Control API — human-agent collaborative login.
Provides REST endpoints for forwarding mouse/keyboard events to an
AgentBay session and managing the Take Control lock. When locked,
the agent's automatic browser/computer tool execution is paused to
prevent human-agent input collisions.
Cookie export occurs automatically when the Take Control session ends.
"""
import asyncio
import json
import logging
import time
import uuid
from datetime import datetime, timezone
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import get_settings
from app.core.permissions import check_agent_access
from app.core.security import encrypt_data, get_current_user
from app.database import get_db
from app.models.agent_credential import AgentCredential
from app.models.user import User
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/agents/{agent_id}/control", tags=["agentbay-control"])
# ── In-memory Take Control lock registry ──
# Key: (agent_id_str, session_id_str) → (user_id, lock_timestamp, env_type)
# env_type: 'browser' | 'computer' | 'code' — which AgentBay environment the
# user is controlling. Stored at lock time so all subsequent TC endpoints can
# look up the correct session type without re-deriving it from the frontend.
_take_control_locks: dict[tuple[str, str], tuple[str, float, str]] = {}
_LOCK_TIMEOUT_SECONDS = 600 # Auto-expire stale locks after 10 minutes
# Cache of sessions that have already had browser initialization called.
# Avoids redundant _ensure_browser_initialized() on every screenshot poll.
_browser_initialized: set[tuple] = set()
# Per-session interaction locks to serialize concurrent TC interactions.
# Without this, two rapid clicks both write tc_action.js simultaneously,
# corrupting one script's execution. Each TC session gets its own Lock.
_tc_interaction_locks: dict[str, asyncio.Lock] = {}
def _get_interaction_lock(agent_id: uuid.UUID, session_id: str) -> asyncio.Lock:
"""Get or create the per-session asyncio.Lock for TC interactions."""
key = f"{agent_id}:{session_id}"
if key not in _tc_interaction_locks:
_tc_interaction_locks[key] = asyncio.Lock()
return _tc_interaction_locks[key]
def is_session_locked(agent_id: str, session_id: str) -> bool:
"""Check if a session is currently under human Take Control.
Called by execute_tool to block automatic agentbay_* tool calls.
Automatically clears expired locks.
"""
key = (agent_id, session_id)
if key not in _take_control_locks:
return False
_user_id, locked_at, _env_type = _take_control_locks[key]
if time.time() - locked_at > _LOCK_TIMEOUT_SECONDS:
logger.info(f"[TakeControl] Auto-expired stale lock for session={session_id[:8]}")
del _take_control_locks[key]
return False
return True
def _get_session_env_type(agent_id: str, session_id: str) -> str:
"""Return the env_type stored in the lock registry for this session.
Falls back to 'browser' if no lock entry is found (backward compat).
"""
key = (agent_id, session_id)
entry = _take_control_locks.get(key)
if entry:
_user_id, _locked_at, env_type = entry
return env_type
return "browser"
# ── Request schemas ──
class ClickRequest(BaseModel):
"""Mouse click event forwarding."""
session_id: str
x: int
y: int
button: str = "left" # left | right | middle
class TypeRequest(BaseModel):
"""Text input event forwarding."""
session_id: str
text: str
class PressKeysRequest(BaseModel):
"""Keyboard key press event forwarding."""
session_id: str
keys: list[str] # e.g. ["ctrl", "v"] or ["Tab"]
class DragRequest(BaseModel):
"""Mouse drag event forwarding — used for slider CAPTCHAs and drag-and-drop."""
session_id: str
from_x: int
from_y: int
to_x: int
to_y: int
duration_ms: int = 600 # Total drag duration in milliseconds
class ScreenshotRequest(BaseModel):
"""Request an immediate screenshot."""
session_id: str
class LockRequest(BaseModel):
"""Enter Take Control mode."""
session_id: str
platform_hint: Optional[str] = None # current page domain (for cookie export)
env_type: Optional[str] = "browser" # which env the user is controlling: browser | computer | code
class UnlockRequest(BaseModel):
"""Exit Take Control mode."""
session_id: str
export_cookies: bool = True # whether to export cookies on exit
platform_hint: Optional[str] = None # domain to associate cookies with
# ── Helpers ──
async def _get_client(agent_id: uuid.UUID, session_id: str, env_type: str = "browser"):
"""Retrieve the AgentBay client for the given agent + session.
Search order (most to least specific):
1. Exact match: (agent_id, session_id, env_type) — fastest, correct in normal flow.
2. Env-type preference, any session: search all cached sessions for this agent
by env_type preference. This handles the common case where the TC frontend's
session_id doesn't exactly match the session_id the agent used when it created
the AgentBay session (e.g., new chat thread opened mid-task).
3. Create new session: last resort — will show a blank desktop/browser.
IMPORTANT: For browser sessions, this also calls _ensure_browser_initialized()
because the browser SDK requires explicit initialization before screenshot/
interaction APIs will work. Without this, get_browser_snapshot_base64() returns
None ("Browser not initialized") and all CDP-based interactions fail silently.
"""
from app.services.agentbay_client import _agentbay_sessions, _AGENTBAY_SESSION_TIMEOUT
from datetime import datetime
now = datetime.now()
# Build search order: requested env_type first, then the rest as fallback
all_types = ["browser", "computer", "code"]
search_order = [env_type] + [t for t in all_types if t != env_type]
# ── Phase 1: Exact (agent_id, session_id, env_type) match ──
for image_type in search_order:
cache_key = (agent_id, session_id, image_type)
if cache_key in _agentbay_sessions:
client, last_used = _agentbay_sessions[cache_key]
if now - last_used < _AGENTBAY_SESSION_TIMEOUT:
_agentbay_sessions[cache_key] = (client, now)
logger.info(
f"[TakeControl] Found existing {image_type} session (exact match) for "
f"agent={agent_id}, session={session_id[:8]} "
f"(requested env_type={env_type})"
)
if image_type in ("browser", "browser_latest") and cache_key not in _browser_initialized:
try:
await client._ensure_browser_initialized()
_browser_initialized.add(cache_key)
except Exception as e:
logger.warning(f"[TakeControl] Browser init on cached session failed: {e}")
return client
# ── Phase 2: Fallback — search all sessions for this agent by env_type preference ──
# The TC frontend session_id may not match the session_id that the agent used
# when it created the AgentBay session (e.g., agent started in conversation A,
# user opens TC from conversation B). We still want to connect to the agent's
# ACTIVE session rather than spin up a blank new one.
best_client = None
best_image_type = None
best_cache_key = None
best_ts = None
# Scan all cached sessions; prefer env_type match and most-recently-used
for img_type in search_order:
for (ag_id, sess_id, it), (client, last_used) in list(_agentbay_sessions.items()):
if ag_id != agent_id or it != img_type:
continue
if now - last_used >= _AGENTBAY_SESSION_TIMEOUT:
continue
# Pick the most recently used session among candidates
if best_ts is None or last_used > best_ts:
best_client = client
best_image_type = it
best_cache_key = (ag_id, sess_id, it)
best_ts = last_used
if best_client:
break # Found a match for preferred env_type — stop
if best_client:
# Refresh the timestamp so this session stays warm
_agentbay_sessions[best_cache_key] = (best_client, now)
logger.info(
f"[TakeControl] Found existing {best_image_type} session (agent-id fallback) for "
f"agent={agent_id} (requested session={session_id[:8]}, "
f"actual session={best_cache_key[1][:8]}, env_type={env_type})"
)
if best_image_type in ("browser", "browser_latest") and best_cache_key not in _browser_initialized:
try:
await best_client._ensure_browser_initialized()
_browser_initialized.add(best_cache_key)
except Exception as e:
logger.warning(f"[TakeControl] Browser init on fallback session failed: {e}")
return best_client
# ── Phase 3: No cached session found — create a new session ──
# This is a last resort: the agent has no active AgentBay session at all.
# The resulting session will show a blank browser/desktop until the agent
# starts using it via its tools.
from app.services.agentbay_client import get_agentbay_client_for_agent
logger.warning(
f"[TakeControl] No cached AgentBay session found for agent={agent_id} "
f"(env_type={env_type}). Creating new session — will show blank screen."
)
try:
client = await get_agentbay_client_for_agent(
agent_id, image_type=env_type, session_id=session_id
)
if env_type == "browser":
try:
await client._ensure_browser_initialized()
_browser_initialized.add((agent_id, session_id, "browser"))
logger.info(f"[TakeControl] Browser initialized for new session, agent={agent_id}")
except Exception as e:
logger.warning(f"[TakeControl] Browser init on new session failed: {e}")
return client
except Exception as e:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"No active {env_type} session found: {e}",
)
# ── Session-aware input helpers ──
# Browser sessions use CDP (Chrome DevTools Protocol) via Playwright to
# interact directly with Chrome. Desktop sessions use the SDK's computer API.
import asyncio
def _is_browser_session(client) -> bool:
"""Check if the client's active session is a browser image."""
return getattr(client, "_image_type", "") in ("browser", "browser_latest")
async def _cdp_exec(client, script: str, timeout_ms: int = 15000) -> dict:
"""Execute a Playwright CDP script inside the AgentBay container.
Uses the AgentBayClient.command_exec wrapper which properly handles
the SDK call and returns a dict with {success, stdout, stderr, ...}.
"""
# Write script to temp file inside the container
write_result = await client.command_exec(
f"cat > /tmp/_tc_action.js << 'TCEOF'\n{script}\nTCEOF",
timeout_ms=5000,
)
if not write_result.get("success"):
logger.error(f"[TakeControl] Failed to write CDP script: {write_result}")
return {"success": False, "output": "Failed to write script", "stderr": str(write_result)[:200]}
result = await client.command_exec(
"node /tmp/_tc_action.js",
timeout_ms=timeout_ms,
)
stdout = result.get("stdout", "") or result.get("output", "") or ""
stderr = result.get("stderr", "") or result.get("error_message", "") or ""
cmd_success = result.get("success", False)
tc_success = "TC_OK" in stdout
logger.info(
f"[TakeControl] CDP exec: cmd_success={cmd_success}, tc_ok={tc_success}, "
f"stdout={stdout[:200]}, stderr={stderr[:200]}, exit_code={result.get('exit_code', 'N/A')}"
)
return {"success": tc_success, "output": stdout[:500], "stderr": stderr[:200]}
async def _eval_cdp_script(client, script_body: str) -> dict:
"""Evaluate a Node.js Playwright CDP script in the browser container."""
import base64
try:
# Base64 encode the script to avoid shell escaping issues inside the container
script_b64 = base64.b64encode(script_body.encode('utf-8')).decode('ascii')
# Write base64 to file and decode it to tc_action.js (in current working dir, since /tmp might be restricted)
cmd_write = f"echo '{script_b64}' | /usr/bin/base64 -d > tc_action.js"
await asyncio.to_thread(client._session.command.exec, cmd_write)
# Execute the script
result = await asyncio.to_thread(client._session.command.exec, "node tc_action.js")
success = getattr(result, 'success', False)
output = getattr(result, 'output', '') or getattr(result, 'stdout', '') or ''
stderr = getattr(result, 'stderr', '') or ''
if not success:
logger.error(f"[TakeControl] CDP execution failed. Output: {output}, Stderr: {stderr}")
return {"success": False, "output": f"Node error: {stderr[:200]}"}
return {"success": True, "output": output}
except Exception as e:
logger.error(f"[TakeControl] CDP exception: {e}")
return {"success": False, "output": str(e)}
async def _tc_browser_cleanup(agent_id: uuid.UUID, session_id: str) -> None:
"""Best-effort cleanup immediately after Take Control exits.
Uses the AgentBay SDK's own browser.operator.navigate() to navigate to
about:blank. This goes through the SERVICE'S Playwright instance (not a
new connectOverCDP connection), so there's no competing CDP session,
no Target.attachToTarget/detachFromTarget events, and no risk of confusing
the service's internal page state.
IMPORTANT: Previous approaches that used connectOverCDP + browser.close()
for cleanup were sending Target.detachFromTarget events to Chrome while
navigation was in progress. The AgentBay service's Playwright received
these detach events mid-navigation, which put its internal state machine
into a 60-second recovery loop before it could accept the next page.goto().
"""
from app.services.agentbay_client import _agentbay_sessions
cleanup_client = None
for img_type in ("browser", "browser_latest"):
ck = (agent_id, session_id, img_type)
if ck in _agentbay_sessions:
cleanup_client = _agentbay_sessions[ck][0]
break
if not cleanup_client:
return
try:
# Cleanup strategy: stop all in-flight page navigations, then navigate
# the active content page to about:blank.
#
# WHY multi-step:
# 1. stopLoading on all pages: a TC click may have opened a NEW TAB
# (target=_blank link on baidu) that is still loading a heavy article.
# Page.stopLoading kills that load immediately so Chrome's DevTools
# is no longer blocked draining a multi-MB response.
# 2. Page.navigate to about:blank on the active page: gives the AgentBay
# service's page.goto() a clean starting point. about:blank commits in
# <10ms; the service no longer has to wait for tieba/zhihu/baidu to drain.
# 3. Wait for Page.loadEventFired before process.exit(): ensures Chrome has
# fully settled at about:blank before we disconnect. This means Chrome
# emits Target.detachedFromTarget (from our WebSocket close) while the
# page is in a stable, loaded state — not mid-navigation — so the
# service's Playwright state machine doesn't enter a 60-second recovery.
# 4. No browser.close(): we let Node.js exit naturally. Chrome handles
# the WebSocket close without an explicit Target.detachFromTarget CDP
# command that races with other async CDP events.
cleanup_script = """
const { chromium } = require('/usr/local/lib/node_modules/playwright');
(async () => {
try {
const browser = await chromium.connectOverCDP('http://localhost:9222');
const context = browser.contexts()[0];
const allPages = context.pages();
// Stop all loading pages so Chrome is not draining heavy responses.
// tc clicks frequently open new tabs (target=_blank) that stay loading
// for 20-40s; stopping them is critical for fast post-TC recovery.
for (const p of allPages) {
try {
const cdp = await context.newCDPSession(p);
await cdp.send('Page.stopLoading');
await cdp.detach();
} catch(_) {}
}
// Navigate the active content page (last non-blank) to about:blank.
// Use raw CDP Page.navigate — the AgentBay SDK rejects about:blank
// ("must start with http or https") but Chrome's CDP has no such rule.
const contentPage = allPages.slice().reverse().find(p => p.url() !== 'about:blank')
|| allPages[allPages.length - 1];
const cdp = await context.newCDPSession(contentPage);
// Navigate and wait for loadEventFired so about:blank is fully settled.
await new Promise((resolve) => {
cdp.on('Page.loadEventFired', () => resolve());
cdp.send('Page.navigate', { url: 'about:blank' }).catch(() => resolve());
setTimeout(resolve, 800); // Fallback: about:blank always loads in <100ms
});
console.log('CLEANUP_OK');
} catch(e) {
console.error('CLEANUP_FAIL: ' + e.message);
}
// No browser.close() — let Chrome handle WebSocket close gracefully after
// the page is in a stable loaded state (about:blank).
process.exit(0);
})();
"""
res = await _eval_cdp_script(cleanup_client, cleanup_script)
logger.info(
f"[TakeControl] Cleanup: {res.get('output', 'no output')[:100]} "
f"for session={session_id[:8]}"
)
except Exception as e:
logger.warning(f"[TakeControl] Cleanup failed (non-fatal): {e}")
async def _perform_click(client, x: int, y: int, button: str = "left"):
"""Click at (x, y) on the remote session.
Browser sessions use connectOverCDP because the Computer API's click_mouse
tool is only available in the computer image type, not browser_latest.
Each CDP script uses try/catch/finally with browser.close() to ensure a
graceful disconnect so Chrome's DevTools session does not leak.
"""
image_type = getattr(client, '_image_type', 'unknown')
logger.info(f"[TakeControl] Click at ({x}, {y}), button={button}, image_type={image_type}")
if _is_browser_session(client):
script = f"""
const {{ chromium }} = require('/usr/local/lib/node_modules/playwright');
(async () => {{
let ok = false;
try {{
const browser = await chromium.connectOverCDP('http://localhost:9222');
const context = browser.contexts()[0];
const pages = context.pages();
// Page selection: prefer the last page with a committed non-blank URL.
// When a tc click opens a new tab (target=_blank), the new tab briefly
// has url() === 'about:blank' before its navigation commits. During that
// window, we correctly target the ORIGINAL content page (the one the user
// sees in the TC screenshot). The NEXT click, after the new tab has settled,
// will naturally pick the new tab because its URL will be non-blank by then.
const page = pages.slice().reverse().find(p => p.url() !== 'about:blank')
|| pages[pages.length - 1];
const initialUrl = page.url();
const initialPageCount = pages.length;
console.log('TARGET_PAGE:' + initialUrl);
await page.mouse.click({x}, {y}, {{ button: '{button}' }});
console.log('CLICK_OK');
ok = true;
// Wait 2 seconds for any triggered navigation to commit before releasing
// the interaction lock. This covers both cases:
// A) Same-tab navigation: URL commits in ~0.5-1s
// B) New-tab navigation (target=_blank): new tab URL transitions from
// about:blank to the target URL in ~1-2s
//
// WHY a fixed sleep instead of polling context.pages() every 200ms:
// Polling makes ~20 CDP calls while Chrome is loading a heavy new tab.
// Under that combined load, Chrome's DevTools HTTP server stops responding,
// causing the NEXT connectOverCDP to time out with a 30-second error.
// A passive sleep has zero CDP overhead and achieves the same goal.
await new Promise(r => setTimeout(r, 2000));
}} catch (e) {{
console.error('CLICK_FAIL:' + e.message);
}}
// No browser.close() — avoid explicit Target.detachFromTarget.
// Chrome handles the WebSocket close gracefully.
process.exit(ok ? 0 : 1);
}})();
"""
res = await _eval_cdp_script(client, script)
return {"success": res.get("success", False) and "CLICK_OK" in res.get("output", ""), "method": "cdp_click", "output": "Clicked" if "CLICK_OK" in res.get("output", "") else res.get("output", "Unknown error")}
# Desktop session — use Computer API
try:
result = await asyncio.to_thread(
client._session.computer.click_mouse, x, y, button
)
success = getattr(result, 'success', False)
logger.info(f"[TakeControl] Computer click at ({x}, {y}): success={success}")
return {"success": success, "method": "computer_click", "output": f"Clicked at ({x}, {y})"}
except Exception as e:
logger.warning(f"[TakeControl] Computer click failed: {e}")
return {"success": False, "output": f"Click failed: {str(e)[:200]}"}
async def _perform_type(client, text: str):
"""Type text into the remote session.
Browser sessions use CDP keyboard API; desktop sessions use computer.input_text.
"""
image_type = getattr(client, '_image_type', 'unknown')
logger.info(f"[TakeControl] Type text: '{text[:30]}', image_type={image_type}")
if _is_browser_session(client):
import urllib.parse
encoded_text = urllib.parse.quote(text)
script = f"""
const {{ chromium }} = require('/usr/local/lib/node_modules/playwright');
(async () => {{
let ok = false;
try {{
const browser = await chromium.connectOverCDP('http://localhost:9222');
const context = browser.contexts()[0];
const pages = context.pages();
const page = pages.slice().reverse().find(p => p.url() !== 'about:blank') || pages[pages.length - 1];
const textToType = decodeURIComponent('{encoded_text}');
await page.keyboard.type(textToType);
console.log('TYPE_OK');
ok = true;
}} catch (e) {{
console.error('TYPE_FAIL:' + e.message);
}}
// No browser.close() — avoid Target.detachFromTarget mid-navigation.
process.exit(ok ? 0 : 1);
}})();
"""
res = await _eval_cdp_script(client, script)
return {"success": res.get("success", False) and "TYPE_OK" in res.get("output", ""), "method": "cdp_type", "output": "Text typed" if "TYPE_OK" in res.get("output", "") else res.get("output", "Unknown error")}
try:
result = await asyncio.to_thread(
client._session.computer.input_text, text
)
success = getattr(result, 'success', False)
logger.info(f"[TakeControl] Computer input_text: success={success}")
return {"success": success, "method": "computer_input", "output": "Text typed"}
except Exception as e:
logger.warning(f"[TakeControl] Computer input_text failed: {e}")
return {"success": False, "output": f"Type failed: {str(e)[:200]}"}
async def _perform_press_keys(client, keys: list[str]):
"""Press key combination on the remote session.
Browser sessions use CDP keyboard API; desktop sessions use computer.press_keys.
"""
key_desc = "+".join(keys)
logger.info(f"[TakeControl] Press keys: {key_desc}")
if _is_browser_session(client):
# Convert key names to the Playwright format (e.g. 'ctrl' → 'Control')
key_map = {
'ctrl': 'Control', 'alt': 'Alt', 'shift': 'Shift', 'meta': 'Meta',
'enter': 'Enter', 'backspace': 'Backspace', 'esc': 'Escape', 'tab': 'Tab',
}
playwright_keys = [key_map.get(k.lower(), k.upper() if len(k) == 1 else k) for k in keys]
combined = "+".join(playwright_keys)
script = f"""
const {{ chromium }} = require('/usr/local/lib/node_modules/playwright');
(async () => {{
let ok = false;
try {{
const browser = await chromium.connectOverCDP('http://localhost:9222');
const context = browser.contexts()[0];
const pages = context.pages();
const page = pages.slice().reverse().find(p => p.url() !== 'about:blank') || pages[pages.length - 1];
await page.keyboard.press('{combined}');
console.log('PRESS_OK');
ok = true;
}} catch (e) {{
console.error('PRESS_FAIL:' + e.message);
}}
// No browser.close() — avoid Target.detachFromTarget mid-navigation.
process.exit(ok ? 0 : 1);
}})();
"""
res = await _eval_cdp_script(client, script)
return {"success": res.get("success", False) and "PRESS_OK" in res.get("output", ""), "method": "cdp_press", "output": f"Pressed {key_desc}" if "PRESS_OK" in res.get("output", "") else res.get("output", "Unknown error")}
try:
result = await asyncio.to_thread(
client._session.computer.press_keys, keys
)
success = getattr(result, 'success', False)
logger.info(f"[TakeControl] Computer press_keys: success={success}")
return {"success": success, "method": "computer_keys", "output": f"Pressed {key_desc}"}
except Exception as e:
logger.warning(f"[TakeControl] Computer press_keys failed: {e}")
return {"success": False, "output": f"Key press failed: {str(e)[:200]}"}
async def _perform_drag(
client, from_x: int, from_y: int, to_x: int, to_y: int, duration_ms: int = 600
) -> dict:
"""Simulate a human-like mouse drag using a Bezier curve trajectory.
Browser sessions use CDP to send precise mouse events with a Bezier
curve trajectory and sub-pixel jitter for CAPTCHA bypass.
Desktop sessions use the Computer API move_mouse sequence.
All CDP scripts use browser.close() for graceful disconnect.
"""
logger.info(
f"[TakeControl] Drag: ({from_x},{from_y}) -> ({to_x},{to_y}), "
f"duration={duration_ms}ms"
)
if _is_browser_session(client):
script = f"""
const {{ chromium }} = require('/usr/local/lib/node_modules/playwright');
let browser;
(async () => {{
let ok = false;
try {{
browser = await chromium.connectOverCDP('http://localhost:9222');
const context = browser.contexts()[0];
const pages = context.pages();
const page = pages.slice().reverse().find(p => p.url() !== 'about:blank') || pages[pages.length - 1];
const steps = 30;
const duration = {duration_ms};
const x0 = {from_x}, y0 = {from_y};
const x3 = {to_x}, y3 = {to_y};
const dx = x3 - x0, dy = y3 - y0;
const perpX = -dy * 0.15, perpY = dx * 0.15;
const x1 = x0 + dx * 0.3 + perpX, y1 = y0 + dy * 0.3 + perpY;
const x2 = x0 + dx * 0.7 - perpX, y2 = y0 + dy * 0.7 - perpY;
const bezier = (t) => {{
const u = 1 - t;
return {{ x: u*u*u*x0+3*u*u*t*x1+3*u*t*t*x2+t*t*t*x3, y: u*u*u*y0+3*u*u*t*y1+3*u*t*t*y2+t*t*t*y3 }};
}};
await page.mouse.move(x0, y0);
await page.mouse.down();
for (let i = 1; i <= steps; i++) {{
const pt = bezier(i / steps);
const jx = (Math.random() - 0.5) * 2;
const jy = (Math.random() - 0.5) * 2;
await page.mouse.move(Math.round(pt.x + jx), Math.round(pt.y + jy));
await new Promise(r => setTimeout(r, duration / steps));
}}
await page.mouse.move(x3, y3);
await page.mouse.up();
console.log('TC_OK: drag complete');
ok = true;
}} catch (e) {{
console.error('TC_FAIL: ' + e.message);
}}
// No browser.close() — avoid Target.detachFromTarget mid-navigation.
process.exit(ok ? 0 : 1);
}})();
"""
res = await _eval_cdp_script(client, script)
return {
"success": res.get("success", False) and "TC_OK" in res.get("output", ""),
"method": "cdp_drag",
"output": f"Dragged ({from_x},{from_y}) -> ({to_x},{to_y})" if "TC_OK" in res.get("output", "") else res.get("output", "Unknown error"),
}
# ── Endpoints ──
class CurrentUrlRequest(BaseModel):
"""Request to get the current page URL from the browser session."""
session_id: str
@router.post("/current-url")
async def control_current_url(
agent_id: uuid.UUID,
data: CurrentUrlRequest,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Get the current page URL from the active browser session via CDP.
Called by the Take Control panel on mount to auto-populate the cookie
domain field, so the user doesn't have to type the domain manually.
"""
_agent, _access = await check_agent_access(db, current_user, agent_id)
env_type = _get_session_env_type(str(agent_id), data.session_id)
client = await _get_client(agent_id, data.session_id, env_type=env_type)
script = """
const { chromium } = require('/usr/local/lib/node_modules/playwright');
let browser;
(async () => {
let ok = false;
try {
browser = await chromium.connectOverCDP('http://localhost:9222');
const context = browser.contexts()[0];
const page = context.pages()[0];
const url = page.url();
console.log('URL_OK:' + url);
ok = true;
} catch (e) {
console.error('URL_FAIL:' + e.message);
} finally {
if (browser) await browser.close().catch(() => {});
}
process.exit(ok ? 0 : 1);
})();
"""
try:
res = await _eval_cdp_script(client, script)
output = res.get("output", "")
if "URL_OK:" in output:
url = output.split("URL_OK:", 1)[1].strip()
return {"status": "ok", "url": url}
return {"status": "ok", "url": ""}
except Exception as e:
logger.warning(f"[TakeControl] current-url failed: {e}")
return {"status": "ok", "url": ""} # Non-fatal — return empty URL
@router.post("/click")
async def control_click(
agent_id: uuid.UUID,
data: ClickRequest,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Forward a mouse click to the AgentBay session.
Requires the session to be in Take Control mode (locked).
Returns {status: 'ok'|'error', detail: str} so the frontend knows if it worked.
"""
_agent, _access = await check_agent_access(db, current_user, agent_id)
if not is_session_locked(str(agent_id), data.session_id):
raise HTTPException(status_code=400, detail="Session is not in Take Control mode")
env_type = _get_session_env_type(str(agent_id), data.session_id)
client = await _get_client(agent_id, data.session_id, env_type=env_type)
# Serialize interactions per-session: rapid clicks would otherwise overwrite
# tc_action.js concurrently, causing the second script to read wrong content.
async with _get_interaction_lock(agent_id, data.session_id):
try:
result = await _perform_click(client, data.x, data.y, data.button)
if result.get("success"):
return {"status": "ok", "detail": f"Clicked at ({data.x}, {data.y})"}
else:
detail = result.get("stderr") or result.get("output") or "Click operation failed"
return {"status": "error", "detail": detail[:500]}
except Exception as e:
logger.error(f"[TakeControl] Click exception: {e}")
return {"status": "error", "detail": str(e)[:500]}
@router.post("/type")
async def control_type(
agent_id: uuid.UUID,
data: TypeRequest,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Forward text input to the AgentBay session."""
_agent, _access = await check_agent_access(db, current_user, agent_id)
if not is_session_locked(str(agent_id), data.session_id):
raise HTTPException(status_code=400, detail="Session is not in Take Control mode")
env_type = _get_session_env_type(str(agent_id), data.session_id)
client = await _get_client(agent_id, data.session_id, env_type=env_type)
async with _get_interaction_lock(agent_id, data.session_id):
try:
result = await _perform_type(client, data.text)
if result.get("success"):
return {"status": "ok", "detail": "Text sent"}
else:
detail = result.get("stderr") or result.get("output") or "Type operation failed"
return {"status": "error", "detail": detail[:500]}
except Exception as e:
logger.error(f"[TakeControl] Type exception: {e}")
return {"status": "error", "detail": str(e)[:500]}
@router.post("/press_keys")
async def control_press_keys(
agent_id: uuid.UUID,
data: PressKeysRequest,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Forward keyboard key presses to the AgentBay session."""
_agent, _access = await check_agent_access(db, current_user, agent_id)
if not is_session_locked(str(agent_id), data.session_id):
raise HTTPException(status_code=400, detail="Session is not in Take Control mode")
env_type = _get_session_env_type(str(agent_id), data.session_id)
client = await _get_client(agent_id, data.session_id, env_type=env_type)
async with _get_interaction_lock(agent_id, data.session_id):
try:
result = await _perform_press_keys(client, data.keys)
if result.get("success"):
return {"status": "ok", "detail": f"Pressed: {'+'.join(data.keys)}"}
else:
detail = result.get("stderr") or result.get("output") or "Key press failed"
return {"status": "error", "detail": detail[:500]}
except Exception as e:
logger.error(f"[TakeControl] Press keys exception: {e}")
return {"status": "error", "detail": str(e)[:500]}
@router.post("/drag")
async def control_drag(
agent_id: uuid.UUID,
data: DragRequest,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Simulate a human-like mouse drag in the AgentBay session.
Used for slider CAPTCHAs and drag-and-drop interactions.
The drag follows a Bezier curve trajectory with random jitter to
mimic natural mouse movement, which is required to bypass bot detection.
"""
_agent, _access = await check_agent_access(db, current_user, agent_id)
if not is_session_locked(str(agent_id), data.session_id):
raise HTTPException(status_code=400, detail="Session is not in Take Control mode")
env_type = _get_session_env_type(str(agent_id), data.session_id)
client = await _get_client(agent_id, data.session_id, env_type=env_type)
async with _get_interaction_lock(agent_id, data.session_id):
try:
result = await _perform_drag(
client,
data.from_x, data.from_y,
data.to_x, data.to_y,
data.duration_ms,
)
if result.get("success"):
return {"status": "ok", "detail": result.get("output", "Drag complete")}
else:
return {"status": "error", "detail": result.get("output", "Drag failed")[:500]}
except Exception as e:
logger.error(f"[TakeControl] Drag exception: {e}")
return {"status": "error", "detail": str(e)[:500]}
@router.post("/screenshot")
async def control_screenshot(
agent_id: uuid.UUID,
data: ScreenshotRequest,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Get an immediate screenshot from the AgentBay session.
Automatically detects the session type (browser/desktop) and uses
the appropriate snapshot method. Returns a base64 data URI and
the screen size for coordinate mapping.
"""
_agent, _access = await check_agent_access(db, current_user, agent_id)
env_type = _get_session_env_type(str(agent_id), data.session_id)
client = await _get_client(agent_id, data.session_id, env_type=env_type)
try:
# Try browser snapshot first, then desktop
screenshot_b64 = await client.get_browser_snapshot_base64()
if not screenshot_b64:
screenshot_b64 = await client.get_desktop_snapshot_base64()
if not screenshot_b64:
logger.warning(f"[TakeControl] Screenshot returned None for agent={agent_id}")
# Also fetch screen size for coordinate mapping between
# screenshot dimensions and computer.click_mouse() coordinates
screen_size = None
try:
size_result = await asyncio.to_thread(
client._session.computer.get_screen_size
)
if size_result.success and getattr(size_result, 'data', None):
screen_size = size_result.data
except Exception:
pass # Non-critical — TC still works without it
return {
"status": "ok",
"screenshot": screenshot_b64,
"screen_size": screen_size,
}
except Exception as e:
logger.warning(f"[TakeControl] Screenshot failed: {e}")
return {"status": "error", "detail": str(e)[:500]}
@router.post("/lock")
async def control_lock(
agent_id: uuid.UUID,
data: LockRequest,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Enter Take Control mode — locks the session against automatic tool execution.
While locked, the agent's execute_tool will return a "waiting for human"
message instead of executing browser/computer tools.
"""
_agent, access_level = await check_agent_access(db, current_user, agent_id)
# Allow any user with access (manage or use) — Take Control is part of
# the normal interaction flow, not an admin-only operation.
key = (str(agent_id), data.session_id)
existing = _take_control_locks.get(key)
if existing:
existing_user_id, locked_at, _existing_env_type = existing
if existing_user_id != str(current_user.id):
# Check if the lock has expired
if time.time() - locked_at > _LOCK_TIMEOUT_SECONDS:
logger.info(f"[TakeControl] Cleared expired lock held by {existing_user_id}")
else:
return {"status": "already_locked", "locked_by": existing_user_id}
# Sanitize env_type — default to 'browser' if empty or unknown
env_type = (data.env_type or "browser").lower()
if env_type not in ("browser", "computer", "code"):
env_type = "browser"
# Acquire or refresh lock with current timestamp and env_type
_take_control_locks[key] = (str(current_user.id), time.time(), env_type)
is_reentry = existing is not None
logger.info(
f"[TakeControl] Lock acquired: agent={agent_id}, session={data.session_id}, "
f"user={current_user.id}, env_type={env_type}, re_entry={is_reentry}"
)
return {"status": "locked", "locked_by": str(current_user.id)}
@router.post("/unlock")
async def control_unlock(
agent_id: uuid.UUID,
data: UnlockRequest,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Exit Take Control mode — unlock session and optionally export cookies.
If export_cookies is True and platform_hint is provided, the current
browser cookies will be exported and stored (encrypted) in the
agent_credentials table.
"""
_agent, _access = await check_agent_access(db, current_user, agent_id)
key = (str(agent_id), data.session_id)
if key not in _take_control_locks:
logger.info(f"[TakeControl] Unlock called but no lock found: agent={agent_id}, session={data.session_id}")
return {"status": "not_locked"}
exported = False
export_count = 0
try:
# Export cookies if requested (non-critical — lock is released regardless)
if data.export_cookies and data.platform_hint:
try:
locked_env_type = _get_session_env_type(str(agent_id), data.session_id)
client = await _get_client(agent_id, data.session_id, env_type=locked_env_type)
export_count = await _export_cookies_from_session(
client, agent_id, data.platform_hint, db
)
exported = True
logger.info(
f"[TakeControl] Cookies exported: agent={agent_id}, "
f"platform={data.platform_hint}, count={export_count}"
)
except Exception as e:
logger.warning(f"[TakeControl] Cookie export failed (non-fatal): {e}")
finally:
# ALWAYS release the lock, even if cookie export fails
_take_control_locks.pop(key, None)
logger.info(
f"[TakeControl] Lock released: agent={agent_id}, session={data.session_id}"
)
# Reset browser initialization flag so the next agentbay browser tool
# call re-initializes the SDK's browser.operator. This clears any stale
# page references left by TC's CDP interactions that would otherwise
# cause browser.operator.navigate to hang indefinitely.
from app.services.agentbay_client import _agentbay_sessions
for _img_type in ("browser", "browser_latest"):
_ck = (agent_id, data.session_id, _img_type)
if _ck in _agentbay_sessions:
_tc_client, _ts = _agentbay_sessions[_ck]
_tc_client._browser_initialized = False
logger.info(
f"[TakeControl] Reset _browser_initialized after TC unlock "
f"for session={data.session_id[:8]}"
)
# Clear from the control-layer initialization tracking set as well
_browser_initialized.discard((agent_id, data.session_id, "browser"))
_browser_initialized.discard((agent_id, data.session_id, "browser_latest"))
# Post-unlock CDP cleanup: cancel any in-progress navigations and release
# held mouse buttons before the agent resumes its browser tool calls.
await _tc_browser_cleanup(agent_id, data.session_id)
return {
"status": "unlocked",
"cookies_exported": exported,
"cookie_count": export_count,
}
async def _export_cookies_from_session(
client, agent_id: uuid.UUID, platform_hint: str, db: AsyncSession
) -> int:
"""Export cookies from the current browser session via CDP and store encrypted.
Uses Playwright's connectOverCDP to read all browser cookies, then upserts
into the agent_credentials table for the matching platform.
Returns the number of cookies exported.
"""
# Build and execute a Node.js script to export ALL cookies via CDP.
#
# Key design decisions:
# 1. We call context.cookies() WITHOUT a URL filter, which returns every cookie
# in the browser profile regardless of which page is currently open.
# 2. We sanitize each cookie object before exporting:
# - Normalize 'sameSite' to the exact casing Playwright addCookies() expects
# ('Strict' | 'Lax' | 'None'). CDP returns lowercase; Playwright wants title-case.
# - Strip 'expires: -1' (session cookies) — Playwright will reject negative expiry.
# - Ensure 'domain' does NOT have a leading dot for addCookies() compatibility.
# (Playwright's addCookies prefers 'example.com' not '.example.com'.)
import base64
export_script = r"""
const { chromium } = require('/usr/local/lib/node_modules/playwright');
let browser;
(async () => {
let ok = false;
try {
browser = await chromium.connectOverCDP('http://localhost:9222');
const context = browser.contexts()[0];
// Fetch ALL cookies from the browser profile (no URL filter = full export)
const rawCookies = await context.cookies();
// Sanitize cookies so they can be re-injected by Playwright's addCookies()
const sameSiteMap = { none: 'None', lax: 'Lax', strict: 'Strict' };
const cookies = rawCookies.map(c => {
const out = { ...c };
// Normalize sameSite casing
if (out.sameSite != null) {
out.sameSite = sameSiteMap[String(out.sameSite).toLowerCase()] || 'Lax';
}
// Remove negative or zero expires (session cookies) — addCookies rejects them
if (out.expires != null && out.expires <= 0) {
delete out.expires;
}
// Ensure domain has leading dot so it matches subdomains.
// Playwright's context.cookies() strips the leading dot from
// domain cookies, turning them into host-only. Chrome's CDP
// Network.setCookie needs the dot to match subdomains (e.g.,
// ".xiaohongshu.com" matches www.xiaohongshu.com).
if (out.domain && !out.domain.startsWith('.')) {
out.domain = '.' + out.domain;
}
return out;
});
console.log('COOKIES_EXPORT:' + JSON.stringify(cookies));
ok = true;
} catch (e) {
console.error('EXPORT_FAIL:' + e.message);
} finally {
if (browser) await browser.close().catch(() => {});
}
process.exit(ok ? 0 : 1);
})();
"""
# Use base64 encoding to write script to current directory (not /tmp, which may lack write perms)
script_b64 = base64.b64encode(export_script.encode('utf-8')).decode('ascii')
write_result = await client.command_exec(
f"echo '{script_b64}' | /usr/bin/base64 -d > tc_export_cookies.js"
)
logger.info(f"[TakeControl] Cookie export script write: success={write_result.get('success')}, stderr={write_result.get('stderr', '')[:100]}")
result = await client.command_exec("node tc_export_cookies.js", timeout_ms=15000)
stdout = result.get("stdout", "")
stderr = result.get("stderr", "")
logger.info(f"[TakeControl] Cookie export script exec: success={result.get('success')}, stdout_len={len(stdout)}, stderr={stderr[:200]}")
if "COOKIES_EXPORT:" not in stdout:
logger.warning(f"[TakeControl] Cookie export script failed: {stdout}")
return 0
# Parse the exported cookies JSON
cookies_line = [line for line in stdout.split("\n") if "COOKIES_EXPORT:" in line]
if not cookies_line:
return 0
cookies_json_str = cookies_line[0].split("COOKIES_EXPORT:", 1)[1].strip()
try:
cookies = json.loads(cookies_json_str)
except json.JSONDecodeError:
logger.warning("[TakeControl] Failed to parse exported cookies JSON")
return 0
if not cookies:
return 0
# Encrypt and store
settings = get_settings()
encrypted_cookies = encrypt_data(cookies_json_str, settings.SECRET_KEY)
# Try to find existing credential for this platform
result = await db.execute(
select(AgentCredential).where(
AgentCredential.agent_id == agent_id,
AgentCredential.platform == platform_hint,
)
)
existing = result.scalar_one_or_none()
now = datetime.now(timezone.utc)
if existing:
# Update existing credential
existing.cookies_json = encrypted_cookies
existing.cookies_updated_at = now
existing.last_login_at = now
existing.status = "active"
else:
# Create new credential
new_cred = AgentCredential(
agent_id=agent_id,
credential_type="website",
platform=platform_hint,
display_name=platform_hint,
cookies_json=encrypted_cookies,
cookies_updated_at=now,
last_login_at=now,
status="active",
)
db.add(new_cred)
await db.commit()
return len(cookies)