deerflow2/backend/src/community/aio_sandbox/local_backend.py

336 lines
12 KiB
Python

"""Local container backend for sandbox provisioning.
Manages sandbox containers using Docker or Apple Container on the local machine.
Handles container lifecycle, port allocation, and cross-process container discovery.
"""
from __future__ import annotations
import logging
import subprocess
import time
from src.utils.network import get_free_port, release_port
from .backend import SandboxBackend, wait_for_sandbox_ready
from .sandbox_info import SandboxInfo
logger = logging.getLogger(__name__)
class LocalContainerBackend(SandboxBackend):
"""Backend that manages sandbox containers locally using Docker or Apple Container.
On macOS, automatically prefers Apple Container if available, otherwise falls back to Docker.
On other platforms, uses Docker.
Features:
- Deterministic container naming for cross-process discovery
- Port allocation with thread-safe utilities
- Container lifecycle management (start/stop with --rm)
- Support for volume mounts and environment variables
"""
def __init__(
self,
*,
image: str,
base_port: int,
container_prefix: str,
config_mounts: list,
environment: dict[str, str],
):
"""Initialize the local container backend.
Args:
image: Container image to use.
base_port: Base port number to start searching for free ports.
container_prefix: Prefix for container names (e.g., "deer-flow-sandbox").
config_mounts: Volume mount configurations from config (list of VolumeMountConfig).
environment: Environment variables to inject into containers.
"""
self._image = image
self._base_port = base_port
self._container_prefix = container_prefix
self._config_mounts = config_mounts
self._environment = environment
self._runtime = self._detect_runtime()
@property
def runtime(self) -> str:
"""The detected container runtime ("docker" or "container")."""
return self._runtime
def _detect_runtime(self) -> str:
"""Detect which container runtime to use.
On macOS, prefer Apple Container if available, otherwise fall back to Docker.
On other platforms, use Docker.
Returns:
"container" for Apple Container, "docker" for Docker.
"""
import platform
if platform.system() == "Darwin":
try:
result = subprocess.run(
["container", "--version"],
capture_output=True,
text=True,
check=True,
timeout=5,
)
logger.info(f"Detected Apple Container: {result.stdout.strip()}")
return "container"
except (FileNotFoundError, subprocess.CalledProcessError, subprocess.TimeoutExpired):
logger.info("Apple Container not available, falling back to Docker")
return "docker"
# ── SandboxBackend interface ──────────────────────────────────────────
def create(self, thread_id: str, sandbox_id: str, extra_mounts: list[tuple[str, str, bool]] | None = None) -> SandboxInfo:
"""Start a new container and return its connection info.
Args:
thread_id: Thread ID for which the sandbox is being created. Useful for backends that want to organize sandboxes by thread.
sandbox_id: Deterministic sandbox identifier (used in container name).
extra_mounts: Additional volume mounts as (host_path, container_path, read_only) tuples.
Returns:
SandboxInfo with container details.
Raises:
RuntimeError: If the container fails to start.
"""
container_name = f"{self._container_prefix}-{sandbox_id}"
port = get_free_port(start_port=self._base_port)
try:
container_id = self._start_container(container_name, port, extra_mounts)
self._ensure_user_data_permissions(container_name)
except Exception:
release_port(port)
raise
# Use host.docker.internal when running inside Docker container
# This allows containers to access the sandbox on the host
sandbox_host = "host.docker.internal"
return SandboxInfo(
sandbox_id=sandbox_id,
sandbox_url=f"http://{sandbox_host}:{port}",
container_name=container_name,
container_id=container_id,
)
def _ensure_user_data_permissions(self, container_name: str) -> None:
"""Ensure /mnt/user-data subdirectories are writable in sandbox container.
Some sandbox services run as non-root users (e.g. ``gem``). If mounted
host directories are created as ``755 root:root``, uploads may fail with
permission denied. This best-effort fix normalizes permissions.
"""
fix_cmd = (
"mkdir -p /mnt/user-data/uploads /mnt/user-data/workspace /mnt/user-data/outputs "
"&& chmod 777 /mnt/user-data/uploads /mnt/user-data/workspace /mnt/user-data/outputs"
)
# Retry briefly because the init process may still be setting up paths
# right after container startup.
for _ in range(5):
try:
subprocess.run(
[self._runtime, "exec", container_name, "sh", "-lc", fix_cmd],
capture_output=True,
text=True,
check=True,
timeout=5,
)
return
except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e:
logger.debug(f"Retrying user-data permission fix for {container_name}: {e}")
time.sleep(0.3)
logger.warning(
"Failed to ensure user-data permissions for %s; uploads may fail until permissions are fixed",
container_name,
)
def destroy(self, info: SandboxInfo) -> None:
"""Stop the container and release its port."""
if info.container_id:
self._stop_container(info.container_id)
# Extract port from sandbox_url for release
try:
from urllib.parse import urlparse
port = urlparse(info.sandbox_url).port
if port:
release_port(port)
except Exception:
pass
def is_alive(self, info: SandboxInfo) -> bool:
"""Check if the container is still running (lightweight, no HTTP)."""
if info.container_name:
return self._is_container_running(info.container_name)
return False
def discover(self, sandbox_id: str) -> SandboxInfo | None:
"""Discover an existing container by its deterministic name.
Checks if a container with the expected name is running, retrieves its
port, and verifies it responds to health checks.
Args:
sandbox_id: The deterministic sandbox ID (determines container name).
Returns:
SandboxInfo if container found and healthy, None otherwise.
"""
container_name = f"{self._container_prefix}-{sandbox_id}"
if not self._is_container_running(container_name):
return None
port = self._get_container_port(container_name)
if port is None:
return None
# Use host.docker.internal when running inside Docker container
sandbox_host = "host.docker.internal"
sandbox_url = f"http://{sandbox_host}:{port}"
if not wait_for_sandbox_ready(sandbox_url, timeout=5):
return None
return SandboxInfo(
sandbox_id=sandbox_id,
sandbox_url=sandbox_url,
container_name=container_name,
)
# ── Container operations ─────────────────────────────────────────────
def _start_container(
self,
container_name: str,
port: int,
extra_mounts: list[tuple[str, str, bool]] | None = None,
) -> str:
"""Start a new container.
Args:
container_name: Name for the container.
port: Host port to map to container port 8080.
extra_mounts: Additional volume mounts.
Returns:
The container ID.
Raises:
RuntimeError: If container fails to start.
"""
cmd = [self._runtime, "run"]
# Docker-specific security options
if self._runtime == "docker":
cmd.extend(["--security-opt", "seccomp=unconfined"])
cmd.extend(
[
"--rm",
"-d",
"-p",
f"{port}:8080",
"--name",
container_name,
]
)
# Environment variables
for key, value in self._environment.items():
cmd.extend(["-e", f"{key}={value}"])
# Config-level volume mounts
for mount in self._config_mounts:
mount_spec = f"{mount.host_path}:{mount.container_path}"
if mount.read_only:
mount_spec += ":ro"
cmd.extend(["-v", mount_spec])
# Extra mounts (thread-specific, skills, etc.)
if extra_mounts:
for host_path, container_path, read_only in extra_mounts:
mount_spec = f"{host_path}:{container_path}"
if read_only:
mount_spec += ":ro"
cmd.extend(["-v", mount_spec])
cmd.append(self._image)
logger.info(f"Starting container using {self._runtime}: {' '.join(cmd)}")
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
container_id = result.stdout.strip()
logger.info(f"Started container {container_name} (ID: {container_id}) using {self._runtime}")
return container_id
except subprocess.CalledProcessError as e:
logger.error(f"Failed to start container using {self._runtime}: {e.stderr}")
raise RuntimeError(f"Failed to start sandbox container: {e.stderr}")
def _stop_container(self, container_id: str) -> None:
"""Stop a container (--rm ensures automatic removal)."""
try:
subprocess.run(
[self._runtime, "stop", container_id],
capture_output=True,
text=True,
check=True,
)
logger.info(f"Stopped container {container_id} using {self._runtime}")
except subprocess.CalledProcessError as e:
logger.warning(f"Failed to stop container {container_id}: {e.stderr}")
def _is_container_running(self, container_name: str) -> bool:
"""Check if a named container is currently running.
This enables cross-process container discovery — any process can detect
containers started by another process via the deterministic container name.
"""
try:
result = subprocess.run(
[self._runtime, "inspect", "-f", "{{.State.Running}}", container_name],
capture_output=True,
text=True,
timeout=5,
)
return result.returncode == 0 and result.stdout.strip().lower() == "true"
except (subprocess.CalledProcessError, subprocess.TimeoutExpired):
return False
def _get_container_port(self, container_name: str) -> int | None:
"""Get the host port of a running container.
Args:
container_name: The container name to inspect.
Returns:
The host port mapped to container port 8080, or None if not found.
"""
try:
result = subprocess.run(
[self._runtime, "port", container_name, "8080"],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode == 0 and result.stdout.strip():
# Output format: "0.0.0.0:PORT" or ":::PORT"
port_str = result.stdout.strip().split(":")[-1]
return int(port_str)
except (subprocess.CalledProcessError, subprocess.TimeoutExpired, ValueError):
pass
return None