"""Built-in guardrail providers that ship with DeerFlow.""" from __future__ import annotations import fnmatch import json import logging import re import shlex from datetime import UTC, datetime from pathlib import PurePosixPath from typing import Any from deerflow.guardrails.provider import GuardrailDecision, GuardrailReason, GuardrailRequest logger = logging.getLogger(__name__) class AllowlistProvider: """Simple allowlist/denylist provider. No external dependencies.""" name = "allowlist" def __init__(self, *, allowed_tools: list[str] | None = None, denied_tools: list[str] | None = None): self._allowed = set(allowed_tools) if allowed_tools else None self._denied = set(denied_tools) if denied_tools else set() def evaluate(self, request: GuardrailRequest) -> GuardrailDecision: if self._allowed is not None and request.tool_name not in self._allowed: return GuardrailDecision(allow=False, reasons=[GuardrailReason(code="oap.tool_not_allowed", message=f"tool '{request.tool_name}' not in allowlist")]) if request.tool_name in self._denied: return GuardrailDecision(allow=False, reasons=[GuardrailReason(code="oap.tool_not_allowed", message=f"tool '{request.tool_name}' is denied")]) return GuardrailDecision(allow=True, reasons=[GuardrailReason(code="oap.allowed")]) async def aevaluate(self, request: GuardrailRequest) -> GuardrailDecision: return self.evaluate(request) class SensitiveDataProvider: """Block tool calls that may access sensitive files such as .env and keys.""" name = "sensitive-data" _DEFAULT_PROTECTED_TOOLS = { "read_file", "write_file", "str_replace", "ls", "glob", "grep", "bash", } _DEFAULT_DENY_BASENAMES = {".env"} _DEFAULT_DENY_GLOBS = { ".env.*", "*.pem", "*.key", "id_rsa*", "secrets.*", "credentials.*", } def __init__( self, *, protected_tools: list[str] | None = None, deny_basenames: list[str] | None = None, deny_globs: list[str] | None = None, block_skills_env: bool = True, **_: Any, ): self._protected_tools = {t.lower() for t in (protected_tools or list(self._DEFAULT_PROTECTED_TOOLS))} self._deny_basenames = {n.lower() for n in (deny_basenames or list(self._DEFAULT_DENY_BASENAMES))} self._deny_globs = {p.lower() for p in (deny_globs or list(self._DEFAULT_DENY_GLOBS))} self._block_skills_env = block_skills_env def _normalize_candidate(self, raw: str | None) -> str: if not raw: return "" return str(raw).strip().strip("\"'") def _looks_sensitive_path(self, raw_path: str) -> bool: value = self._normalize_candidate(raw_path) if not value: return False lowered = value.lower() if self._block_skills_env and "/mnt/skills/" in lowered: basename = PurePosixPath(lowered).name if basename == ".env" or basename.startswith(".env."): return True basename = PurePosixPath(lowered).name if basename in self._deny_basenames: return True return any(fnmatch.fnmatch(basename, pat) for pat in self._deny_globs) def _extract_bash_candidates(self, command: str) -> list[str]: candidates: list[str] = [] if not command: return candidates try: tokens = shlex.split(command) except ValueError: tokens = command.split() for token in tokens: t = token.strip() if not t: continue # Path-like tokens if "/" in t or t.startswith("."): candidates.append(t) # file.env style arguments may not contain slash if t.lower().startswith(".env"): candidates.append(t) return candidates def _collect_candidates(self, request: GuardrailRequest) -> list[str]: args = request.tool_input if isinstance(request.tool_input, dict) else {} tool = request.tool_name candidates: list[str] = [] if tool in {"read_file", "write_file", "str_replace", "ls"}: path = args.get("path") if isinstance(path, str): candidates.append(path) elif tool in {"glob", "grep"}: path = args.get("path") if isinstance(path, str): candidates.append(path) glob_pat = args.get("glob") if isinstance(glob_pat, str): candidates.append(glob_pat) elif tool == "bash": command = str(args.get("command") or "") candidates.extend(self._extract_bash_candidates(command)) # Fast-path for common secret exposure commands if re.search(r"\b(printenv|env)\b", command, flags=re.IGNORECASE): candidates.append(".env") return candidates def _audit(self, request: GuardrailRequest, decision: GuardrailDecision) -> None: if decision.allow: return code = decision.reasons[0].code if decision.reasons else "oap.blocked_pattern" rec = { "timestamp": datetime.now(UTC).isoformat(), "provider": self.name, "tool_name": request.tool_name, "reason_code": code, "thread_id": request.thread_id, "agent_id": request.agent_id, } logger.warning("[SensitiveDataGuardrail] %s", json.dumps(rec, ensure_ascii=False)) def evaluate(self, request: GuardrailRequest) -> GuardrailDecision: tool = (request.tool_name or "").lower() if tool not in self._protected_tools: return GuardrailDecision(allow=True, reasons=[GuardrailReason(code="oap.allowed")]) candidates = self._collect_candidates(request) if any(self._looks_sensitive_path(c) for c in candidates): decision = GuardrailDecision( allow=False, reasons=[GuardrailReason(code="oap.blocked_pattern", message="sensitive path access is blocked by policy")], policy_id="sensitive-data.v1", ) self._audit(request, decision) return decision return GuardrailDecision(allow=True, reasons=[GuardrailReason(code="oap.allowed")]) async def aevaluate(self, request: GuardrailRequest) -> GuardrailDecision: return self.evaluate(request)