"""Local container backend for sandbox provisioning. Manages sandbox containers using Docker or Apple Container on the local machine. Handles container lifecycle, port allocation, and cross-process container discovery. """ from __future__ import annotations import json import logging import os import subprocess from datetime import datetime from deerflow.utils.network import get_free_port, release_port from .backend import SandboxBackend, wait_for_sandbox_ready from .sandbox_info import SandboxInfo logger = logging.getLogger(__name__) def _parse_docker_timestamp(raw: str) -> float: """Parse Docker's ISO 8601 timestamp into a Unix epoch float. Docker returns timestamps with nanosecond precision and a trailing ``Z`` (e.g. ``2026-04-08T01:22:50.123456789Z``). Python's ``fromisoformat`` accepts at most microseconds and (pre-3.11) does not accept ``Z``, so the string is normalized before parsing. Returns ``0.0`` on empty input or parse failure so callers can use ``0.0`` as a sentinel for "unknown age". """ if not raw: return 0.0 try: s = raw.strip() if "." in s: dot_pos = s.index(".") tz_start = dot_pos + 1 while tz_start < len(s) and s[tz_start].isdigit(): tz_start += 1 frac = s[dot_pos + 1 : tz_start][:6] # truncate to microseconds tz_suffix = s[tz_start:] s = s[: dot_pos + 1] + frac + tz_suffix if s.endswith("Z"): s = s[:-1] + "+00:00" return datetime.fromisoformat(s).timestamp() except (ValueError, TypeError) as e: logger.debug(f"Could not parse docker timestamp {raw!r}: {e}") return 0.0 def _extract_host_port(inspect_entry: dict, container_port: int) -> int | None: """Extract the host port mapped to ``container_port/tcp`` from a docker inspect entry. Returns None if the container has no port mapping for that port. """ try: ports = (inspect_entry.get("NetworkSettings") or {}).get("Ports") or {} bindings = ports.get(f"{container_port}/tcp") or [] if bindings: host_port = bindings[0].get("HostPort") if host_port: return int(host_port) except (ValueError, TypeError, AttributeError): pass return None def _format_container_mount(runtime: str, host_path: str, container_path: str, read_only: bool) -> list[str]: """Format a bind-mount argument for the selected runtime. Docker's ``-v host:container`` syntax is ambiguous for Windows drive-letter paths like ``D:/...`` because ``:`` is both the drive separator and the volume separator. Use ``--mount type=bind,...`` for Docker to avoid that parsing ambiguity. Apple Container keeps using ``-v``. """ if runtime == "docker": mount_spec = f"type=bind,src={host_path},dst={container_path}" if read_only: mount_spec += ",readonly" return ["--mount", mount_spec] mount_spec = f"{host_path}:{container_path}" if read_only: mount_spec += ":ro" return ["-v", mount_spec] class LocalContainerBackend(SandboxBackend): """Backend that manages sandbox containers locally using Docker or Apple Container. On macOS, automatically prefers Apple Container if available, otherwise falls back to Docker. On other platforms, uses Docker. Features: - Deterministic container naming for cross-process discovery - Port allocation with thread-safe utilities - Container lifecycle management (start/stop with --rm) - Support for volume mounts and environment variables """ def __init__( self, *, image: str, base_port: int, container_prefix: str, config_mounts: list, environment: dict[str, str], ): """Initialize the local container backend. Args: image: Container image to use. base_port: Base port number to start searching for free ports. container_prefix: Prefix for container names (e.g., "deer-flow-sandbox"). config_mounts: Volume mount configurations from config (list of VolumeMountConfig). environment: Environment variables to inject into containers. """ self._image = image self._base_port = base_port self._container_prefix = container_prefix self._config_mounts = config_mounts self._environment = environment self._runtime = self._detect_runtime() @property def runtime(self) -> str: """The detected container runtime ("docker" or "container").""" return self._runtime def _detect_runtime(self) -> str: """Detect which container runtime to use. On macOS, prefer Apple Container if available, otherwise fall back to Docker. On other platforms, use Docker. Returns: "container" for Apple Container, "docker" for Docker. """ import platform if platform.system() == "Darwin": try: result = subprocess.run( ["container", "--version"], capture_output=True, text=True, check=True, timeout=5, ) logger.info(f"Detected Apple Container: {result.stdout.strip()}") return "container" except (FileNotFoundError, subprocess.CalledProcessError, subprocess.TimeoutExpired): logger.info("Apple Container not available, falling back to Docker") return "docker" # ── SandboxBackend interface ────────────────────────────────────────── def create(self, thread_id: str, sandbox_id: str, extra_mounts: list[tuple[str, str, bool]] | None = None) -> SandboxInfo: """Start a new container and return its connection info. Args: thread_id: Thread ID for which the sandbox is being created. Useful for backends that want to organize sandboxes by thread. sandbox_id: Deterministic sandbox identifier (used in container name). extra_mounts: Additional volume mounts as (host_path, container_path, read_only) tuples. Returns: SandboxInfo with container details. Raises: RuntimeError: If the container fails to start. """ container_name = f"{self._container_prefix}-{sandbox_id}" # Retry loop: if Docker rejects the port (e.g. a stale container still # holds the binding after a process restart), skip that port and try the # next one. The socket-bind check in get_free_port mirrors Docker's # 0.0.0.0 bind, but Docker's port-release can be slightly asynchronous, # so a reactive fallback here ensures we always make progress. _next_start = self._base_port container_id: str | None = None port: int = 0 for _attempt in range(10): port = get_free_port(start_port=_next_start) try: container_id = self._start_container(container_name, port, extra_mounts) break except RuntimeError as exc: release_port(port) err = str(exc) err_lower = err.lower() # Port already bound: skip this port and retry with the next one. if "port is already allocated" in err or "address already in use" in err_lower: logger.warning(f"Port {port} rejected by Docker (already allocated), retrying with next port") _next_start = port + 1 continue # Container-name conflict: another process may have already started # the deterministic sandbox container for this sandbox_id. Try to # discover and adopt the existing container instead of failing. if "is already in use by container" in err_lower or "conflict. the container name" in err_lower: logger.warning(f"Container name {container_name} already in use, attempting to discover existing sandbox instance") existing = self.discover(sandbox_id) if existing is not None: return existing raise else: raise RuntimeError("Could not start sandbox container: all candidate ports are already allocated by Docker") # When running inside Docker (DooD), sandbox containers are reachable via # host.docker.internal rather than localhost (they run on the host daemon). sandbox_host = os.environ.get("DEER_FLOW_SANDBOX_HOST", "localhost") return SandboxInfo( sandbox_id=sandbox_id, sandbox_url=f"http://{sandbox_host}:{port}", container_name=container_name, container_id=container_id, ) def destroy(self, info: SandboxInfo) -> None: """Stop the container and release its port.""" # Prefer container_id, fall back to container_name (both accepted by docker stop). # This ensures containers discovered via list_running() (which only has the name) # can also be stopped. stop_target = info.container_id or info.container_name if stop_target: self._stop_container(stop_target) # Extract port from sandbox_url for release try: from urllib.parse import urlparse port = urlparse(info.sandbox_url).port if port: release_port(port) except Exception: pass def is_alive(self, info: SandboxInfo) -> bool: """Check if the container is still running (lightweight, no HTTP).""" if info.container_name: return self._is_container_running(info.container_name) return False def discover(self, sandbox_id: str) -> SandboxInfo | None: """Discover an existing container by its deterministic name. Checks if a container with the expected name is running, retrieves its port, and verifies it responds to health checks. Args: sandbox_id: The deterministic sandbox ID (determines container name). Returns: SandboxInfo if container found and healthy, None otherwise. """ container_name = f"{self._container_prefix}-{sandbox_id}" if not self._is_container_running(container_name): return None port = self._get_container_port(container_name) if port is None: return None sandbox_host = os.environ.get("DEER_FLOW_SANDBOX_HOST", "localhost") sandbox_url = f"http://{sandbox_host}:{port}" if not wait_for_sandbox_ready(sandbox_url, timeout=5): return None return SandboxInfo( sandbox_id=sandbox_id, sandbox_url=sandbox_url, container_name=container_name, ) def list_running(self) -> list[SandboxInfo]: """Enumerate all running containers matching the configured prefix. Uses a single ``docker ps`` call to list container names, then a single batched ``docker inspect`` call to retrieve creation timestamp and port mapping for all containers at once. Total subprocess calls: 2 (down from 2N+1 in the naive per-container approach). Note: Docker's ``--filter name=`` performs *substring* matching, so a secondary ``startswith`` check is applied to ensure only containers with the exact prefix are included. Containers without port mappings are still included (with empty sandbox_url) so that startup reconciliation can adopt orphans regardless of their port state. """ # Step 1: enumerate container names via docker ps try: result = subprocess.run( [ self._runtime, "ps", "--filter", f"name={self._container_prefix}-", "--format", "{{.Names}}", ], capture_output=True, text=True, timeout=10, ) if result.returncode != 0: stderr = (result.stderr or "").strip() logger.warning( "Failed to list running containers with %s ps (returncode=%s, stderr=%s)", self._runtime, result.returncode, stderr or "", ) return [] if not result.stdout.strip(): return [] except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError, OSError) as e: logger.warning(f"Failed to list running containers: {e}") return [] # Filter to names matching our exact prefix (docker filter is substring-based) container_names = [name.strip() for name in result.stdout.strip().splitlines() if name.strip().startswith(self._container_prefix + "-")] if not container_names: return [] # Step 2: batched docker inspect — single subprocess call for all containers inspections = self._batch_inspect(container_names) infos: list[SandboxInfo] = [] sandbox_host = os.environ.get("DEER_FLOW_SANDBOX_HOST", "localhost") for container_name in container_names: data = inspections.get(container_name) if data is None: # Container disappeared between ps and inspect, or inspect failed continue created_at, host_port = data sandbox_id = container_name[len(self._container_prefix) + 1 :] sandbox_url = f"http://{sandbox_host}:{host_port}" if host_port else "" infos.append( SandboxInfo( sandbox_id=sandbox_id, sandbox_url=sandbox_url, container_name=container_name, created_at=created_at, ) ) logger.info(f"Found {len(infos)} running sandbox container(s)") return infos def _batch_inspect(self, container_names: list[str]) -> dict[str, tuple[float, int | None]]: """Batch-inspect containers in a single subprocess call. Returns a mapping of ``container_name -> (created_at, host_port)``. Missing containers or parse failures are silently dropped from the result. """ if not container_names: return {} try: result = subprocess.run( [self._runtime, "inspect", *container_names], capture_output=True, text=True, timeout=15, ) except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError, OSError) as e: logger.warning(f"Failed to batch-inspect containers: {e}") return {} if result.returncode != 0: stderr = (result.stderr or "").strip() logger.warning( "Failed to batch-inspect containers with %s inspect (returncode=%s, stderr=%s)", self._runtime, result.returncode, stderr or "", ) return {} try: payload = json.loads(result.stdout or "[]") except json.JSONDecodeError as e: logger.warning(f"Failed to parse docker inspect output as JSON: {e}") return {} out: dict[str, tuple[float, int | None]] = {} for entry in payload: # ``Name`` is prefixed with ``/`` in the docker inspect response name = (entry.get("Name") or "").lstrip("/") if not name: continue created_at = _parse_docker_timestamp(entry.get("Created", "")) host_port = _extract_host_port(entry, 8080) out[name] = (created_at, host_port) return out # ── Container operations ───────────────────────────────────────────── def _start_container( self, container_name: str, port: int, extra_mounts: list[tuple[str, str, bool]] | None = None, ) -> str: """Start a new container. Args: container_name: Name for the container. port: Host port to map to container port 8080. extra_mounts: Additional volume mounts. Returns: The container ID. Raises: RuntimeError: If container fails to start. """ cmd = [self._runtime, "run"] # Docker-specific security options if self._runtime == "docker": cmd.extend(["--security-opt", "seccomp=unconfined"]) cmd.extend( [ "--rm", "-d", "-p", f"{port}:8080", "--name", container_name, ] ) # Environment variables for key, value in self._environment.items(): cmd.extend(["-e", f"{key}={value}"]) # Config-level volume mounts for mount in self._config_mounts: cmd.extend( _format_container_mount( self._runtime, mount.host_path, mount.container_path, mount.read_only, ) ) # Extra mounts (thread-specific, skills, etc.) if extra_mounts: for host_path, container_path, read_only in extra_mounts: cmd.extend( _format_container_mount( self._runtime, host_path, container_path, read_only, ) ) cmd.append(self._image) logger.info(f"Starting container using {self._runtime}: {' '.join(cmd)}") try: result = subprocess.run(cmd, capture_output=True, text=True, check=True) container_id = result.stdout.strip() logger.info(f"Started container {container_name} (ID: {container_id}) using {self._runtime}") return container_id except subprocess.CalledProcessError as e: logger.error(f"Failed to start container using {self._runtime}: {e.stderr}") raise RuntimeError(f"Failed to start sandbox container: {e.stderr}") def _stop_container(self, container_id: str) -> None: """Stop a container (--rm ensures automatic removal).""" try: subprocess.run( [self._runtime, "stop", container_id], capture_output=True, text=True, check=True, ) logger.info(f"Stopped container {container_id} using {self._runtime}") except subprocess.CalledProcessError as e: logger.warning(f"Failed to stop container {container_id}: {e.stderr}") def _is_container_running(self, container_name: str) -> bool: """Check if a named container is currently running. This enables cross-process container discovery — any process can detect containers started by another process via the deterministic container name. """ try: result = subprocess.run( [self._runtime, "inspect", "-f", "{{.State.Running}}", container_name], capture_output=True, text=True, timeout=5, ) return result.returncode == 0 and result.stdout.strip().lower() == "true" except (subprocess.CalledProcessError, subprocess.TimeoutExpired): return False def _get_container_port(self, container_name: str) -> int | None: """Get the host port of a running container. Args: container_name: The container name to inspect. Returns: The host port mapped to container port 8080, or None if not found. """ try: result = subprocess.run( [self._runtime, "port", container_name, "8080"], capture_output=True, text=True, timeout=5, ) if result.returncode == 0 and result.stdout.strip(): # Output format: "0.0.0.0:PORT" or ":::PORT" port_str = result.stdout.strip().split(":")[-1] return int(port_str) except (subprocess.CalledProcessError, subprocess.TimeoutExpired, ValueError): pass return None